diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index b7bbcf6..8fae3a9 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -445,6 +445,7 @@ libpqrcv_PQexec(const char *query) if (PQresultStatus(lastResult) == PGRES_COPY_IN || PQresultStatus(lastResult) == PGRES_COPY_OUT || PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQresultStatus(lastResult) == PGRES_FATAL_ERROR || PQstatus(streamConn) == CONNECTION_BAD) break; } diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 828f18e..b84e5ed 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1801,6 +1801,10 @@ getCopyResult(PGconn *conn, ExecStatusType copytype) return pqPrepareAsyncResult(conn); } + /* If error has occured, return a PGRES_FATAL_ERROR result */ + if (conn->result && conn->result->resultStatus == PGRES_FATAL_ERROR) + return pqPrepareAsyncResult(conn); + /* If we have an async result for the COPY, return that */ if (conn->result && conn->result->resultStatus == copytype) return pqPrepareAsyncResult(conn); @@ -1991,6 +1995,9 @@ PQexecFinish(PGconn *conn) * We have to stop if we see copy in/out/both, however. We will resume * parsing after application performs the data transfer. * + * Stop if we are in copy mode and error has occurred, the pending results + * will be discarded during next execution in PQexecStart. + * * Also stop if the connection is lost (else we'll loop infinitely). */ lastResult = NULL; @@ -2020,6 +2027,11 @@ PQexecFinish(PGconn *conn) result->resultStatus == PGRES_COPY_BOTH || conn->status == CONNECTION_BAD) break; + else if ((conn->asyncStatus == PGASYNC_COPY_IN || + conn->asyncStatus == PGASYNC_COPY_OUT || + conn->asyncStatus == PGASYNC_COPY_BOTH) && + result->resultStatus == PGRES_FATAL_ERROR) + break; } return lastResult; diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 641804c..4ee1610 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -49,7 +49,9 @@ static int getParamDescriptions(PGconn *conn); static int getAnotherTuple(PGconn *conn, int msgLength); static int getParameterStatus(PGconn *conn); static int getNotify(PGconn *conn); -static int getCopyStart(PGconn *conn, ExecStatusType copytype); +static int getCopyStart(PGconn *conn, + ExecStatusType copytype, + int msgLength); static int getReadyForQuery(PGconn *conn); static void reportErrorPosition(PQExpBuffer msg, const char *query, int loc, int encoding); @@ -362,22 +364,26 @@ pqParseInput3(PGconn *conn) } break; case 'G': /* Start Copy In */ - if (getCopyStart(conn, PGRES_COPY_IN)) + if (getCopyStart(conn, PGRES_COPY_IN, msgLength)) return; + + /* getCopyStart() moves inStart itself */ conn->asyncStatus = PGASYNC_COPY_IN; - break; + continue; case 'H': /* Start Copy Out */ - if (getCopyStart(conn, PGRES_COPY_OUT)) + if (getCopyStart(conn, PGRES_COPY_OUT, msgLength)) return; + + /* getCopyStart() moves inStart itself */ conn->asyncStatus = PGASYNC_COPY_OUT; conn->copy_already_done = 0; - break; + continue; case 'W': /* Start Copy Both */ - if (getCopyStart(conn, PGRES_COPY_BOTH)) + if (getCopyStart(conn, PGRES_COPY_BOTH, msgLength)) return; conn->asyncStatus = PGASYNC_COPY_BOTH; conn->copy_already_done = 0; - break; + continue; case 'd': /* Copy Data */ /* @@ -630,7 +636,7 @@ advance_and_error: /* * parseInput subroutine to read a 't' (ParameterDescription) message. * We'll build a new PGresult structure containing the parameter data. - * Returns: 0 if completed message, EOF if not enough data yet. + * Returns: 0 if completed message, EOF in case of error. * * Note that if we run out of data, we have to release the partially * constructed PGresult, and rebuild it again next time. Fortunately, @@ -1343,31 +1349,45 @@ getNotify(PGconn *conn) * parseInput already read the message type and length. */ static int -getCopyStart(PGconn *conn, ExecStatusType copytype) +getCopyStart(PGconn *conn, ExecStatusType copytype, int msgLength) { PGresult *result; int nfields; int i; + const char *errmsg; result = PQmakeEmptyPGresult(conn, copytype); if (!result) - goto failure; + { + errmsg = NULL; + goto advance_and_error; + } if (pqGetc(&conn->copy_is_binary, conn)) - goto failure; + { + errmsg = libpq_gettext("extraneous data in COPY start message"); + goto advance_and_error; + } result->binary = conn->copy_is_binary; + /* the next two bytes are the number of fields */ - if (pqGetInt(&(result->numAttributes), 2, conn)) - goto failure; + if (pqGetInt(&result->numAttributes, 2, conn)) + { + errmsg = libpq_gettext("extraneous data in COPY start message"); + goto advance_and_error; + } nfields = result->numAttributes; /* allocate space for the attribute descriptors */ - if (nfields > 0) + if (result && nfields > 0) { result->attDescs = (PGresAttDesc *) pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE); if (!result->attDescs) - goto failure; + { + errmsg = NULL; + goto advance_and_error; + } MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc)); } @@ -1376,23 +1396,57 @@ getCopyStart(PGconn *conn, ExecStatusType copytype) int format; if (pqGetInt(&format, 2, conn)) - goto failure; + { + errmsg = libpq_gettext("extraneous data in COPY start message"); + goto advance_and_error; + } /* * Since pqGetInt treats 2-byte integers as unsigned, we need to * coerce these results to signed form. */ format = (int) ((int16) format); - result->attDescs[i].format = format; + if (result && result->attDescs) + result->attDescs[i].format = format; } /* Success! */ conn->result = result; + + /* + * Advance inStart to show that the copy related message has been + * processed. + */ + conn->inStart = conn->inCursor; + return 0; -failure: - PQclear(result); - return EOF; +advance_and_error: + /* Discard unsaved result, if any */ + if (result && result != conn->result) + PQclear(result); + + /* Discard the failed message by pretending we read it */ + conn->inStart += 5 + msgLength; + + /* + * Replace partially constructed result with an error result. First + * discard the old result to try to win back some memory. + */ + pqClearAsyncResult(conn); + + /* + * If preceding code didn't provide an error message, assume "out of + * memory" was meant. The advantage of having this special case is that + * freeing the old result first greatly improves the odds that gettext() + * will succeed in providing a translation. + */ + if (!errmsg) + errmsg = libpq_gettext("out of memory"); + printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg); + pqSaveErrorResult(conn); + + return 0; } /*