]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Update rlm_sql_unixodbc to use trunk connections and async where possible
authorNick Porter <nick@portercomputing.co.uk>
Wed, 9 Oct 2024 18:13:47 +0000 (19:13 +0100)
committerNick Porter <nick@portercomputing.co.uk>
Fri, 11 Oct 2024 13:59:16 +0000 (14:59 +0100)
Async support is driver specific, so some will run synchronous and some
async.

Tested with:
 - Microsoft ODBC Driver 18 for SQL Server
 - FreeTDS driver (Sybase/MS SQL)
 - MariaDB Connector/ODBC(Unicode)
 - MySQL ODBC 9.0 Unicode Driver
 - PostgreSQL Unicode

of which only Microsoft ODBC provided async support.

raddb/mods-available/sql
src/modules/rlm_sql/drivers/rlm_sql_unixodbc/rlm_sql_unixodbc.c
src/modules/rlm_sql/rlm_sql.h

index 876930b66d683e6781d35038805171092495b2ea..a4c0b7e67dd6ea677e1bd0b6794a8f644b73a89b 100644 (file)
@@ -194,7 +194,7 @@ sql {
 #      logfile = ${logdir}/sqllog.sql
 
        #
-       #  query_timeout:: Set the maximum query duration for `cassandra`.
+       #  query_timeout:: Set the maximum query duration for `cassandra` and `unixodbc`
        #
 #      query_timeout = 5
 
index 57a8b67296f2c02d46a3b2434f5cdf3f1239b03d..1d9456bee18415f38d14c15777dcca7fa29a3106 100644 (file)
@@ -28,135 +28,487 @@ USES_APPLE_DEPRECATED_API
 
 #include <sqltypes.h>
 #include "rlm_sql.h"
+#include "rlm_sql_trunk.h"
 
 typedef struct {
-       SQLHENV env;
-       SQLHDBC dbc;
-       SQLHSTMT stmt;
-       rlm_sql_row_t row;
-       void *conn;
+       SQLHENV                 env;            /* Environment handle */
+       SQLHDBC                 dbc;            /* Database connection handle */
+       SQLHSTMT                stmt;           /* Statement handle */
+       SQLSMALLINT             colcount;       /* Number of columns in last result */
+       rlm_sql_row_t           row;            /* Results row */
+       connection_t            *conn;          /* Generic connection structure for this connection */
+       rlm_sql_config_t const  *config;        /* SQL instance configuration */
+       SQLUSMALLINT            async_mode;     /* What Async mode does this driver support */
+       fr_sql_query_t          *query_ctx;     /* Current query running on the connection */
+       fr_event_timer_t const  *read_ev;       /* Timer event for polling reading this connection */
+       fr_event_timer_t const  *write_ev;      /* Timer event for polling writing this connection */
+       uint                    select_interval;        /* How frequently this connection gets polled for select queries */
+       uint                    query_interval; /* How frequently this connection gets polled for other queries */
+       uint                    poll_count;     /* How many polls have been done for the current query */
 } rlm_sql_unixodbc_conn_t;
 
 USES_APPLE_DEPRECATED_API
 #include <sql.h>
 #include <sqlext.h>
 
-/* Forward declarations */
-static sql_rcode_t sql_check_error(long error_handle, rlm_sql_handle_t *handle);
-static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, rlm_sql_config_t const *config);
-static int sql_num_fields(rlm_sql_handle_t *handle, rlm_sql_config_t const *config);
+/** Checks the error code to determine if the connection needs to be re-esttablished
+ *
+ * @param ret Return code from a failed unixodbc call.
+ * @param handle_type Type of ODBC handle
+ * @param handle ODBC handle
+ * @return
+ *     - #RLM_SQL_OK on success.
+ *     - #RLM_SQL_ALT_QUERY if alternate queries should be tried.
+ *     - #RLM_SQL_RECONNECT if reconnect is needed.
+ *     - #RLM_SQL_ERROR on error.
+ */
+static sql_rcode_t sql_check_error(SQLRETURN ret, SQLSMALLINT handle_type, SQLHANDLE handle)
+{
+       SQLCHAR         state[6];
+       SQLCHAR         error[256];
+       SQLINTEGER      errornum = 0;
+       SQLSMALLINT     length = 255;
+       int             res = RLM_SQL_ERROR;
+
+       if (SQL_SUCCEEDED(ret)) return 0; /* on success, just return 0 */
+
+       error[0] = state[0] = '\0';
+
+       SQLGetDiagRec(handle_type, handle, 1, state, &errornum, error, sizeof(error), &length);
+
+       if (state[0] == '0') {
+               switch (state[1]) {
+               /* SQLSTATE 01 class contains info and warning messages */
+               case '1':
+                       INFO("%s %s", state, error);
+                       FALL_THROUGH;
+               case '0':               /* SQLSTATE 00 class means success */
+                       res = RLM_SQL_OK;
+                       break;
 
-static int _sql_socket_destructor(rlm_sql_unixodbc_conn_t *conn)
+               /* SQLSTATE 08 class describes various connection errors */
+               case '8':
+                       ERROR("SQL down %s %s", state, error);
+                       res = RLM_SQL_RECONNECT;
+                       break;
+
+               /* any other SQLSTATE means error */
+               default:
+                       ERROR("%s %s", state, error);
+                       break;
+               }
+       } else {
+               /* SQLSTATE 23000 is "Integrity constraint violation" - such as duplicate key */
+               if (strcmp((char const *)state, "23000") == 0) {
+                       res = RLM_SQL_ALT_QUERY;
+               } else {
+                       ERROR("%s %s", state, error);
+               }
+       }
+
+       return res;
+}
+
+static void _sql_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
 {
-       DEBUG2("Socket destructor called, closing socket");
+       rlm_sql_unixodbc_conn_t *c = talloc_get_type_abort(h, rlm_sql_unixodbc_conn_t);
+
+       if (c->read_ev) fr_event_timer_delete(&c->read_ev);
+       if (c->write_ev) fr_event_timer_delete(&c->write_ev);
+
+       if (c->stmt) SQLFreeHandle(SQL_HANDLE_STMT, c->stmt);
+
+       if (c->dbc) {
+               SQLDisconnect(c->dbc);
+               SQLFreeHandle(SQL_HANDLE_DBC, c->dbc);
+       }
 
-       if (conn->stmt) SQLFreeStmt(conn->stmt, SQL_DROP);
+       if (c->env) SQLFreeHandle(SQL_HANDLE_ENV, c->env);
+       talloc_free(h);
+}
+
+static connection_state_t sql_trunk_connection_init_stmt(rlm_sql_unixodbc_conn_t *c)
+{
+       char            buff[256], verbuf[10];
+       SQLRETURN       ret;
+       SQLULEN         timeout;
+
+       SQLGetInfo(c->dbc, SQL_DRIVER_NAME, buff, sizeof(buff), NULL);
+       SQLGetInfo(c->dbc, SQL_DRIVER_ODBC_VER, verbuf, sizeof(verbuf), NULL);
+       SQLGetInfo(c->dbc, SQL_ASYNC_MODE, &c->async_mode, 0, NULL);
+       switch(c->async_mode) {
+       case SQL_AM_NONE:
+               DEBUG2("Using driver %s, ODBC version %s.  Driver does not support async operations", buff, verbuf);
+               break;
+       case SQL_AM_CONNECTION:
+               DEBUG2("Using driver %s, ODBC version %s.  Async operation is set per connection", buff, verbuf);
+               ret = SQLSetConnectAttr(c->dbc, SQL_ATTR_ASYNC_ENABLE, (SQLPOINTER)SQL_ASYNC_ENABLE_ON, SQL_IS_UINTEGER);
+               sql_check_error(ret, SQL_HANDLE_DBC, c->dbc);
+               break;
+       case SQL_AM_STATEMENT:
+               DEBUG2("Using driver %s, ODBC version %s.  Async operation is set per statement", buff, verbuf);
+               break;
+       }
 
-       if (conn->dbc) {
-               SQLDisconnect(conn->dbc);
-               SQLFreeConnect(conn->dbc);
+       /* Allocate the stmt handle */
+       ret = SQLAllocHandle(SQL_HANDLE_STMT, c->dbc, &c->stmt);
+       if (sql_check_error(ret, SQL_HANDLE_DBC, c->dbc)) {
+               ERROR("Can't allocate the stmt");
+               _sql_connection_close(NULL, c, NULL);
+               return CONNECTION_STATE_FAILED;
+       }
+       if (c->async_mode == SQL_AM_STATEMENT) {
+               ret = SQLSetStmtAttr(c->stmt, SQL_ATTR_ASYNC_ENABLE, (SQLPOINTER)SQL_ASYNC_ENABLE_ON, 0);
+               sql_check_error(ret, SQL_HANDLE_STMT, c->stmt);
        }
 
-       if (conn->env) SQLFreeEnv(conn->env);
+       timeout = fr_time_delta_to_sec(c->config->query_timeout);
+       SQLSetStmtAttr(c->stmt, SQL_ATTR_QUERY_TIMEOUT, (SQLPOINTER)timeout, SQL_IS_UINTEGER);
 
-       return 0;
+       return CONNECTION_STATE_CONNECTED;
 }
 
-static sql_rcode_t sql_socket_init(rlm_sql_handle_t *handle, rlm_sql_config_t const *config,
-                                  fr_time_delta_t timeout)
+static void sql_trunk_connection_init_poll(fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
 {
-       rlm_sql_unixodbc_conn_t *conn;
-       long err_handle;
-       uint32_t timeout_ms = fr_time_delta_to_msec(timeout);
+       rlm_sql_unixodbc_conn_t *c = talloc_get_type_abort(uctx, rlm_sql_unixodbc_conn_t);
+       SQLRETURN               ret;
+
+       ret = SQLConnect(c->dbc,
+                        UNCONST(SQLCHAR *, c->config->sql_server), strlen(c->config->sql_server),
+                        UNCONST(SQLCHAR *, c->config->sql_login), strlen(c->config->sql_login),
+                        UNCONST(SQLCHAR *, c->config->sql_password), strlen(c->config->sql_password));
+
+       if (ret == SQL_STILL_EXECUTING) {
+               if (fr_event_timer_in(c, el, &c->read_ev, fr_time_delta_from_usec(c->query_interval),
+                                     sql_trunk_connection_init_poll, c) < 0) {
+                       ERROR("Unable to insert polling event");
+                       connection_signal_reconnect(c->conn, CONNECTION_FAILED);
+               }
+               return;
+       }
 
-       MEM(conn = handle->conn = talloc_zero(handle, rlm_sql_unixodbc_conn_t));
-       talloc_set_destructor(conn, _sql_socket_destructor);
+       if (sql_check_error(ret, SQL_HANDLE_DBC, c->dbc)) {
+               ERROR("Connection failed");
+               connection_signal_reconnect(c->conn, CONNECTION_FAILED);
+       }
+
+       if (sql_trunk_connection_init_stmt(c) == CONNECTION_STATE_CONNECTED) connection_signal_connected(c->conn);
+}
 
-       /* 1. Allocate environment handle and register version */
-       err_handle = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &conn->env);
-       if (sql_check_error(err_handle, handle)) {
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
+static connection_state_t _sql_connection_init(void **h, connection_t *conn, void *uctx)
+{
+       rlm_sql_t const         *sql = talloc_get_type_abort_const(uctx, rlm_sql_t);
+       rlm_sql_config_t const  *config = &sql->config;
+       rlm_sql_unixodbc_conn_t *c;
+       SQLRETURN               ret;
+       SQLULEN                 timeout = fr_time_delta_to_sec(sql->config.trunk_conf.conn_conf->connection_timeout);
+
+       MEM(c = talloc_zero(conn, rlm_sql_unixodbc_conn_t));
+       *c = (rlm_sql_unixodbc_conn_t) {
+               .conn = conn,
+               .config = config,
+               .select_interval = 1000,        /* Default starting poll interval - 1ms*/
+               .query_interval = 1000,
+       };
+
+       /* Allocate environment handle and register version */
+       ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &c->env);
+       if (ret == SQL_ERROR) {
                ERROR("Can't allocate environment handle");
-               return RLM_SQL_ERROR;
+       error:
+               _sql_connection_close(NULL, c, NULL);
+               return CONNECTION_STATE_FAILED;
        }
 
-       err_handle = SQLSetEnvAttr(conn->env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3, 0);
-       if (sql_check_error(err_handle, handle)) {
+       ret = SQLSetEnvAttr(c->env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3_80, 0);
+       if (sql_check_error(ret, SQL_HANDLE_ENV, c->env)) {
                ERROR("Can't register ODBC version");
-               return RLM_SQL_ERROR;
+               goto error;
        }
 
-       /* 2. Allocate connection handle */
-       err_handle = SQLAllocHandle(SQL_HANDLE_DBC, conn->env, &conn->dbc);
-       if (sql_check_error(err_handle, handle)) {
+       /* Allocate connection handle */
+       ret = SQLAllocHandle(SQL_HANDLE_DBC, c->env, &c->dbc);
+       if (sql_check_error(ret, SQL_HANDLE_ENV, c->env)) {
                ERROR("Can't allocate connection handle");
-               return RLM_SQL_ERROR;
+               goto error;
        }
 
-       /* Set the connection timeout */
-       SQLSetConnectAttr(conn->dbc, SQL_ATTR_LOGIN_TIMEOUT, &timeout_ms, SQL_IS_UINTEGER);
-
-       /* 3. Connect to the datasource */
-       err_handle = SQLConnect(conn->dbc,
-                               UNCONST(SQLCHAR *, config->sql_server), strlen(config->sql_server),
-                               UNCONST(SQLCHAR *, config->sql_login), strlen(config->sql_login),
-                               UNCONST(SQLCHAR *, config->sql_password), strlen(config->sql_password));
+       /*
+        *      Set the login / connection timeout
+        *
+        *      Note SQLSetConnectionAttr and SQLSetStmtAttr have an insane parameter passing
+        *      model.  The 3rd parameter can be an integer, or a pointer to a string
+        *      so integers get cast to pointers to match the function signature.
+        */
+       SQLSetConnectAttr(c->dbc, SQL_ATTR_LOGIN_TIMEOUT, (SQLPOINTER)timeout, SQL_IS_UINTEGER);
+       SQLSetConnectAttr(c->dbc, SQL_ATTR_CONNECTION_TIMEOUT, (SQLPOINTER)timeout, SQL_IS_UINTEGER);
+
+       /* Set the connection handle to Async */
+       SQLSetConnectAttr(c->dbc, SQL_ATTR_ASYNC_DBC_FUNCTIONS_ENABLE, (SQLPOINTER)SQL_ASYNC_DBC_ENABLE_ON, SQL_IS_UINTEGER);
+
+       /* Connect to the datasource */
+       ret = SQLConnect(c->dbc,
+                        UNCONST(SQLCHAR *, config->sql_server), strlen(config->sql_server),
+                        UNCONST(SQLCHAR *, config->sql_login), strlen(config->sql_login),
+                        UNCONST(SQLCHAR *, config->sql_password), strlen(config->sql_password));
+
+       if (ret == SQL_STILL_EXECUTING) {
+               if (fr_event_timer_in(c, conn->el, &c->read_ev, fr_time_delta_from_usec(c->query_interval),
+                                     sql_trunk_connection_init_poll, c) < 0) {
+                       ERROR("Unable to insert polling event");
+                       goto error;
+               }
+               *h = c;
+               return CONNECTION_STATE_CONNECTING;
+       }
 
-       if (sql_check_error(err_handle, handle)) {
+       if (sql_check_error(ret, SQL_HANDLE_DBC, c->dbc)) {
                ERROR("Connection failed");
-               return RLM_SQL_ERROR;
+               goto error;
        }
 
-       /* 4. Allocate the stmt */
-       err_handle = SQLAllocHandle(SQL_HANDLE_STMT, conn->dbc, &conn->stmt);
-       if (sql_check_error(err_handle, handle)) {
-               ERROR("Can't allocate the stmt");
-               return RLM_SQL_ERROR;
+       *h = c;
+       return sql_trunk_connection_init_stmt(c);
+}
+
+SQL_TRUNK_CONNECTION_ALLOC
+
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
+static void sql_trunk_request_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
+{
+       rlm_sql_unixodbc_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_unixodbc_conn_t);
+       request_t               *request;
+       trunk_request_t         *treq;
+       fr_sql_query_t          *query_ctx;
+       SQLRETURN               ret;
+
+       if (trunk_connection_pop_request(&treq, tconn) != 0) return;
+       if (!treq) return;
+
+       query_ctx = talloc_get_type_abort(treq->preq, fr_sql_query_t);
+       request = query_ctx->request;
+
+       switch(query_ctx->status) {
+       case SQL_QUERY_PREPARED:
+               ROPTIONAL(RDEBUG2, DEBUG2, "Executing query: %s", query_ctx->query_str);
+               ret = SQLExecDirect(sql_conn->stmt, UNCONST(SQLCHAR *, query_ctx->query_str), strlen(query_ctx->query_str));
+               query_ctx->tconn = tconn;
+
+               if (ret == SQL_STILL_EXECUTING) {
+                       ROPTIONAL(RDEBUG3, DEBUG3, "Awaiting response");
+                       query_ctx->status = SQL_QUERY_SUBMITTED;
+                       sql_conn->query_ctx = query_ctx;
+                       sql_conn->poll_count = 0;
+                       trunk_request_signal_sent(treq);
+                       return;
+               }
+
+               query_ctx->rcode = sql_check_error(ret, SQL_HANDLE_STMT, sql_conn->stmt);
+               switch(query_ctx->rcode) {
+               case RLM_SQL_OK:
+               case RLM_SQL_ALT_QUERY:
+                       break;
+
+               default:
+                       query_ctx->status = SQL_QUERY_FAILED;
+                       trunk_request_signal_fail(treq);
+                       if (query_ctx->rcode == RLM_SQL_RECONNECT) connection_signal_reconnect(conn, CONNECTION_FAILED);
+                       return;
+               }
+               query_ctx->status = SQL_QUERY_RETURNED;
+               break;
+
+       default:
+               return;
        }
 
-    return RLM_SQL_OK;
+       ROPTIONAL(RDEBUG3, DEBUG3, "Got immediate response");
+       trunk_request_signal_reapable(treq);
+       if (request) unlang_interpret_mark_runnable(request);
 }
 
-static unlang_action_t sql_query(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
+static void sql_request_cancel(connection_t *conn, void *preq, trunk_cancel_reason_t reason,
+                               UNUSED void *uctx)
 {
-       fr_sql_query_t          *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
-       long err_handle;
-
-       /* Executing query */
-       err_handle = SQLExecDirect(conn->stmt, UNCONST(SQLCHAR *, query_ctx->query_str), strlen(query_ctx->query_str));
-       if ((query_ctx->rcode = sql_check_error(err_handle, query_ctx->handle))) {
-               if(query_ctx->rcode == RLM_SQL_RECONNECT) {
-                       DEBUG("rlm_sql will attempt to reconnect");
+       fr_sql_query_t          *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t);
+       rlm_sql_unixodbc_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_unixodbc_conn_t);
+
+       if (!query_ctx->treq) return;
+       if (reason != TRUNK_CANCEL_REASON_SIGNAL) return;
+       if (sql_conn->query_ctx == query_ctx) sql_conn->query_ctx = NULL;
+}
+
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
+static void sql_request_cancel_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn,
+                                  connection_t *conn, UNUSED void *uctx)
+{
+       trunk_request_t         *treq;
+       rlm_sql_unixodbc_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_unixodbc_conn_t);
+       SQLRETURN               ret;
+       fr_sql_query_t          *query_ctx;
+
+       if ((trunk_connection_pop_cancellation(&treq, tconn)) != 0) return;
+       if (!treq) return;
+
+       query_ctx = talloc_get_type_abort(treq->preq, fr_sql_query_t);
+       ret = SQLCancel(sql_conn->stmt);
+       query_ctx->status = SQL_QUERY_CANCELLED;
+       if (ret == SQL_STILL_EXECUTING) {
+               trunk_request_signal_cancel_sent(treq);
+               return;
+       }
+       trunk_request_signal_cancel_complete(treq);
+}
+
+static void sql_trunk_connection_read_poll(fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
+{
+       rlm_sql_unixodbc_conn_t *c = talloc_get_type_abort(uctx, rlm_sql_unixodbc_conn_t);
+       fr_sql_query_t          *query_ctx = c->query_ctx;
+       SQLRETURN               ret;
+       trunk_request_t         *treq = query_ctx->treq;
+       request_t               *request = query_ctx->request;
+
+       switch (query_ctx->status) {
+       case SQL_QUERY_SUBMITTED:
+               ret = SQLExecDirect(c->stmt, UNCONST(SQLCHAR *, query_ctx->query_str), strlen(query_ctx->query_str));
+               c->poll_count++;
+               /* Back off the poll interval, up to half the query timeout */
+               if (c->poll_count > 2) {
+                       if (query_ctx->type == SQL_QUERY_SELECT) {
+                               if (c->select_interval < fr_time_delta_to_usec(c->config->query_timeout)/2) c->select_interval += 100;
+                       } else {
+                               if (c->select_interval < fr_time_delta_to_usec(c->config->query_timeout)/2) c->query_interval += 100;
+                       }
                }
-               RETURN_MODULE_FAIL;
+               if (ret == SQL_STILL_EXECUTING) {
+                       ROPTIONAL(RDEBUG3, DEBUG3, "Still awaiting response");
+                       if (fr_event_timer_in(c, el, &c->read_ev,
+                                             fr_time_delta_from_usec(query_ctx->type == SQL_QUERY_SELECT ? c->select_interval : c->query_interval),
+                                             sql_trunk_connection_read_poll, c) < 0) {
+                               ERROR("Unable to insert polling event");
+                       }
+                       return;
+               }
+
+               query_ctx->rcode = sql_check_error(ret, SQL_HANDLE_STMT, c->stmt);
+               switch(query_ctx->rcode) {
+               case RLM_SQL_OK:
+               case RLM_SQL_ALT_QUERY:
+                       /* If we only polled once, reduce the interval*/
+                       if (c->poll_count == 1) {
+                               if (query_ctx->type == SQL_QUERY_SELECT) {
+                                       c->select_interval /= 2;
+                               } else {
+                                       c->query_interval /= 2;
+                               }
+                       }
+                       break;
+
+               default:
+                       query_ctx->status = SQL_QUERY_FAILED;
+                       trunk_request_signal_fail(treq);
+                       if (query_ctx->rcode == RLM_SQL_RECONNECT) connection_signal_reconnect(c->conn, CONNECTION_FAILED);
+                       return;
+               }
+               break;
+
+       case SQL_QUERY_CANCELLED:
+               ret = SQLCancel(c->stmt);
+               if (ret == SQL_STILL_EXECUTING) {
+                       ROPTIONAL(RDEBUG3, DEBUG3, "Still awaiting response");
+                       if (fr_event_timer_in(c, el, &c->read_ev, fr_time_delta_from_usec(query_ctx->type == SQL_QUERY_SELECT ? c->select_interval : c->query_interval),
+                                             sql_trunk_connection_read_poll, c) < 0) {
+                               ERROR("Unable to insert polling event");
+                       }
+                       return;
+               }
+               trunk_request_signal_cancel_complete(treq);
+               return;
+
+       default:
+               return;
        }
-       RETURN_MODULE_OK;
+
+       if (request) unlang_interpret_mark_runnable(request);
+}
+
+static void sql_trunk_connection_write_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
+{
+       trunk_connection_t      *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
+
+       trunk_connection_signal_writable(tconn);
 }
 
-static unlang_action_t sql_select_query(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
+/*
+ *     UnixODBC doesn't support event driven async, so in this case
+ *     we have to resort to polling.
+ *
+ *     This "notify" callback sets up the appropriate polling events.
+ */
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
+static void sql_trunk_connection_notify(UNUSED trunk_connection_t *tconn, connection_t *conn, UNUSED fr_event_list_t *el,
+                                       trunk_connection_event_t notify_on, UNUSED void *uctx)
+{
+       rlm_sql_unixodbc_conn_t *c = talloc_get_type_abort(conn->h, rlm_sql_unixodbc_conn_t);
+       fr_sql_query_t          *query_ctx = c->query_ctx;
+       uint                    poll_interval = (query_ctx && query_ctx->type != SQL_QUERY_SELECT) ? c->query_interval : c->select_interval;
+       switch (notify_on) {
+       case TRUNK_CONN_EVENT_NONE:
+               if (c->read_ev) fr_event_timer_delete(&c->read_ev);
+               if (c->write_ev) fr_event_timer_delete(&c->write_ev);
+               return;
+
+       case TRUNK_CONN_EVENT_BOTH:
+       case TRUNK_CONN_EVENT_READ:
+               if (c->query_ctx) {
+                       if (fr_event_timer_in(c, el, &c->read_ev, fr_time_delta_from_usec(poll_interval),
+                                             sql_trunk_connection_read_poll, c) < 0) {
+                               ERROR("Unable to insert polling event");
+                       }
+               }
+               if (notify_on == TRUNK_CONN_EVENT_READ) return;
+
+               FALL_THROUGH;
+
+       case TRUNK_CONN_EVENT_WRITE:
+               if (fr_event_timer_in(c, el, &c->write_ev, fr_time_delta_from_usec(0),
+                                     sql_trunk_connection_write_poll, tconn) < 0) {
+                       ERROR("Unable to insert polling event");
+               }
+               return;
+       }
+}
+
+SQL_QUERY_FAIL
+SQL_QUERY_RESUME
+
+static unlang_action_t sql_select_query_resume(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
 {
        fr_sql_query_t          *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
-       SQLINTEGER i;
-       SQLLEN len;
-       int colcount;
+       rlm_sql_unixodbc_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_unixodbc_conn_t);
+       SQLINTEGER              i;
+       SQLLEN                  len;
+       SQLRETURN               ret = SQL_STILL_EXECUTING;
 
-       /* Only state = 0 means success */
-       if ((sql_query(p_result, NULL, request, query_ctx) == UNLANG_ACTION_CALCULATE_RESULT) &&
-           (query_ctx->rcode != RLM_SQL_OK)) RETURN_MODULE_FAIL;
+       if (query_ctx->rcode != RLM_SQL_OK) RETURN_MODULE_FAIL;
 
-       colcount = sql_num_fields(query_ctx->handle, &query_ctx->inst->config);
-       if (colcount < 0) {
+       while (ret == SQL_STILL_EXECUTING) {
+               ret = SQLNumResultCols(conn->stmt, &conn->colcount);
+       }
+       if (sql_check_error(ret, SQL_HANDLE_STMT, conn->stmt)) {
                query_ctx->rcode = RLM_SQL_ERROR;
                RETURN_MODULE_FAIL;
        }
 
        /* Reserving memory for result */
-       conn->row = talloc_zero_array(conn, char *, colcount + 1); /* Space for pointers */
+       conn->row = talloc_zero_array(conn, char *, conn->colcount + 1); /* Space for pointers */
 
-       for (i = 1; i <= colcount; i++) {
+       for (i = 1; i <= conn->colcount; i++) {
                len = 0;
-               SQLColAttributes(conn->stmt, ((SQLUSMALLINT) i), SQL_DESC_LENGTH, NULL, 0, NULL, &len);
+               /* SQLColAttribute can in theory run Async */
+               while (SQLColAttribute(conn->stmt, (SQLUSMALLINT) i, SQL_DESC_LENGTH, NULL, 0, NULL, &len) == SQL_STILL_EXECUTING);
                conn->row[i - 1] = talloc_array(conn->row, char, ++len);
                SQLBindCol(conn->stmt, i, SQL_C_CHAR, (SQLCHAR *)conn->row[i - 1], len, NULL);
        }
@@ -164,40 +516,30 @@ static unlang_action_t sql_select_query(rlm_rcode_t *p_result, UNUSED int *prior
        RETURN_MODULE_OK;
 }
 
-static int sql_num_fields(rlm_sql_handle_t *handle, rlm_sql_config_t const *config)
-{
-       rlm_sql_unixodbc_conn_t *conn = handle->conn;
-       long err_handle;
-       SQLSMALLINT num_fields = 0;
-
-       err_handle = SQLNumResultCols(conn->stmt,&num_fields);
-       if (sql_check_error(err_handle, handle)) return -1;
-
-       return num_fields;
-}
-
 static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
-
-       SQLSMALLINT     fields, len, i;
+       rlm_sql_unixodbc_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_unixodbc_conn_t);
+       SQLSMALLINT             len, i;
+       SQLRETURN               ret;
+       char const              **names;
+       char                    field[128];
 
-       char const      **names;
-       char            field[128];
+       if (conn->colcount == 0) return RLM_SQL_ERROR;
 
-       SQLNumResultCols(conn->stmt, &fields);
-       if (fields == 0) return RLM_SQL_ERROR;
+       MEM(names = talloc_array(query_ctx, char const *, conn->colcount));
 
-       MEM(names = talloc_array(query_ctx, char const *, fields));
-
-       for (i = 0; i < fields; i++) {
+       for (i = 0; i < conn->colcount; i++) {
                char *p;
-
-               switch (SQLColAttribute(conn->stmt, i, SQL_DESC_BASE_COLUMN_NAME,
-                                       field, sizeof(field), &len, NULL)) {
+               ret = SQL_STILL_EXECUTING;
+               while (ret == SQL_STILL_EXECUTING) {
+                       ret = SQLColAttribute(conn->stmt, i + 1, SQL_DESC_NAME,
+                                             field, sizeof(field), &len, NULL);
+               }
+               switch (ret) {
                case SQL_INVALID_HANDLE:
                case SQL_ERROR:
                        ERROR("Failed retrieving field name at index %i", i);
+                       sql_check_error(ret, SQL_HANDLE_STMT, conn->stmt);
                        talloc_free(names);
                        return RLM_SQL_ERROR;
 
@@ -217,19 +559,20 @@ static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNU
 static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
 {
        fr_sql_query_t          *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
-       rlm_sql_handle_t        *handle = query_ctx->handle;
-       rlm_sql_unixodbc_conn_t *conn = handle->conn;
-       long                    err_handle;
+       rlm_sql_unixodbc_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_unixodbc_conn_t);
+       SQLRETURN               ret = SQL_STILL_EXECUTING;
 
        query_ctx->row = NULL;
 
-       err_handle = SQLFetch(conn->stmt);
-       if (err_handle == SQL_NO_DATA_FOUND) {
+       while (ret == SQL_STILL_EXECUTING) {
+               ret = SQLFetch(conn->stmt);
+       }
+       if (ret == SQL_NO_DATA_FOUND) {
                query_ctx->rcode = RLM_SQL_NO_MORE_ROWS;
                RETURN_MODULE_OK;
        }
 
-       query_ctx->rcode = sql_check_error(err_handle, handle);
+       query_ctx->rcode = sql_check_error(ret, SQL_HANDLE_STMT, conn->stmt);
        if (query_ctx->rcode != RLM_SQL_OK) RETURN_MODULE_FAIL;
 
        query_ctx->row = conn->row;
@@ -238,11 +581,36 @@ static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority
        RETURN_MODULE_OK;
 }
 
-static sql_rcode_t sql_finish_select_query(fr_sql_query_t *query_ctx, rlm_sql_config_t const *config)
+static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
+       rlm_sql_unixodbc_conn_t *conn = query_ctx->tconn->conn->h;
 
-       sql_free_result(query_ctx, config);
+       TALLOC_FREE(conn->row);
+       conn->colcount = 0;
+
+       return RLM_SQL_OK;
+}
+
+static sql_rcode_t sql_finish_query(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
+{
+       rlm_sql_unixodbc_conn_t *conn;
+
+       /*
+        *      If the query is not in a state which would return results, then do nothing.
+        */
+       if (query_ctx->treq && !(query_ctx->treq->state &
+           (TRUNK_REQUEST_STATE_SENT | TRUNK_REQUEST_STATE_REAPABLE | TRUNK_REQUEST_STATE_COMPLETE))) return RLM_SQL_OK;
+
+       /*
+        *      If the connection doesn't exist there's nothing to do
+        */
+       if (!query_ctx->tconn || !query_ctx->tconn->conn || !query_ctx->tconn->conn->h) return RLM_SQL_ERROR;
+
+       conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_unixodbc_conn_t);
+
+       TALLOC_FREE(conn->row);
+       conn->colcount = 0;
+       conn->query_ctx = NULL;
 
        /*
         *      SQL_CLOSE - The cursor (if any) associated with the statement
@@ -258,25 +626,7 @@ static sql_rcode_t sql_finish_select_query(fr_sql_query_t *query_ctx, rlm_sql_co
         */
        SQLFreeStmt(conn->stmt, SQL_CLOSE);
 
-       return 0;
-}
-
-static sql_rcode_t sql_finish_query(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
-{
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
-
-       SQLFreeStmt(conn->stmt, SQL_CLOSE);
-
-       return 0;
-}
-
-static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
-{
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
-
-       TALLOC_FREE(conn->row);
-
-       return 0;
+       return RLM_SQL_OK;
 }
 
 /** Retrieves any errors associated with the query context
@@ -293,78 +643,44 @@ static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_con
 static size_t sql_error(TALLOC_CTX *ctx, sql_log_entry_t out[], NDEBUG_UNUSED size_t outlen,
                        fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_unixodbc_conn_t         *conn = query_ctx->handle->conn;
-       SQLCHAR                         state[256];
-       SQLCHAR                         errbuff[256];
-       SQLINTEGER                      errnum = 0;
-       SQLSMALLINT                     length = 255;
+       rlm_sql_unixodbc_conn_t *conn = query_ctx->tconn->conn->h;
+       SQLCHAR                 state[256];
+       SQLCHAR                 errbuff[256];
+       SQLINTEGER              errnum = 0;
+       SQLSMALLINT             length = 255;
+       size_t                  i = 0;
 
-       fr_assert(outlen > 0);
+       fr_assert(outlen > 2);
 
+       /*
+        *      Depending on which handles exist at the time of calling there
+        *      may be 1, 2 or 3 handles to check errors on.
+        */
        errbuff[0] = state[0] = '\0';
-       SQLError(conn->env, conn->dbc, conn->stmt, state, &errnum,
-                errbuff, sizeof(errbuff), &length);
-       if (errnum == 0) return 0;
-
-       out[0].type = L_ERR;
-       out[0].msg = talloc_typed_asprintf(ctx, "%s: %s", state, errbuff);
-
-       return 1;
-}
-
-/** Checks the error code to determine if the connection needs to be re-esttablished
- *
- * @param error_handle Return code from a failed unixodbc call.
- * @param handle rlm_sql connection handle.
- * @return
- *     - #RLM_SQL_OK on success.
- *     - #RLM_SQL_RECONNECT if reconnect is needed.
- *     - #RLM_SQL_ERROR on error.
- */
-static sql_rcode_t sql_check_error(long error_handle, rlm_sql_handle_t *handle)
-{
-       SQLCHAR state[256];
-       SQLCHAR error[256];
-       SQLINTEGER errornum = 0;
-       SQLSMALLINT length = 255;
-       int res = -1;
-
-       rlm_sql_unixodbc_conn_t *conn = handle->conn;
-
-       if (SQL_SUCCEEDED(error_handle)) return 0; /* on success, just return 0 */
-
-       error[0] = state[0] = '\0';
-
-       SQLError(conn->env, conn->dbc, conn->stmt, state, &errornum,
-                error, sizeof(error), &length);
-
-       if (state[0] == '0') {
-               switch (state[1]) {
-               /* SQLSTATE 01 class contains info and warning messages */
-               case '1':
-                       INFO("%s %s", state, error);
-                       FALL_THROUGH;
-               case '0':               /* SQLSTATE 00 class means success */
-                       res = RLM_SQL_OK;
-                       break;
+       SQLGetDiagRec(SQL_HANDLE_ENV, conn->env, 1, state, &errnum, errbuff, sizeof(errbuff), &length);
+       if (errnum != 0) {
+               out[i].type = L_ERR;
+               out[i].msg = talloc_typed_asprintf(ctx, "%s: %s", state, errbuff);
+               i++;
+       }
+       if (conn->dbc == SQL_NULL_HANDLE) return i;
 
-               /* SQLSTATE 08 class describes various connection errors */
-               case '8':
-                       ERROR("SQL down %s %s", state, error);
-                       res = RLM_SQL_RECONNECT;
-                       break;
+       SQLGetDiagRec(SQL_HANDLE_DBC, conn->dbc, 1, state, &errnum, errbuff, sizeof(errbuff), &length);
+       if (errnum != 0) {
+               out[i].type = L_ERR;
+               out[i].msg = talloc_typed_asprintf(ctx, "%s: %s", state, errbuff);
+               i++;
+       }
+       if (conn->stmt == SQL_NULL_HANDLE) return i;
 
-               /* any other SQLSTATE means error */
-               default:
-                       ERROR("%s %s", state, error);
-                       res = RLM_SQL_ERROR;
-                       break;
-               }
-       } else {
-               ERROR("%s %s", state, error);
+       SQLGetDiagRec(SQL_HANDLE_STMT, conn->stmt, 1, state, &errnum, errbuff, sizeof(errbuff), &length);
+       if (errnum != 0) {
+               out[i].type = L_ERR;
+               out[i].msg = talloc_typed_asprintf(ctx, "%s: %s", state, errbuff);
+               i++;
        }
 
-       return res;
+       return i;
 }
 
 /*************************************************************************
@@ -375,14 +691,14 @@ static sql_rcode_t sql_check_error(long error_handle, rlm_sql_handle_t *handle)
  *            or insert)
  *
  *************************************************************************/
-static int sql_affected_rows(fr_sql_query_t *query_ctx, rlm_sql_config_t const *config)
+static int sql_affected_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
 {
-       rlm_sql_unixodbc_conn_t *conn = query_ctx->handle->conn;
-       long error_handle;
-       SQLLEN affected_rows;
+       rlm_sql_unixodbc_conn_t *conn = query_ctx->tconn->conn->h;
+       SQLRETURN               ret;
+       SQLLEN                  affected_rows;
 
-       error_handle = SQLRowCount(conn->stmt, &affected_rows);
-       if (sql_check_error(error_handle, query_ctx->handle)) return -1;
+       ret = SQLRowCount(conn->stmt, &affected_rows);
+       if (sql_check_error(ret, SQL_HANDLE_STMT, conn->stmt)) return -1;
 
        return affected_rows;
 }
@@ -395,14 +711,23 @@ rlm_sql_driver_t rlm_sql_unixodbc = {
                .magic                          = MODULE_MAGIC_INIT,
                .name                           = "sql_unixodbc"
        },
-       .sql_socket_init                = sql_socket_init,
-       .sql_query                      = sql_query,
-       .sql_select_query               = sql_select_query,
+       .flags                          = RLM_SQL_RCODE_FLAGS_ALT_QUERY,
+       .sql_query_resume               = sql_query_resume,
+       .sql_select_query_resume        = sql_select_query_resume,
        .sql_affected_rows              = sql_affected_rows,
        .sql_fields                     = sql_fields,
        .sql_fetch_row                  = sql_fetch_row,
        .sql_free_result                = sql_free_result,
        .sql_error                      = sql_error,
        .sql_finish_query               = sql_finish_query,
-       .sql_finish_select_query        = sql_finish_select_query
+       .sql_finish_select_query        = sql_finish_query,
+       .uses_trunks                    = true,
+       .trunk_io_funcs = {
+               .connection_alloc       = sql_trunk_connection_alloc,
+               .connection_notify      = sql_trunk_connection_notify,
+               .request_mux            = sql_trunk_request_mux,
+               .request_cancel_mux     = sql_request_cancel_mux,
+               .request_cancel         = sql_request_cancel,
+               .request_fail           = sql_request_fail,
+       }
 };
index e2bb5474ce51e9c2d9f425f664fe0f9694426f57..3e64e239fc7f33050fba583eacc3e87fdd85bf17 100644 (file)
@@ -130,7 +130,8 @@ typedef enum {
        SQL_QUERY_SUBMITTED,                                    //!< Submitted for execution.
        SQL_QUERY_RETURNED,                                     //!< Query has executed.
        SQL_QUERY_FETCHING_RESULTS,                             //!< Fetching results from server.
-       SQL_QUERY_RESULTS_FETCHED                               //!< Results fetched from the server.
+       SQL_QUERY_RESULTS_FETCHED,                              //!< Results fetched from the server.
+       SQL_QUERY_CANCELLED                                     //!< A cancellation has been sent to the server.
 } fr_sql_query_status_t;
 
 typedef struct {