diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 13e0d23..e757bba 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -10459,7 +10459,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, ti->oid = pstrdup(de->d_name); ti->path = pstrdup(buflinkpath.data); ti->rpath = relpath ? pstrdup(relpath) : NULL; - ti->size = infotbssize ? sendTablespace(fullpath, true) : -1; + ti->size = infotbssize ? sendTablespace(fullpath, true, InvalidXLogRecPtr) : -1; if (tablespaces) *tablespaces = lappend(*tablespaces, ti); diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index c2978a9..3560da1 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -41,6 +41,7 @@ #include "utils/ps_status.h" #include "utils/relcache.h" #include "utils/timestamp.h" +#include "utils/pg_lsn.h" typedef struct @@ -52,13 +53,22 @@ typedef struct bool includewal; uint32 maxrate; bool sendtblspcmapfile; + XLogRecPtr prev_backup_start_lsn; } basebackup_options; static int64 sendDir(const char *path, int basepathlen, bool sizeonly, - List *tablespaces, bool sendtblspclinks); + List *tablespaces, bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn); static bool sendFile(const char *readfilename, const char *tarfilename, - struct stat *statbuf, bool missing_ok, Oid dboid); + struct stat *statbuf, bool missing_ok, Oid dboid, + XLogRecPtr prev_backup_start_lsn); +static bool sendFileMap(const char *readfilename, const char *tarfilename, + struct stat *statbuf, bool missing_ok, Oid dboid, + XLogRecPtr prev_backup_start_lsn, int *expected_write_size); +static bool sendFilePartial(const char *readfilename, const char *tarfilename, + struct stat *statbuf, bool missing_ok, Oid dboid, + XLogRecPtr prev_backup_start_lsn, int expected_write_size); + static void sendFileWithContent(const char *filename, const char *content); static int64 _tarWriteHeader(const char *filename, const char *linktarget, struct stat *statbuf, bool sizeonly); @@ -275,7 +285,8 @@ perform_base_backup(basebackup_options *opt) /* Add a node for the base directory at the end */ ti = palloc0(sizeof(tablespaceinfo)); - ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1; + ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, + true, opt->prev_backup_start_lsn) : -1; tablespaces = lappend(tablespaces, ti); /* Send tablespace header */ @@ -331,10 +342,10 @@ perform_base_backup(basebackup_options *opt) if (tblspc_map_file && opt->sendtblspcmapfile) { sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data); - sendDir(".", 1, false, tablespaces, false); + sendDir(".", 1, false, tablespaces, false, opt->prev_backup_start_lsn); } else - sendDir(".", 1, false, tablespaces, true); + sendDir(".", 1, false, tablespaces, true, opt->prev_backup_start_lsn); /* ... and pg_control after everything else. */ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0) @@ -342,10 +353,10 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", XLOG_CONTROL_FILE))); - sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid); + sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, InvalidXLogRecPtr); } else - sendTablespace(ti->path, false); + sendTablespace(ti->path, false, opt->prev_backup_start_lsn); /* * If we're including WAL, and this is the main data directory we @@ -592,7 +603,7 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf))); - sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid); + sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, InvalidXLogRecPtr); /* unconditionally mark file as archived */ StatusFilePath(pathbuf, fname, ".done"); @@ -650,6 +661,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_maxrate = false; bool o_tablespace_map = false; bool o_noverify_checksums = false; + bool o_prev_backup_start_lsn = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -738,6 +750,25 @@ parse_basebackup_options(List *options, basebackup_options *opt) noverify_checksums = true; o_noverify_checksums = true; } + else if (strcmp(defel->defname, "prev_backup_start_lsn") == 0) + { + char *prev_backup_start_lsn_str; + XLogRecPtr prev_backup_start_lsn; + bool have_error = false; + + if (o_prev_backup_start_lsn) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + + prev_backup_start_lsn_str = strVal(defel->arg); + elog(WARNING, "prev_backup_start_lsn_str: %s", prev_backup_start_lsn_str); + prev_backup_start_lsn = pg_lsn_in_internal(prev_backup_start_lsn_str, &have_error); + //TODO handle parsing error + + opt->prev_backup_start_lsn = (XLogRecPtr) prev_backup_start_lsn; + o_prev_backup_start_lsn = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -966,7 +997,9 @@ sendFileWithContent(const char *filename, const char *content) * Only used to send auxiliary tablespaces, not PGDATA. */ int64 -sendTablespace(char *path, bool sizeonly) +sendTablespace(char* path, bool sizeonly, + XLogRecPtr prev_backup_start_lsn) + { int64 size; char pathbuf[MAXPGPATH]; @@ -999,7 +1032,9 @@ sendTablespace(char *path, bool sizeonly) sizeonly); /* Send all the files in the tablespace version directory */ - size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true); + + size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, + prev_backup_start_lsn); return size; } @@ -1018,7 +1053,7 @@ sendTablespace(char *path, bool sizeonly) */ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, - bool sendtblspclinks) + bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn) { DIR *dir; struct dirent *de; @@ -1294,7 +1329,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, skip_this_dir = true; if (!skip_this_dir) - size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks); + size += sendDir(pathbuf, basepathlen, sizeonly, + tablespaces, sendtblspclinks, prev_backup_start_lsn); } else if (S_ISREG(statbuf.st_mode)) { @@ -1302,7 +1338,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, if (!sizeonly) sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, - true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid); + true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid, + prev_backup_start_lsn); if (sent || sizeonly) { @@ -1363,10 +1400,14 @@ is_checksummed_file(const char *fullpath, const char *filename) * * Returns true if the file was successfully sent, false if 'missing_ok', * and the file did not exist. + * + * If prev_backup_start_lsn is not InvalidXLogRecPtr, send .partial file, + * containing blocks for incremental backup and .blockmap file. */ + static bool sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf, - bool missing_ok, Oid dboid) + bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn) { FILE *fp; BlockNumber blkno = 0; @@ -1383,6 +1424,21 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf int segmentno = 0; char *segmentpath; bool verify_checksum = false; + bool file_has_map = false; + int expected_write_size = 0; + + /* Send map, if requesred. */ + if (prev_backup_start_lsn) + file_has_map = sendFileMap(readfilename, tarfilename, statbuf, + missing_ok, dboid, prev_backup_start_lsn, &expected_write_size); + + /* + * If possible, send incremental version of file + * all non-relation files will be send in code below. + */ + if (file_has_map) + return sendFilePartial(readfilename, tarfilename, statbuf, + missing_ok, dboid, prev_backup_start_lsn, expected_write_size); fp = AllocateFile(readfilename, "rb"); if (fp == NULL) @@ -1447,6 +1503,8 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf verify_checksum = false; } + /* iterate over pages to get info we need. + * ither it is checksum verification or collecting a map */ if (verify_checksum) { for (i = 0; i < cnt / BLCKSZ; i++) @@ -1468,15 +1526,15 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf if (phdr->pd_checksum != checksum) { /* - * Retry the block on the first failure. It's - * possible that we read the first 4K page of the - * block just before postgres updated the entire block - * so it ends up looking torn to us. We only need to - * retry once because the LSN should be updated to - * something we can ignore on the next pass. If the - * error happens again then it is a true validation - * failure. - */ + * Retry the block on the first failure. It's + * possible that we read the first 4K page of the + * block just before postgres updated the entire block + * so it ends up looking torn to us. We only need to + * retry once because the LSN should be updated to + * something we can ignore on the next pass. If the + * error happens again then it is a true validation + * failure. + */ if (block_retry == false) { /* Reread the failed block */ @@ -1484,7 +1542,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf { ereport(ERROR, (errcode_for_file_access(), - errmsg("could not fseek in file \"%s\": %m", + errmsg("could not fseek in file \"%s\": %m", readfilename))); } @@ -1492,7 +1550,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf { ereport(ERROR, (errcode_for_file_access(), - errmsg("could not reread block %d of file \"%s\": %m", + errmsg("could not reread block %d of file \"%s\": %m", blkno, readfilename))); } @@ -1500,7 +1558,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf { ereport(ERROR, (errcode_for_file_access(), - errmsg("could not fseek in file \"%s\": %m", + errmsg("could not fseek in file \"%s\": %m", readfilename))); } @@ -1593,6 +1651,232 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf } +static bool +sendFileMap(const char *readfilename, const char *tarfilename, struct stat *statbuf, + bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn, + int *expected_write_size) +{ + FILE *fp; + BlockNumber blkno = 0; + char buf[TAR_SEND_SIZE]; + off_t cnt; + int i; + pgoff_t len = 0; + char *page; + size_t pad; + char *tarfilename_blockmap = NULL; + BlockNumber *pagemap = NULL; + char *filename; + int statbuf_size = statbuf->st_size; + int pagemap_real_size; + int n_blocks_to_send = 0; + + Assert(prev_backup_start_lsn != InvalidXLogRecPtr); + + tarfilename_blockmap = psprintf("%s.blockmap", tarfilename); + + /* + * Get the filename (excluding path). As last_dir_separator() + * includes the last directory separator, we chop that off by + * incrementing the pointer. + + */ + filename = last_dir_separator(readfilename) + 1; + + /* + * Handle all non relation files here. + * Do nothing. + */ + if (!is_checksummed_file(readfilename, filename) || + !S_ISREG(statbuf->st_mode) || + (filename[0] == 't' && isdigit(filename[1])) || // exclude all temp files + !isdigit(filename[0]) || // relfiles always start with number + strstr(filename, "_")) // exclude all fork files + { + elog(INFO, "sendFileMap %s, no datafile", filename); + return false; + } + elog(INFO, "sendFileMap %s, datafile", filename); + + fp = AllocateFile(readfilename, "rb"); + if (fp == NULL) + { + if (errno == ENOENT && missing_ok) + return false; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", readfilename))); + } + + /* allocate pagemap of the size enough to write all file blocks */ + pagemap = palloc0((statbuf->st_size / BLCKSZ)*sizeof(BlockNumber)); + + while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0) + { + /* iterate over pages to collect a map */ + for (i = 0; i < cnt / BLCKSZ; i++) + { + page = buf + BLCKSZ * i; + /* add block to map */ + if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn) + { + pagemap[n_blocks_to_send] = blkno; + elog(INFO, "expected_write_size %d add to map blkno %d pagemap[n_blocks_to_send] %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X", + *expected_write_size, blkno, pagemap[n_blocks_to_send], readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page), + (uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn); + *expected_write_size += BLCKSZ; + n_blocks_to_send++; + } + blkno++; + } + + len += cnt; + + if (len >= statbuf->st_size) + { + /* + * Reached end of file. The file could be longer, if it was + * extended while we were sending it, but for a base backup we can + * ignore such extended data. It will be restored from WAL. + */ + break; + } + } + + pagemap_real_size = n_blocks_to_send*sizeof(BlockNumber); + + statbuf->st_size = pagemap_real_size; + _tarWriteHeader(tarfilename_blockmap, NULL, statbuf, false); + + if (pagemap_real_size) + { + pq_putmessage('d', (char *) pagemap, pagemap_real_size); + + /* + * Pad to 512 byte boundary, per tar format requirements. (This small + * piece of data is probably not worth throttling.) + */ + pad = ((pagemap_real_size + 511) & ~511) - pagemap_real_size; + if (pad > 0) + { + MemSet(buf, 0, pad); + pq_putmessage('d', buf, pad); + } + } + + statbuf->st_size = statbuf_size; + FreeFile(fp); + + pfree(pagemap); + return true; +} + +static bool +sendFilePartial(const char *readfilename, const char *tarfilename, struct stat *statbuf, + bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn, + int expected_write_size) +{ + FILE *fp; + BlockNumber blkno = 0; + char buf[TAR_SEND_SIZE]; + char sendbuf[TAR_SEND_SIZE]; + int n_blocks_to_send = 0; + off_t cnt; + int i; + pgoff_t len = 0; + char *page; + char *tarfilename_partial = NULL; + int pad; + int statbuf_size; + int write_len = 0; + + Assert(prev_backup_start_lsn != InvalidXLogRecPtr); + + tarfilename_partial = psprintf("%s.partial", tarfilename); + + statbuf_size = statbuf->st_size; + statbuf->st_size = expected_write_size; + _tarWriteHeader(tarfilename_partial, NULL, statbuf, false); + statbuf->st_size = statbuf_size; + + fp = AllocateFile(readfilename, "rb"); + if (fp == NULL) + { + if (errno == ENOENT && missing_ok) + return false; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", readfilename))); + } + + while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0) + { + /* iterate over pages to collect a map */ + for (i = 0; i < cnt / BLCKSZ; i++) + { + page = buf + BLCKSZ * i; + + if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn) + { + elog(INFO, "add to sendbuf blkno %d, n_blocks_to_send %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X", + blkno, n_blocks_to_send, readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page), + (uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn); + memcpy(sendbuf + BLCKSZ * n_blocks_to_send, page, BLCKSZ); + n_blocks_to_send++; + } + blkno++; + } + + { + elog(INFO, "send n_blocks_to_send %d of file %s", + n_blocks_to_send, readfilename); + /* Send the chunk as a CopyData message */ + write_len += n_blocks_to_send*BLCKSZ; + if (pq_putmessage('d', sendbuf, n_blocks_to_send*BLCKSZ)) + ereport(ERROR, + (errmsg("base backup could not send data, aborting backup"))); + n_blocks_to_send = 0; + } + + len += cnt; + + if (len >= statbuf->st_size) + { + /* + * Reached end of file. The file could be longer, if it was + * extended while we were sending it, but for a base backup we can + * ignore such extended data. It will be restored from WAL. + */ + break; + } + } + + if (write_len < expected_write_size) + { + MemSet(buf, 0, sizeof(buf)); + while (write_len < expected_write_size) + { + cnt = Min(sizeof(buf), expected_write_size - write_len); + pq_putmessage('d', buf, cnt); + write_len += cnt; + throttle(cnt); + } + } + + /* Pad to 512 byte boundary, per tar format requirements */ + pad = ((write_len + 511) & ~511) - write_len; + if (pad > 0) + { + char buf[512]; + + MemSet(buf, 0, pad); + pq_putmessage('d', buf, pad); + } + + FreeFile(fp); + return true; +} + static int64 _tarWriteHeader(const char *filename, const char *linktarget, struct stat *statbuf, bool sizeonly) diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index c4e11cc..cb883a8 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -87,6 +87,7 @@ static SQLCmd *make_sqlcmd(void); %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_PREV_BACKUP_START_LSN %type command %type base_backup start_replication start_logical_replication @@ -103,6 +104,7 @@ static SQLCmd *make_sqlcmd(void); %type create_slot_opt_list %type create_slot_opt + %% firstcmd: command opt_semicolon @@ -155,7 +157,7 @@ var_name: IDENT { $$ = $1; } /* * BASE_BACKUP [LABEL '