]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-sql: Use generic sql connection pooling code for mysql/pgsql.
authorTimo Sirainen <tss@iki.fi>
Tue, 4 May 2010 14:55:23 +0000 (17:55 +0300)
committerTimo Sirainen <tss@iki.fi>
Tue, 4 May 2010 14:55:23 +0000 (17:55 +0300)
It's possible to give multiple host settings to do load balancing / HA.
If one host is down, another one is tried. All queries are automatically
retried in another host if they fail in first one.

Since PostgreSQL support async queries, Dovecot can create multiple
connections to the database as needed, so it can do lookups in parallel. The
number of connections can be changed with maxconns=n in connect_query, the
default is 5.

--HG--
branch : HEAD

doc/example-config/dovecot-sql.conf.ext
src/lib-sql/Makefile.am
src/lib-sql/driver-mysql.c
src/lib-sql/driver-pgsql.c
src/lib-sql/driver-sqlite.c
src/lib-sql/driver-sqlpool.c [new file with mode: 0644]
src/lib-sql/sql-api-private.h
src/lib-sql/sql-api.c
src/lib-sql/sql-api.h

index 8e40c1dfafe0e881390adb1598c58df5e38c369c..ebe3b7c236f2a15cc6f0d404c4b487de16fb6dc6 100644 (file)
 
 # Database connection string. This is driver-specific setting.
 #
+# HA / round-robin load-balancing is supported by giving multiple host
+# settings, like: host=sql1.host.org host=sql2.host.org
+#
 # pgsql:
 #   For available options, see the PostgreSQL documention for the
 #   PQconnectdb function of libpq.
+#   Use maxconns=n (default 5) to change how many connections Dovecot can
+#   create to pgsql.
 #
 # mysql:
 #   Basic options emulate PostgreSQL option names:
@@ -50,8 +55,6 @@
 #   You can connect to UNIX sockets by using host: host=/var/run/mysql.sock
 #   Note that currently you can't use spaces in parameters.
 #
-#   MySQL supports multiple host parameters for load balancing / HA.
-#
 # sqlite:
 #   The path to the database file.
 #
index 1742ca91a52b2cfbb9b41023b25b40993dfc33b8..13fb83a344179391003c66bd38965a3d06c0ce4e 100644 (file)
@@ -38,6 +38,7 @@ if ! SQL_PLUGINS
 driver_sources = \
        driver-mysql.c \
        driver-pgsql.c \
+       driver-sqlpool.c \
        driver-sqlite.c
 endif
 
index 3fcbcae9a39b9f33a7c17e3c02610130744a6707..56dd9bcc526e9d8758ba3f4dac35e0b935aceb5a 100644 (file)
 #include <mysql.h>
 #include <errmsg.h>
 
