From: Nick Porter Date: Tue, 16 Jul 2024 17:43:11 +0000 (+0100) Subject: Convert rlm_sql_postgresql to async X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c092c52fd216f1e7e39dcb310de3667db206b2ec;p=thirdparty%2Ffreeradius-server.git Convert rlm_sql_postgresql to async --- diff --git a/src/modules/rlm_sql/drivers/rlm_sql_postgresql/rlm_sql_postgresql.c b/src/modules/rlm_sql/drivers/rlm_sql_postgresql/rlm_sql_postgresql.c index b1c577895c2..31dcaacc250 100644 --- a/src/modules/rlm_sql/drivers/rlm_sql_postgresql/rlm_sql_postgresql.c +++ b/src/modules/rlm_sql/drivers/rlm_sql_postgresql/rlm_sql_postgresql.c @@ -72,6 +72,9 @@ typedef struct { int num_fields; int affected_rows; char **row; + connection_t *conn; //!< Generic connection structure for this connection. + int fd; //!< fd for this connection's I/O events. + fr_sql_query_t *query_ctx; //!< Current query running on this connection. } rlm_sql_postgres_conn_t; static conf_parser_t driver_config[] = { @@ -199,7 +202,7 @@ static sql_rcode_t sql_classify_error(rlm_sql_postgresql_t *inst, ExecStatusType return RLM_SQL_ERROR; } - DEBUG2("sqlstate %s matched %s: %s (%s)", error_code, + DEBUG3("sqlstate %s matched %s: %s (%s)", error_code, entry->sql_state, entry->meaning, fr_table_str_by_value(sql_rcode_table, entry->rcode, "")); /* @@ -220,193 +223,348 @@ static sql_rcode_t sql_classify_error(UNUSED PGresult const *result) } #endif -static int _sql_socket_destructor(rlm_sql_postgres_conn_t *conn) +static void _sql_connect_io_notify(fr_event_list_t *el, int fd, UNUSED int flags, void *uctx) { - DEBUG2("Socket destructor called, closing socket"); + rlm_sql_postgres_conn_t *c = talloc_get_type_abort(uctx, rlm_sql_postgres_conn_t); + PostgresPollingStatusType status; - if (!conn->db) return 0; + fr_event_fd_delete(el, fd, FR_EVENT_FILTER_IO); - /* PQfinish also frees the memory used by the PGconn structure */ - PQfinish(conn->db); + status = PQconnectPoll(c->db); - return 0; + /* + * Documentation says: + * Caution: do not assume that the socket remains the same across PQconnectPoll calls. + * So we get the socket again. + */ + c->fd = PQsocket(c->db); + switch (status) { + case PGRES_POLLING_OK: + DEBUG2("Connected to database '%s' on '%s' server version %i, protocol version %i, backend PID %i ", + PQdb(c->db), PQhost(c->db), PQserverVersion(c->db), PQprotocolVersion(c->db), + PQbackendPID(c->db)); + PQsetnonblocking(c->db, 1); + connection_signal_connected(c->conn); + return; + + case PGRES_POLLING_FAILED: + error: + ERROR("Connection failed: %s", PQerrorMessage(c->db)); + connection_signal_reconnect(c->conn, CONNECTION_FAILED); + return; + + case PGRES_POLLING_READING: + if (fr_event_fd_insert(c, NULL, c->conn->el, c->fd, _sql_connect_io_notify, NULL, NULL, c) != 0) goto error; + return; + + case PGRES_POLLING_WRITING: + if (fr_event_fd_insert(c, NULL, c->conn->el, c->fd, NULL, _sql_connect_io_notify, NULL, c) != 0) goto error; + return; + + default: + goto error; + + } } -static int CC_HINT(nonnull) sql_socket_init(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config, - UNUSED fr_time_delta_t timeout) +static connection_state_t _sql_connection_init(void **h, connection_t *conn, void *uctx) { - rlm_sql_postgresql_t *inst = talloc_get_type_abort(handle->inst->driver_submodule->data, rlm_sql_postgresql_t); - rlm_sql_postgres_conn_t *conn; + rlm_sql_t const *sql = talloc_get_type_abort_const(uctx, rlm_sql_t); + rlm_sql_postgresql_t const *inst = talloc_get_type_abort(sql->driver_submodule->data, rlm_sql_postgresql_t); + rlm_sql_postgres_conn_t *c; + PostgresPollingStatusType status; + + MEM(c = talloc_zero(conn, rlm_sql_postgres_conn_t)); + c->conn = conn; + c->fd = -1; - MEM(conn = handle->conn = talloc_zero(handle, rlm_sql_postgres_conn_t)); - talloc_set_destructor(conn, _sql_socket_destructor); + DEBUG2("Starting connection to PostgreSQL server using parameters: %s", inst->db_string); - DEBUG2("Connecting using parameters: %s", inst->db_string); - conn->db = PQconnectdb(inst->db_string); - if (!conn->db) { + c->db = PQconnectStart(inst->db_string); + if (!c->db) { ERROR("Connection failed: Out of memory"); - return -1; + talloc_free(c); + return CONNECTION_STATE_FAILED; } - if (PQstatus(conn->db) != CONNECTION_OK) { - ERROR("Connection failed: %s", PQerrorMessage(conn->db)); - PQfinish(conn->db); - conn->db = NULL; - return -1; + + switch (PQstatus(c->db)) { + case CONNECTION_OK: + c->fd = PQsocket(c->db); + DEBUG2("Connected to database '%s' on '%s' server version %i, protocol version %i, backend PID %i ", + PQdb(c->db), PQhost(c->db), PQserverVersion(c->db), PQprotocolVersion(c->db), + PQbackendPID(c->db)); + PQsetnonblocking(c->db, 1); + connection_signal_connected(c->conn); + return CONNECTION_STATE_CONNECTING; + + case CONNECTION_BAD: + ERROR("Connection failed: %s", PQerrorMessage(c->db)); + error: + PQfinish(c->db); + talloc_free(c); + return CONNECTION_STATE_FAILED; + + default: + break; + } - DEBUG2("Connected to database '%s' on '%s' server version %i, protocol version %i, backend PID %i ", - PQdb(conn->db), PQhost(conn->db), PQserverVersion(conn->db), PQprotocolVersion(conn->db), - PQbackendPID(conn->db)); + status = PQconnectPoll(c->db); + c->fd = PQsocket(c->db); + if (fr_event_fd_insert(c, NULL, c->conn->el, c->fd, + status == PGRES_POLLING_READING ? _sql_connect_io_notify : NULL, + status == PGRES_POLLING_WRITING ? _sql_connect_io_notify : NULL, NULL, c) != 0) goto error; - return 0; + DEBUG2("Connecting to database '%s' on '%s', fd %d", PQdb(c->db), PQhost(c->db), c->fd); + + *h = c; + + return CONNECTION_STATE_CONNECTING; } -static unlang_action_t sql_query(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx) +static void _sql_connection_close(fr_event_list_t *el, void *h, UNUSED void *uctx) { - fr_sql_query_t *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t); - rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn; - rlm_sql_postgresql_t *inst = talloc_get_type_abort(query_ctx->inst->driver_submodule->data, rlm_sql_postgresql_t); - fr_time_delta_t timeout = query_ctx->inst->config.query_timeout; - fr_time_t start; - int sockfd; - PGresult *tmp_result; - int numfields = 0; - ExecStatusType status; + rlm_sql_postgres_conn_t *c = talloc_get_type_abort(h, rlm_sql_postgres_conn_t); - if (!conn->db) { - ERROR("Socket not connected"); - reconnect: - query_ctx->rcode = RLM_SQL_RECONNECT; - RETURN_MODULE_FAIL; + if (c->fd >= 0) { + fr_event_fd_delete(el, c->fd, FR_EVENT_FILTER_IO); + c->fd = -1; } - sockfd = PQsocket(conn->db); - if (sockfd < 0) { - ERROR("Unable to obtain socket: %s", PQerrorMessage(conn->db)); - goto reconnect; + if (c->result) { + PQclear(c->result); + c->result = NULL; } - if (!PQsendQuery(conn->db, query_ctx->query_str)) { - ERROR("Failed to send query: %s", PQerrorMessage(conn->db)); - goto reconnect; + /* PQfinish also frees the memory used by the PGconn structure */ + PQfinish(c->db); + c->query_ctx = NULL; + talloc_free(h); +} + +static connection_t *sql_trunk_connection_alloc(trunk_connection_t *tconn, fr_event_list_t *el, + connection_conf_t const *conn_conf, + char const *log_prefix, void *uctx) +{ + connection_t *conn; + rlm_sql_thread_t *thread = talloc_get_type_abort(uctx, rlm_sql_thread_t); + + conn = connection_alloc(tconn, el, + &(connection_funcs_t) { + .init = _sql_connection_init, + .close = _sql_connection_close + }, + conn_conf, log_prefix, thread->inst); + if (!conn) { + PERROR("Failed allocating state handler for new SQL connection"); + return NULL; } - /* - * We try to avoid blocking by waiting until the driver indicates that - * the result is ready or our timeout expires - */ - start = fr_time(); - while (PQisBusy(conn->db)) { - int r; - fd_set read_fd; - fr_time_delta_t elapsed = fr_time_delta_wrap(0); - - FD_ZERO(&read_fd); - FD_SET(sockfd, &read_fd); - - if (fr_time_delta_ispos(timeout)) { - elapsed = fr_time_sub(fr_time(), start); - if (fr_time_delta_gteq(elapsed, timeout)) goto too_long; - } + return conn; +} - r = select(sockfd + 1, &read_fd, NULL, NULL, fr_time_delta_ispos(timeout) ? - &fr_time_delta_to_timeval(fr_time_delta_sub(timeout, elapsed)) : NULL); - if (r == 0) { - too_long: - ERROR("Socket read timeout after %d seconds", (int) fr_time_delta_to_sec(timeout)); - goto reconnect; - } - if (r < 0) { - if (errno == EINTR) continue; - ERROR("Failed in select: %s", fr_syserror(errno)); - goto reconnect; - } - if (!PQconsumeInput(conn->db)) { - ERROR("Failed reading input: %s", PQerrorMessage(conn->db)); - goto reconnect; +TRUNK_NOTIFY_FUNC(sql_trunk_connection_notify, rlm_sql_postgres_conn_t) + +static void sql_trunk_request_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, + connection_t *conn, UNUSED void *uctx) +{ + rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t); + request_t *request; + trunk_request_t *treq; + fr_sql_query_t *query_ctx; + int err; + + if (trunk_connection_pop_request(&treq, tconn) != 0) return; + if (!treq) return; + + query_ctx = talloc_get_type_abort(treq->preq, fr_sql_query_t); + request = query_ctx->request; + + switch (query_ctx->status) { + case SQL_QUERY_PREPARED: + ROPTIONAL(RDEBUG2, DEBUG2, "Executing query: %s", query_ctx->query_str); + err = PQsendQuery(sql_conn->db, query_ctx->query_str); + query_ctx->tconn = tconn; + if (!err) { + ROPTIONAL(RERROR, ERROR, "Failed to send query: %s", PQerrorMessage(sql_conn->db)); + trunk_request_signal_fail(treq); + return; } + + query_ctx->status = SQL_QUERY_SUBMITTED; + sql_conn->query_ctx = query_ctx; + trunk_request_signal_sent(treq); + return; + + default: + return; } +} - /* - * Returns a PGresult pointer or possibly a null pointer. - * A non-null pointer will generally be returned except in - * out-of-memory conditions or serious errors such as inability - * to send the command to the server. If a null pointer is - * returned, it should be treated like a PGRES_FATAL_ERROR - * result. - */ - conn->result = PQgetResult(conn->db); +static void sql_trunk_request_demux(UNUSED fr_event_list_t *el, UNUSED trunk_connection_t *tconn, + connection_t *conn, UNUSED void *uctx) +{ + rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t); + rlm_sql_postgresql_t *inst; + fr_sql_query_t *query_ctx; + request_t *request; + PGresult *tmp_result; + ExecStatusType status; + int numfields; - /* Discard results for appended queries */ - while ((tmp_result = PQgetResult(conn->db)) != NULL) - PQclear(tmp_result); + query_ctx = sql_conn->query_ctx; + request = query_ctx->request; + inst = talloc_get_type_abort(query_ctx->inst->driver_submodule->data, rlm_sql_postgresql_t); - /* - * As this error COULD be a connection error OR an out-of-memory - * condition return value WILL be wrong SOME of the time - * regardless! Pick your poison... - */ - if (!conn->result) { - ERROR("Failed getting query result: %s", PQerrorMessage(conn->db)); - goto reconnect; - } + switch (query_ctx->status) { + case SQL_QUERY_SUBMITTED: + if (PQconsumeInput(sql_conn->db) == 0) { + ROPTIONAL(RERROR, ERROR, "SQL query failed: %s", PQerrorMessage(sql_conn->db)); + query_ctx->rcode = RLM_SQL_ERROR; + break; + } + if (PQisBusy(sql_conn->db)) return; + + query_ctx->status = SQL_QUERY_RETURNED; + + sql_conn->result = PQgetResult(sql_conn->db); + + /* Discard results for appended queries */ + while ((tmp_result = PQgetResult(sql_conn->db)) != NULL) + PQclear(tmp_result); - status = PQresultStatus(conn->result); - switch (status){ - /* - * Successful completion of a command returning no data. - */ - case PGRES_COMMAND_OK: /* - * Affected_rows function only returns the number of affected rows of a command - * returning no data... + * As this error COULD be a connection error OR an out-of-memory + * condition return value WILL be wrong SOME of the time + * regardless! Pick your poison... + */ + if (!sql_conn->result) { + ROPTIONAL(RERROR, ERROR, "Failed getting query result: %s", PQerrorMessage(sql_conn->db)); + query_ctx->rcode = RLM_SQL_RECONNECT; + break; + } + + status = PQresultStatus(sql_conn->result); + switch (status){ + /* + * Successful completion of a command returning no data. + */ + case PGRES_COMMAND_OK: + /* + * Affected_rows function only returns the number of affected rows of a command + * returning no data... + */ + sql_conn->affected_rows = affected_rows(sql_conn->result); + ROPTIONAL(RDEBUG2, DEBUG2, "query affected rows = %i", sql_conn->affected_rows); + break; + /* + * Successful completion of a command returning data (such as a SELECT or SHOW). */ - conn->affected_rows = affected_rows(conn->result); - DEBUG2("query affected rows = %i", conn->affected_rows); - break; - /* - * Successful completion of a command returning data (such as a SELECT or SHOW). - */ #ifdef HAVE_PGRES_SINGLE_TUPLE - case PGRES_SINGLE_TUPLE: + case PGRES_SINGLE_TUPLE: #endif - case PGRES_TUPLES_OK: - conn->cur_row = 0; - conn->affected_rows = PQntuples(conn->result); - numfields = PQnfields(conn->result); /*Check row storing functions..*/ - DEBUG2("query returned rows = %i, fields = %i", conn->affected_rows, numfields); - break; + case PGRES_TUPLES_OK: + sql_conn->cur_row = 0; + sql_conn->affected_rows = PQntuples(sql_conn->result); + numfields = PQnfields(sql_conn->result); /*Check row storing functions..*/ + ROPTIONAL(RDEBUG2, DEBUG2, "query returned rows = %i, fields = %i", sql_conn->affected_rows, numfields); + break; #ifdef HAVE_PGRES_COPY_BOTH - case PGRES_COPY_BOTH: + case PGRES_COPY_BOTH: #endif - case PGRES_COPY_OUT: - case PGRES_COPY_IN: - DEBUG2("Data transfer started"); - break; + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + DEBUG2("Data transfer started"); + break; - /* - * Weird.. this shouldn't happen. - */ - case PGRES_EMPTY_QUERY: - case PGRES_BAD_RESPONSE: /* The server's response was not understood */ - case PGRES_NONFATAL_ERROR: - case PGRES_FATAL_ERROR: + /* + * Weird.. this shouldn't happen. + */ + case PGRES_EMPTY_QUERY: + case PGRES_BAD_RESPONSE: /* The server's response was not understood */ + case PGRES_NONFATAL_ERROR: + case PGRES_FATAL_ERROR: #ifdef HAVE_PGRES_PIPELINE_SYNC - case PGRES_PIPELINE_SYNC: - case PGRES_PIPELINE_ABORTED: + case PGRES_PIPELINE_SYNC: + case PGRES_PIPELINE_ABORTED: #endif + break; + } + + query_ctx->rcode = sql_classify_error(inst, status, sql_conn->result); break; + + default: + fr_assert(0); } - query_ctx->rcode = sql_classify_error(inst, status, conn->result); + if (request) unlang_interpret_mark_runnable(request); +} + +static void sql_request_cancel(connection_t *conn, void *preq, trunk_cancel_reason_t reason, + UNUSED void *uctx) +{ + fr_sql_query_t *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t); + rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t); + + if (!query_ctx->treq) return; + if (reason != TRUNK_CANCEL_REASON_SIGNAL) return; + if (sql_conn->query_ctx == query_ctx) sql_conn->query_ctx = NULL; +} + +static void sql_request_cancel_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, + connection_t *conn, UNUSED void *uctx) +{ + trunk_request_t *treq; + PGcancel *cancel; + rlm_sql_postgres_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_postgres_conn_t); + char errbuf[256]; + PGresult *tmp_result; + + if ((trunk_connection_pop_cancellation(&treq, tconn)) == 0) { + cancel = PQgetCancel(sql_conn->db); + if (!cancel) goto complete; + if (PQcancel(cancel, errbuf, sizeof(errbuf)) == 0) { + ERROR("Failed to cancel query: %s", errbuf); + } + PQfreeCancel(cancel); + + /* + * The documentation says that regardless of the result of + * PQcancel, the normal processing of PQgetResult must happen. + */ + while ((tmp_result = PQgetResult(sql_conn->db)) != NULL) + PQclear(tmp_result); + + complete: + trunk_request_signal_cancel_complete(treq); + } +} + +static void sql_request_fail(request_t *request, void *preq, UNUSED void *rctx, + UNUSED trunk_request_state_t state, UNUSED void *uctx) +{ + fr_sql_query_t *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t); + + query_ctx->treq = NULL; + query_ctx->rcode = RLM_SQL_ERROR; + + if (request) unlang_interpret_mark_runnable(request); +} + +static unlang_action_t sql_query_resume(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx) +{ + fr_sql_query_t *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t); + if (query_ctx->rcode != RLM_SQL_OK) RETURN_MODULE_FAIL; + RETURN_MODULE_OK; } static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config) { - rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn; + rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t); int fields, i; char const **names; @@ -425,9 +583,8 @@ static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNU static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx) { fr_sql_query_t *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t); - rlm_sql_handle_t *handle = query_ctx->handle; int records, i, len; - rlm_sql_postgres_conn_t *conn = handle->conn; + rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t); query_ctx->row = NULL; @@ -457,7 +614,16 @@ static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config) { - rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn; + rlm_sql_postgres_conn_t *conn; + + if (query_ctx->treq && !(query_ctx->treq->state & + (TRUNK_REQUEST_STATE_SENT | TRUNK_REQUEST_STATE_REAPABLE | TRUNK_REQUEST_STATE_COMPLETE))) return RLM_SQL_OK; + + if (!query_ctx->tconn || !query_ctx->tconn->conn || !query_ctx->tconn->conn->h) return RLM_SQL_ERROR; + + if (!(query_ctx->tconn->state & TRUNK_CONN_PROCESSING)) return RLM_SQL_ERROR; + + conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t); if (conn->result != NULL) { PQclear(conn->result); @@ -483,7 +649,7 @@ static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_con static size_t sql_error(TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config) { - rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn; + rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t); char const *p, *q; size_t i = 0; @@ -507,7 +673,7 @@ static size_t sql_error(TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, static int sql_affected_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config) { - rlm_sql_postgres_conn_t *conn = query_ctx->handle->conn; + rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_postgres_conn_t); return conn->affected_rows; } @@ -515,8 +681,8 @@ static int sql_affected_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t static size_t sql_escape_func(request_t *request, char *out, size_t outlen, char const *in, void *arg) { size_t inlen, ret; - rlm_sql_handle_t *handle = talloc_get_type_abort(arg, rlm_sql_handle_t); - rlm_sql_postgres_conn_t *conn = handle->conn; + connection_t *c = talloc_get_type_abort(arg, connection_t); + rlm_sql_postgres_conn_t *conn = talloc_get_type_abort(c->h, rlm_sql_postgres_conn_t); int err; /* Check for potential buffer overflow */ @@ -656,6 +822,34 @@ static int mod_load(void) return 0; } +static void *sql_escape_arg_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, void *uctx) +{ + rlm_sql_t const *inst = talloc_get_type_abort_const(uctx, rlm_sql_t); + connection_t *conn; + + conn = connection_alloc(ctx, el, + &(connection_funcs_t){ + .init = _sql_connection_init, + .close = _sql_connection_close, + }, + inst->config.trunk_conf.conn_conf, + inst->name, inst); + + if (!conn) { + PERROR("Failed allocating state handler for SQL escape connection"); + return NULL; + } + + connection_signal_init(conn); + return conn; +} + +static void sql_escape_arg_free(void *uctx) +{ + connection_t *conn = talloc_get_type_abort(uctx, connection_t); + connection_signal_halt(conn); +} + /* Exported to rlm_sql */ extern rlm_sql_driver_t rlm_sql_postgresql; rlm_sql_driver_t rlm_sql_postgresql = { @@ -668,14 +862,25 @@ rlm_sql_driver_t rlm_sql_postgresql = { .instantiate = mod_instantiate }, .flags = RLM_SQL_RCODE_FLAGS_ALT_QUERY, - .sql_socket_init = sql_socket_init, - .sql_query = sql_query, - .sql_select_query = sql_query, + .sql_query_resume = sql_query_resume, + .sql_select_query_resume = sql_query_resume, .sql_fields = sql_fields, .sql_fetch_row = sql_fetch_row, .sql_error = sql_error, .sql_finish_query = sql_free_result, .sql_finish_select_query = sql_free_result, .sql_affected_rows = sql_affected_rows, - .sql_escape_func = sql_escape_func + .sql_escape_func = sql_escape_func, + .sql_escape_arg_alloc = sql_escape_arg_alloc, + .sql_escape_arg_free = sql_escape_arg_free, + .uses_trunks = true, + .trunk_io_funcs = { + .connection_alloc = sql_trunk_connection_alloc, + .connection_notify = sql_trunk_connection_notify, + .request_mux = sql_trunk_request_mux, + .request_demux = sql_trunk_request_demux, + .request_cancel = sql_request_cancel, + .request_cancel_mux = sql_request_cancel_mux, + .request_fail = sql_request_fail, + } };