#!/bin/bash # assume both instances are running, on port 6972 (master) and 6973 (replica) port1=6972 port2=6973 logfile1=$HOME/pg_stuff/pg_installations/pgsql.logical_replication/logfile.logical_replication logfile2=$HOME/pg_stuff/pg_installations/pgsql.logical_replication2/logfile.logical_replication2 # clear logs echo > $logfile1 echo > $logfile2 logfile_ts=$( date +"%Y%m%d_%H_%M_%S" ) scale=1; if [[ ! "$1" == "" ]]; then scale=$1; fi clients=1; if [[ ! "$2" == "" ]]; then clients=$2; fi #INIT_WAIT=0; if [[ ! "$3" == "" ]]; then INIT_WAIT=$3; fi duration=60; if [[ ! "$3" == "" ]]; then duration=$3; fi date_str=$logfile_ts; if [[ ! "$4" == "" ]]; then date_str=$4; fi CLEAN_ONLY=''; if [[ ! "$5" == "" ]]; then CLEAN_ONLY=$5; fi threads=8 # threads=1 master_version=$( psql -qtAXc "show server_version" -p $port1 ) replica_version=$( psql -qtAXc "show server_version" -p $port2 ) master_commit_hash=$( psql -qtAXc "show server_version" -p $port1 | cut -d _ -f 6 ) replica_commit_hash=$( psql -qtAXc "show server_version" -p $port2 | cut -d _ -f 6 ) master_start_time=$( psql -qtAXc "select pg_postmaster_start_time()" -p $port1 ) replica_start_time=$( psql -qtAXc "select pg_postmaster_start_time()" -p $port2 ) master_patch_md5=$(echo "select md5(comments) from information_schema.sql_packages where feature_name = 'patch md5'"|psql -qtAXp $port1) replica_patch_md5=$(echo "select md5(comments) from information_schema.sql_packages where feature_name = 'patch md5'"|psql -qtAXp $port2) master_s_c=$( psql -qtAXc "show synchronous_commit" -p $port1 ) replica_s_c=$( psql -qtAXc "show synchronous_commit" -p $port2 ) master_assert=$( psql -qtAXc "show debug_assertions" -p $port1 ) replica_assert=$( psql -qtAXc "show debug_assertions" -p $port2 ) echo "============================================================================" echo "-- scale $scale clients $clients duration $duration CLEAN_ONLY=$CLEAN_ONLY" echo "============================================================================" echo "-- hostname: "$( hostname -s ) echo "-- timestamp: $date_str" #echo -n "-- "; ps -ef f | grep 6972 | grep -Evw 'grep|xterm|screen|SCREEN' | less -iS #echo -n "-- "; ps -ef f | grep 6973 | grep -Evw 'grep|xterm|screen|SCREEN' | less -iS echo "-- master_start_time $master_start_time replica_start_time $replica_start_time" if [[ "$master_patch_md5" == "$replica_patch_md5" ]]; then echo "-- master patch-md5 [$master_patch_md5]" echo "-- replica patch-md5 [$replica_patch_md5] (ok)" else echo "-- master patch-md5 [$master_patch_md5] - replica patch-md5 NOT the same, bailing out" echo "-- replica patch-md5 [$replica_patch_md5] - replica patch-md5 NOT the same, bailing out" exit -1 fi echo "-- synchronous_commit, master [$master_s_c] replica [$replica_s_c]" echo "-- master_assert [$master_assert] replica_assert [$replica_assert]" echo "-- self md5 "$( md5sum $0 ) unset PGSERVICEFILE PGSERVICE # PGPORT PGDATA PGHOST export PGDATABASE=testdb pgdata_master=$HOME/pg_stuff/pg_installations/pgsql.logical_replication/data pgdata_replica=$HOME/pg_stuff/pg_installations/pgsql.logical_replication2/data function cb() { # display the 4 pgbench tables' accumulated content as md5s # a,b,t,h stand for: pgbench_accounts, -branches, -tellers, -history md5_total_6972='-1' md5_total_6973='-2' num_tables=$( echo "select count(*) from pg_class where relkind = 'r' and relname ~ '^pgbench_'" | psql -qtAX ) if [[ $num_tables -ne 4 ]] then echo "pgbench tables not 4 - exit" >> $outf exit fi for port in $port1 $port2 do md5_a=$(echo "select * from pgbench_accounts order by aid"|psql -qtAXp $port|md5sum|cut -b 1-9) md5_b=$(echo "select * from pgbench_branches order by bid"|psql -qtAXp $port|md5sum|cut -b 1-9) md5_t=$(echo "select * from pgbench_tellers order by tid"|psql -qtAXp $port|md5sum|cut -b 1-9) md5_h=$(echo "select * from pgbench_history order by hid"|psql -qtAXp $port|md5sum|cut -b 1-9) cnt_a=$(echo "select count(*) from pgbench_accounts" |psql -qtAXp $port) cnt_b=$(echo "select count(*) from pgbench_branches" |psql -qtAXp $port) cnt_t=$(echo "select count(*) from pgbench_tellers" |psql -qtAXp $port) cnt_h=$(echo "select count(*) from pgbench_history" |psql -qtAXp $port) md5_total[$port]=$( echo "${md5_a} ${md5_b} ${md5_t} ${md5_h}" | md5sum ) printf "$port a,b,t,h: %8d %6d %6d %6d" $cnt_a $cnt_b $cnt_t $cnt_h echo -n " $md5_a $md5_b $md5_t $md5_h" if [[ $port -eq $port1 ]]; then echo " master" elif [[ $port -eq $port2 ]]; then echo -n " replica" else echo " ERROR " fi done if [[ "${md5_total[$port1]}" == "${md5_total[$port2]}" ]] then echo " ok" else echo " NOK" fi } # function cb_new() # { # # display the 4 pgbench tables' accumulated content as md5s # # a,b,t,h stand for: pgbench_accounts, -branches, -tellers, -history # md5_total_6972='-1' # md5_total_6973='-2' # num_tables=$( echo "select count(*) from pg_class where relkind = 'r' and relname ~ '^pgbench_'" | psql -qtAX ) # if [[ $num_tables -ne 4 ]] # then # echo "pgbench tables not 4 - exit" >> $outf # exit # fi # for port in $port1 $port2 # do # arr=$( echo " select count(*) from pgbench_accounts # union all select count(*) from pgbench_branches # union all select count(*) from pgbench_tellers # union all select count(*) from pgbench_history " | psql -qtAXp $port ) # set -- $arr # cnt_a=$1 # cnt_b=$2 # cnt_t=$3 # cnt_h=$4 # # md5_a=$(echo "select * from pgbench_accounts order by aid"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_a=$(echo "select sum(aid),sum(bid),sum(abalance),sum(length(filler)) from pgbench_accounts"|psql -qtAXp$port|md5sum|cut -b 1-9) # arr=$(echo "select array_to_string( array ( # (select md5(cast(sum(aid)+sum(bid)+sum(abalance)+coalesce(sum(length(filler)),0) as text)) from pgbench_accounts a) # union all (select case when $cnt_b = 0 then md5('') else md5(cast( sum(bid)+sum(bbalance)+coalesce(sum(length(filler)),0) as text)) end from pgbench_branches b) # union all (select case when $cnt_t = 0 then md5('') else md5(cast( sum(tid)+sum(bid)+sum(tbalance)+coalesce(sum(length(filler)),0) as text)) end from pgbench_tellers t) # union all (select case when $cnt_h = 0 then md5('') else md5(cast(( sum(tid) + sum(bid) + sum(aid) + sum(delta) + coalesce( cast(sum(extract(epoch from mtime)) as bigint), 0) # + coalesce(sum(length(filler)), 0) + sum(hid) ) as text)) end from pgbench_history h) # ), ' ')"|psql -qtAXp $port ) # set -- $arr # md5_a=$1 md5_b=$2 md5_t=$3 md5_h=$4 # # md5_a='' md5_b=$1 md5_t=$2 md5_h=$3 # # echo -ne "md5_b=[$md5_b]\nmd5_t=[$md5_t]\nmd5_h=[$md5_h]" # # # md5_a=$(echo "select sum(aid),sum(bid),sum(abalance),sum(length(filler)) from pgbench_accounts"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_b=$(echo "select * from pgbench_branches order by bid"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_t=$(echo "select * from pgbench_tellers order by tid"|psql -qtAXp$port|md5sum|cut -b 1-9) # ## md5_h=$(echo "select * from pgbench_history order by hid"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_h=$(echo "select sum(tid),sum(bid),sum(aid),sum(delta),cast(sum(extract(epoch from mtime)) as bigint),sum(length(filler)), sum(hid) from pgbench_history"|psql -qtAXp$port|md5sum|cut -b 1-9) # # cnt_a='' #$(echo "select count(*) from pgbench_accounts"|psql -qtAXp $port) # # cnt_b=$(echo "select count(*) from pgbench_branches"|psql -qtAXp $port) # # cnt_t=$(echo "select count(*) from pgbench_tellers" |psql -qtAXp $port) # # cnt_h=$(echo "select count(*) from pgbench_history" |psql -qtAXp $port) # md5_total[$port]=$( echo "${md5_a} ${md5_b} ${md5_t} ${md5_h}" | md5sum ) # printf "$port a,b,t,h: %8d %6d %6d %6d" ${cnt_a} ${cnt_b} ${cnt_t} ${cnt_h} # # printf "$port a,b,t,h: ? %6d %6d %6d" ${cnt_b} ${cnt_t} ${cnt_h} # echo -n " ${md5_a:1:9} ${md5_b:1:9} ${md5_t:1:9} ${md5_h:1:9}" # # echo -n " ${md5_b:1:9} ${md5_t:1:9} ${md5_h:1:9}" # if [[ $port -eq $port1 ]]; then echo " master" # elif [[ $port -eq $port2 ]]; then echo -n " replica" # else echo " ERROR " # fi # done # if [[ "${md5_total[$port1]}" == "${md5_total[$port2]}" ]] # then # echo " ok" # else # echo " NOK" # fi # } function clean_pubsub() { if [[ 1 -eq 1 ]] then echo "$1" sub_count=$( echo "select count(*) from pg_subscription" | psql -qtAXp $port2 ) if [[ $sub_count -ne 0 ]] then echo "sub_count -ne 0 : deleting sub1 (plain)" echo "drop subscription if exists sub1" | psql -qXp $port2 echo "sub_count -ne 0 : deleting sub1 (nodrop)" echo "drop subscription if exists sub1 nodrop slot" | psql -qXp $port2 fi sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port2 ) if [[ $sub_repl_slot_count -ne 0 ]] then echo "sub_repl_slot_count -ne 0 - deleting" echo "select pg_drop_replication_slot('sub1')" | psql -Xp $port1 fi pub_count=$( echo "select count(*) from pg_publication" | psql -qtAXp $port1 ) if [[ $pub_count -ne 0 ]] then echo "pub_count -ne 0 - deleting pub1" echo "drop publication if exists pub1" | psql -qXp $port1 fi pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port1 ) if [[ $pub_repl_slot_count -ne 0 ]] then echo "pub_repl_slot_count -ne 0 - deleting (sub1)" echo "select pg_drop_replication_slot('sub1')" | psql -qXp $port1 fi pub_count=$( echo "select count(*) from pg_publication" | psql -qtAXp $port1 ) pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port1 ) sub_count=$( echo "select count(*) from pg_subscription" | psql -qtAXp $port2 ) sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql -qtAXp $port2 ) if [[ $pub_count -ne 0 ]] \ || [[ $sub_count -ne 0 ]] \ || [[ $pub_repl_slot_count -ne 0 ]] \ || [[ $sub_repl_slot_count -ne 0 ]] \ ; then echo " pub_count $pub_count pub_repl_slot_count $pub_repl_slot_count sub_count $sub_count sub_repl_slot_count $sub_repl_slot_count " uncleanf=unclean.$date_str.txt.bz2 echo "-- imperfect cleanup, pg_waldump to $uncleanf, waiting 60 s, then exit" for w in $( ls -1 $pgdata_master/pg_wal/0* ) do echo "--" echo "-- file: $w" echo "--" pg_waldump $w done 2>&1 | bzip2 > $uncleanf sleep 60 exit 1 fi fi } # pg_subscription_rel, pg_replication_origin_status on subscriber and # pg_replication_slots on publisher at the end of the failed run wal_info_on_fail() { now_str=$(date +"%H%M") walfile1=$( echo "select pg_walfile_name(lsn) from (values (pg_current_wal_location())) as f(lsn)" | psql -qtAXp $port1 ) walfile2=$( echo "select pg_walfile_name(lsn) from (values (pg_current_wal_location())) as f(lsn)" | psql -qtAXp $port2 ) for x in \ "$pgdata_master/pg_wal/${walfile1} 1" \ "$pgdata_replica/pg_wal/${walfile2} 2" \ ; do echo "-- parsing [$x]" set -- $x walf=$1 nr=$2 bz2f=waldump.${date_str}_${now_str}.$nr.$unchanged.$walfile1.txt.bz2 echo "-- walf [$walf]" echo "-- bz2f [$bz2f]" lines=$( pg_waldump $walf 2>&1 | wc -l ) echo "-- lines $lines" echo "-- (before) pg_waldump $walf" if [[ $lines -gt 0 ]] then echo -en "-- file: $walf\n-- $lines lines\n" echo -en "-- pg_waldump $walf 2>&1 | bzip2 > $bz2f\n" pg_waldump $walf 2>&1 | bzip2 > "$bz2f" rc=$? #echo -en "--\n-- the above was: $walf\n--\n" else echo "-- walfile_name [$walf]" echo "-- $lines lines" fi done } table_info_on_fail() { # And finally if you could dump the contents of pg_subscription_rel, # pg_replication_origin_status on subscriber and pg_replication_slots on # publisher at the end of the failed run that would also help. echo "table pg_subscription_rel; table pg_replication_origin_status;" | psql -aqX -p $port2 # on subscriber and echo "table pg_replication_slots;" | psql -aqX -p $port1 # ... on publisher } # invoke the function: clean_pubsub "clean-at-start-call" if [[ ! "$CLEAN_ONLY" == "" ]] then exit 0 fi # logfile_ts=$( date +"%Y%m%d_%H%M_%s%N" ) echo "drop table if exists pgbench_accounts; drop table if exists pgbench_branches; drop table if exists pgbench_tellers; drop table if exists pgbench_history;" | psql -qXp $port1 \ && echo "drop table if exists pgbench_accounts; drop table if exists pgbench_branches; drop table if exists pgbench_tellers; drop table if exists pgbench_history;" | psql -qXp $port2 \ && pgbench -p $port1 -qis ${scale//_/} && echo " alter table pgbench_history add column hid serial primary key; -- alter table pgbench_history replica identity full; -- delete from pgbench_accounts where aid > 40; " | psql -q1Xp $port1 \ && pg_dump -F c --no-create-subscription-slots -p $port1 \ -t pgbench_history \ -t pgbench_accounts \ -t pgbench_branches \ -t pgbench_tellers \ | pg_restore -1 -p $port2 -d testdb # echo "-- (no diffs expected... )" # echo "$(cb)" # cb_text0=$(cb) # empty the tables on replica: # echo " delete from pgbench_accounts; delete from pgbench_branches; delete from pgbench_tellers ; delete from pgbench_history ; " | psql -q -X -p $port2 echo "truncate pgbench_accounts,pgbench_branches,pgbench_tellers,pgbench_history ; " | psql -qXp $port2 # echo "-- (pre-replication, diffs *are* expected... )" # echo "$(cb)" echo "create publication pub1 for all tables;" | psql -p $port1 -aqtAX echo "create subscription sub1 connection 'port=${port1}' publication pub1 with (disabled); alter subscription sub1 enable; " | psql -p $port2 -aqtAX ## if [[ ${INIT_WAIT//_/} -gt 0 ]] ## then ## #echo "select 'ok' as ok, now() as ts, pid, state, usename, backend_start backend_xmin, state, sent_location application_name from pg_stat_replication" | psql -qXp $port1 ## echo "-- wait ${INIT_WAIT//_/}s" ## sleep ${INIT_WAIT//_/} ## fi # # state1=$(echo "select string_agg(state,',') states from pg_stat_replication" | psql -qtAXp $port1 ) # startup_count=$(echo "select count(*) from pg_stat_replication where state = 'startup'" | psql -qtAXp $port1 ) # # #while [[ ! "$state1" == "streaming" ]] # while [[ "$startup_count" -gt "0" ]] # do # echo "-- delay" # echo "select * from pg_stat_replication" | psql -qXp $port1 # echo "-- delay, master not yet in 'streaming' state [$state1]" # sleep 1 # startup_count=$(echo "select count(*) from pg_stat_replication where state = 'startup'" | psql -qtAXp $port1 ) # #state1=$(echo "select state from pg_stat_replication" | psql -qtAXp $port1 ) # done # echo "-- state ok" # echo "select 'ok' as ok, now() as ts, pid, state, usename, backend_start backend_xmin, state, sent_location application_name from pg_stat_replication" | psql -qXp $port1 RUN_PGBENCH=1 if [[ $RUN_PGBENCH -eq 1 ]] then pseconds=$( echo "$duration / 5" | bc ) echo "-- pgbench -c $clients -j $threads -T $duration -P $pseconds -n -- scale $scale" pgbench -c $clients -j $threads -T $duration -P $pseconds -n # scale $scale else echo "-- not running pgbench..." fi # md5_a_primary=$(echo "select * from pgbench_accounts order by aid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # md5_b_primary=$(echo "select * from pgbench_branches order by bid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # md5_t_primary=$(echo "select * from pgbench_tellers order by tid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # md5_h_primary=$(echo "select * from pgbench_history order by hid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # cnt_a_primary=$(echo "select count(*) from pgbench_accounts" |psql -qtAXp $port1) # cnt_b_primary=$(echo "select count(*) from pgbench_branches" |psql -qtAXp $port1) # cnt_t_primary=$(echo "select count(*) from pgbench_tellers" |psql -qtAXp $port1) # cnt_h_primary=$(echo "select count(*) from pgbench_history" |psql -qtAXp $port1) # waiting1=$(( ${scale//_/} * 4 )) # if [[ $waiting1 -gt 60 ]] # then # waiting1=60 # fi # echo "-- waiting ${waiting1}s... (scale * 4, or 60)" # sleep $waiting1 waiting1=0 echo "-- waiting ${waiting1}s... (always)" sleep $waiting1 wait_total=$waiting1 date +"%Y.%m.%d %H:%M:%S" echo "-- getting md5 (cb)" cb_text1=$(cb) cb_text1_md5=$( echo "$cb_text1" | md5sum ) echo "${cb_text1} ${cb_text1_md5:1:9}" date +"%Y.%m.%d %H:%M:%S" loop_counter=0 wait_chunk=5 wait_max=3600 unchanged=0 while [[ 1 -eq 1 ]]; do if echo "$cb_text1" | grep -qw 'replica ok'; then echo "-- All is well." echo "-- ${wait_total} seconds total. scale $scale clients $clients -T $duration" break fi if [[ $unchanged -eq 5 ]] then #now_str=$(date +"%M%S") #unfinishedf1=unfinished.${date_str}_${now_str}.1.$unchanged.txt.bz2 #unfinishedf2=unfinished.${date_str}_${now_str}.2.$unchanged.txt.bz2 #echo -ne "--\n-- pg_waldump to $unfinishedf1 (and replica)...\n--\n" #for w in $(ls -1 $pgdata_master/pg_wal/0* ); do echo -ne "--\n-- file: ${w}\n--\n"; pg_waldump $w; done 2>&1 | bzip2 > $unfinishedf1 #for w in $(ls -1 $pgdata_replica/pg_wal/0* ); do echo -ne "--\n-- file: ${w}\n--\n"; pg_waldump $w; done 2>&1 | bzip2 > $unfinishedf2 table_info_on_fail wal_info_on_fail fi waited_already=$(( $loop_counter * $wait_chunk )) if [[ $waited_already -gt $wait_max ]] then echo "-- Not good, but breaking out of wait (waited more than ${wait_max} s)" info_on_fail wal_info_on_fail echo "-- (wait_total ${wait_total} s)" break fi echo "-- wait another ${wait_chunk} s (total ${wait_total} s) (unchanged $unchanged)" sleep $wait_chunk; wait_total=$(( $wait_total + $wait_chunk )) echo "-- getting md5 (cb)" cb_text1=$(cb) cb_text1_md5_new=$( echo "$cb_text1" | md5sum ) if [[ "$cb_text1_md5_new" == "$cb_text1_md5" ]] then unchanged=$(( $unchanged + 1 )) else unchanged=0 fi echo "${cb_text1} ${cb_text1_md5_new:1:9}" if [[ $unchanged -gt 20 ]] then echo "-- Not good, but breaking out of wait ($unchanged times no change)" table_info_on_fail wal_info_on_fail echo "-- (wait_total ${wait_total} s)" break fi cb_text1_md5=$cb_text1_md5_new loop_counter=$(( loop_counter + 1 )) done # wait 20s, then invoke the cleanup function: echo "-- waiting 20s, then end-cleaning" sleep 20 clean_pubsub "clean-at-end-call" # dest_dir='.' dest_dir='logfiles' _time_=$( date +"%H%M") if [[ ! -d logfiles ]] then mkdir logfiles fi if echo "${cb_text1}" | grep -qw 'replica ok' then cp $logfile1 ${dest_dir}/logrep.$date_str.1.${_time_}.scale_${scale}.clients_$clients.ok.log cp $logfile2 ${dest_dir}/logrep.$date_str.2.${_time_}.scale_${scale}.clients_$clients.ok.log else cp $logfile1 ${dest_dir}/logrep.$date_str.1.${_time_}.scale_${scale}.clients_$clients.NOK.log cp $logfile2 ${dest_dir}/logrep.$date_str.2.${_time_}.scale_${scale}.clients_$clients.NOK.log fi