-/* Abort connect() if it can't connect within this time. */
-#define MYSQL_CONNECT_FAILURE_TIMEOUT 10
-
-/* Minimum delay between reconnecting to same server */
-#define CONNECT_MIN_DELAY 1
-/* Maximum time to avoiding reconnecting to same server */
-#define CONNECT_MAX_DELAY (60*30)
-/* If no servers are connected but a query is requested, try reconnecting to
-   next server which has been disconnected longer than this (with a single
-   server setup this is really the "max delay" and the CONNECT_MAX_DELAY
-   is never used). */
-#define CONNECT_RESET_DELAY 15
-
 struct mysql_db {
        struct sql_db api;
 
        pool_t pool;
-       const char *user, *password, *dbname, *unix_socket;
+       const char *user, *password, *dbname, *host, *unix_socket;
        const char *ssl_cert, *ssl_key, *ssl_ca, *ssl_ca_path, *ssl_cipher;
        const char *option_file, *option_group;
        unsigned int port, client_flags;
 
-       ARRAY_DEFINE(connections, struct mysql_connection);
-       unsigned int next_query_connection;
-};
-
-struct mysql_connection {
-       struct mysql_db *db;
-
        MYSQL *mysql;
-       const char *host;
-
-       unsigned int connect_delay;
-       unsigned int connect_failure_count;
+       unsigned int next_query_connection;
 
-       time_t last_connect;
-       unsigned int connected:1;
        unsigned int ssl_set:1;
 };
 
 struct mysql_result {
        struct sql_result api;
-       struct mysql_connection *conn;
 
        MYSQL_RES *result;
         MYSQL_ROW row;
@@ -67,137 +41,83 @@ struct mysql_transaction_context {
        struct sql_transaction_context ctx;
 
        pool_t query_pool;
-       struct mysql_query_list *head, *tail;
-
        const char *error;
 
        unsigned int failed:1;
 };
 
-struct mysql_query_list {
-       struct mysql_query_list *next;
-       const char *query;
-       unsigned int *affected_rows;
-};
-
 extern const struct sql_db driver_mysql_db;
 extern const struct sql_result driver_mysql_result;
 extern const struct sql_result driver_mysql_error_result;
 
-static bool driver_mysql_connect(struct mysql_connection *conn)
+static int driver_mysql_connect(struct sql_db *_db)
 {
-       struct mysql_db *db = conn->db;
+       struct mysql_db *db = (struct mysql_db *)_db;
        const char *unix_socket, *host;
        unsigned long client_flags = db->client_flags;
-       time_t now;
        bool failed;
 
-       if (conn->connected)
-               return TRUE;
-
-       /* don't try reconnecting more than once a second */
-       now = time(NULL);
-       if (conn->last_connect + (time_t)conn->connect_delay > now)
-               return FALSE;
-       conn->last_connect = now;
+       i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
 
-       if (*conn->host == '/') {
-               unix_socket = conn->host;
+       if (*db->host == '/') {
+               unix_socket = db->host;
                host = NULL;
        } else {
                unix_socket = NULL;
-               host = conn->host;
+               host = db->host;
        }
 
        if (db->option_file != NULL) {
-               mysql_options(conn->mysql, MYSQL_READ_DEFAULT_FILE,
+               mysql_options(db->mysql, MYSQL_READ_DEFAULT_FILE,
                              db->option_file);
        }
 
-       mysql_options(conn->mysql, MYSQL_READ_DEFAULT_GROUP,
+       mysql_options(db->mysql, MYSQL_READ_DEFAULT_GROUP,
                      db->option_group != NULL ? db->option_group : "client");
 
-       if (!conn->ssl_set && (db->ssl_ca != NULL || db->ssl_ca_path != NULL)) {
+       if (!db->ssl_set && (db->ssl_ca != NULL || db->ssl_ca_path != NULL)) {
 #ifdef HAVE_MYSQL_SSL
-               mysql_ssl_set(conn->mysql, db->ssl_key, db->ssl_cert,
+               mysql_ssl_set(db->mysql, db->ssl_key, db->ssl_cert,
                              db->ssl_ca, db->ssl_ca_path
 #ifdef HAVE_MYSQL_SSL_CIPHER
                              , db->ssl_cipher
 #endif
                             );
-               conn->ssl_set = TRUE;
+               db->ssl_set = TRUE;
 #else
                i_fatal("mysql: SSL support not compiled in "
                        "(remove ssl_ca and ssl_ca_path settings)");
 #endif
        }
 
-       alarm(MYSQL_CONNECT_FAILURE_TIMEOUT);
+       alarm(SQL_CONNECT_TIMEOUT_SECS);
 #ifdef CLIENT_MULTI_RESULTS
        client_flags |= CLIENT_MULTI_RESULTS;
 #endif
        /* CLIENT_MULTI_RESULTS allows the use of stored procedures */
-       failed = mysql_real_connect(conn->mysql, host, db->user, db->password,
+       failed = mysql_real_connect(db->mysql, host, db->user, db->password,
                                    db->dbname, db->port, unix_socket,
                                    client_flags) == NULL;
        alarm(0);
        if (failed) {
-               if (conn->connect_failure_count > 0) {
-                       /* increase delay between reconnections to this
-                          server */
-                       conn->connect_delay *= 5;
-                       if (conn->connect_delay > CONNECT_MAX_DELAY)
-                               conn->connect_delay = CONNECT_MAX_DELAY;
-               }
-               conn->connect_failure_count++;
-
+               sql_db_set_state(&db->api, SQL_DB_STATE_DISCONNECTED);
                i_error("mysql: Connect failed to %s (%s): %s - "
                        "waiting for %u seconds before retry",
                        host != NULL ? host : unix_socket, db->dbname,
-                       mysql_error(conn->mysql), conn->connect_delay);
-               return FALSE;
+                       mysql_error(db->mysql), db->api.connect_delay);
+               return -1;
        } else {
                i_info("mysql: Connected to %s%s (%s)",
                       host != NULL ? host : unix_socket,
-                      conn->ssl_set ? " using SSL" : "", db->dbname);
-
-               conn->connect_failure_count = 0;
-               conn->connect_delay = CONNECT_MIN_DELAY;
-               conn->connected = TRUE;
-               return TRUE;
-       }
-}
-
-static int driver_mysql_connect_all(struct sql_db *_db)
-{
-       struct mysql_db *db = (struct mysql_db *)_db;
-       struct mysql_connection *conn;
-       int ret = -1;
+                      db->ssl_set ? " using SSL" : "", db->dbname);
 
-       array_foreach_modifiable(&db->connections, conn) {
-               if (driver_mysql_connect(conn))
-                       ret = 1;
+               sql_db_set_state(&db->api, SQL_DB_STATE_IDLE);
+               return 1;
        }
-       return ret;
 }
 
-static void driver_mysql_connection_add(struct mysql_db *db, const char *host)
+static void driver_mysql_disconnect(struct sql_db *_db ATTR_UNUSED)
 {
-       struct mysql_connection *conn;
-
-       conn = array_append_space(&db->connections);
-       conn->db = db;
-       conn->host = p_strdup(db->pool, host);
-       conn->mysql = mysql_init(NULL);
-       if (conn->mysql == NULL)
-               i_fatal("mysql_init() failed");
-
-       conn->connect_delay = CONNECT_MIN_DELAY;
-}
-
-static void driver_mysql_connection_free(struct mysql_connection *conn)
-{
-       mysql_close(conn->mysql);
 }
 
 static void driver_mysql_parse_connect_string(struct mysql_db *db,
@@ -221,7 +141,7 @@ static void driver_mysql_parse_connect_string(struct mysql_db *db,
                field = NULL;
                if (strcmp(name, "host") == 0 ||
                    strcmp(name, "hostaddr") == 0)
-                       driver_mysql_connection_add(db, value);
+                       field = &db->host;
                else if (strcmp(name, "user") == 0)
                        field = &db->user;
                else if (strcmp(name, "password") == 0)
@@ -253,8 +173,12 @@ static void driver_mysql_parse_connect_string(struct mysql_db *db,
                        *field = p_strdup(db->pool, value);
        }
 
-       if (array_count(&db->connections) == 0)
+       if (db->host == NULL)
                i_fatal("mysql: No hosts given in connect string");
+
+       db->mysql = mysql_init(NULL);
+       if (db->mysql == NULL)
+               i_fatal("mysql_init() failed");
 }
 
 static struct sql_db *driver_mysql_init_v(const char *connect_string)
@@ -266,7 +190,6 @@ static struct sql_db *driver_mysql_init_v(const char *connect_string)
        db = p_new(pool, struct mysql_db, 1);
        db->pool = pool;
        db->api = driver_mysql_db;
-       p_array_init(&db->connections, db->pool, 6);
 
        T_BEGIN {
                driver_mysql_parse_connect_string(db, connect_string);
@@ -277,88 +200,26 @@ static struct sql_db *driver_mysql_init_v(const char *connect_string)
 static void driver_mysql_deinit_v(struct sql_db *_db)
 {
        struct mysql_db *db = (struct mysql_db *)_db;
-       struct mysql_connection *conn;
-
-       array_foreach_modifiable(&db->connections, conn)
-               (void)driver_mysql_connection_free(conn);
 
+       mysql_close(db->mysql);
        array_free(&_db->module_contexts);
        pool_unref(&db->pool);
 }
 
-static enum sql_db_flags
-driver_mysql_get_flags(struct sql_db *db ATTR_UNUSED)
-{
-       return SQL_DB_FLAG_BLOCKING;
-}
-
-static int driver_mysql_connection_do_query(struct mysql_connection *conn,
-                                           const char *query)
+static int driver_mysql_do_query(struct mysql_db *db, const char *query)
 {
-       int i;
-
-       for (i = 0; i < 2; i++) {
-               if (!driver_mysql_connect(conn))
-                       return 0;
-
-               if (mysql_query(conn->mysql, query) == 0)
-                       return 1;
-
-               /* failed */
-               switch (mysql_errno(conn->mysql)) {
-               case CR_SERVER_GONE_ERROR:
-               case CR_SERVER_LOST:
-                       /* connection lost - try immediate reconnect */
-                       conn->connected = FALSE;
-                       break;
-               default:
-                       return -1;
-               }
-       }
-
-       /* connected -> lost it -> connected -> lost again */
-       return 0;
-}
-
-static int driver_mysql_do_query(struct mysql_db *db, const char *query,
-                                struct mysql_connection **conn_r)
-{
-       struct mysql_connection *conn;
-       unsigned int i, start, count;
-       bool reset;
-       int ret;
-
-       conn = array_get_modifiable(&db->connections, &count);
-
-       /* go through the connections in round robin. if the connection
-          isn't available, try next one that is. */
-       start = db->next_query_connection % count;
-       db->next_query_connection++;
-
-       for (reset = FALSE;; reset = TRUE) {
-               i = start;
-               do {
-                       ret = driver_mysql_connection_do_query(&conn[i], query);
-                       if (ret != 0) {
-                               /* success / failure */
-                               *conn_r = &conn[i];
-                               return ret;
-                       }
-
-                       /* not connected, try next one */
-                       i = (i + 1) % count;
-               } while (i != start);
-
-               if (reset)
-                       break;
+       if (mysql_query(db->mysql, query) == 0)
+               return 1;
 
-               /* none are connected. connect_delays may have gotten too high,
-                  reset all of them to see if some are still alive. */
-               for (i = 0; i < count; i++)
-                       conn[i].connect_delay = CONNECT_RESET_DELAY;
+       /* failed */
+       switch (mysql_errno(db->mysql)) {
+       case CR_SERVER_GONE_ERROR:
+       case CR_SERVER_LOST:
+               sql_db_set_state(&db->api, SQL_DB_STATE_DISCONNECTED);
+               break;
+       default:
+               return -1;
        }
-
-       *conn_r = NULL;
        return 0;
 }
 
@@ -366,40 +227,29 @@ static const char *
 driver_mysql_escape_string(struct sql_db *_db, const char *string)
 {
        struct mysql_db *db = (struct mysql_db *)_db;
-       struct mysql_connection *conn;
-       unsigned int i, count;
        size_t len = strlen(string);
        char *to;
 
-       /* All the connections should be identical, so just use the first
-          connected one */
-       conn = array_get_modifiable(&db->connections, &count);
-       for (i = 0; i < count; i++) {
-               if (conn[i].connected)
-                       break;
+       if (_db->state == SQL_DB_STATE_DISCONNECTED) {
+               /* try connecting */
+               (void)sql_connect(&db->api);
        }
-       if (i == count) {
-               /* so, try connecting.. */
-               for (i = 0; i < count; i++) {
-                       if (driver_mysql_connect(&conn[i]))
-                               break;
-               }
-               if (i == count) {
-                       /* FIXME: we don't have a valid connection, so fallback
-                          to using default escaping. the next query will most
-                          likely fail anyway so it shouldn't matter that much
-                          what we return here.. Anyway, this API needs
-                          changing so that the escaping function could already
-                          fail the query reliably. */
-                       to = t_buffer_get(len * 2 + 1);
-                       len = mysql_escape_string(to, string, len);
-                       t_buffer_alloc(len + 1);
-                       return to;
-               }
+
+       if (db->mysql == NULL) {
+               /* FIXME: we don't have a valid connection, so fallback
+                  to using default escaping. the next query will most
+                  likely fail anyway so it shouldn't matter that much
+                  what we return here.. Anyway, this API needs
+                  changing so that the escaping function could already
+                  fail the query reliably. */
+               to = t_buffer_get(len * 2 + 1);
+               len = mysql_escape_string(to, string, len);
+               t_buffer_alloc(len + 1);
+               return to;
        }
 
        to = t_buffer_get(len * 2 + 1);
-       len = mysql_real_escape_string(conn[i].mysql, to, string, len);
+       len = mysql_real_escape_string(db->mysql, to, string, len);
        t_buffer_alloc(len + 1);
        return to;
 }
@@ -407,9 +257,8 @@ driver_mysql_escape_string(struct sql_db *_db, const char *string)
 static void driver_mysql_exec(struct sql_db *_db, const char *query)
 {
        struct mysql_db *db = (struct mysql_db *)_db;
-       struct mysql_connection *conn;
 
-       (void)driver_mysql_do_query(db, query, &conn);
+       (void)driver_mysql_do_query(db, query);
 }
 
 static void driver_mysql_query(struct sql_db *db, const char *query,
@@ -428,7 +277,6 @@ static struct sql_result *
 driver_mysql_query_s(struct sql_db *_db, const char *query)
 {
        struct mysql_db *db = (struct mysql_db *)_db;
-       struct mysql_connection *conn;
        struct mysql_result *result;
        int ret;
 
@@ -436,25 +284,25 @@ driver_mysql_query_s(struct sql_db *_db, const char *query)
        result->api = driver_mysql_result;
        result->api.db = _db;
 
-       switch (driver_mysql_do_query(db, query, &conn)) {
+       switch (driver_mysql_do_query(db, query)) {
        case 0:
                /* not connected */
                result->api = sql_not_connected_result;
                break;
        case 1:
                /* query ok */
-               result->result = mysql_store_result(conn->mysql);
+               result->result = mysql_store_result(db->mysql);
 #ifdef CLIENT_MULTI_RESULTS
                /* Because we've enabled CLIENT_MULTI_RESULTS, we need to read
                   (ignore) extra results - there should not be any.
                   ret is: -1 = done, >0 = error, 0 = more results. */
-               while ((ret = mysql_next_result(conn->mysql)) == 0) ;
+               while ((ret = mysql_next_result(db->mysql)) == 0) ;
 #else
                ret = -1;
 #endif
 
                if (ret < 0 &&
-                   (result->result != NULL || mysql_errno(conn->mysql) == 0))
+                   (result->result != NULL || mysql_errno(db->mysql) == 0))
                        break;
 
                /* failed */
@@ -468,7 +316,6 @@ driver_mysql_query_s(struct sql_db *_db, const char *query)
        }
 
        result->api.refcount = 1;
-       result->conn = conn;
        return &result->api;
 }
 
@@ -488,6 +335,7 @@ static void driver_mysql_result_free(struct sql_result *_result)
 static int driver_mysql_result_next_row(struct sql_result *_result)
 {
        struct mysql_result *result = (struct mysql_result *)_result;
+       struct mysql_db *db = (struct mysql_db *)_result->db;
 
        if (result->result == NULL) {
                /* no results */
@@ -498,7 +346,7 @@ static int driver_mysql_result_next_row(struct sql_result *_result)
        if (result->row != NULL)
                return 1;
 
-       return mysql_errno(result->conn->mysql) != 0 ? -1 : 0;
+       return mysql_errno(db->mysql) != 0 ? -1 : 0;
 }
 
 static void driver_mysql_result_fetch_fields(struct mysql_result *result)
@@ -583,9 +431,9 @@ driver_mysql_result_get_values(struct sql_result *_result)
 
 static const char *driver_mysql_result_get_error(struct sql_result *_result)
 {
-       struct mysql_result *result = (struct mysql_result *)_result;
+       struct mysql_db *db = (struct mysql_db *)_result->db;
 
-       return mysql_error(result->conn->mysql);
+       return mysql_error(db->mysql);
 }
 
 static struct sql_transaction_context *
@@ -626,12 +474,13 @@ static int transaction_send_query(struct mysql_transaction_context *ctx,
                ctx->error = sql_result_get_error(result);
                ctx->failed = TRUE;
                ret = -1;
-       } else if (ctx->head != NULL && ctx->head->affected_rows != NULL) {
-               struct mysql_result *my_result = (struct mysql_result *)result;
+       } else if (ctx->ctx.head != NULL &&
+                  ctx->ctx.head->affected_rows != NULL) {
+               struct mysql_db *db = (struct mysql_db *)result->db;
 
-               rows = mysql_affected_rows(my_result->conn->mysql);
+               rows = mysql_affected_rows(db->mysql);
                i_assert(rows != (my_ulonglong)-1);
-               *ctx->head->affected_rows = rows;
+               *ctx->ctx.head->affected_rows = rows;
        }
        sql_result_unref(result);
        return ret;
@@ -647,14 +496,14 @@ driver_mysql_transaction_commit_s(struct sql_transaction_context *_ctx,
 
        *error_r = NULL;
 
-       if (ctx->head != NULL) {
+       if (_ctx->head != NULL) {
                /* try to use a transaction in any case,
                   even if it doesn't work. */
                (void)transaction_send_query(ctx, "BEGIN");
-               while (ctx->head != NULL) {
-                       if (transaction_send_query(ctx, ctx->head->query) < 0)
+               while (_ctx->head != NULL) {
+                       if (transaction_send_query(ctx, _ctx->head->query) < 0)
                                break;
-                       ctx->head = ctx->head->next;
+                       _ctx->head = _ctx->head->next;
                }
                ret = transaction_send_query(ctx, "COMMIT");
                *error_r = ctx->error;
@@ -679,27 +528,20 @@ driver_mysql_update(struct sql_transaction_context *_ctx, const char *query,
 {
        struct mysql_transaction_context *ctx =
                (struct mysql_transaction_context *)_ctx;
-       struct mysql_query_list *list;
-
-       list = p_new(ctx->query_pool, struct mysql_query_list, 1);
-       list->query = p_strdup(ctx->query_pool, query);
-       list->affected_rows = affected_rows;
 
-       if (ctx->head == NULL)
-               ctx->head = list;
-       else
-               ctx->tail->next = list;
-       ctx->tail = list;
+       sql_transaction_add_query(&ctx->ctx, ctx->query_pool,
+                                 query, affected_rows);
 }
 
 const struct sql_db driver_mysql_db = {
-       "mysql",
+       .name = "mysql",
+       .flags = SQL_DB_FLAG_BLOCKING | SQL_DB_FLAG_POOLED,
 
        .v = {
                driver_mysql_init_v,
                driver_mysql_deinit_v,
-               driver_mysql_get_flags,
-               driver_mysql_connect_all,
+               driver_mysql_connect,
+               driver_mysql_disconnect,
                driver_mysql_escape_string,
                driver_mysql_exec,
                driver_mysql_query,
@@ -741,7 +583,8 @@ const struct sql_result driver_mysql_error_result = {
                driver_mysql_result_error_next_row,
                NULL, NULL, NULL, NULL, NULL, NULL, NULL,
                driver_mysql_result_get_error
-       }
+       },
+       .failed_try_retry = TRUE
 };
 
 void driver_mysql_init(void);
index ef25888fa20fbcd6191ead514a3095130fddb26b..bd8a090bd6ae815720a427379517d856c9a97831 100644 (file)
@@ -3,16 +3,12 @@
 #include "lib.h"
 #include "array.h"
 #include "ioloop.h"
-#include "ioloop-internal.h" /* kind of dirty, but it should be fine.. */
 #include "sql-api-private.h"
 
 #ifdef BUILD_PGSQL
 #include <stdlib.h>
-#include <time.h>
 #include <libpq-fe.h>
 
-#define QUERY_TIMEOUT_SECS 6
-
 struct pgsql_db {
        struct sql_db api;
 
@@ -21,20 +17,16 @@ struct pgsql_db {
        PGconn *pg;
 
        struct io *io;
+       struct timeout *to_connect;
        enum io_condition io_dir;
 
-       struct pgsql_queue *queue, **queue_tail;
-       struct timeout *queue_to;
-
+       struct pgsql_result *cur_result;
        struct ioloop *ioloop;
        struct sql_result *sync_result;
 
        char *error;
-       time_t last_connect;
-       unsigned int connecting:1;
-       unsigned int connected:1;
-       unsigned int querying:1;
-       unsigned int query_finished:1;
+
+       unsigned int fatal_error:1;
 };
 
 struct pgsql_binary_value {
@@ -45,8 +37,8 @@ struct pgsql_binary_value {
 struct pgsql_result {
        struct sql_result api;
        PGresult *pgres;
+       struct timeout *to;
 
-       char *query;
        unsigned int rownum, rows;
        unsigned int fields_count;
        const char **fields;
@@ -57,15 +49,7 @@ struct pgsql_result {
        sql_query_callback_t *callback;
        void *context;
 
-       unsigned int retry_query:1;
-};
-
-struct pgsql_queue {
-       struct pgsql_queue *next;
-
-       time_t created;
-       char *query;
-       struct pgsql_result *result;
+       unsigned int timeout:1;
 };
 
 struct pgsql_transaction_context {
@@ -76,7 +60,6 @@ struct pgsql_transaction_context {
        void *context;
 
        pool_t query_pool;
-       struct pgsql_query_list *head, *tail;
        const char *error;
 
        unsigned int begin_succeeded:1;
@@ -84,26 +67,15 @@ struct pgsql_transaction_context {
        unsigned int failed:1;
 };
 
-struct pgsql_query_list {
-       struct pgsql_query_list *next;
-       struct pgsql_transaction_context *ctx;
-
-       const char *query;
-       unsigned int *affected_rows;
-};
 extern const struct sql_db driver_pgsql_db;
 extern const struct sql_result driver_pgsql_result;
 
-static void
-driver_pgsql_query_full(struct sql_db *db, const char *query,
-                       sql_query_callback_t *callback, void *context,
-                       bool retry_query);
-static void queue_send_next(struct pgsql_db *db);
 static void result_finish(struct pgsql_result *result);
 
 static void driver_pgsql_close(struct pgsql_db *db)
 {
        db->io_dir = 0;
+       db->fatal_error = FALSE;
 
        PQfinish(db->pg);
        db->pg = NULL;
@@ -113,10 +85,15 @@ static void driver_pgsql_close(struct pgsql_db *db)
                   so use io_remove_closed(). */
                io_remove_closed(&db->io);
        }
+       if (db->to_connect != NULL)
+               timeout_remove(&db->to_connect);
+
+       sql_db_set_state(&db->api, SQL_DB_STATE_DISCONNECTED);
 
-       db->connecting = FALSE;
-       db->connected = FALSE;
-        db->querying = FALSE;
+       if (db->ioloop != NULL) {
+               /* running a sync query, stop it */
+               io_loop_stop(db->ioloop);
+       }
 }
 
 static const char *last_error(struct pgsql_db *db)
@@ -150,9 +127,6 @@ static void connect_callback(struct pgsql_db *db)
                io_dir = IO_WRITE;
                break;
        case PGRES_POLLING_OK:
-               i_info("pgsql: Connected to %s", PQdb(db->pg));
-               db->connecting = FALSE;
-               db->connected = TRUE;
                break;
        case PGRES_POLLING_FAILED:
                i_error("pgsql: Connect failed to %s: %s",
@@ -169,20 +143,33 @@ static void connect_callback(struct pgsql_db *db)
                db->io_dir = io_dir;
        }
 
-       if (db->connected && db->queue != NULL)
-               queue_send_next(db);
+       if (io_dir == 0) {
+               i_info("pgsql: Connected to %s", PQdb(db->pg));
+               if (db->to_connect != NULL)
+                       timeout_remove(&db->to_connect);
+               sql_db_set_state(&db->api, SQL_DB_STATE_IDLE);
+               if (db->ioloop != NULL) {
+                       /* driver_pgsql_sync_init() waiting for connection to
+                          finish */
+                       io_loop_stop(db->ioloop);
+               }
+       }
+}
+
+static void driver_pgsql_connect_timeout(struct pgsql_db *db)
+{
+       unsigned int secs = ioloop_time - db->api.last_connect_try;
+
+       i_error("pgsql: Connect failed to %s: Timeout after %u seconds",
+               PQdb(db->pg), secs);
+       driver_pgsql_close(db);
 }
 
 static int driver_pgsql_connect(struct sql_db *_db)
 {
        struct pgsql_db *db = (struct pgsql_db *)_db;
-       time_t now;
 
-       /* don't try reconnecting more than once a second */
-       now = time(NULL);
-       if (db->connecting || db->last_connect == now)
-               return db->connected ? 1 : (db->connecting ? 0 : -1);
-       db->last_connect = now;
+       i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
 
        db->pg = PQconnectStart(db->connect_string);
        if (db->pg == NULL)
@@ -193,28 +180,33 @@ static int driver_pgsql_connect(struct sql_db *_db)
                        PQdb(db->pg), last_error(db));
                driver_pgsql_close(db);
                return -1;
-       } else {
-               /* nonblocking connecting begins. */
-               if (PQsetnonblocking(db->pg, 1) < 0)
-                       i_error("pgsql: PQsetnonblocking() failed");
-               db->io = io_add(PQsocket(db->pg), IO_WRITE,
-                               connect_callback, db);
-               db->io_dir = IO_WRITE;
-               db->connecting = TRUE;
-               return 0;
        }
+
+       /* nonblocking connecting begins. */
+       if (PQsetnonblocking(db->pg, 1) < 0)
+               i_error("pgsql: PQsetnonblocking() failed");
+       db->to_connect = timeout_add(SQL_CONNECT_TIMEOUT_SECS * 1000,
+                                    driver_pgsql_connect_timeout, db);
+       db->io = io_add(PQsocket(db->pg), IO_WRITE, connect_callback, db);
+       db->io_dir = IO_WRITE;
+       sql_db_set_state(&db->api, SQL_DB_STATE_CONNECTING);
+       return 0;
+}
+
+static void driver_pgsql_disconnect(struct sql_db *_db)
+{
+       struct pgsql_db *db = (struct pgsql_db *)_db;
+
+       driver_pgsql_close(db);
 }
 
 static struct sql_db *driver_pgsql_init_v(const char *connect_string)
 {
        struct pgsql_db *db;
 
-       i_assert(connect_string != NULL);
-
        db = i_new(struct pgsql_db, 1);
        db->connect_string = i_strdup(connect_string);
        db->api = driver_pgsql_db;
-       db->queue_tail = &db->queue;
        return &db->api;
 }
 
@@ -222,18 +214,9 @@ static void driver_pgsql_deinit_v(struct sql_db *_db)
 {
        struct pgsql_db *db = (struct pgsql_db *)_db;
 
-       while (db->queue != NULL) {
-               struct pgsql_queue *next = db->queue->next;
-
-                result_finish(db->queue->result);
-               i_free(db->queue->query);
-               i_free(db->queue);
-
-               db->queue = next;
-       }
+       if (db->cur_result != NULL && db->cur_result->to != NULL)
+                result_finish(db->cur_result);
 
-       if (db->queue_to != NULL)
-               timeout_remove(&db->queue_to);
         driver_pgsql_close(db);
        i_free(db->error);
        i_free(db->connect_string);
@@ -241,10 +224,14 @@ static void driver_pgsql_deinit_v(struct sql_db *_db)
        i_free(db);
 }
 
-static enum sql_db_flags
-driver_pgsql_get_flags(struct sql_db *db ATTR_UNUSED)
+static void driver_pgsql_set_idle(struct pgsql_db *db)
 {
-       return 0;
+       i_assert(db->api.state == SQL_DB_STATE_BUSY);
+
+       if (db->fatal_error)
+               driver_pgsql_close(db);
+       else
+               sql_db_set_state(&db->api, SQL_DB_STATE_IDLE);
 }
 
 static void consume_results(struct pgsql_db *db)
@@ -261,31 +248,42 @@ static void consume_results(struct pgsql_db *db)
                PQclear(pgres);
        }
 
-       if (PQstatus(db->pg) == CONNECTION_BAD)
+       if (PQstatus(db->pg) == CONNECTION_BAD) {
                io_remove_closed(&db->io);
-       else
+               driver_pgsql_close(db);
+       } else {
                io_remove(&db->io);
-
-       db->querying = FALSE;
-       if (db->queue != NULL && db->connected)
-               queue_send_next(db);
+               driver_pgsql_set_idle(db);
+       }
 }
 
 static void driver_pgsql_result_free(struct sql_result *_result)
 {
        struct pgsql_db *db = (struct pgsql_db *)_result->db;
         struct pgsql_result *result = (struct pgsql_result *)_result;
+       bool success;
 
-       if (result->api.callback)
+       if (result->api.callback) {
+               /* we're coming here from a user's sql_result_free() that's
+                  being called from a callback. we'll do this later,
+                  so ignore. */
                return;
+       }
+
+       i_assert(db->cur_result == result);
+       i_assert(result->callback == NULL);
 
        if (_result == db->sync_result)
                db->sync_result = NULL;
+       db->cur_result = NULL;
 
+       success = result->pgres != NULL && !db->fatal_error;
        if (result->pgres != NULL) {
                PQclear(result->pgres);
                result->pgres = NULL;
+       }
 
+       if (success) {
                /* we'll have to read the rest of the results as well */
                i_assert(db->io == NULL);
                db->io = io_add(PQsocket(db->pg), IO_READ,
@@ -293,7 +291,7 @@ static void driver_pgsql_result_free(struct sql_result *_result)
                db->io_dir = IO_READ;
                consume_results(db);
        } else {
-               db->querying = FALSE;
+               driver_pgsql_set_idle(db);
        }
 
        if (array_is_created(&result->binary_values)) {
@@ -306,54 +304,38 @@ static void driver_pgsql_result_free(struct sql_result *_result)
 
        i_free(result->fields);
        i_free(result->values);
-       i_free(result->query);
        i_free(result);
-
-       if (db->queue != NULL && !db->querying && db->connected)
-               queue_send_next(db);
 }
 
 static void result_finish(struct pgsql_result *result)
 {
        struct pgsql_db *db = (struct pgsql_db *)result->api.db;
-       bool free_result = TRUE, retry = FALSE;
-       bool disconnected;
+       bool free_result = TRUE;
+
+       timeout_remove(&result->to);
 
        /* if connection to server was lost, we don't yet see that the
           connection is bad. we only see the fatal error, so assume it also
           means disconnection. */
-       disconnected = PQstatus(db->pg) == CONNECTION_BAD ||
-               PQresultStatus(result->pgres) == PGRES_FATAL_ERROR;
-       if (disconnected && result->retry_query) {
-               /* retry the query */
-               i_error("pgsql: Query failed, retrying: %s", last_error(db));
-               retry = TRUE;
-       } else if (result->callback != NULL) {
-               result->api.callback = TRUE;
-               T_BEGIN {
-                       result->callback(&result->api, result->context);
-               } T_END;
-               result->api.callback = FALSE;
-               free_result = db->sync_result != &result->api;
-               if (db->queue == NULL && db->ioloop != NULL)
-                       io_loop_stop(db->ioloop);
-       }                            
-
-       if (disconnected) {
-               /* disconnected */
-               if (result->pgres != NULL) {
-                       PQclear(result->pgres);
-                       result->pgres = NULL;
-               }
-               driver_pgsql_close(db);
+       if (PQstatus(db->pg) == CONNECTION_BAD || result->pgres == NULL ||
+           PQresultStatus(result->pgres) == PGRES_FATAL_ERROR)
+               db->fatal_error = TRUE;
+
+       if (db->fatal_error) {
+               result->api.failed = TRUE;
+               result->api.failed_try_retry = TRUE;
+       }
+       result->api.callback = TRUE;
+       T_BEGIN {
+               result->callback(&result->api, result->context);
+       } T_END;
+       result->api.callback = FALSE;
+       result->callback = NULL;
+
+       free_result = db->sync_result != &result->api;
+       if (db->ioloop != NULL)
+               io_loop_stop(db->ioloop);
 
-               if (retry) {
-                       /* retry the query */
-                       driver_pgsql_query_full(&db->api, result->query,
-                                               result->callback,
-                                               result->context, FALSE);
-               }
-       }
        if (free_result)
                sql_result_unref(&result->api);
 }
@@ -363,7 +345,6 @@ static void get_result(struct pgsql_result *result)
         struct pgsql_db *db = (struct pgsql_db *)result->api.db;
 
        if (!PQconsumeInput(db->pg)) {
-               db->connected = FALSE;
                result_finish(result);
                return;
        }
@@ -396,7 +377,6 @@ static void flush_callback(struct pgsql_result *result)
        io_remove(&db->io);
 
        if (ret < 0) {
-               db->connected = FALSE;
                result_finish(result);
        } else {
                /* all flushed */
@@ -404,29 +384,34 @@ static void flush_callback(struct pgsql_result *result)
        }
 }
 
-static void send_query(struct pgsql_result *result, const char *query)
+static void query_timeout(struct pgsql_result *result)
+{
+       i_error("pgsql: Query timed out, aborting");
+       result->timeout = TRUE;
+       result_finish(result);
+}
+
+static void do_query(struct pgsql_result *result, const char *query)
 {
         struct pgsql_db *db = (struct pgsql_db *)result->api.db;
        int ret;
 
+       i_assert(SQL_DB_IS_READY(&db->api));
+       i_assert(db->cur_result == NULL);
        i_assert(db->io == NULL);
-       i_assert(!db->querying);
-       i_assert(db->connected);
 
-       if (!PQsendQuery(db->pg, query)) {
-               db->connected = FALSE;
-               result_finish(result);
-               return;
-       }
+       db->cur_result = result;
+       result->to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
+                                query_timeout, result);
 
-       ret = PQflush(db->pg);
-       if (ret < 0) {
-               db->connected = FALSE;
+       if (!PQsendQuery(db->pg, query) ||
+           (ret = PQflush(db->pg)) < 0) {
+               /* failed to send query */
                result_finish(result);
                return;
        }
 
-       db->querying = TRUE;
+       sql_db_set_state(&db->api, SQL_DB_STATE_BUSY);
        if (ret > 0) {
                /* write blocks */
                db->io = io_add(PQsocket(db->pg), IO_WRITE,
@@ -437,125 +422,6 @@ static void send_query(struct pgsql_result *result, const char *query)
        }
 }
 
-static struct pgsql_queue *queue_unlink_first(struct pgsql_db *db)
-{
-       struct pgsql_queue *queue;
-
-       queue = db->queue;
-       db->queue = queue->next;
-
-       if (db->queue == NULL)
-               db->queue_tail = &db->queue;
-       return queue;
-}
-
-static void queue_send_next(struct pgsql_db *db)
-{
-       struct pgsql_queue *queue;
-
-       queue = queue_unlink_first(db);
-       send_query(queue->result, queue->query);
-
-       i_free(queue->query);
-       i_free(queue);
-}
-
-static void queue_abort_next(struct pgsql_db *db)
-{
-       struct pgsql_queue *queue;
-
-       queue = queue_unlink_first(db);
-
-       queue->result->callback(&sql_not_connected_result,
-                               queue->result->context);
-       i_free(queue->result);
-       i_free(queue->query);
-       i_free(queue);
-
-       if (db->queue == NULL && db->ioloop != NULL)
-               io_loop_stop(db->ioloop);
-}
-
-static void queue_drop_timed_out_queries(struct pgsql_db *db)
-{
-       while (db->queue != NULL &&
-              db->queue->created + QUERY_TIMEOUT_SECS < ioloop_time)
-               queue_abort_next(db);
-
-}
-
-static void queue_timeout(struct pgsql_db *db)
-{
-       if (db->querying)
-               return;
-
-       if (!db->connected) {
-               queue_drop_timed_out_queries(db);
-               driver_pgsql_connect(&db->api);
-               return;
-       }
-
-       if (db->queue != NULL)
-               queue_send_next(db);
-
-       if (db->queue == NULL)
-               timeout_remove(&db->queue_to);
-}
-
-static void
-driver_pgsql_queue_query(struct pgsql_result *result, const char *query)
-{
-        struct pgsql_db *db = (struct pgsql_db *)result->api.db;
-       struct pgsql_queue *queue;
-
-       queue = i_new(struct pgsql_queue, 1);
-       queue->created = time(NULL);
-       queue->query = i_strdup(query);
-       queue->result = result;
-
-       *db->queue_tail = queue;
-       db->queue_tail = &queue->next;
-
-       if (db->queue_to == NULL)
-               db->queue_to = timeout_add(5000, queue_timeout, db);
-}
-
-static void do_query(struct pgsql_result *result, const char *query)
-{
-        struct pgsql_db *db = (struct pgsql_db *)result->api.db;
-
-       i_assert(db->sync_result == NULL);
-
-       if (db->querying) {
-               /* only one query at a time */
-               driver_pgsql_queue_query(result, query);
-               return;
-       }
-
-       if (!db->connected) {
-               /* try connecting again */
-               driver_pgsql_connect(&db->api);
-               driver_pgsql_queue_query(result, query);
-               return;
-       }
-
-       if (db->queue == NULL)
-               send_query(result, query);
-       else {
-               /* there's already queries queued, send them first */
-               driver_pgsql_queue_query(result, query);
-               queue_send_next(db);
-       }
-}
-
-static void exec_callback(struct sql_result *_result,
-                         void *context ATTR_UNUSED)
-{
-       struct pgsql_db *db = (struct pgsql_db *)_result->db;
-
-       i_error("pgsql: sql_exec() failed: %s", last_error(db));
-}
-
 static const char *
 driver_pgsql_escape_string(struct sql_db *_db, const char *string)
 {
@@ -564,22 +430,32 @@ driver_pgsql_escape_string(struct sql_db *_db, const char *string)
        char *to;
 
 #ifdef HAVE_PQESCAPE_STRING_CONN
-       if (!db->connected) {
+       if (db->api.state == SQL_DB_STATE_DISCONNECTED) {
                /* try connecting again */
-               (void)driver_pgsql_connect(&db->api);
+               (void)sql_connect(&db->api);
        }
-       to = t_buffer_get(len * 2 + 1);
-       len = PQescapeStringConn(db->pg, to, string, len, NULL);
-#else
-       to = t_buffer_get(len * 2 + 1);
-       len = PQescapeString(to, string, len);
+       if (db->api.state != SQL_DB_STATE_DISCONNECTED) {
+               to = t_buffer_get(len * 2 + 1);
+               len = PQescapeStringConn(db->pg, to, string, len, NULL);
+       } else
 #endif
+       {
+               to = t_buffer_get(len * 2 + 1);
+               len = PQescapeString(to, string, len);
+       }
        t_buffer_alloc(len + 1);
        return to;
 }
 
-static void driver_pgsql_exec_full(struct sql_db *db, const char *query,
-                                  bool retry_query)
+static void exec_callback(struct sql_result *_result,
+                         void *context ATTR_UNUSED)
+{
+       struct pgsql_db *db = (struct pgsql_db *)_result->db;
+
+       i_error("pgsql: sql_exec() failed: %s", last_error(db));
+}
+
+static void driver_pgsql_exec(struct sql_db *db, const char *query)
 {
        struct pgsql_result *result;
 
@@ -588,22 +464,11 @@ static void driver_pgsql_exec_full(struct sql_db *db, const char *query,
        result->api.db = db;
        result->api.refcount = 1;
        result->callback = exec_callback;
-       if (retry_query) {
-               result->query = i_strdup(query);
-               result->retry_query = TRUE;
-       }
        do_query(result, query);
 }
 
-static void driver_pgsql_exec(struct sql_db *db, const char *query)
-{
-       driver_pgsql_exec_full(db, query, TRUE);
-}
-
-static void
-driver_pgsql_query_full(struct sql_db *db, const char *query,
-                       sql_query_callback_t *callback, void *context,
-                       bool retry_query)
+static void driver_pgsql_query(struct sql_db *db, const char *query,
+                              sql_query_callback_t *callback, void *context)
 {
        struct pgsql_result *result;
 
@@ -613,74 +478,65 @@ driver_pgsql_query_full(struct sql_db *db, const char *query,
        result->api.refcount = 1;
        result->callback = callback;
        result->context = context;
-       if (retry_query) {
-               result->query = i_strdup(query);
-               result->retry_query = TRUE;
-       }
        do_query(result, query);
 }
 
-static void driver_pgsql_query(struct sql_db *db, const char *query,
-                              sql_query_callback_t *callback, void *context)
-{
-       driver_pgsql_query_full(db, query, callback, context, TRUE);
-}
-
 static void pgsql_query_s_callback(struct sql_result *result, void *context)
 {
         struct pgsql_db *db = context;
 
-       db->query_finished = TRUE;
        db->sync_result = result;
 }
 
-static struct sql_result *
-driver_pgsql_query_s(struct sql_db *_db, const char *query)
+static void driver_pgsql_sync_init(struct pgsql_db *db)
 {
-        struct pgsql_db *db = (struct pgsql_db *)_db;
-       struct sql_result *result;
-       struct io old_io;
-
-       if (db->queue_to != NULL) {
-               /* we're creating a new ioloop, make sure the timeout gets
-                  added there. */
-               timeout_remove(&db->queue_to);
+       if (db->io == NULL) {
+               db->ioloop = io_loop_create();
+               return;
        }
 
-       if (db->io == NULL)
-               db->ioloop = io_loop_create();
-       else {
-               /* have to move our existing I/O handler to new I/O loop */
-               old_io = *db->io;
-               io_remove(&db->io);
+       i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
 
-               db->ioloop = io_loop_create();
+       /* have to move our existing I/O handler to new I/O loop */
+       io_remove(&db->io);
 
-               db->io = io_add(PQsocket(db->pg), old_io.condition,
-                               old_io.callback, old_io.context);
-       }
+       db->ioloop = io_loop_create();
+       db->io = io_add(PQsocket(db->pg), db->io_dir, connect_callback, db);
+       /* wait for connecting to finish */
+       io_loop_run(db->ioloop);
+}
 
-       db->query_finished = FALSE;
-       if (query != NULL)
-               driver_pgsql_query(_db, query, pgsql_query_s_callback, db);
+static void driver_pgsql_sync_deinit(struct pgsql_db *db)
+{
+       io_loop_destroy(&db->ioloop);
+}
 
-       if (!db->query_finished) {
-               if ((db->connected || db->connecting) && db->io != NULL)
-                       io_loop_run(db->ioloop);
-               else
-                       queue_abort_next(db);
+static struct sql_result *
+driver_pgsql_sync_query(struct pgsql_db *db, const char *query)
+{
+       struct sql_result *result;
 
-               if (db->io != NULL) {
-                       i_assert(db->sync_result == &sql_not_connected_result);
-                       io_remove(&db->io);
-               }
-               if (db->queue_to != NULL)
-                       timeout_remove(&db->queue_to);
-       } else {
-               i_assert(db->io == NULL);
-               i_assert(db->queue_to == NULL);
+       i_assert(db->sync_result == NULL);
+
+       switch (db->api.state) {
+       case SQL_DB_STATE_CONNECTING:
+       case SQL_DB_STATE_BUSY:
+               i_unreached();
+       case SQL_DB_STATE_DISCONNECTED:
+               sql_not_connected_result.refcount++;
+               return &sql_not_connected_result;
+       case SQL_DB_STATE_IDLE:
+               break;
+       }
+
+       driver_pgsql_query(&db->api, query, pgsql_query_s_callback, db);
+       if (db->sync_result == NULL)
+               io_loop_run(db->ioloop);
+
+       if (db->io != NULL) {
+               i_assert(db->fatal_error);
+               io_remove_closed(&db->io);
        }
-       io_loop_destroy(&db->ioloop);
 
        result = db->sync_result;
        if (result == &sql_not_connected_result) {
@@ -693,6 +549,18 @@ driver_pgsql_query_s(struct sql_db *_db, const char *query)
        return result;
 }
 
+static struct sql_result *
+driver_pgsql_query_s(struct sql_db *_db, const char *query)
+{
+       struct pgsql_db *db = (struct pgsql_db *)_db;
+       struct sql_result *result;
+
+       driver_pgsql_sync_init(db);
+       result = driver_pgsql_sync_query(db, query);
+       driver_pgsql_sync_deinit(db);
+       return result;
+}
+
 static int driver_pgsql_result_next_row(struct sql_result *_result)
 {
        struct pgsql_result *result = (struct pgsql_result *)_result;
@@ -712,8 +580,10 @@ static int driver_pgsql_result_next_row(struct sql_result *_result)
                        return 0;
        }
 
-       if (result->pgres == NULL)
+       if (result->pgres == NULL) {
+               _result->failed = TRUE;
                return -1;
+       }
 
        switch (PQresultStatus(result->pgres)) {
        case PGRES_COMMAND_OK:
@@ -725,10 +595,12 @@ static int driver_pgsql_result_next_row(struct sql_result *_result)
        case PGRES_EMPTY_QUERY:
        case PGRES_NONFATAL_ERROR:
                /* nonfatal error */
+               _result->failed = TRUE;
                return -1;
        default:
                /* treat as fatal error */
-               db->connected = FALSE;
+               _result->failed = TRUE;
+               db->fatal_error = TRUE;
                return -1;
        }
 }
@@ -862,7 +734,9 @@ static const char *driver_pgsql_result_get_error(struct sql_result *_result)
 
        i_free_and_null(db->error);
 
-       if (result->pgres == NULL) {
+       if (result->timeout) {
+               db->error = i_strdup("Query timed out");
+       } else if (result->pgres == NULL) {
                /* connection error */
                db->error = i_strdup(last_error(db));
        } else {
@@ -905,7 +779,7 @@ driver_pgsql_transaction_unref(struct pgsql_transaction_context *ctx)
 
 static void
 transaction_begin_callback(struct sql_result *result,
-                           struct pgsql_transaction_context *ctx)
+                          struct pgsql_transaction_context *ctx)
 {
        if (sql_result_next_row(result) < 0) {
                ctx->begin_failed = TRUE;
@@ -930,17 +804,18 @@ transaction_commit_callback(struct sql_result *result,
 
 static void
 transaction_update_callback(struct sql_result *result,
-                           struct pgsql_query_list *list)
+                           struct sql_transaction_query *query)
 {
-       struct pgsql_transaction_context *ctx = list->ctx;
+       struct pgsql_transaction_context *ctx =
+               (struct pgsql_transaction_context *)query->trans;
 
        if (sql_result_next_row(result) < 0) {
                ctx->failed = TRUE;
                ctx->error = sql_result_get_error(result);
-       } else if (list->affected_rows != NULL) {
+       } else if (query->affected_rows != NULL) {
                struct pgsql_result *pg_result = (struct pgsql_result *)result;
 
-               *list->affected_rows = atoi(PQcmdTuples(pg_result->pgres));
+               *query->affected_rows = atoi(PQcmdTuples(pg_result->pgres));
        }
        driver_pgsql_transaction_unref(ctx);
 }
@@ -955,78 +830,110 @@ driver_pgsql_transaction_commit(struct sql_transaction_context *_ctx,
        ctx->callback = callback;
        ctx->context = context;
 
-       if (ctx->failed || ctx->head == NULL) {
+       if (ctx->failed || _ctx->head == NULL) {
                callback(ctx->failed ? ctx->error : NULL, context);
                driver_pgsql_transaction_unref(ctx);
-       } else if (ctx->head->next == NULL) {
+       } else if (_ctx->head->next == NULL) {
                /* just a single query, send it */
-               sql_query(_ctx->db, ctx->head->query,
+               sql_query(_ctx->db, _ctx->head->query,
                          transaction_commit_callback, ctx);
        } else {
                /* multiple queries, use a transaction */
                ctx->refcount++;
                sql_query(_ctx->db, "BEGIN", transaction_begin_callback, ctx);
-               while (ctx->head != NULL) {
+               while (_ctx->head != NULL) {
                        ctx->refcount++;
-                       sql_query(_ctx->db, ctx->head->query,
-                                 transaction_update_callback, ctx->head);
-                       ctx->head = ctx->head->next;
+                       sql_query(_ctx->db, _ctx->head->query,
+                                 transaction_update_callback, _ctx->head);
+                       _ctx->head = _ctx->head->next;
                }
                sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx);
        }
 }
 
+static void
+commit_multi_fail(struct pgsql_transaction_context *ctx,
+                 struct sql_result *result, const char *query)
+{
+       ctx->failed = TRUE;
+       ctx->error = t_strdup_printf("%s (query: %s)",
+                                    sql_result_get_error(result), query);
+       sql_result_unref(result);
+}
+
+static struct sql_result *
+driver_pgsql_transaction_commit_multi(struct pgsql_transaction_context *ctx)
+{
+       struct pgsql_db *db = (struct pgsql_db *)ctx->ctx.db;
+       struct sql_result *result;
+       struct sql_transaction_query *query;
+
+       result = driver_pgsql_sync_query(db, "BEGIN");
+       if (sql_result_next_row(result) < 0) {
+               commit_multi_fail(ctx, result, "BEGIN");
+               return NULL;
+       }
+       sql_result_unref(result);
+
+       /* send queries */
+       for (query = ctx->ctx.head; query != NULL; query = query->next) {
+               result = driver_pgsql_sync_query(db, query->query);
+               if (sql_result_next_row(result) < 0) {
+                       commit_multi_fail(ctx, result, query->query);
+                       break;
+               }
+               if (query->affected_rows != NULL) {
+                       struct pgsql_result *pg_result =
+                               (struct pgsql_result *)result;
+
+                       *query->affected_rows =
+                               atoi(PQcmdTuples(pg_result->pgres));
+               }
+               sql_result_unref(result);
+       }
+
+       return driver_pgsql_sync_query(db, ctx->failed ?
+                                      "ROLLBACK" : "COMMIT");
+}
+
 static int
 driver_pgsql_transaction_commit_s(struct sql_transaction_context *_ctx,
                                  const char **error_r)
 {
        struct pgsql_transaction_context *ctx =
                (struct pgsql_transaction_context *)_ctx;
+       struct pgsql_db *db = (struct pgsql_db *)_ctx->db;
+       struct sql_transaction_query *single_query = NULL;
        struct sql_result *result;
 
        *error_r = NULL;
 
-       if (ctx->failed || ctx->head == NULL) {
+       if (ctx->failed || _ctx->head == NULL) {
                /* nothing to be done */
                result = NULL;
-       } else if (ctx->head->next == NULL) {
+       } else if (_ctx->head->next == NULL) {
                /* just a single query, send it */
-               result = sql_query_s(_ctx->db, ctx->head->query);
+               single_query = _ctx->head;
+               result = sql_query_s(_ctx->db, single_query->query);
        } else {
                /* multiple queries, use a transaction */
-               ctx->refcount++;
-               sql_query(_ctx->db, "BEGIN", transaction_begin_callback, ctx);
-               while (ctx->head != NULL) {
-                       ctx->refcount++;
-                       sql_query(_ctx->db, ctx->head->query,
-                                 transaction_update_callback, ctx->head);
-                       ctx->head = ctx->head->next;
-               }
-               if (ctx->refcount > 1) {
-                       /* flush the previous queries */
-                       (void)driver_pgsql_query_s(_ctx->db, NULL);
-               }
-
-               if (ctx->begin_failed) {
-                       result = NULL;
-               } else if (ctx->failed) {
-                       result = sql_query_s(_ctx->db, "ROLLBACK");
-               } else {
-                       result = sql_query_s(_ctx->db, "COMMIT");
-               }
+               driver_pgsql_sync_init(db);
+               result = driver_pgsql_transaction_commit_multi(ctx);
+               driver_pgsql_sync_deinit(db);
        }
 
-       if (ctx->failed)
+       if (ctx->failed) {
+               i_assert(ctx->error != NULL);
                *error_r = ctx->error;
-       else if (result != NULL) {
+       else if (result != NULL) {
                if (sql_result_next_row(result) < 0)
                        *error_r = sql_result_get_error(result);
-               else if (ctx->head != NULL &&
-                        ctx->head->affected_rows != NULL) {
+               else if (single_query != NULL &&
+                        single_query->affected_rows != NULL) {
                        struct pgsql_result *pg_result =
                                (struct pgsql_result *)result;
 
-                       *ctx->head->affected_rows =
+                       *single_query->affected_rows =
                                atoi(PQcmdTuples(pg_result->pgres));
                }
        }
@@ -1054,28 +961,19 @@ driver_pgsql_update(struct sql_transaction_context *_ctx, const char *query,
 {
        struct pgsql_transaction_context *ctx =
                (struct pgsql_transaction_context *)_ctx;
-       struct pgsql_query_list *list;
-
-       list = p_new(ctx->query_pool, struct pgsql_query_list, 1);
-       list->ctx = ctx;
-       list->query = p_strdup(ctx->query_pool, query);
-       list->affected_rows = affected_rows;
 
-       if (ctx->head == NULL)
-               ctx->head = list;
-       else
-               ctx->tail->next = list;
-       ctx->tail = list;
+       sql_transaction_add_query(_ctx, ctx->query_pool, query, affected_rows);
 }
 
 const struct sql_db driver_pgsql_db = {
-       "pgsql",
+       .name = "pgsql",
+       .flags = SQL_DB_FLAG_POOLED,
 
        .v = {
                driver_pgsql_init_v,
                driver_pgsql_deinit_v,
-               driver_pgsql_get_flags,
                driver_pgsql_connect,
+               driver_pgsql_disconnect,
                driver_pgsql_escape_string,
                driver_pgsql_exec,
                driver_pgsql_query,
index d631c15b97e23870c7863bd76b0c860193167982..67e087b0d24bc161f6e7f25cb6f20673ac62b067 100644 (file)
@@ -54,10 +54,19 @@ static int driver_sqlite_connect(struct sql_db *_db)
                i_error("sqlite: open(%s) failed: %s", db->dbfile,
                        sqlite3_errmsg(db->sqlite));
                sqlite3_close(db->sqlite);
+               db->sqlite = NULL;
                return -1;
        }
 }
 
+static void driver_sqlite_disconnect(struct sql_db *_db)
+{
+       struct sqlite_db *db = (struct sqlite_db *)_db;
+
+       sqlite3_close(db->sqlite);
+       db->sqlite = NULL;
+}
+
 static struct sql_db *driver_sqlite_init_v(const char *connect_string)
 {
        struct sqlite_db *db;
@@ -84,12 +93,6 @@ static void driver_sqlite_deinit_v(struct sql_db *_db)
        pool_unref(&db->pool);
 }
 
-static enum sql_db_flags
-driver_sqlite_get_flags(struct sql_db *db ATTR_UNUSED)
-{
-       return SQL_DB_FLAG_BLOCKING;
-}
-
 static const char *
 driver_sqlite_escape_string(struct sql_db *_db ATTR_UNUSED,
                            const char *string)
@@ -387,13 +390,14 @@ driver_sqlite_update(struct sql_transaction_context *_ctx, const char *query,
 }
 
 const struct sql_db driver_sqlite_db = {
-       "sqlite",
+       .name = "sqlite",
+       .flags = SQL_DB_FLAG_BLOCKING,
 
        .v = {
                driver_sqlite_init_v,
                driver_sqlite_deinit_v,
-               driver_sqlite_get_flags,
                driver_sqlite_connect,
+               driver_sqlite_disconnect,
                driver_sqlite_escape_string,
                driver_sqlite_exec,
                driver_sqlite_query,
diff --git a/src/lib-sql/driver-sqlpool.c b/src/lib-sql/driver-sqlpool.c
new file mode 100644 (file)
index 0000000..0e12320
--- /dev/null
@@ -0,0 +1,756 @@
+/* Copyright (c) 2010 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "llist.h"
+#include "ioloop.h"
+#include "sql-api-private.h"
+
+#include <time.h>
+
+#define QUERY_TIMEOUT_SECS 6
+
+struct sqlpool_host {
+       char *connect_string;
+
+       unsigned int connection_count;
+};
+
+struct sqlpool_connection {
+       struct sql_db *db;
+       unsigned int host_idx;
+};
+
+struct sqlpool_db {
+       struct sql_db api;
+
+       pool_t pool;
+       const struct sql_db *driver;
+       unsigned int connection_limit;
+
+       ARRAY_DEFINE(hosts, struct sqlpool_host);
+       /* all connections from all hosts */
+       ARRAY_DEFINE(all_connections, struct sqlpool_connection);
+       /* index of last connection in all_connections that was used to
+          send a query. */
+       unsigned int last_query_conn_idx;
+
+       /* queued requests */
+       struct sqlpool_request *requests_head, *requests_tail;
+       struct timeout *request_to;
+};
+
+struct sqlpool_request {
+       struct sqlpool_request *prev, *next;
+
+       struct sqlpool_db *db;
+       time_t created;
+
+       unsigned int host_idx;
+       unsigned int retried:1;
+
+       /* requests are a) queries */
+       char *query;
+       sql_query_callback_t *callback;
+       void *context;
+
+       /* b) transaction waiters */
+       struct sqlpool_transaction_context *trans;
+};
+
+struct sqlpool_transaction_context {
+       struct sql_transaction_context ctx;
+
+       sql_commit_callback_t *callback;
+       void *context;
+
+       struct sqlpool_request *commit_request;
+       struct sql_transaction_context *conn_trans;
+
+       pool_t query_pool;
+};
+
+extern struct sql_db driver_sqlpool_db;
+
+static void
+driver_sqlpool_query_callback(struct sql_result *result,
+                             struct sqlpool_request *request);
+static void
+driver_sqlpool_commit_callback(const char *error,
+                              struct sqlpool_transaction_context *ctx);
+
+static struct sqlpool_request *
+sqlpool_request_new(struct sqlpool_db *db, const char *query)
+{
+       struct sqlpool_request *request;
+
+       request = i_new(struct sqlpool_request, 1);
+       request->db = db;
+       request->created = time(NULL);
+       request->query = i_strdup(query);
+       return request;
+}
+
+static void
+sqlpool_request_free(struct sqlpool_request **_request)
+{
+       struct sqlpool_request *request = *_request;
+
+       *_request = NULL;
+
+       i_assert(request->prev == NULL && request->next == NULL);
+       i_free(request->query);
+       i_free(request);
+}
+
+static void
+sqlpool_request_abort(struct sqlpool_request **_request)
+{
+       struct sqlpool_request *request = *_request;
+
+       *_request = NULL;
+
+       if (request->callback != NULL)
+               request->callback(&sql_not_connected_result, request->context);
+
+       i_assert(request->prev != NULL ||
+                request->db->requests_head == request);
+       DLLIST2_REMOVE(&request->db->requests_head,
+                      &request->db->requests_tail, request);
+       sqlpool_request_free(&request);
+}
+
+static void
+driver_sqlpool_new_conn_trans(struct sqlpool_transaction_context *trans,
+                             struct sql_db *conndb)
+{
+       trans->conn_trans = sql_transaction_begin(conndb);
+       /* backend will use our queries list (we might still append more
+          queries to the list) */
+       trans->conn_trans->head = trans->ctx.head;
+       trans->conn_trans->tail = trans->ctx.tail;
+}
+
+static void
+sqlpool_request_handle_transaction(struct sql_db *conndb,
+                                  struct sqlpool_transaction_context *trans)
+{
+       i_assert(trans->conn_trans == NULL);
+
+       sqlpool_request_free(&trans->commit_request);
+       driver_sqlpool_new_conn_trans(trans, conndb);
+
+       if (trans->callback != NULL) {
+               /* commit() already called, finish it */
+               sql_transaction_commit(&trans->conn_trans,
+                                      driver_sqlpool_commit_callback, trans);
+       }
+}
+
+static void
+sqlpool_request_send_next(struct sqlpool_db *db, struct sql_db *conndb)
+{
+       struct sqlpool_request *request;
+
+       if (db->requests_head == NULL || !SQL_DB_IS_READY(conndb))
+               return;
+
+       request = db->requests_head;
+       DLLIST2_REMOVE(&db->requests_head, &db->requests_tail, request);
+       timeout_reset(db->request_to);
+
+       if (request->query != NULL) {
+               sql_query(conndb, request->query,
+                         driver_sqlpool_query_callback, request);
+       } else if (request->trans != NULL) {
+               sqlpool_request_handle_transaction(conndb, request->trans);
+       } else {
+               i_unreached();
+       }
+}
+
+static void sqlpool_reconnect(struct sql_db *conndb)
+{
+       timeout_remove(&conndb->to_reconnect);
+       (void)sql_connect(conndb);
+}
+
+static void
+sqlpool_state_changed(struct sql_db *conndb, enum sql_db_state prev_state,
+                     void *context)
+{
+       struct sqlpool_db *db = context;
+
+       if (conndb->state == SQL_DB_STATE_IDLE) {
+               conndb->connect_failure_count = 0;
+               conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
+               sqlpool_request_send_next(db, conndb);
+       }
+
+       if (prev_state == SQL_DB_STATE_CONNECTING &&
+           conndb->state == SQL_DB_STATE_DISCONNECTED) {
+               /* connect failed */
+               if (conndb->connect_failure_count > 0) {
+                       /* increase delay between reconnections to this
+                          server */
+                       conndb->connect_delay *= 5;
+                       if (conndb->connect_delay > SQL_CONNECT_MAX_DELAY)
+                               conndb->connect_delay = SQL_CONNECT_MAX_DELAY;
+               }
+               conndb->connect_failure_count++;
+
+               /* reconnect after the delay */
+               if (conndb->to_reconnect != NULL)
+                       timeout_remove(&conndb->to_reconnect);
+               conndb->to_reconnect = timeout_add(conndb->connect_delay * 1000,
+                                                  sqlpool_reconnect, conndb);
+       }
+}
+
+static struct sqlpool_host *
+sqlpool_find_host_with_least_connections(struct sqlpool_db *db,
+                                        unsigned int *host_idx_r)
+{
+       struct sqlpool_host *hosts, *min = NULL;
+       unsigned int i, count;
+
+       hosts = array_get_modifiable(&db->hosts, &count);
+       for (i = 0; i < count; i++) {
+               if (min == NULL ||
+                   min->connection_count > hosts[i].connection_count) {
+                       min = &hosts[i];
+                       *host_idx_r = i;
+               }
+       }
+       i_assert(min != NULL);
+       return min;
+}
+
+static struct sqlpool_connection *sqlpool_add_connection(struct sqlpool_db *db)
+{
+       struct sql_db *conndb;
+       struct sqlpool_host *host;
+       struct sqlpool_connection *conn;
+       unsigned int host_idx;
+
+       host = sqlpool_find_host_with_least_connections(db, &host_idx);
+       if (host->connection_count >= db->connection_limit)
+               return NULL;
+       host->connection_count++;
+
+       conndb = db->driver->v.init(host->connect_string);
+       i_array_init(&conndb->module_contexts, 5);
+
+       conndb->state_change_callback = sqlpool_state_changed;
+       conndb->state_change_context = db;
+       conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
+
+       conn = array_append_space(&db->all_connections);
+       conn->host_idx = host_idx;
+       conn->db = conndb;
+       return conn;
+}
+
+static const struct sqlpool_connection *
+sqlpool_find_available_connection(struct sqlpool_db *db,
+                                 unsigned int unwanted_host_idx,
+                                 bool *all_disconnected_r)
+{
+       const struct sqlpool_connection *conns;
+       unsigned int i, count;
+
+       *all_disconnected_r = TRUE;
+
+       conns = array_get(&db->all_connections, &count);
+       for (i = 0; i < count; i++) {
+               unsigned int idx = (i + db->last_query_conn_idx) % count;
+               struct sql_db *conndb = conns[idx].db;
+
+               if (conns[idx].host_idx == unwanted_host_idx)
+                       continue;
+
+               if (!SQL_DB_IS_READY(conndb)) {
+                       /* see if we could reconnect to it immediately */
+                       (void)sql_connect(conndb);
+               }
+               if (SQL_DB_IS_READY(conndb)) {
+                       db->last_query_conn_idx = idx;
+                       *all_disconnected_r = FALSE;
+                       return &conns[idx];
+               }
+               if (conndb->state != SQL_DB_STATE_DISCONNECTED)
+                       *all_disconnected_r = FALSE;
+       }
+       return NULL;
+}
+
+static bool
+driver_sqlpool_get_connection(struct sqlpool_db *db,
+                             unsigned int unwanted_host_idx,
+                             const struct sqlpool_connection **conn_r)
+{
+       const struct sqlpool_connection *conn, *conns;
+       unsigned int i, count;
+       bool all_disconnected;
+
+       conn = sqlpool_find_available_connection(db, unwanted_host_idx,
+                                                &all_disconnected);
+       if (conn == NULL && unwanted_host_idx != -1U) {
+               /* maybe there are no wanted hosts. use any of them. */
+               conn = sqlpool_find_available_connection(db, -1U,
+                                                        &all_disconnected);
+       }
+       if (conn == NULL && all_disconnected) {
+               /* no connected connections. connect_delays may have gotten too
+                  high, reset all of them to see if some are still alive. */
+               conns = array_get(&db->all_connections, &count);
+               for (i = 0; i < count; i++) {
+                       struct sql_db *conndb = conns[i].db;
+
+                       if (conndb->connect_delay > SQL_CONNECT_RESET_DELAY)
+                               conndb->connect_delay = SQL_CONNECT_RESET_DELAY;
+               }
+               conn = sqlpool_find_available_connection(db, -1U,
+                                                        &all_disconnected);
+       }
+       if (conn == NULL) {
+               /* still nothing. try creating new connections */
+               conn = sqlpool_add_connection(db);
+               if (conn == NULL || !SQL_DB_IS_READY(conn->db))
+                       return FALSE;
+       }
+       *conn_r = conn;
+       return TRUE;
+}
+
+static bool
+driver_sqlpool_get_sync_connection(struct sqlpool_db *db,
+                                  const struct sqlpool_connection **conn_r)
+{
+       const struct sqlpool_connection *conns;
+       unsigned int i, count;
+
+       if (driver_sqlpool_get_connection(db, -1U, conn_r))
+               return TRUE;
+
+       /* no idling connections, but maybe we can find one that's trying to
+          connect to server, and we can use it once it's finished */
+       conns = array_get(&db->all_connections, &count);
+       for (i = 0; i < count; i++) {
+               if (conns[i].db->state == SQL_DB_STATE_CONNECTING) {
+                       *conn_r = &conns[i];
+                       return TRUE;
+               }
+       }
+       return FALSE;
+}
+
+static void
+driver_sqlpool_parse_hosts(struct sqlpool_db *db, const char *connect_string)
+{
+       const char *const *args, *key, *value, *const *hostnamep;
+       struct sqlpool_host *host;
+       ARRAY_TYPE(const_string) hostnames, connect_args;
+
+       t_array_init(&hostnames, 8);
+       t_array_init(&connect_args, 32);
+
+       /* connect string is a space separated list. it may contain
+          backend-specific strings which we'll pass as-is. we'll only care
+          about our own settings, plus the host settings. */
+       args = t_strsplit_spaces(connect_string, " ");
+       for (; *args != NULL; args++) {
+               value = strchr(*args, '=');
+               if (value == NULL) {
+                       key = *args;
+                       value = "";
+               } else {
+                       key = t_strdup_until(*args, value);
+                       value++;
+               }
+
+               if (strcmp(key, "maxconns") == 0) {
+                       if (str_to_uint(value, &db->connection_limit) < 0) {
+                               i_fatal("Invalid value for maxconns: %s",
+                                       value);
+                       }
+               } else if (strcmp(key, "host") == 0) {
+                       array_append(&hostnames, &value, 1);
+               } else {
+                       array_append(&connect_args, args, 1);
+               }
+       }
+
+       /* build a new connect string without our settings or hosts */
+       (void)array_append_space(&connect_args);
+       connect_string = t_strarray_join(array_idx(&connect_args, 0), " ");
+
+       if (array_count(&hostnames) == 0) {
+               /* no hosts specified. create a default one. */
+               host = array_append_space(&db->hosts);
+               host->connect_string = i_strdup(connect_string);
+       } else {
+               if (*connect_string == '\0')
+                       connect_string = NULL;
+
+               array_foreach(&hostnames, hostnamep) {
+                       host = array_append_space(&db->hosts);
+                       host->connect_string =
+                               i_strconcat("host=", *hostnamep, " ",
+                                           connect_string, NULL);
+               }
+       }
+
+       if (db->connection_limit == 0)
+               db->connection_limit = SQL_DEFAULT_CONNECTION_LIMIT;
+}
+
+struct sql_db *
+driver_sqlpool_init(const char *connect_string, const struct sql_db *driver)
+{
+       struct sqlpool_db *db;
+
+       i_assert(connect_string != NULL);
+
+       db = i_new(struct sqlpool_db, 1);
+       db->driver = driver;
+       db->api = driver_sqlpool_db;
+       db->api.flags = driver->flags;
+       i_array_init(&db->hosts, 8);
+
+       T_BEGIN {
+               driver_sqlpool_parse_hosts(db, connect_string);
+       } T_END;
+
+       i_array_init(&db->all_connections, 16);
+       /* always have at least one backend connection initialized */
+       (void)sqlpool_add_connection(db);
+       return &db->api;
+}
+
+static void driver_sqlpool_deinit(struct sql_db *_db)
+{
+       struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       struct sqlpool_host *host;
+       struct sqlpool_connection *conn;
+
+       array_foreach_modifiable(&db->all_connections, conn)
+               sql_deinit(&conn->db);
+       array_clear(&db->all_connections);
+
+       while (db->requests_head != NULL) {
+               struct sqlpool_request *request = db->requests_head;
+
+               sqlpool_request_abort(&request);
+       }
+       if (db->request_to != NULL)
+               timeout_remove(&db->request_to);
+
+       array_foreach_modifiable(&db->hosts, host)
+               i_free(host->connect_string);
+
+       i_assert(array_count(&db->all_connections) == 0);
+       array_free(&db->hosts);
+       array_free(&db->all_connections);
+       array_free(&_db->module_contexts);
+       i_free(db);
+}
+
+static int driver_sqlpool_connect(struct sql_db *_db)
+{
+       struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       const struct sqlpool_connection *conn;
+       int ret = -1, ret2;
+
+       array_foreach(&db->all_connections, conn) {
+               ret2 = sql_connect(conn->db);
+               if (ret2 > 0)
+                       return 1;
+               if (ret2 == 0)
+                       ret = 0;
+       }
+       return ret;
+}
+
+static void driver_sqlpool_disconnect(struct sql_db *_db)
+{
+       struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       const struct sqlpool_connection *conn;
+
+       array_foreach(&db->all_connections, conn)
+               sql_disconnect(conn->db);
+}
+
+static const char *
+driver_sqlpool_escape_string(struct sql_db *_db, const char *string)
+{
+       struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       const struct sqlpool_connection *conn;
+
+       /* we always have at least one connection */
+       conn = array_idx(&db->all_connections, 0);
+       return sql_escape_string(conn->db, string);
+}
+
+static void driver_sqlpool_timeout(struct sqlpool_db *db)
+{
+       while (db->requests_head != NULL) {
+               struct sqlpool_request *request = db->requests_head;
+
+               if (request->created + SQL_QUERY_TIMEOUT_SECS > ioloop_time)
+                       break;
+
+               i_error("%s: Query timed out "
+                       "(no free connections for %u secs): %s",
+                       db->driver->name,
+                       (unsigned int)(ioloop_time - request->created),
+                       request->query != NULL ? request->query :
+                       "<transaction>");
+               sqlpool_request_abort(&request);
+       }
+
+       if (db->requests_head == NULL)
+               timeout_remove(&db->request_to);
+}
+
+static void
+driver_sqlpool_prepend_request(struct sqlpool_db *db,
+                              struct sqlpool_request *request)
+{
+       DLLIST2_PREPEND(&db->requests_head, &db->requests_tail, request);
+       if (db->request_to == NULL) {
+               db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
+                                            driver_sqlpool_timeout, db);
+       }
+}
+
+static void
+driver_sqlpool_append_request(struct sqlpool_db *db,
+                             struct sqlpool_request *request)
+{
+       DLLIST2_APPEND(&db->requests_head, &db->requests_tail, request);
+       if (db->request_to == NULL) {
+               db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
+                                            driver_sqlpool_timeout, db);
+       }
+}
+
+static void
+driver_sqlpool_query_callback(struct sql_result *result,
+                             struct sqlpool_request *request)
+{
+       struct sqlpool_db *db = request->db;
+       const struct sqlpool_connection *conn;
+       struct sql_db *conndb;
+
+       if (result->failed_try_retry && !request->retried) {
+               i_error("%s: Query failed, retrying: %s",
+                       db->driver->name, sql_result_get_error(result));
+               request->retried = TRUE;
+               driver_sqlpool_prepend_request(db, request);
+
+               if (driver_sqlpool_get_connection(request->db,
+                                                 request->host_idx, &conn)) {
+                       request->host_idx = conn->host_idx;
+                       sqlpool_request_send_next(db, conn->db);
+               }
+       } else {
+               if (result->failed) {
+                       i_error("%s: Query failed, aborting: %s",
+                               db->driver->name, request->query);
+               }
+               conndb = result->db;
+
+               if (request->callback != NULL)
+                       request->callback(result, request->context);
+               sqlpool_request_free(&request);
+
+               sqlpool_request_send_next(db, conndb);
+       }
+}
+
+static void driver_sqlpool_query(struct sql_db *_db, const char *query,
+                                sql_query_callback_t *callback, void *context)
+{
+        struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       struct sqlpool_request *request;
+       const struct sqlpool_connection *conn;
+
+       request = sqlpool_request_new(db, query);
+       request->callback = callback;
+       request->context = context;
+
+       if (!driver_sqlpool_get_connection(db, -1U, &conn))
+               driver_sqlpool_append_request(db, request);
+       else {
+               request->host_idx = conn->host_idx;
+               sql_query(conn->db, query, driver_sqlpool_query_callback,
+                         request);
+       }
+}
+
+static void driver_sqlpool_exec(struct sql_db *_db, const char *query)
+{
+       driver_sqlpool_query(_db, query, NULL, NULL);
+}
+
+static struct sql_result *
+driver_sqlpool_query_s(struct sql_db *_db, const char *query)
+{
+        struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       const struct sqlpool_connection *conn;
+       struct sql_result *result;
+
+       if (!driver_sqlpool_get_sync_connection(db, &conn)) {
+               sql_not_connected_result.refcount++;
+               return &sql_not_connected_result;
+       }
+
+       result = sql_query_s(conn->db, query);
+       if (result->failed_try_retry) {
+               if (!driver_sqlpool_get_sync_connection(db, &conn))
+                       return result;
+
+               sql_result_unref(result);
+               result = sql_query_s(conn->db, query);
+       }
+       return result;
+}
+
+static struct sql_transaction_context *
+driver_sqlpool_transaction_begin(struct sql_db *_db)
+{
+        struct sqlpool_db *db = (struct sqlpool_db *)_db;
+       struct sqlpool_transaction_context *ctx;
+       const struct sqlpool_connection *conn;
+
+       ctx = i_new(struct sqlpool_transaction_context, 1);
+       ctx->ctx.db = _db;
+
+       if (driver_sqlpool_get_connection(db, -1U, &conn))
+               ctx->conn_trans = sql_transaction_begin(conn->db);
+       else {
+               /* queue changes until we get a connection */
+               ctx->commit_request = sqlpool_request_new(db, NULL);
+               ctx->commit_request->trans = ctx;
+               ctx->query_pool = pool_alloconly_create("sqlpool transaction",
+                                                       1024);
+               driver_sqlpool_append_request(db, ctx->commit_request);
+       }
+       return &ctx->ctx;
+}
+
+static void
+driver_sqlpool_transaction_free(struct sqlpool_transaction_context *ctx)
+{
+       i_assert(ctx->conn_trans == NULL);
+
+       if (ctx->commit_request != NULL)
+               sqlpool_request_abort(&ctx->commit_request);
+       if (ctx->query_pool != NULL)
+               pool_unref(&ctx->query_pool);
+       i_free(ctx);
+}
+
+static void
+driver_sqlpool_commit_callback(const char *error,
+                              struct sqlpool_transaction_context *ctx)
+{
+       ctx->callback(error, ctx->context);
+       driver_sqlpool_transaction_free(ctx);
+}
+
+static void
+driver_sqlpool_transaction_commit(struct sql_transaction_context *_ctx,
+                                 sql_commit_callback_t *callback,
+                                 void *context)
+{
+       struct sqlpool_transaction_context *ctx =
+               (struct sqlpool_transaction_context *)_ctx;
+
+       ctx->callback = callback;
+       ctx->context = context;
+
+       if (ctx->conn_trans != NULL) {
+               sql_transaction_commit(&ctx->conn_trans,
+                                      driver_sqlpool_commit_callback, ctx);
+       }
+}
+
+static int
+driver_sqlpool_transaction_commit_s(struct sql_transaction_context *_ctx,
+                                   const char **error_r)
+{
+       struct sqlpool_transaction_context *ctx =
+               (struct sqlpool_transaction_context *)_ctx;
+        struct sqlpool_db *db = (struct sqlpool_db *)_ctx->db;
+       const struct sqlpool_connection *conn;
+       int ret;
+
+       *error_r = NULL;
+
+       if (ctx->conn_trans == NULL) {
+               if (driver_sqlpool_get_sync_connection(db, &conn))
+                       driver_sqlpool_new_conn_trans(ctx, conn->db);
+               else {
+                       *error_r = SQL_ERRSTR_NOT_CONNECTED;
+                       driver_sqlpool_transaction_free(ctx);
+                       return -1;
+               }
+               sqlpool_request_abort(&ctx->commit_request);
+       }
+
+       ret = sql_transaction_commit_s(&ctx->conn_trans, error_r);
+       driver_sqlpool_transaction_free(ctx);
+       return ret;
+}
+
+static void
+driver_sqlpool_transaction_rollback(struct sql_transaction_context *_ctx)
+{
+       struct sqlpool_transaction_context *ctx =
+               (struct sqlpool_transaction_context *)_ctx;
+
+       if (ctx->conn_trans != NULL)
+               sql_transaction_rollback(&ctx->conn_trans);
+       driver_sqlpool_transaction_free(ctx);
+}
+
+static void
+driver_sqlpool_update(struct sql_transaction_context *_ctx, const char *query,
+                     unsigned int *affected_rows)
+{
+       struct sqlpool_transaction_context *ctx =
+               (struct sqlpool_transaction_context *)_ctx;
+
+       if (ctx->conn_trans != NULL && ctx->ctx.head == NULL)
+               sql_update_get_rows(ctx->conn_trans, query, affected_rows);
+       else {
+               /* we didn't get a connection for transaction immediately.
+                  queue updates until commit transfers all of these */
+               sql_transaction_add_query(&ctx->ctx, ctx->query_pool,
+                                         query, affected_rows);
+       }
+}
+
+struct sql_db driver_sqlpool_db = {
+       "",
+
+       .v = {
+               NULL,
+               driver_sqlpool_deinit,
+               driver_sqlpool_connect,
+               driver_sqlpool_disconnect,
+               driver_sqlpool_escape_string,
+               driver_sqlpool_exec,
+               driver_sqlpool_query,
+               driver_sqlpool_query_s,
+
+               driver_sqlpool_transaction_begin,
+               driver_sqlpool_transaction_commit,
+               driver_sqlpool_transaction_commit_s,
+               driver_sqlpool_transaction_rollback,
+
+               driver_sqlpool_update
+       }
+};
index 3c5b92afb11884defbe7d24319eba3eb7585ab55..59db2b3112e94d72c68999cc55ea66ccd27f736f 100644 (file)
@@ -4,6 +4,37 @@
 #include "sql-api.h"
 #include "module-context.h"
 
+enum sql_db_state {
+       /* not connected to database */
+       SQL_DB_STATE_DISCONNECTED,
+       /* waiting for connection attempt to succeed or fail */
+       SQL_DB_STATE_CONNECTING,
+       /* connected, allowing more queries */
+       SQL_DB_STATE_IDLE,
+       /* connected, no more queries allowed */
+       SQL_DB_STATE_BUSY
+};
+
+/* Minimum delay between reconnecting to same server */
+#define SQL_CONNECT_MIN_DELAY 1
+/* Maximum time to avoiding reconnecting to same server */
+#define SQL_CONNECT_MAX_DELAY (60*30)
+/* If no servers are connected but a query is requested, try reconnecting to
+   next server which has been disconnected longer than this (with a single
+   server setup this is really the "max delay" and the SQL_CONNECT_MAX_DELAY
+   is never used). */
+#define SQL_CONNECT_RESET_DELAY 15
+/* Abort connect() if it can't connect within this time. */
+#define SQL_CONNECT_TIMEOUT_SECS 10
+/* Abort queries after this many seconds */
+#define SQL_QUERY_TIMEOUT_SECS 60
+/* Default max. number of connections to create per host */
+#define SQL_DEFAULT_CONNECTION_LIMIT 5
+
+#define SQL_DB_IS_READY(db) \
+       ((db)->state == SQL_DB_STATE_IDLE)
+#define SQL_ERRSTR_NOT_CONNECTED "Not connected to database"
+
 struct sql_db_module_register {
        unsigned int id;
 };
@@ -14,14 +45,22 @@ union sql_db_module_context {
 
 extern struct sql_db_module_register sql_db_module_register;
 
+struct sql_transaction_query {
+       struct sql_transaction_query *next;
+       struct sql_transaction_context *trans;
+
+       const char *query;
+       unsigned int *affected_rows;
+};
+
 struct sql_db_vfuncs {
        struct sql_db *(*init)(const char *connect_string);
        void (*deinit)(struct sql_db *db);
 
-       enum sql_db_flags (*get_flags)(struct sql_db *db);
-
        int (*connect)(struct sql_db *db);
+       void (*disconnect)(struct sql_db *db);
        const char *(*escape_string)(struct sql_db *db, const char *string);
+
        void (*exec)(struct sql_db *db, const char *query);
        void (*query)(struct sql_db *db, const char *query,
                      sql_query_callback_t *callback, void *context);
@@ -41,8 +80,23 @@ struct sql_db_vfuncs {
 
 struct sql_db {
        const char *name;
+       enum sql_db_flags flags;
+
        struct sql_db_vfuncs v;
        ARRAY_DEFINE(module_contexts, union sql_db_module_context *);
+
+       void (*state_change_callback)(struct sql_db *db,
+                                     enum sql_db_state prev_state,
+                                     void *context);
+       void *state_change_context;
+
+       enum sql_db_state state;
+       /* last time we started connecting to this server
+          (which may or may not have succeeded) */
+       time_t last_connect_try;
+       unsigned int connect_delay;
+       unsigned int connect_failure_count;
+       struct timeout *to_reconnect;
 };
 
 struct sql_result_vfuncs {
@@ -84,11 +138,16 @@ struct sql_result {
        void *fetch_dest;
        size_t fetch_dest_size;
 
+       unsigned int failed:1;
+       unsigned int failed_try_retry:1;
        unsigned int callback:1;
 };
 
 struct sql_transaction_context {
        struct sql_db *db;
+
+       /* commit() must use this query list if head is non-NULL. */
+       struct sql_transaction_query *head, *tail;
 };
 
 ARRAY_DEFINE_TYPE(sql_drivers, const struct sql_db *);
@@ -96,4 +155,12 @@ ARRAY_DEFINE_TYPE(sql_drivers, const struct sql_db *);
 extern ARRAY_TYPE(sql_drivers) sql_drivers;
 extern struct sql_result sql_not_connected_result;
 
+struct sql_db *
+driver_sqlpool_init(const char *connect_string, const struct sql_db *driver);
+
+void sql_db_set_state(struct sql_db *db, enum sql_db_state state);
+
+void sql_transaction_add_query(struct sql_transaction_context *ctx, pool_t pool,
+                              const char *query, unsigned int *affected_rows);
+
 #endif
index 33be4db20d3201af9513116f64c4d083dc173997..15bf5eca3e9a0dff396a8c304cfd26d645ca0927 100644 (file)
@@ -2,9 +2,11 @@
 
 #include "lib.h"
 #include "array.h"
+#include "ioloop.h"
 #include "sql-api-private.h"
 
 #include <stdlib.h>
+#include <time.h>
 
 struct sql_db_module_register sql_db_module_register = { 0 };
 ARRAY_TYPE(sql_drivers) sql_drivers;
@@ -38,23 +40,36 @@ void sql_driver_unregister(const struct sql_db *driver)
        }
 }
 
-struct sql_db *sql_init(const char *db_driver,
-                       const char *connect_string ATTR_UNUSED)
+static const struct sql_db *sql_find_driver(const char *name)
 {
        const struct sql_db *const *drivers;
        unsigned int i, count;
-       struct sql_db *db;
 
        drivers = array_get(&sql_drivers, &count);
        for (i = 0; i < count; i++) {
-               if (strcmp(db_driver, drivers[i]->name) == 0) {
-                       db = drivers[i]->v.init(connect_string);
-                       i_array_init(&db->module_contexts, 5);
-                       return db;
-               }
+               if (strcmp(drivers[i]->name, name) == 0)
+                       return drivers[i];
        }
+       return NULL;
+}
+
+struct sql_db *sql_init(const char *db_driver, const char *connect_string)
+{
+       const struct sql_db *driver;
+       struct sql_db *db;
+
+       i_assert(connect_string != NULL);
 
-       i_fatal("Unknown database driver '%s'", db_driver);
+       driver = sql_find_driver(db_driver);
+       if (driver == NULL)
+               i_fatal("Unknown database driver '%s'", db_driver);
+
+       if ((driver->flags & SQL_DB_FLAG_POOLED) == 0)
+               db = driver->v.init(connect_string);
+       else
+               db = driver_sqlpool_init(connect_string, driver);
+       i_array_init(&db->module_contexts, 5);
+       return db;
 }
 
 void sql_deinit(struct sql_db **_db)
@@ -62,19 +77,46 @@ void sql_deinit(struct sql_db **_db)
        struct sql_db *db = *_db;
 
        *_db = NULL;
+
+       if (db->to_reconnect != NULL)
+               timeout_remove(&db->to_reconnect);
        db->v.deinit(db);
 }
 
 enum sql_db_flags sql_get_flags(struct sql_db *db)
 {
-       return db->v.get_flags(db);
+       return db->flags;
 }
 
 int sql_connect(struct sql_db *db)
 {
+       time_t now;
+
+       switch (db->state) {
+       case SQL_DB_STATE_DISCONNECTED:
+               break;
+       case SQL_DB_STATE_CONNECTING:
+               return 0;
+       default:
+               return 1;
+       }
+
+       /* don't try reconnecting more than once a second */
+       now = time(NULL);
+       if (db->last_connect_try + (time_t)db->connect_delay > now)
+               return -1;
+       db->last_connect_try = now;
+
        return db->v.connect(db);
 }
 
+void sql_disconnect(struct sql_db *db)
+{
+       if (db->to_reconnect != NULL)
+               timeout_remove(&db->to_reconnect);
+       db->v.disconnect(db);
+}
+
 const char *sql_escape_string(struct sql_db *db, const char *string)
 {
        return db->v.escape_string(db, string);
@@ -286,7 +328,7 @@ sql_result_not_connected_next_row(struct sql_result *result ATTR_UNUSED)
 static const char *
 sql_result_not_connected_get_error(struct sql_result *result ATTR_UNUSED)
 {
-       return "Not connected to database";
+       return SQL_ERRSTR_NOT_CONNECTED;
 }
 
 struct sql_transaction_context *sql_transaction_begin(struct sql_db *db)
@@ -332,11 +374,43 @@ void sql_update_get_rows(struct sql_transaction_context *ctx, const char *query,
        ctx->db->v.update(ctx, query, affected_rows);
 }
 
+void sql_db_set_state(struct sql_db *db, enum sql_db_state state)
+{
+       enum sql_db_state old_state = db->state;
+
+       if (db->state == state)
+               return;
+
+       db->state = state;
+       if (db->state_change_callback != NULL) {
+               db->state_change_callback(db, old_state,
+                                         db->state_change_context);
+       }
+}
+
+void sql_transaction_add_query(struct sql_transaction_context *ctx, pool_t pool,
+                              const char *query, unsigned int *affected_rows)
+{
+       struct sql_transaction_query *tquery;
+
+       tquery = p_new(pool, struct sql_transaction_query, 1);
+       tquery->trans = ctx;
+       tquery->query = p_strdup(pool, query);
+       tquery->affected_rows = affected_rows;
+
+       if (ctx->head == NULL)
+               ctx->head = tquery;
+       else
+               ctx->tail->next = tquery;
+       ctx->tail = tquery;
+}
+
 struct sql_result sql_not_connected_result = {
        .v = {
                sql_result_not_connected_free,
                sql_result_not_connected_next_row,
                NULL, NULL, NULL, NULL, NULL, NULL, NULL,
                sql_result_not_connected_get_error
-       }
+       },
+       .failed_try_retry = TRUE
 };
index 9853ae1ca44cf9846c636844d7971f7a848082f5..bf8bedb61a4265668cfa3668ba75aecb2cd56757 100644 (file)
@@ -6,7 +6,9 @@
 
 enum sql_db_flags {
        /* Set if queries are not executed asynchronously */
-       SQL_DB_FLAG_BLOCKING            = 0x01
+       SQL_DB_FLAG_BLOCKING            = 0x01,
+       /* Set if database wants to use connection pooling */
+       SQL_DB_FLAG_POOLED              = 0x02
 };
 
 enum sql_field_type {
@@ -63,6 +65,8 @@ enum sql_db_flags sql_get_flags(struct sql_db *db);
    though. Returns -1 if we're not connected, 0 if we started connecting or
    1 if we are fully connected now. */
 int sql_connect(struct sql_db *db);
+/* Explicitly disconnect from database. */
+void sql_disconnect(struct sql_db *db);
 
 /* Escape the given string if needed and return it. */
 const char *sql_escape_string(struct sql_db *db, const char *string);