diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index d9b7c4d0d4..a0ac5b4ef7 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { table_name [ ( column_name [, ...] ) FORCE_NULL ( column_name [, ...] ) ENCODING 'encoding_name' + ERROR_LIMIT 'limit_number' @@ -355,6 +356,26 @@ COPY { table_name [ ( + + ERROR_LIMIT + + + Enables ignoring of errored out rows up to limit_number. If limit_number is set + to -1, then all errors will be ignored. + + + + Currently, only unique or exclusion constraint violation + and rows formatting errors are ignored. Malformed + rows will rise warnings, while constraint violating rows + will be returned back to the caller. + + + + + WHERE diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index e17d8c760f..c911b3d0c2 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -24,6 +24,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/printtup.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" @@ -48,7 +49,9 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" +#include "tcop/pquery.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -154,6 +157,7 @@ typedef struct CopyStateData List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ Node *whereClause; /* WHERE condition (or NULL) */ + int error_limit; /* total number of error to ignore */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -183,6 +187,9 @@ typedef struct CopyStateData bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; ExprState *qualexpr; + bool ignore_error; /* is ignore error specified? */ + bool ignore_all_error; /* is error_limit -1 (ignore all error) + * specified? */ TransitionCaptureState *transition_capture; @@ -837,7 +844,7 @@ CopyLoadRawBuf(CopyState cstate) void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, - uint64 *processed) + uint64 *processed, DestReceiver *dest) { CopyState cstate; bool is_from = stmt->is_from; @@ -1068,7 +1075,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + *processed = CopyFrom(cstate, dest); /* copy from file to database */ EndCopyFrom(cstate); } else @@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "error_limit") == 0) + { + if (cstate->ignore_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->error_limit = defGetInt64(defel); + cstate->ignore_error = true; + if (cstate->error_limit == -1) + cstate->ignore_all_error = true; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); + if (cstate->ignore_error && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ERROR LIMIT only available using COPY FROM"))); } /* @@ -2653,7 +2676,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * Copy FROM file to relation. */ uint64 -CopyFrom(CopyState cstate) +CopyFrom(CopyState cstate, DestReceiver *dest) { ResultRelInfo *resultRelInfo; ResultRelInfo *target_resultRelInfo; @@ -2675,6 +2698,7 @@ CopyFrom(CopyState cstate) bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + Portal portal = NULL; Assert(cstate->rel); @@ -2838,7 +2862,19 @@ CopyFrom(CopyState cstate) /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); - ExecOpenIndices(resultRelInfo, false); + if (cstate->ignore_error) + { + TupleDesc tupDesc; + + ExecOpenIndices(resultRelInfo, true); + tupDesc = RelationGetDescr(cstate->rel); + + portal = GetPortalByName(""); + SetRemoteDestReceiverParams(dest, portal); + dest->rStartup(dest, (int) CMD_SELECT, tupDesc); + } + else + ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; @@ -2943,6 +2979,13 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_error) + { + /* + * Can't support speculative insertion in multi-inserts. + */ + insertMethod = CIM_SINGLE; + } else { /* @@ -3286,6 +3329,63 @@ CopyFrom(CopyState cstate) */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0) + { + /* Perform a speculative insertion. */ + uint32 specToken; + ItemPointerData conflictTid; + bool specConflict; + + /* + * Do a non-conclusive check for conflicts first. + */ + specConflict = false; + + if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid, + NIL)) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + + /* + * Acquire our speculative insertion lock. + */ + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + + /* insert the tuple, with the speculative token */ + table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot, + estate->es_output_cid, + 0, + NULL, + specToken); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(myslot, estate, true, + &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot, + specToken, !specConflict); + + /* + * Wake up anyone waiting for our decision. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + /* + * If there was a conflict, return it and preceded to + * the next record if there are any. + */ + if (specConflict) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + } else { /* OK, store the tuple and create index entries for it */ @@ -3703,7 +3803,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3718,9 +3818,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- extra data after last expected column", + cstate->line_buf.data))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3732,10 +3844,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- missing data for column \"%s\"", + cstate->line_buf.data, NameStr(att->attname)))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3822,10 +3946,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- row field count is %d, expected %d", + cstate->line_buf.data, (int) fld_count, attr_count))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7881079e96..521696be29 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -791,7 +791,7 @@ copy_table(Relation rel) cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); /* Do the copy */ - (void) CopyFrom(cstate); + (void) CopyFrom(cstate, NULL); logicalrep_rel_close(relmapentry, NoLock); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index e984545780..cb7b0c80d2 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -550,7 +550,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, DoCopy(pstate, (CopyStmt *) parsetree, pstmt->stmt_location, pstmt->stmt_len, - &processed); + &processed, dest); if (completionTag) snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT, processed); diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 90f6380170..1dfda330be 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -1037,6 +1037,7 @@ ProcessResult(PGresult **results) { bool success = true; bool first_cycle = true; + bool is_copy_in = false; for (;;) { @@ -1160,6 +1161,7 @@ ProcessResult(PGresult **results) copystream, PQbinaryTuples(*results), ©_result) && success; + is_copy_in = true; } ResetCancelConn(); @@ -1190,6 +1192,11 @@ ProcessResult(PGresult **results) first_cycle = false; } + /* Print returned result for COPY FROM with error_limit. */ + if (is_copy_in && !success && PQresultStatus(*results) != + PGRES_FATAL_ERROR) + (void) PrintQueryTuples(*results); + SetResultVariables(*results, success); /* may need this to recover from conn loss during COPY */ diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index bbe0105d77..16fc8e6a82 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -25,7 +25,7 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, - uint64 *processed); + uint64 *processed, DestReceiver *dest); extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options); extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, @@ -37,7 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); -extern uint64 CopyFrom(CopyState cstate); +extern uint64 CopyFrom(CopyState cstate, DestReceiver *dest); extern DestReceiver *CreateCopyDestReceiver(void); diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index c53ed3ebf5..b1a17a8683 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -55,6 +55,15 @@ LINE 1: COPY x TO stdout WHERE a = 1; ^ COPY x from stdin WHERE a = 50004; COPY x from stdin WHERE a > 60003; +COPY x from stdin WITH(ERROR_LIMIT 5); +WARNING: skipping "70001 22 32" --- missing data for column "d" +WARNING: skipping "70002 23 33 43 53 54" --- extra data after last expected column +WARNING: skipping "70003 24 34 44" --- missing data for column "e" + + a | b | c | d | e +---+---+---+---+--- +(0 rows) + COPY x from stdin WHERE f > 60003; ERROR: column "f" does not exist LINE 1: COPY x from stdin WHERE f > 60003; @@ -102,12 +111,14 @@ SELECT * FROM x; 50004 | 25 | 35 | 45 | before trigger fired 60004 | 25 | 35 | 45 | before trigger fired 60005 | 26 | 36 | 46 | before trigger fired + 70004 | 25 | 35 | 45 | before trigger fired + 70005 | 26 | 36 | 46 | before trigger fired 1 | 1 | stuff | test_1 | after trigger fired 2 | 2 | stuff | test_2 | after trigger fired 3 | 3 | stuff | test_3 | after trigger fired 4 | 4 | stuff | test_4 | after trigger fired 5 | 5 | stuff | test_5 | after trigger fired -(28 rows) +(30 rows) -- check copy out COPY x TO stdout; @@ -134,6 +145,8 @@ COPY x TO stdout; 50004 25 35 45 before trigger fired 60004 25 35 45 before trigger fired 60005 26 36 46 before trigger fired +70004 25 35 45 before trigger fired +70005 26 36 46 before trigger fired 1 1 stuff test_1 after trigger fired 2 2 stuff test_2 after trigger fired 3 3 stuff test_3 after trigger fired @@ -163,6 +176,8 @@ Delimiter before trigger fired 35 before trigger fired 35 before trigger fired 36 before trigger fired +35 before trigger fired +36 before trigger fired stuff after trigger fired stuff after trigger fired stuff after trigger fired @@ -192,6 +207,8 @@ I'm null before trigger fired 25 before trigger fired 25 before trigger fired 26 before trigger fired +25 before trigger fired +26 before trigger fired 1 after trigger fired 2 after trigger fired 3 after trigger fired diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index 902f4fac19..2378f428fc 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003; 60005 26 36 46 56 \. +COPY x from stdin WITH(ERROR_LIMIT 5); +70001 22 32 +70002 23 33 43 53 54 +70003 24 34 44 +70004 25 35 45 55 +70005 26 36 46 56 +\. + COPY x from stdin WHERE f > 60003; COPY x from stdin WHERE a = max(x.b);