]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
Redesigned auth request queuing to auth worker processes.
authorTimo Sirainen <tss@iki.fi>
Tue, 16 Dec 2008 05:50:44 +0000 (07:50 +0200)
committerTimo Sirainen <tss@iki.fi>
Tue, 16 Dec 2008 05:50:44 +0000 (07:50 +0200)
Only a single request is now pending for a worker at a time. If a request
was queued for more than 3 seconds, log a warning (but no often than once in
5 minutes).

--HG--
branch : HEAD

src/auth/auth-worker-server.c

index b02004e2eb49f66019acd9a90d75b2531696f7c9..c04031685999447e02a40b73d83e6abee3dc89e6 100644 (file)
@@ -1,8 +1,9 @@
 /* Copyright (c) 2005-2008 Dovecot authors, see the included COPYING file */
 
 #include "common.h"
-#include "array.h"
 #include "ioloop.h"
+#include "array.h"
+#include "aqueue.h"
 #include "network.h"
 #include "istream.h"
 #include "ostream.h"
 #include <stdlib.h>
 #include <unistd.h>
 
-#define AUTH_WORKER_MAX_OUTBUF_SIZE 10240
 #define AUTH_WORKER_LOOKUP_TIMEOUT_SECS 60
 #define AUTH_WORKER_MAX_IDLE_SECS (60*30)
+#define AUTH_WORKER_DELAY_WARN_SECS 3
+#define AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS 300
 
 struct auth_worker_request {
        unsigned int id;
+       time_t created;
+       const char *data_str;
        struct auth_request *auth_request;
         auth_worker_callback_t *callback;
 };
@@ -32,9 +36,8 @@ struct auth_worker_connection {
        struct timeout *to;
 
        unsigned int id_counter;
-        ARRAY_DEFINE(requests, struct auth_worker_request);
+        struct auth_worker_request *request;
 
-       unsigned int request_count;
        unsigned int requests_left;
 };
 
@@ -43,22 +46,84 @@ static unsigned int idle_count;
 static unsigned int auth_workers_max;
 static unsigned int auth_workers_max_request_count;
 
+static ARRAY_DEFINE(worker_request_array, struct auth_worker_request *);
+static struct aqueue *worker_request_queue;
+static time_t auth_worker_last_warn;
+
 static char *worker_socket_path;
 
 static void worker_input(struct auth_worker_connection *conn);
-static void auth_worker_destroy(struct auth_worker_connection *conn,
-                               const char *reason);
+static void auth_worker_destroy(struct auth_worker_connection **conn,
+                               const char *reason, bool restart);
 
 static void auth_worker_idle_timeout(struct auth_worker_connection *conn)
 {
-       i_assert(conn->request_count == 0);
+       i_assert(conn->request == NULL);
 
        if (idle_count > 1)
-               auth_worker_destroy(conn, NULL);
+               auth_worker_destroy(&conn, NULL, FALSE);
        else
                timeout_reset(conn->to);
 }
 
+static void auth_worker_call_timeout(struct auth_worker_connection *conn)
+{
+       i_assert(conn->request != NULL);
+
+       auth_worker_destroy(&conn, "Lookup timed out", TRUE);
+}
+
+static void auth_worker_request_send(struct auth_worker_connection *conn,
+                                    struct auth_worker_request *request)
+{
+       struct const_iovec iov[3];
+
+       i_assert(conn->requests_left > 0);
+
+       if (ioloop_time - request->created > AUTH_WORKER_DELAY_WARN_SECS &&
+           ioloop_time - auth_worker_last_warn >
+           AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS) {
+               auth_worker_last_warn = ioloop_time;
+               i_warning("auth workers: Auth request was queued for %d "
+                         "seconds, %d left in queue",
+                         (int)(ioloop_time - request->created),
+                         aqueue_count(worker_request_queue));
+       }
+
+       request->id = ++conn->id_counter;
+
+       iov[0].iov_base = t_strdup_printf("%d\t", request->id);
+       iov[0].iov_len = strlen(iov[0].iov_base);
+       iov[1].iov_base = request->data_str;
+       iov[1].iov_len = strlen(request->data_str);
+       iov[2].iov_base = "\n";
+       iov[2].iov_len = 1;
+
+       o_stream_sendv(conn->output, iov, 3);
+
+       conn->request = request;
+       conn->requests_left--;
+
+       timeout_remove(&conn->to);
+       conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000,
+                              auth_worker_call_timeout, conn);
+       idle_count--;
+}
+
+static void auth_worker_request_send_next(struct auth_worker_connection *conn)
+{
+       struct auth_worker_request *request, *const *requestp;
+
+       if (aqueue_count(worker_request_queue) == 0)
+               return;
+
+       requestp = array_idx(&worker_request_array,
+                            aqueue_idx(worker_request_queue, 0));
+       request = *requestp;
+       aqueue_delete_tail(worker_request_queue);
+       auth_worker_request_send(conn, request);
+}
+
 static struct auth_worker_connection *auth_worker_create(void)
 {
        struct auth_worker_connection *conn;
@@ -81,14 +146,15 @@ static struct auth_worker_connection *auth_worker_create(void)
                                worker_socket_path);
                }
 
