]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Make dblink interruptible, via new libpqsrv APIs.
authorNoah Misch <noah@leadboat.com>
Mon, 8 Jan 2024 19:39:56 +0000 (11:39 -0800)
committerNoah Misch <noah@leadboat.com>
Thu, 3 Apr 2025 16:34:01 +0000 (09:34 -0700)
This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query).  Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side.  The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these.

The original commit d3c5f37dd543498cc7c678815d3921823beec9e9 did not
back-patch.  Back-patch now to v16-v13, bringing coverage to all supported
versions.  This back-patch omits the orignal's refactoring in postgres_fdw.

Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com

contrib/dblink/dblink.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/include/libpq/libpq-be-fe-helpers.h

index 0cc339e7d5278878dae877680e133b982116fcf5..234fb15fe7e0522d9dd0c220cc9c482322414a6a 100644 (file)
@@ -48,6 +48,7 @@
 #include "funcapi.h"
 #include "lib/stringinfo.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "parser/scansup.h"
@@ -59,6 +60,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/varlena.h"
+#include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
@@ -478,7 +480,7 @@ dblink_open(PG_FUNCTION_ARGS)
        /* If we are not in a transaction, start one */
        if (PQtransactionStatus(conn) == PQTRANS_IDLE)
        {
-               res = PQexec(conn, "BEGIN");
+               res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                        dblink_res_internalerror(conn, res, "begin error");
                PQclear(res);
@@ -497,7 +499,7 @@ dblink_open(PG_FUNCTION_ARGS)
                (rconn->openCursorCount)++;
 
        appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
-       res = PQexec(conn, buf.data);
+       res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                dblink_res_error(conn, conname, res, fail,
@@ -566,7 +568,7 @@ dblink_close(PG_FUNCTION_ARGS)
        appendStringInfo(&buf, "CLOSE %s", curname);
 
        /* close the cursor */
-       res = PQexec(conn, buf.data);
+       res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                dblink_res_error(conn, conname, res, fail,
@@ -586,7 +588,7 @@ dblink_close(PG_FUNCTION_ARGS)
                {
                        rconn->newXactForCursor = false;
 
-                       res = PQexec(conn, "COMMIT");
+                       res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
                        if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                dblink_res_internalerror(conn, res, "commit error");
                        PQclear(res);
@@ -668,7 +670,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
         * PGresult will be long-lived even though we are still in a short-lived
         * memory context.
         */
-       res = PQexec(conn, buf.data);
+       res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
                 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -816,7 +818,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
                else
                {
                        /* async result retrieval, do it the old way */
-                       PGresult   *res = PQgetResult(conn);
+                       PGresult   *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
 
                        /* NULL means we're all done with the async results */
                        if (res)
@@ -1127,7 +1129,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
                PQclear(sinfo.last_res);
                PQclear(sinfo.cur_res);
                /* and clear out any pending data in libpq */
-               while ((res = PQgetResult(conn)) != NULL)
+               while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
+                          NULL)
                        PQclear(res);
                PG_RE_THROW();
        }
@@ -1154,7 +1157,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
        {
                CHECK_FOR_INTERRUPTS();
 
-               sinfo->cur_res = PQgetResult(conn);
+               sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
                if (!sinfo->cur_res)
                        break;
 
@@ -1482,7 +1485,7 @@ dblink_exec(PG_FUNCTION_ARGS)
                if (!conn)
                        dblink_conn_not_avail(conname);
 
-               res = PQexec(conn, sql);
+               res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
                         PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2744,8 +2747,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 
        /*
         * If we don't get a message from the PGresult, try the PGconn.  This is
-        * needed because for connection-level failures, PQexec may just return
-        * NULL, not a PGresult at all.
+        * needed because for connection-level failures, PQgetResult may just
+        * return NULL, not a PGresult at all.
         */
        if (message_primary == NULL)
                message_primary = pchomp(PQerrorMessage(conn));
index a468420dcaff05ea9a4c1870db4017001c2ffdfe..60f906dce601fad09a341a9e0c8e4148212ff3b9 100644 (file)
@@ -639,12 +639,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and socket readiness events.
  *
- * We must not use the regular blocking libpq functions like PQexec()
- * since they are uninterruptible by signals on some platforms, such as
- * Windows.
- *
- * The function is modeled on PQexec() in libpq, but only implements
- * those parts that are in use in the walreceiver api.
+ * The function is modeled on libpqsrv_exec(), with the behavior difference
+ * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
+ * skips try/catch, since all errors terminate the process.
  *
  * May return NULL, rather than an error result, on failure.
  */
index 41e3bb4376ae820a3f3dfb66b9c53e428f08c4e7..a4b3e805b9dd5b93125e3f9bf383e5f6b7e07e44 100644 (file)
@@ -49,6 +49,8 @@
 
 static inline void libpqsrv_connect_prepare(void);
 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
 
 
 /*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
        PG_END_TRY();
 }
 
+/*
+ * PQexec() wrapper that processes interrupts.
+ *
+ * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
+ * interrupts while pushing the query text to the server.  Consider that
+ * setting if query strings can be long relative to TCP buffer size.
+ *
+ * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
+ * notably, PQexec() would silently discard any prior query results.
+ */
+static inline PGresult *
+libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
+{
+       if (!PQsendQuery(conn, query))
+               return NULL;
+       return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * PQexecParams() wrapper that processes interrupts.
+ *
+ * See notes at libpqsrv_exec().
+ */
+static inline PGresult *
+libpqsrv_exec_params(PGconn *conn,
+                                        const char *command,
+                                        int nParams,
+                                        const Oid *paramTypes,
+                                        const char *const *paramValues,
+                                        const int *paramLengths,
+                                        const int *paramFormats,
+                                        int resultFormat,
+                                        uint32 wait_event_info)
+{
+       if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
+                                                  paramLengths, paramFormats, resultFormat))
+               return NULL;
+       return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * Like PQexec(), loop over PQgetResult() until it returns NULL or another
+ * terminal state.  Return the last non-NULL result or the terminal state.
+ */
+static inline PGresult *
+libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
+{
+       PGresult   *volatile lastResult = NULL;
+
+       /* In what follows, do not leak any PGresults on an error. */
+       PG_TRY();
+       {
+               for (;;)
+               {
+                       /* Wait for, and collect, the next PGresult. */
+                       PGresult   *result;
+
+                       result = libpqsrv_get_result(conn, wait_event_info);
+                       if (result == NULL)
+                               break;                  /* query is complete, or failure */
+
+                       /*
+                        * Emulate PQexec()'s behavior of returning the last result when
+                        * there are many.
+                        */
+                       PQclear(lastResult);
+                       lastResult = result;
+
+                       if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+                               PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+                               PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+                               PQstatus(conn) == CONNECTION_BAD)
+                               break;
+               }
+       }
+       PG_CATCH();
+       {
+               PQclear(lastResult);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       return lastResult;
+}
+
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static inline PGresult *
+libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
+{
+       /*
+        * Collect data until PQgetResult is ready to get the result without
+        * blocking.
+        */
+       while (PQisBusy(conn))
+       {
+               int                     rc;
+
+               rc = WaitLatchOrSocket(MyLatch,
+                                                          WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
+                                                          WL_SOCKET_READABLE,
+                                                          PQsocket(conn),
+                                                          0,
+                                                          wait_event_info);
+
+               /* Interrupted? */
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               /* Consume whatever data is available from the socket */
+               if (PQconsumeInput(conn) == 0)
+               {
+                       /* trouble; expect PQgetResult() to return NULL */
+                       break;
+               }
+       }
+
+       /* Now we can collect and return the next PGresult */
+       return PQgetResult(conn);
+}
+
 #endif                                                 /* LIBPQ_BE_FE_HELPERS_H */