*** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 196,201 **** PostgreSQL documentation --- 196,211 ---- + + + + + status packets sent to server as soon as after fsync. + + + + + *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 371,377 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0)) /* * Any errors will already have been reported in the function process, --- 371,377 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0, false)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 38,44 **** static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; ! static void usage(void); static XLogRecPtr FindStreamingStart(uint32 *tli); --- 38,44 ---- static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; ! static volatile bool reply_fsync = false; static void usage(void); static XLogRecPtr FindStreamingStart(uint32 *tli); *************** *** 74,79 **** usage(void) --- 74,80 ---- printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -s, --status-interval=INTERVAL\n" " time between status packets sent to server (in seconds)\n")); + printf(_(" -r, --reply-fsync status packets sent to server as soon as after fsync\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); *************** *** 334,340 **** StreamLog(void) ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", ! fsync_interval); PQfinish(conn); } --- 335,341 ---- ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", ! fsync_interval, reply_fsync); PQfinish(conn); } *************** *** 368,373 **** main(int argc, char **argv) --- 369,375 ---- {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, + {"reply-fsync", no_argument, NULL, 'r'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} *************** *** 394,400 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv", long_options, &option_index)) != -1) { switch (c) --- 396,402 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:rS:nF:wWv", long_options, &option_index)) != -1) { switch (c) *************** *** 435,440 **** main(int argc, char **argv) --- 437,445 ---- exit(1); } break; + case 'r': + reply_fsync = true; + break; case 'S': replication_slot = pg_strdup(optarg); break; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 33,65 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static int64 last_fsync = -1; /* timestamp of last WAL file flush */ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, ! int fsync_interval); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, ! XLogRecPtr blockpos, int64 *last_status); static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr *blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix); static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, ! XLogRecPtr *stoppos); static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int64 last_status, int fsync_interval, XLogRecPtr blockpos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); /* * Open a new WAL file in the specified directory. --- 33,69 ---- static int64 last_fsync = -1; /* timestamp of last WAL file flush */ static bool still_sending = true; /* feedback still needs to be sent? */ + static int64 last_status = -1; /* timestamp of last feedback message */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, ! int fsync_interval, bool reply_fsync); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, ! XLogRecPtr blockpos); static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr *blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, bool reply_fsync); static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, ! XLogRecPtr *stoppos, bool reply_fsync); static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, XLogRecPtr *stoppos, ! bool reply_fsync); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int fsync_interval, XLogRecPtr blockpos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); + static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, + bool replyRequested); /* * Open a new WAL file in the specified directory. *************** *** 154,160 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool ! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) { off_t currpos; --- 158,164 ---- * and returns false, otherwise returns true. */ static bool ! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool reply_fsync) { off_t currpos; *************** *** 210,215 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) --- 214,226 ---- lastFlushPosition = pos; last_fsync = feGetCurrentTimestamp(); + /* Send feedback! */ + if (reply_fsync) + { + if (!sendFeedback(conn, pos, last_fsync, false)) + return false; + last_status = last_fsync; + } return true; } *************** *** 443,448 **** CheckServerVersionForStreaming(PGconn *conn) --- 454,462 ---- * fsync_interval controls how often we flush to the received WAL file, * in milliseconds. * + * If 'reply_fsync' is true, status packets sent to server as soon as + * after fsync. + * * Note: The log position *must* be at a log segment start! */ bool *************** *** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval) { char query[128]; char slotcmd[128]; --- 464,470 ---- char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval, bool reply_fsync) { char query[128]; char slotcmd[128]; *************** *** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval); if (res == NULL) goto error; --- 609,615 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval, reply_fsync); if (res == NULL) goto error; *************** *** 760,769 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval) { char *copybuf = NULL; - int64 last_status = -1; XLogRecPtr blockpos = startpos; still_sending = true; --- 774,782 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval, bool reply_fsync) { char *copybuf = NULL; XLogRecPtr blockpos = startpos; still_sending = true; *************** *** 778,784 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Check if we should continue streaming, or abort at this point. */ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos)) goto error; now = feGetCurrentTimestamp(); --- 791,797 ---- * Check if we should continue streaming, or abort at this point. */ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos, reply_fsync)) goto error; now = feGetCurrentTimestamp(); *************** *** 802,807 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 815,827 ---- lastFlushPosition = blockpos; last_fsync = now; + /* Send feedback! */ + if (reply_fsync) + { + if (!sendFeedback(conn, blockpos, now, false)) + goto error; + last_status = now; + } } /* *************** *** 821,827 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Calculate how long send/receive loops should sleep */ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, ! last_status, fsync_interval, blockpos); r = CopyStreamReceive(conn, sleeptime, ©buf); while (r != 0) --- 841,847 ---- * Calculate how long send/receive loops should sleep */ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, ! fsync_interval, blockpos); r = CopyStreamReceive(conn, sleeptime, ©buf); while (r != 0) *************** *** 831,837 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (r == -2) { PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, ! basedir, partial_suffix, stoppos); if (res == NULL) goto error; else --- 851,857 ---- if (r == -2) { PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, ! basedir, partial_suffix, stoppos, reply_fsync); if (res == NULL) goto error; else *************** *** 841,861 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Check the message type. */ if (copybuf[0] == 'k') { ! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, ! &last_status)) goto error; } else if (copybuf[0] == 'w') { if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, ! timeline, basedir, stream_stop, partial_suffix)) goto error; /* * Check if we should continue streaming, or abort at this point. */ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos)) goto error; } else --- 861,880 ---- /* Check the message type. */ if (copybuf[0] == 'k') { ! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos)) goto error; } else if (copybuf[0] == 'w') { if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, ! timeline, basedir, stream_stop, partial_suffix, reply_fsync)) goto error; /* * Check if we should continue streaming, or abort at this point. */ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos, reply_fsync)) goto error; } else *************** *** 995,1001 **** CopyStreamReceive(PGconn *conn, long timeout, char **buffer) */ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, ! XLogRecPtr blockpos, int64 *last_status) { int pos; bool replyRequested; --- 1014,1020 ---- */ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, ! XLogRecPtr blockpos) { int pos; bool replyRequested; *************** *** 1024,1030 **** ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, now = feGetCurrentTimestamp(); if (!sendFeedback(conn, blockpos, now, false)) return false; ! *last_status = now; } return true; --- 1043,1049 ---- now = feGetCurrentTimestamp(); if (!sendFeedback(conn, blockpos, now, false)) return false; ! last_status = now; } return true; *************** *** 1037,1043 **** static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr *blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix) { int xlogoff; int bytes_left; --- 1056,1062 ---- ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr *blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, bool reply_fsync) { int xlogoff; int bytes_left; *************** *** 1145,1151 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { ! if (!close_walfile(basedir, partial_suffix, *blockpos)) /* Error message written in close_walfile() */ return false; --- 1164,1170 ---- /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { ! if (!close_walfile(basedir, partial_suffix, *blockpos, reply_fsync)) /* Error message written in close_walfile() */ return false; *************** *** 1175,1181 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, static PGresult * HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, ! XLogRecPtr *stoppos) { PGresult *res = PQgetResult(conn); --- 1194,1200 ---- static PGresult * HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, ! XLogRecPtr *stoppos, bool reply_fsync) { PGresult *res = PQgetResult(conn); *************** *** 1186,1192 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf, */ if (still_sending) { ! if (!close_walfile(basedir, partial_suffix, blockpos)) { /* Error message written in close_walfile() */ PQclear(res); --- 1205,1211 ---- */ if (still_sending) { ! if (!close_walfile(basedir, partial_suffix, blockpos, reply_fsync)) { /* Error message written in close_walfile() */ PQclear(res); *************** *** 1218,1228 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf, static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, XLogRecPtr *stoppos) { if (still_sending && stream_stop(blockpos, timeline, false)) { ! if (!close_walfile(basedir, partial_suffix, blockpos)) { /* Potential error message is written by close_walfile */ return false; --- 1237,1248 ---- static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, XLogRecPtr *stoppos, ! bool reply_fsync) { if (still_sending && stream_stop(blockpos, timeline, false)) { ! if (!close_walfile(basedir, partial_suffix, blockpos, reply_fsync)) { /* Potential error message is written by close_walfile */ return false; *************** *** 1244,1250 **** CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, */ static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int64 last_status, int fsync_interval, XLogRecPtr blockpos) { int64 targettime = 0; int64 status_targettime = 0; --- 1264,1270 ---- */ static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, ! int fsync_interval, XLogRecPtr blockpos) { int64 targettime = 0; int64 status_targettime = 0; *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 17,20 **** extern bool ReceiveXlogStream(PGconn *conn, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval); --- 17,21 ---- stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval, ! bool reply_fsync);