-               if (try == 5) {
-                       i_fatal("net_connect_unix(%s) "
-                               "failed after %d tries: %m",
-                               worker_socket_path, try);
+               if (try == 50) {
+                       i_error("net_connect_unix(%s) "
+                               "failed after %d secs: %m",
+                               worker_socket_path, try/10);
+                       return NULL;
                }
 
-               /* not created yet? try again */
-               sleep(1);
+               /* wait and try again */
+               usleep(100000);
        }
 
        conn = i_new(struct auth_worker_connection, 1);
@@ -97,7 +163,6 @@ static struct auth_worker_connection *auth_worker_create(void)
                                         FALSE);
        conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
        conn->io = io_add(fd, IO_READ, worker_input, conn);
-       i_array_init(&conn->requests, 16);
        conn->requests_left = auth_workers_max_request_count;
        conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
                               auth_worker_idle_timeout, conn);
@@ -108,13 +173,14 @@ static struct auth_worker_connection *auth_worker_create(void)
        return conn;
 }
 
-static void auth_worker_destroy(struct auth_worker_connection *conn,
-                               const char *reason)
+static void auth_worker_destroy(struct auth_worker_connection **_conn,
+                               const char *reason, bool restart)
 {
+       struct auth_worker_connection *conn = *_conn;
        struct auth_worker_connection **connp;
-       struct auth_worker_request *requests;
        unsigned int i, count;
-       const char *reply;
+
+       *_conn = NULL;
 
        connp = array_get_modifiable(&connections, &count);
        for (i = 0; i < count; i++) {
@@ -124,28 +190,19 @@ static void auth_worker_destroy(struct auth_worker_connection *conn,
                }
        }
 
-       if (conn->request_count == 0)
+       if (conn->request == NULL)
                idle_count--;
 
-       /* abort all pending requests */
-       reply = t_strdup_printf("FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE);
-
-       requests = array_get_modifiable(&conn->requests, &count);
-       for (i = 0; i < count; i++) {
-               if (requests[i].id != 0) {
-                       auth_request_log_error(requests[i].auth_request,
-                                              "worker-server",
-                                              "Aborted: %s", reason);
-                       T_BEGIN {
-                               requests[i].callback(requests[i].auth_request,
-                                                    reply);
-                       } T_END;
-                       auth_request_unref(&requests[i].auth_request);
-               }
-       }
+       if (conn->request != NULL) T_BEGIN {
+               struct auth_request *auth_request = conn->request->auth_request;
 
+               auth_request_log_error(auth_request, "worker-server",
+                                      "Aborted: %s", reason);
+               conn->request->callback(auth_request, t_strdup_printf(
+                               "FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE));
+               auth_request_unref(&conn->request->auth_request);
+       } T_END;
 
-       array_free(&conn->requests);
        io_remove(&conn->io);
        i_stream_destroy(&conn->input);
        o_stream_destroy(&conn->output);
@@ -154,78 +211,47 @@ static void auth_worker_destroy(struct auth_worker_connection *conn,
        if (close(conn->fd) < 0)
                i_error("close(auth worker) failed: %m");
        i_free(conn);
-}
 
-static struct auth_worker_request *
-auth_worker_request_lookup(struct auth_worker_connection *conn,
-                          unsigned int id)
-{
-       struct auth_worker_request *requests;
-       unsigned int i, count;
-
-       requests = array_get_modifiable(&conn->requests, &count);
-       for (i = 0; i < count; i++) {
-               if (requests[i].id == id)
-                       return &requests[i];
+       if (idle_count == 0 && restart) {
+               conn = auth_worker_create();
+               if (conn != NULL)
+                       auth_worker_request_send_next(conn);
        }
-       return NULL;
 }
 
 static struct auth_worker_connection *auth_worker_find_free(void)
 {
-       struct auth_worker_connection **conn, *best;
+       struct auth_worker_connection **conns;
        unsigned int i, count;
-       size_t outbuf_size, best_size;
-
-       conn = array_get_modifiable(&connections, &count);
-       if (idle_count > 0) {
-               /* there exists at least one idle connection, use it */
-               for (i = 0; i < count; i++) {
-                       if (conn[i]->request_count == 0)
-                               return conn[i];
-               }
-               i_unreached();
-       }
 
-       /* first the connection with least data in output buffer */
-       best = NULL;
-       best_size = (size_t)-1;
+       if (idle_count == 0)
+               return NULL;
+
+       conns = array_get_modifiable(&connections, &count);
        for (i = 0; i < count; i++) {
-               outbuf_size = o_stream_get_buffer_used_size(conn[i]->output);
-               if (outbuf_size < best_size && conn[i]->requests_left > 0) {
-                       best = conn[i];
-                       best_size = outbuf_size;
-               }
+               if (conns[i]->request == NULL)
+                       return conns[i];
        }
-
-       return best;
+       i_unreached();
+       return NULL;
 }
 
-static void auth_worker_handle_request(struct auth_worker_connection *conn,
+static void auth_worker_request_handle(struct auth_worker_connection *conn,
                                       struct auth_worker_request *request,
                                       const char *line)
 {
+       conn->request = NULL;
+       timeout_remove(&conn->to);
+       conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
+                              auth_worker_idle_timeout, conn);
+       idle_count++;
+
        request->callback(request->auth_request, line);
        auth_request_unref(&request->auth_request);
-
-       /* mark the record empty so it can be used for future requests */
-       memset(request, 0, sizeof(*request));
-
-       /* update counters */
-       conn->request_count--;
-       if (conn->request_count > 0)
-               timeout_reset(conn->to);
-       else {
-               timeout_remove(&conn->to);
-               conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
-                                      auth_worker_idle_timeout, conn);
-               idle_count++;
-       }
 }
 
 static void worker_input(struct auth_worker_connection *conn)
 {
-       struct auth_worker_request *request;
        const char *line, *id_str;
        unsigned int id;
 
@@ -234,13 +260,14 @@ static void worker_input(struct auth_worker_connection *conn)
                return;
        case -1:
                /* disconnected */
-               auth_worker_destroy(conn, "Worker process died unexpectedly");
+               auth_worker_destroy(&conn, "Worker process died unexpectedly",
+                                   TRUE);
                return;
        case -2:
                /* buffer full */
                i_error("BUG: Auth worker sent us more than %d bytes",
                        (int)AUTH_WORKER_MAX_LINE_LENGTH);
-               auth_worker_destroy(conn, "Worker is buggy");
+               auth_worker_destroy(&conn, "Worker is buggy", TRUE);
                return;
        }
 
@@ -250,37 +277,28 @@ static void worker_input(struct auth_worker_connection *conn)
                if (line == NULL)
                        continue;
 
-               T_BEGIN {
-                       id = (unsigned int)strtoul(t_strcut(id_str, '\t'),
-                                                  NULL, 10);
-                       request = auth_worker_request_lookup(conn, id);
-               } T_END;
-
-               if (request != NULL)
-                       auth_worker_handle_request(conn, request, line + 1);
-       }
-
-       if (conn->requests_left == 0 && conn->request_count == 0) {
-               auth_worker_destroy(conn, "Max requests limit");
-               if (idle_count == 0)
-                       auth_worker_create();
+               id = (unsigned int)strtoul(t_strcut(id_str, '\t'),
+                                          NULL, 10);
+               if (conn->request != NULL && id == conn->request->id) {
+                       auth_worker_request_handle(conn, conn->request,
+                                                  line + 1);
+               } else {
+                       if (conn->request != NULL) {
+                               i_error("BUG: Worker sent reply with id %u, "
+                                       "expected %u", id, conn->request->id);
+                       } else {
+                               i_error("BUG: Worker sent reply with id %u, "
+                                       "none was expected", id);
+                       }
+                       auth_worker_destroy(&conn, "Worker is buggy", TRUE);
+                       return;
+               }
        }
-}
-
-static struct auth_worker_request *
-auth_worker_request_get(struct auth_worker_connection *conn)
-{
-        struct auth_worker_request *request;
 
-       request = auth_worker_request_lookup(conn, 0);
-       return request != NULL ? request : array_append_space(&conn->requests);
-}
-
-static void auth_worker_call_timeout(struct auth_worker_connection *conn)
-{
-       i_assert(conn->request_count > 0);
-
-       auth_worker_destroy(conn, "Lookup timed out");
+       if (conn->requests_left == 0)
+               auth_worker_destroy(&conn, "Max requests limit", TRUE);
+       else
+               auth_worker_request_send_next(conn);
 }
 
 void auth_worker_call(struct auth_request *auth_request,
@@ -289,66 +307,32 @@ void auth_worker_call(struct auth_request *auth_request,
 {
        struct auth_worker_connection *conn;
        struct auth_worker_request *request;
-       const char *reply, *data_str;
-       struct const_iovec iov[3];
-
-       conn = auth_worker_find_free();
-       if (conn == NULL) {
-               /* no connections currently. can happen if all have been
-                  idle for last 10 minutes. create a new one. */
-               conn = auth_worker_create();
-               if (conn == NULL) {
-                       auth_request_log_error(auth_request, "worker-server",
-                               "Couldn't create new auth worker");
-                       reply = t_strdup_printf("FAIL\t%d",
-                                               PASSDB_RESULT_INTERNAL_FAILURE);
-                       callback(auth_request, reply);
-                       return;
-               }
-       }
 
-       i_assert(conn->requests_left > 0);
-
-       data_str = auth_stream_reply_export(data);
-       iov[0].iov_base = t_strdup_printf("%d\t", ++conn->id_counter);
-       iov[0].iov_len = strlen(iov[0].iov_base);
-       iov[1].iov_base = data_str;
-       iov[1].iov_len = strlen(data_str);
-       iov[2].iov_base = "\n";
-       iov[2].iov_len = 1;
-
-       if (o_stream_get_buffer_used_size(conn->output) +
-           iov[0].iov_len + iov[1].iov_len + 1 > AUTH_WORKER_MAX_OUTBUF_SIZE) {
-               auth_request_log_error(auth_request, "worker-server",
-                                      "All auth workers are busy");
-               reply = t_strdup_printf("FAIL\t%d",
-                                       PASSDB_RESULT_INTERNAL_FAILURE);
-               callback(auth_request, reply);
-               return;
-       }
-
-       /* find an empty request */
-       request = auth_worker_request_get(conn);
-       request->id = conn->id_counter;
+       request = p_new(auth_request->pool, struct auth_worker_request, 1);
+       request->created = ioloop_time;
+       request->data_str = p_strdup(auth_request->pool,
+                                    auth_stream_reply_export(data));
        request->auth_request = auth_request;
        request->callback = callback;
        auth_request_ref(auth_request);
 
-       o_stream_sendv(conn->output, iov, 3);
-
-       if (idle_count == 0) {
-               /* this request was queued, we need new workers */
-               auth_worker_create();
+       if (aqueue_count(worker_request_queue) > 0) {
+               /* requests are already being queued, no chance of
+                  finding/creating a worker */
+               conn = NULL;
+       } else {
+               conn = auth_worker_find_free();
+               if (conn == NULL) {
+                       /* no free connections, create a new one */
+                       conn = auth_worker_create();
+               }
        }
-
-       if (conn->request_count == 0) {
-               timeout_remove(&conn->to);
-               conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000,
-                                      auth_worker_call_timeout, conn);
-               idle_count--;
+       if (conn != NULL)
+               auth_worker_request_send(conn, request);
+       else {
+               /* reached the limit, queue the request */
+               aqueue_append(worker_request_queue, &request);
        }
-       conn->request_count++;
-       conn->requests_left--;
 }
 
 void auth_worker_server_init(void)
@@ -377,22 +361,28 @@ void auth_worker_server_init(void)
        if (auth_workers_max_request_count == 0)
                auth_workers_max_request_count = (unsigned int)-1;
 
+       i_array_init(&worker_request_array, 128);
+       worker_request_queue = aqueue_init(&worker_request_array.arr);
+
        i_array_init(&connections, 16);
-       auth_worker_create();
+       (void)auth_worker_create();
 }
 
 void auth_worker_server_deinit(void)
 {
-       struct auth_worker_connection **connp;
+       struct auth_worker_connection **connp, *conn;
 
        if (!array_is_created(&connections))
                return;
 
        while (array_count(&connections) > 0) {
                connp = array_idx_modifiable(&connections, 0);
-               auth_worker_destroy(*connp, "Shutting down");
+               conn = *connp;
+               auth_worker_destroy(&conn, "Shutting down", FALSE);
        }
        array_free(&connections);
 
+       aqueue_deinit(&worker_request_queue);
+       array_free(&worker_request_array);
        i_free(worker_socket_path);
 }