*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 196,201 **** PostgreSQL documentation
--- 196,211 ----
+
+
+
+
+ status packets sent to server as soon as after fsync.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 371,377 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0))
/*
* Any errors will already have been reported in the function process,
--- 371,377 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0, false))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 38,44 **** static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
!
static void usage(void);
static XLogRecPtr FindStreamingStart(uint32 *tli);
--- 38,44 ----
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
! static volatile bool reply_fsync = false;
static void usage(void);
static XLogRecPtr FindStreamingStart(uint32 *tli);
***************
*** 74,79 **** usage(void)
--- 74,80 ----
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -s, --status-interval=INTERVAL\n"
" time between status packets sent to server (in seconds)\n"));
+ printf(_(" -r, --reply-fsync status packets sent to server as soon as after fsync\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
***************
*** 334,340 **** StreamLog(void)
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
! fsync_interval);
PQfinish(conn);
}
--- 335,341 ----
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
! fsync_interval, reply_fsync);
PQfinish(conn);
}
***************
*** 368,373 **** main(int argc, char **argv)
--- 369,375 ----
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
+ {"reply-fsync", no_argument, NULL, 'r'},
{"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
***************
*** 394,400 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 396,402 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:rS:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 435,440 **** main(int argc, char **argv)
--- 437,445 ----
exit(1);
}
break;
+ case 'r':
+ reply_fsync = true;
+ break;
case 'S':
replication_slot = pg_strdup(optarg);
break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 33,65 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
! int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr blockpos, int64 *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix);
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos);
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int64 last_status, int fsync_interval,
XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
/*
* Open a new WAL file in the specified directory.
--- 33,69 ----
static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
+ static int64 last_status = -1; /* timestamp of last feedback message */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
! int fsync_interval, bool reply_fsync);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr blockpos);
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, bool reply_fsync);
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos, bool reply_fsync);
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos,
! bool reply_fsync);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int fsync_interval,
XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
+ static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now,
+ bool replyRequested);
/*
* Open a new WAL file in the specified directory.
***************
*** 154,160 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
{
off_t currpos;
--- 158,164 ----
* and returns false, otherwise returns true.
*/
static bool
! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool reply_fsync)
{
off_t currpos;
***************
*** 210,215 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
--- 214,226 ----
lastFlushPosition = pos;
last_fsync = feGetCurrentTimestamp();
+ /* Send feedback! */
+ if (reply_fsync)
+ {
+ if (!sendFeedback(conn, pos, last_fsync, false))
+ return false;
+ last_status = last_fsync;
+ }
return true;
}
***************
*** 443,448 **** CheckServerVersionForStreaming(PGconn *conn)
--- 454,462 ----
* fsync_interval controls how often we flush to the received WAL file,
* in milliseconds.
*
+ * If 'reply_fsync' is true, status packets sent to server as soon as
+ * after fsync.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
***************
*** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! int fsync_interval)
{
char query[128];
char slotcmd[128];
--- 464,470 ----
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! int fsync_interval, bool reply_fsync)
{
char query[128];
char slotcmd[128];
***************
*** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval);
if (res == NULL)
goto error;
--- 609,615 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval, reply_fsync);
if (res == NULL)
goto error;
***************
*** 760,769 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
- int64 last_status = -1;
XLogRecPtr blockpos = startpos;
still_sending = true;
--- 774,782 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval, bool reply_fsync)
{
char *copybuf = NULL;
XLogRecPtr blockpos = startpos;
still_sending = true;
***************
*** 778,784 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos))
goto error;
now = feGetCurrentTimestamp();
--- 791,797 ----
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos, reply_fsync))
goto error;
now = feGetCurrentTimestamp();
***************
*** 802,807 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 815,827 ----
lastFlushPosition = blockpos;
last_fsync = now;
+ /* Send feedback! */
+ if (reply_fsync)
+ {
+ if (!sendFeedback(conn, blockpos, now, false))
+ goto error;
+ last_status = now;
+ }
}
/*
***************
*** 821,827 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Calculate how long send/receive loops should sleep
*/
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
! last_status, fsync_interval, blockpos);
r = CopyStreamReceive(conn, sleeptime, ©buf);
while (r != 0)
--- 841,847 ----
* Calculate how long send/receive loops should sleep
*/
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
! fsync_interval, blockpos);
r = CopyStreamReceive(conn, sleeptime, ©buf);
while (r != 0)
***************
*** 831,837 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (r == -2)
{
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! basedir, partial_suffix, stoppos);
if (res == NULL)
goto error;
else
--- 851,857 ----
if (r == -2)
{
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! basedir, partial_suffix, stoppos, reply_fsync);
if (res == NULL)
goto error;
else
***************
*** 841,861 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Check the message type. */
if (copybuf[0] == 'k')
{
! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
! &last_status))
goto error;
}
else if (copybuf[0] == 'w')
{
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! timeline, basedir, stream_stop, partial_suffix))
goto error;
/*
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos))
goto error;
}
else
--- 861,880 ----
/* Check the message type. */
if (copybuf[0] == 'k')
{
! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos))
goto error;
}
else if (copybuf[0] == 'w')
{
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! timeline, basedir, stream_stop, partial_suffix, reply_fsync))
goto error;
/*
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos, reply_fsync))
goto error;
}
else
***************
*** 995,1001 **** CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
*/
static bool
ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr blockpos, int64 *last_status)
{
int pos;
bool replyRequested;
--- 1014,1020 ----
*/
static bool
ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr blockpos)
{
int pos;
bool replyRequested;
***************
*** 1024,1030 **** ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
now = feGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
return false;
! *last_status = now;
}
return true;
--- 1043,1049 ----
now = feGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
return false;
! last_status = now;
}
return true;
***************
*** 1037,1043 **** static bool
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix)
{
int xlogoff;
int bytes_left;
--- 1056,1062 ----
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, bool reply_fsync)
{
int xlogoff;
int bytes_left;
***************
*** 1145,1151 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, *blockpos))
/* Error message written in close_walfile() */
return false;
--- 1164,1170 ----
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, *blockpos, reply_fsync))
/* Error message written in close_walfile() */
return false;
***************
*** 1175,1181 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
static PGresult *
HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos)
{
PGresult *res = PQgetResult(conn);
--- 1194,1200 ----
static PGresult *
HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos, bool reply_fsync)
{
PGresult *res = PQgetResult(conn);
***************
*** 1186,1192 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/
if (still_sending)
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
PQclear(res);
--- 1205,1211 ----
*/
if (still_sending)
{
! if (!close_walfile(basedir, partial_suffix, blockpos, reply_fsync))
{
/* Error message written in close_walfile() */
PQclear(res);
***************
*** 1218,1228 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
static bool
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos)
{
if (still_sending && stream_stop(blockpos, timeline, false))
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
return false;
--- 1237,1248 ----
static bool
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos,
! bool reply_fsync)
{
if (still_sending && stream_stop(blockpos, timeline, false))
{
! if (!close_walfile(basedir, partial_suffix, blockpos, reply_fsync))
{
/* Potential error message is written by close_walfile */
return false;
***************
*** 1244,1250 **** CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
*/
static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int64 last_status, int fsync_interval, XLogRecPtr blockpos)
{
int64 targettime = 0;
int64 status_targettime = 0;
--- 1264,1270 ----
*/
static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! int fsync_interval, XLogRecPtr blockpos)
{
int64 targettime = 0;
int64 status_targettime = 0;
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 17,20 **** extern bool ReceiveXlogStream(PGconn *conn,
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
! int fsync_interval);
--- 17,21 ----
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
! int fsync_interval,
! bool reply_fsync);