]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Convert rlm_sql_postgresql to async
authorNick Porter <nick@portercomputing.co.uk>
Tue, 16 Jul 2024 17:43:11 +0000 (18:43 +0100)
committerNick Porter <nick@portercomputing.co.uk>
Mon, 29 Jul 2024 18:14:52 +0000 (19:14 +0100)
src/modules/rlm_sql/drivers/rlm_sql_postgresql/rlm_sql_postgresql.c

index b1c577895c2adca06b3802aa5722de7f3eed29b1..31dcaacc250914217669801630eb8e7812609751 100644 (file)
@@ -72,6 +72,9 @@ typedef struct {
        int             num_fields;
        int             affected_rows;
        char            **row;
+       connection_t    *conn;                  //!< Generic connection structure for this connection.
+       int             fd;                     //!< fd for this connection's I/O events.
+       fr_sql_query_t  *query_ctx;             //!< Current query running on this connection.
 } rlm_sql_postgres_conn_t;
 
 static conf_parser_t driver_config[] = {
@@ -199,7 +202,7 @@ static sql_rcode_t sql_classify_error(rlm_sql_postgresql_t *inst, ExecStatusType
                return RLM_SQL_ERROR;
        }
 
-       DEBUG2("sqlstate %s matched %s: %s (%s)", error_code,
+       DEBUG3("sqlstate %s matched %s: %s (%s)", error_code,
               entry->sql_state, entry->meaning, fr_table_str_by_value(sql_rcode_table, entry->rcode, "<DEFAULT>"));
 
        /*
@@ -220,193 +223,348 @@ static sql_rcode_t sql_classify_error(UNUSED PGresult const *result)
 }
 #endif
 
-static int _sql_socket_destructor(rlm_sql_postgres_conn_t *conn)
+static void _sql_connect_io_notify(fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
 {
-       DEBUG2("Socket destructor called, closing socket");
+       rlm_sql_postgres_conn_t         *c = talloc_get_type_abort(uctx, rlm_sql_postgres_conn_t);
+       PostgresPollingStatusType       status;
 
-       if (!conn->db) return 0;
+       fr_event_fd_delete(el, fd, FR_EVENT_FILTER_IO);
 
-       /* PQfinish also frees the memory used by the PGconn structure */
-       PQfinish(conn->db);
+       status = PQconnectPoll(c->db);
 
-       return 0;
+       /*
+        *      Documentation says:
+        *              Caution: do not assume that the socket remains the same across PQconnectPoll calls.
+        *      So we get the socket again.
+        */
+       c->fd = PQsocket(c->db);
+       switch (status) {
+       case PGRES_POLLING_OK:
+               DEBUG2("Connected to database '%s' on '%s' server version %i, protocol version %i, backend PID %i ",
+                      PQdb(c->db), PQhost(c->db), PQserverVersion(c->db), PQprotocolVersion(c->db),
+                      PQbackendPID(c->db));
+               PQsetnonblocking(c->db, 1);
+               connection_signal_connected(c->conn);
+               return;
+
+       case PGRES_POLLING_FAILED:
+       error:
+               ERROR("Connection failed: %s", PQerrorMessage(c->db));
+               connection_signal_reconnect(c->conn, CONNECTION_FAILED);
+               return;
+
+       case PGRES_POLLING_READING:
+               if (fr_event_fd_insert(c, NULL, c->conn->el, c->fd, _sql_connect_io_notify, NULL, NULL, c) != 0) goto error;
+               return;
+
+       case PGRES_POLLING_WRITING:
+               if (fr_event_fd_insert(c, NULL, c->conn->el, c->fd, NULL, _sql_connect_io_notify, NULL, c) != 0) goto error;
+               return;
+
+       default:
+               goto error;
+
+       }
 }
 
-static int CC_HINT(nonnull) sql_socket_init(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config,
-                                           UNUSED fr_time_delta_t timeout)
+static connection_state_t _sql_connection_init(void **h, connection_t *conn, void *uctx)
 {
-       rlm_sql_postgresql_t    *inst = talloc_get_type_abort(handle->inst->driver_submodule->data, rlm_sql_postgresql_t);
-       rlm_sql_postgres_conn_t *conn;
+       rlm_sql_t const                 *sql = talloc_get_type_abort_const(uctx, rlm_sql_t);
+       rlm_sql_postgresql_t const      *inst = talloc_get_type_abort(sql->driver_submodule->data, rlm_sql_postgresql_t);
+       rlm_sql_postgres_conn_t         *c;
+       PostgresPollingStatusType       status;
+
+       MEM(c = talloc_zero(conn, rlm_sql_postgres_conn_t));
+       c->conn = conn;
+       c->fd = -1;
 
-       MEM(conn = handle->conn = talloc_zero(handle, rlm_sql_postgres_conn_t));
-       talloc_set_destructor(conn, _sql_socket_destructor);
+       DEBUG2("Starting connection to PostgreSQL server using parameters: %s", inst->db_string);
 
-       DEBUG2("Connecting using parameters: %s", inst->db_string);
-       conn->db = PQconnectdb(inst->db_string);
-       if (!conn->db) {
+       c->db = PQconnectStart(inst->db_string);
+       if (!c->db) {
                ERROR("Connection failed: Out of memory");
-               return -1;
+               talloc_free(c);
+               return CONNECTION_STATE_FAILED;
        }
-       if (PQstatus(conn->db) != CONNECTION_OK) {
-               ERROR("Connection failed: %s", PQerrorMessage(conn->db));
-               PQfinish(conn->db);
-               conn->db = NULL;
-               return -1;
+
+       switch (PQstatus(c->db)) {
+       case CONNECTION_OK:
+               c->fd = PQsocket(c->db);
+               DEBUG2("Connected to database '%s' on '%s' server version %i, protocol version %i, backend PID %i ",
+                      PQdb(c->db), PQhost(c->db), PQserverVersion(c->db), PQprotocolVersion(c->db),
+                      PQbackendPID(c->db));
+               PQsetnonblocking(c->db, 1);
+               connection_signal_connected(c->conn);
+               return CONNECTION_STATE_CONNECTING;
+
+       case CONNECTION_BAD:
+               ERROR("Connection failed: %s", PQerrorMessage(c->db));
+       error:
+               PQfinish(c->db);
+               talloc_free(c);
+               return CONNECTION_STATE_FAILED;
+
+       default:
+               break;
+
        }
 
-       DEBUG2("Connected to database '%s' on '%s' server version %i, protocol version %i, backend PID %i ",
-              PQdb(conn->db), PQhost(conn->db), PQserverVersion(conn->db), PQprotocolVersion(conn->db),
-              PQbackendPID(conn->db));
+       status = PQconnectPoll(c->db);
+       c->fd = PQsocket(c->db);
+       if (fr_event_fd_insert(c, NULL, c->conn->el, c->fd,
+                              status == PGRES_POLLING_READING ? _sql_connect_io_notify : NULL,
+                              status == PGRES_POLLING_WRITING ? _sql_connect_io_notify : NULL, NULL, c) != 0) goto error;
 
-       return 0;
+       DEBUG2("Connecting to database '%s' on '%s', fd %d", PQdb(c->db), PQhost(c->db), c->fd);
+
+       *h = c;
+
+       return CONNECTION_STATE_CONNECTING;
 }
 
-static unlang_action_t sql_query(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
+static void _sql_connection_close(fr_event_list_t *el, void *h, UNUSED void *uctx)
 {
-       fr_sql_query_t          *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
-       rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn;
-       rlm_sql_postgresql_t    *inst = talloc_get_type_abort(query_ctx->inst->driver_submodule->data, rlm_sql_postgresql_t);
-       fr_time_delta_t         timeout = query_ctx->inst->config.query_timeout;
-       fr_time_t               start;
-       int                     sockfd;
-       PGresult                *tmp_result;
-       int                     numfields = 0;
-       ExecStatusType          status;
+       rlm_sql_postgres_conn_t *c = talloc_get_type_abort(h, rlm_sql_postgres_conn_t);
 
-       if (!conn->db) {
-               ERROR("Socket not connected");
-       reconnect:
-               query_ctx->rcode = RLM_SQL_RECONNECT;
-               RETURN_MODULE_FAIL;
+       if (c->fd >= 0) {
+               fr_event_fd_delete(el, c->fd, FR_EVENT_FILTER_IO);
+               c->fd = -1;
        }
 
-       sockfd = PQsocket(conn->db);
-       if (sockfd < 0) {
-               ERROR("Unable to obtain socket: %s", PQerrorMessage(conn->db));
-               goto reconnect;
+       if (c->result) {
+               PQclear(c->result);
+               c->result = NULL;
        }
 
-       if (!PQsendQuery(conn->db, query_ctx->query_str)) {
-               ERROR("Failed to send query: %s", PQerrorMessage(conn->db));
-               goto reconnect;
+       /* PQfinish also frees the memory used by the PGconn structure */
+       PQfinish(c->db);
+       c->query_ctx = NULL;
+       talloc_free(h);
+}
+
+static connection_t *sql_trunk_connection_alloc(trunk_connection_t *tconn, fr_event_list_t *el,
+                                               connection_conf_t const *conn_conf,
+                                               char const *log_prefix, void *uctx)
+{
+       connection_t            *conn;
+       rlm_sql_thread_t        *thread = talloc_get_type_abort(uctx, rlm_sql_thread_t);
+
+       conn = connection_alloc(tconn, el,
+                               &(connection_funcs_t) {
+                                       .init = _sql_connection_init,
+                                       .close = _sql_connection_close
+                               },
+                               conn_conf, log_prefix, thread->inst);
+       if (!conn) {
+               PERROR("Failed allocating state handler for new SQL connection");
+               return NULL;
        }
 
-       /*
-        *  We try to avoid blocking by waiting until the driver indicates that
-        *  the result is ready or our timeout expires
-        */
-       start = fr_time();
-       while (PQisBusy(conn->db)) {
-               int             r;
-               fd_set          read_fd;
-               fr_time_delta_t elapsed = fr_time_delta_wrap(0);
-
-               FD_ZERO(&read_fd);
-               FD_SET(sockfd, &read_fd);
-
-               if (fr_time_delta_ispos(timeout)) {
-                       elapsed = fr_time_sub(fr_time(), start);
-                       if (fr_time_delta_gteq(elapsed, timeout)) goto too_long;
-               }
+       return conn;
+}
 
-               r = select(sockfd + 1, &read_fd, NULL, NULL, fr_time_delta_ispos(timeout) ?
-                          &fr_time_delta_to_timeval(fr_time_delta_sub(timeout, elapsed)) : NULL);
-               if (r == 0) {
-               too_long:
-                       ERROR("Socket read timeout after %d seconds", (int) fr_time_delta_to_sec(timeout));
-                       goto reconnect;
-               }
-               if (r < 0) {
-                       if (errno == EINTR) continue;
-                       ERROR("Failed in select: %s", fr_syserror(errno));
-                       goto reconnect;
-               }
-               if (!PQconsumeInput(conn->db)) {
-                       ERROR("Failed reading input: %s", PQerrorMessage(conn->db));
-                       goto reconnect;
+TRUNK_NOTIFY_FUNC(sql_trunk_connection_notify, rlm_sql_postgres_conn_t)
+
+static void sql_trunk_request_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn,
+                                 connection_t *conn, UNUSED void *uctx)
+{
+       rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t);
+       request_t               *request;
+       trunk_request_t         *treq;
+       fr_sql_query_t          *query_ctx;
+       int                     err;
+
+       if (trunk_connection_pop_request(&treq, tconn) != 0) return;
+       if (!treq) return;
+
+       query_ctx = talloc_get_type_abort(treq->preq, fr_sql_query_t);
+       request = query_ctx->request;
+
+       switch (query_ctx->status) {
+       case SQL_QUERY_PREPARED:
+               ROPTIONAL(RDEBUG2, DEBUG2, "Executing query: %s", query_ctx->query_str);
+               err = PQsendQuery(sql_conn->db, query_ctx->query_str);
+               query_ctx->tconn = tconn;
+               if (!err) {
+                       ROPTIONAL(RERROR, ERROR, "Failed to send query: %s", PQerrorMessage(sql_conn->db));
+                       trunk_request_signal_fail(treq);
+                       return;
                }
+
+               query_ctx->status = SQL_QUERY_SUBMITTED;
+               sql_conn->query_ctx = query_ctx;
+               trunk_request_signal_sent(treq);
+               return;
+
+       default:
+               return;
        }
+}
 
-       /*
-        *  Returns a PGresult pointer or possibly a null pointer.
-        *  A non-null pointer will generally be returned except in
-        *  out-of-memory conditions or serious errors such as inability
-        *  to send the command to the server. If a null pointer is
-        *  returned, it should be treated like a PGRES_FATAL_ERROR
-        *  result.
-        */
-       conn->result = PQgetResult(conn->db);
+static void sql_trunk_request_demux(UNUSED fr_event_list_t *el, UNUSED trunk_connection_t *tconn,
+                                   connection_t *conn, UNUSED void *uctx)
+{
+       rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t);
+       rlm_sql_postgresql_t    *inst;
+       fr_sql_query_t          *query_ctx;
+       request_t               *request;
+       PGresult                *tmp_result;
+       ExecStatusType          status;
+       int                     numfields;
 
-       /* Discard results for appended queries */
-       while ((tmp_result = PQgetResult(conn->db)) != NULL)
-               PQclear(tmp_result);
+       query_ctx = sql_conn->query_ctx;
+       request = query_ctx->request;
+       inst = talloc_get_type_abort(query_ctx->inst->driver_submodule->data, rlm_sql_postgresql_t);
 
-       /*
-        *  As this error COULD be a connection error OR an out-of-memory
-        *  condition return value WILL be wrong SOME of the time
-        *  regardless! Pick your poison...
-        */
-       if (!conn->result) {
-               ERROR("Failed getting query result: %s", PQerrorMessage(conn->db));
-               goto reconnect;
-       }
+       switch (query_ctx->status) {
+       case SQL_QUERY_SUBMITTED:
+               if (PQconsumeInput(sql_conn->db) == 0) {
+                       ROPTIONAL(RERROR, ERROR, "SQL query failed: %s", PQerrorMessage(sql_conn->db));
+                       query_ctx->rcode = RLM_SQL_ERROR;
+                       break;
+               }
+               if (PQisBusy(sql_conn->db)) return;
+
+               query_ctx->status = SQL_QUERY_RETURNED;
+
+               sql_conn->result = PQgetResult(sql_conn->db);
+
+               /* Discard results for appended queries */
+               while ((tmp_result = PQgetResult(sql_conn->db)) != NULL)
+                       PQclear(tmp_result);
 
-       status = PQresultStatus(conn->result);
-       switch (status){
-       /*
-        *  Successful completion of a command returning no data.
-        */
-       case PGRES_COMMAND_OK:
                /*
-                *  Affected_rows function only returns the number of affected rows of a command
-                *  returning no data...
+                *  As this error COULD be a connection error OR an out-of-memory
+                *  condition return value WILL be wrong SOME of the time
+                *  regardless! Pick your poison...
+                */
+               if (!sql_conn->result) {
+                       ROPTIONAL(RERROR, ERROR, "Failed getting query result: %s", PQerrorMessage(sql_conn->db));
+                       query_ctx->rcode = RLM_SQL_RECONNECT;
+                       break;
+               }
+
+               status = PQresultStatus(sql_conn->result);
+               switch (status){
+               /*
+                *  Successful completion of a command returning no data.
+                */
+               case PGRES_COMMAND_OK:
+                       /*
+                        *  Affected_rows function only returns the number of affected rows of a command
+                        *  returning no data...
+                        */
+                       sql_conn->affected_rows = affected_rows(sql_conn->result);
+                       ROPTIONAL(RDEBUG2, DEBUG2, "query affected rows = %i", sql_conn->affected_rows);
+                       break;
+               /*
+                *  Successful completion of a command returning data (such as a SELECT or SHOW).
                 */
-               conn->affected_rows = affected_rows(conn->result);
-               DEBUG2("query affected rows = %i", conn->affected_rows);
-               break;
-       /*
-        *  Successful completion of a command returning data (such as a SELECT or SHOW).
-        */
 #ifdef HAVE_PGRES_SINGLE_TUPLE
-       case PGRES_SINGLE_TUPLE:
+               case PGRES_SINGLE_TUPLE:
 #endif
-       case PGRES_TUPLES_OK:
-               conn->cur_row = 0;
-               conn->affected_rows = PQntuples(conn->result);
-               numfields = PQnfields(conn->result); /*Check row storing functions..*/
-               DEBUG2("query returned rows = %i, fields = %i", conn->affected_rows, numfields);
-               break;
+               case PGRES_TUPLES_OK:
+                       sql_conn->cur_row = 0;
+                       sql_conn->affected_rows = PQntuples(sql_conn->result);
+                       numfields = PQnfields(sql_conn->result); /*Check row storing functions..*/
+                       ROPTIONAL(RDEBUG2, DEBUG2, "query returned rows = %i, fields = %i", sql_conn->affected_rows, numfields);
+                       break;
 
 #ifdef HAVE_PGRES_COPY_BOTH
-       case PGRES_COPY_BOTH:
+               case PGRES_COPY_BOTH:
 #endif
-       case PGRES_COPY_OUT:
-       case PGRES_COPY_IN:
-               DEBUG2("Data transfer started");
-               break;
+               case PGRES_COPY_OUT:
+               case PGRES_COPY_IN:
+                       DEBUG2("Data transfer started");
+                       break;
 
-       /*
-        *  Weird.. this shouldn't happen.
-        */
-       case PGRES_EMPTY_QUERY:
-       case PGRES_BAD_RESPONSE:        /* The server's response was not understood */
-       case PGRES_NONFATAL_ERROR:
-       case PGRES_FATAL_ERROR:
+               /*
+                *  Weird.. this shouldn't happen.
+                */
+               case PGRES_EMPTY_QUERY:
+               case PGRES_BAD_RESPONSE:        /* The server's response was not understood */
+               case PGRES_NONFATAL_ERROR:
+               case PGRES_FATAL_ERROR:
 #ifdef HAVE_PGRES_PIPELINE_SYNC
-       case PGRES_PIPELINE_SYNC:
-       case PGRES_PIPELINE_ABORTED:
+               case PGRES_PIPELINE_SYNC:
+               case PGRES_PIPELINE_ABORTED:
 #endif
+                       break;
+               }
+
+               query_ctx->rcode = sql_classify_error(inst, status, sql_conn->result);
                break;
+
+       default:
+               fr_assert(0);
        }
 
-       query_ctx->rcode = sql_classify_error(inst, status, conn->result);
+       if (request) unlang_interpret_mark_runnable(request);
+}
+
+static void sql_request_cancel(connection_t *conn, void *preq, trunk_cancel_reason_t reason,
+                              UNUSED void *uctx)
+{
+       fr_sql_query_t          *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t);
+       rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t);
+
+       if (!query_ctx->treq) return;
+       if (reason != TRUNK_CANCEL_REASON_SIGNAL) return;
+       if (sql_conn->query_ctx == query_ctx) sql_conn->query_ctx = NULL;
+}
+
+static void sql_request_cancel_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn,
+                                  connection_t *conn, UNUSED void *uctx)
+{
+       trunk_request_t         *treq;
+       PGcancel                *cancel;
+       rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t);
+       char                    errbuf[256];
+       PGresult                *tmp_result;
+
+       if ((trunk_connection_pop_cancellation(&treq, tconn)) == 0) {
+               cancel = PQgetCancel(sql_conn->db);
+               if (!cancel) goto complete;
+               if (PQcancel(cancel, errbuf, sizeof(errbuf)) == 0) {
+                       ERROR("Failed to cancel query: %s", errbuf);
+               }
+               PQfreeCancel(cancel);
+
+               /*
+                *      The documentation says that regardless of the result of
+                *      PQcancel, the normal processing of PQgetResult must happen.
+                */
+               while ((tmp_result = PQgetResult(sql_conn->db)) != NULL)
+                       PQclear(tmp_result);
+
+       complete:
+               trunk_request_signal_cancel_complete(treq);
+       }
+}
+
+static void sql_request_fail(request_t *request, void *preq, UNUSED void *rctx,
+                            UNUSED trunk_request_state_t state, UNUSED void *uctx)
+{
+       fr_sql_query_t  *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t);
+
+       query_ctx->treq = NULL;
+       query_ctx->rcode = RLM_SQL_ERROR;
+
+       if (request) unlang_interpret_mark_runnable(request);
+}
+
+static unlang_action_t sql_query_resume(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
+{
+       fr_sql_query_t  *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
+
        if (query_ctx->rcode != RLM_SQL_OK) RETURN_MODULE_FAIL;
+
        RETURN_MODULE_OK;
 }
 
 static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn;
+       rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t);
 
        int             fields, i;
        char const      **names;
@@ -425,9 +583,8 @@ static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNU
 static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
 {
        fr_sql_query_t          *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
-       rlm_sql_handle_t        *handle = query_ctx->handle;
        int                     records, i, len;
-       rlm_sql_postgres_conn_t *conn = handle->conn;
+       rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t);
 
        query_ctx->row = NULL;
 
@@ -457,7 +614,16 @@ static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority
 
 static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn;
+       rlm_sql_postgres_conn_t *conn;
+
+       if (query_ctx->treq && !(query_ctx->treq->state &
+           (TRUNK_REQUEST_STATE_SENT | TRUNK_REQUEST_STATE_REAPABLE | TRUNK_REQUEST_STATE_COMPLETE))) return RLM_SQL_OK;
+
+       if (!query_ctx->tconn || !query_ctx->tconn->conn || !query_ctx->tconn->conn->h) return RLM_SQL_ERROR;
+
+       if (!(query_ctx->tconn->state & TRUNK_CONN_PROCESSING)) return RLM_SQL_ERROR;
+
+       conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t);
 
        if (conn->result != NULL) {
                PQclear(conn->result);
@@ -483,7 +649,7 @@ static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_con
 static size_t sql_error(TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen,
                        fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn;
+       rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t);
        char const              *p, *q;
        size_t                  i = 0;
 
@@ -507,7 +673,7 @@ static size_t sql_error(TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen,
 
 static int sql_affected_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn;
+       rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t);
 
        return conn->affected_rows;
 }
@@ -515,8 +681,8 @@ static int sql_affected_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t
 static size_t sql_escape_func(request_t *request, char *out, size_t outlen, char const *in, void *arg)
 {
        size_t                  inlen, ret;
-       rlm_sql_handle_t        *handle = talloc_get_type_abort(arg, rlm_sql_handle_t);
-       rlm_sql_postgres_conn_t *conn = handle->conn;
+       connection_t            *c = talloc_get_type_abort(arg, connection_t);
+       rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(c->h, rlm_sql_postgres_conn_t);
        int                     err;
 
        /* Check for potential buffer overflow */
@@ -656,6 +822,34 @@ static int mod_load(void)
        return 0;
 }
 
+static void *sql_escape_arg_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, void *uctx)
+{
+       rlm_sql_t const *inst = talloc_get_type_abort_const(uctx, rlm_sql_t);
+       connection_t    *conn;
+
+       conn = connection_alloc(ctx, el,
+                               &(connection_funcs_t){
+                                       .init = _sql_connection_init,
+                                       .close = _sql_connection_close,
+                               },
+                               inst->config.trunk_conf.conn_conf,
+                               inst->name, inst);
+
+       if (!conn) {
+               PERROR("Failed allocating state handler for SQL escape connection");
+               return NULL;
+       }
+
+       connection_signal_init(conn);
+       return conn;
+}
+
+static void sql_escape_arg_free(void *uctx)
+{
+       connection_t    *conn = talloc_get_type_abort(uctx, connection_t);
+       connection_signal_halt(conn);
+}
+
 /* Exported to rlm_sql */
 extern rlm_sql_driver_t rlm_sql_postgresql;
 rlm_sql_driver_t rlm_sql_postgresql = {
@@ -668,14 +862,25 @@ rlm_sql_driver_t rlm_sql_postgresql = {
                .instantiate                    = mod_instantiate
        },
        .flags                          = RLM_SQL_RCODE_FLAGS_ALT_QUERY,
-       .sql_socket_init                = sql_socket_init,
-       .sql_query                      = sql_query,
-       .sql_select_query               = sql_query,
+       .sql_query_resume               = sql_query_resume,
+       .sql_select_query_resume        = sql_query_resume,
        .sql_fields                     = sql_fields,
        .sql_fetch_row                  = sql_fetch_row,
        .sql_error                      = sql_error,
        .sql_finish_query               = sql_free_result,
        .sql_finish_select_query        = sql_free_result,
        .sql_affected_rows              = sql_affected_rows,
-       .sql_escape_func                = sql_escape_func
+       .sql_escape_func                = sql_escape_func,
+       .sql_escape_arg_alloc           = sql_escape_arg_alloc,
+       .sql_escape_arg_free            = sql_escape_arg_free,
+       .uses_trunks                    = true,
+       .trunk_io_funcs = {
+               .connection_alloc       = sql_trunk_connection_alloc,
+               .connection_notify      = sql_trunk_connection_notify,
+               .request_mux            = sql_trunk_request_mux,
+               .request_demux          = sql_trunk_request_demux,
+               .request_cancel         = sql_request_cancel,
+               .request_cancel_mux     = sql_request_cancel_mux,
+               .request_fail           = sql_request_fail,
+       }
 };