/* Copyright (c) 2005-2008 Dovecot authors, see the included COPYING file */
#include "common.h"
-#include "array.h"
#include "ioloop.h"
+#include "array.h"
+#include "aqueue.h"
#include "network.h"
#include "istream.h"
#include "ostream.h"
#include <stdlib.h>
#include <unistd.h>
-#define AUTH_WORKER_MAX_OUTBUF_SIZE 10240
#define AUTH_WORKER_LOOKUP_TIMEOUT_SECS 60
#define AUTH_WORKER_MAX_IDLE_SECS (60*30)
+#define AUTH_WORKER_DELAY_WARN_SECS 3
+#define AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS 300
struct auth_worker_request {
unsigned int id;
+ time_t created;
+ const char *data_str;
struct auth_request *auth_request;
auth_worker_callback_t *callback;
};
struct timeout *to;
unsigned int id_counter;
- ARRAY_DEFINE(requests, struct auth_worker_request);
+ struct auth_worker_request *request;
- unsigned int request_count;
unsigned int requests_left;
};
static unsigned int auth_workers_max;
static unsigned int auth_workers_max_request_count;
+static ARRAY_DEFINE(worker_request_array, struct auth_worker_request *);
+static struct aqueue *worker_request_queue;
+static time_t auth_worker_last_warn;
+
static char *worker_socket_path;
static void worker_input(struct auth_worker_connection *conn);
-static void auth_worker_destroy(struct auth_worker_connection *conn,
- const char *reason);
+static void auth_worker_destroy(struct auth_worker_connection **conn,
+ const char *reason, bool restart);
static void auth_worker_idle_timeout(struct auth_worker_connection *conn)
{
- i_assert(conn->request_count == 0);
+ i_assert(conn->request == NULL);
if (idle_count > 1)
- auth_worker_destroy(conn, NULL);
+ auth_worker_destroy(&conn, NULL, FALSE);
else
timeout_reset(conn->to);
}
+static void auth_worker_call_timeout(struct auth_worker_connection *conn)
+{
+ i_assert(conn->request != NULL);
+
+ auth_worker_destroy(&conn, "Lookup timed out", TRUE);
+}
+
+static void auth_worker_request_send(struct auth_worker_connection *conn,
+ struct auth_worker_request *request)
+{
+ struct const_iovec iov[3];
+
+ i_assert(conn->requests_left > 0);
+
+ if (ioloop_time - request->created > AUTH_WORKER_DELAY_WARN_SECS &&
+ ioloop_time - auth_worker_last_warn >
+ AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS) {
+ auth_worker_last_warn = ioloop_time;
+ i_warning("auth workers: Auth request was queued for %d "
+ "seconds, %d left in queue",
+ (int)(ioloop_time - request->created),
+ aqueue_count(worker_request_queue));
+ }
+
+ request->id = ++conn->id_counter;
+
+ iov[0].iov_base = t_strdup_printf("%d\t", request->id);
+ iov[0].iov_len = strlen(iov[0].iov_base);
+ iov[1].iov_base = request->data_str;
+ iov[1].iov_len = strlen(request->data_str);
+ iov[2].iov_base = "\n";
+ iov[2].iov_len = 1;
+
+ o_stream_sendv(conn->output, iov, 3);
+
+ conn->request = request;
+ conn->requests_left--;
+
+ timeout_remove(&conn->to);
+ conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000,
+ auth_worker_call_timeout, conn);
+ idle_count--;
+}
+
+static void auth_worker_request_send_next(struct auth_worker_connection *conn)
+{
+ struct auth_worker_request *request, *const *requestp;
+
+ if (aqueue_count(worker_request_queue) == 0)
+ return;
+
+ requestp = array_idx(&worker_request_array,
+ aqueue_idx(worker_request_queue, 0));
+ request = *requestp;
+ aqueue_delete_tail(worker_request_queue);
+ auth_worker_request_send(conn, request);
+}
+
static struct auth_worker_connection *auth_worker_create(void)
{
struct auth_worker_connection *conn;
worker_socket_path);
}
- if (try == 5) {
- i_fatal("net_connect_unix(%s) "
- "failed after %d tries: %m",
- worker_socket_path, try);
+ if (try == 50) {
+ i_error("net_connect_unix(%s) "
+ "failed after %d secs: %m",
+ worker_socket_path, try/10);
+ return NULL;
}
- /* not created yet? try again */
- sleep(1);
+ /* wait and try again */
+ usleep(100000);
}
conn = i_new(struct auth_worker_connection, 1);
FALSE);
conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
conn->io = io_add(fd, IO_READ, worker_input, conn);
- i_array_init(&conn->requests, 16);
conn->requests_left = auth_workers_max_request_count;
conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
auth_worker_idle_timeout, conn);
return conn;
}
-static void auth_worker_destroy(struct auth_worker_connection *conn,
- const char *reason)
+static void auth_worker_destroy(struct auth_worker_connection **_conn,
+ const char *reason, bool restart)
{
+ struct auth_worker_connection *conn = *_conn;
struct auth_worker_connection **connp;
- struct auth_worker_request *requests;
unsigned int i, count;
- const char *reply;
+
+ *_conn = NULL;
connp = array_get_modifiable(&connections, &count);
for (i = 0; i < count; i++) {
}
}
- if (conn->request_count == 0)
+ if (conn->request == NULL)
idle_count--;
- /* abort all pending requests */
- reply = t_strdup_printf("FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE);
-
- requests = array_get_modifiable(&conn->requests, &count);
- for (i = 0; i < count; i++) {
- if (requests[i].id != 0) {
- auth_request_log_error(requests[i].auth_request,
- "worker-server",
- "Aborted: %s", reason);
- T_BEGIN {
- requests[i].callback(requests[i].auth_request,
- reply);
- } T_END;
- auth_request_unref(&requests[i].auth_request);
- }
- }
+ if (conn->request != NULL) T_BEGIN {
+ struct auth_request *auth_request = conn->request->auth_request;
+ auth_request_log_error(auth_request, "worker-server",
+ "Aborted: %s", reason);
+ conn->request->callback(auth_request, t_strdup_printf(
+ "FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE));
+ auth_request_unref(&conn->request->auth_request);
+ } T_END;
- array_free(&conn->requests);
io_remove(&conn->io);
i_stream_destroy(&conn->input);
o_stream_destroy(&conn->output);
if (close(conn->fd) < 0)
i_error("close(auth worker) failed: %m");
i_free(conn);
-}
-static struct auth_worker_request *
-auth_worker_request_lookup(struct auth_worker_connection *conn,
- unsigned int id)
-{
- struct auth_worker_request *requests;
- unsigned int i, count;
-
- requests = array_get_modifiable(&conn->requests, &count);
- for (i = 0; i < count; i++) {
- if (requests[i].id == id)
- return &requests[i];
+ if (idle_count == 0 && restart) {
+ conn = auth_worker_create();
+ if (conn != NULL)
+ auth_worker_request_send_next(conn);
}
- return NULL;
}
static struct auth_worker_connection *auth_worker_find_free(void)
{
- struct auth_worker_connection **conn, *best;
+ struct auth_worker_connection **conns;
unsigned int i, count;
- size_t outbuf_size, best_size;
-
- conn = array_get_modifiable(&connections, &count);
- if (idle_count > 0) {
- /* there exists at least one idle connection, use it */
- for (i = 0; i < count; i++) {
- if (conn[i]->request_count == 0)
- return conn[i];
- }
- i_unreached();
- }
- /* first the connection with least data in output buffer */
- best = NULL;
- best_size = (size_t)-1;
+ if (idle_count == 0)
+ return NULL;
+
+ conns = array_get_modifiable(&connections, &count);
for (i = 0; i < count; i++) {
- outbuf_size = o_stream_get_buffer_used_size(conn[i]->output);
- if (outbuf_size < best_size && conn[i]->requests_left > 0) {
- best = conn[i];
- best_size = outbuf_size;
- }
+ if (conns[i]->request == NULL)
+ return conns[i];
}
-
- return best;
+ i_unreached();
+ return NULL;
}
-static void auth_worker_handle_request(struct auth_worker_connection *conn,
+static void auth_worker_request_handle(struct auth_worker_connection *conn,
struct auth_worker_request *request,
const char *line)
{
+ conn->request = NULL;
+ timeout_remove(&conn->to);
+ conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
+ auth_worker_idle_timeout, conn);
+ idle_count++;
+
request->callback(request->auth_request, line);
auth_request_unref(&request->auth_request);
-
- /* mark the record empty so it can be used for future requests */
- memset(request, 0, sizeof(*request));
-
- /* update counters */
- conn->request_count--;
- if (conn->request_count > 0)
- timeout_reset(conn->to);
- else {
- timeout_remove(&conn->to);
- conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
- auth_worker_idle_timeout, conn);
- idle_count++;
- }
}
static void worker_input(struct auth_worker_connection *conn)
{
- struct auth_worker_request *request;
const char *line, *id_str;
unsigned int id;
return;
case -1:
/* disconnected */
- auth_worker_destroy(conn, "Worker process died unexpectedly");
+ auth_worker_destroy(&conn, "Worker process died unexpectedly",
+ TRUE);
return;
case -2:
/* buffer full */
i_error("BUG: Auth worker sent us more than %d bytes",
(int)AUTH_WORKER_MAX_LINE_LENGTH);
- auth_worker_destroy(conn, "Worker is buggy");
+ auth_worker_destroy(&conn, "Worker is buggy", TRUE);
return;
}
if (line == NULL)
continue;
- T_BEGIN {
- id = (unsigned int)strtoul(t_strcut(id_str, '\t'),
- NULL, 10);
- request = auth_worker_request_lookup(conn, id);
- } T_END;
-
- if (request != NULL)
- auth_worker_handle_request(conn, request, line + 1);
- }
-
- if (conn->requests_left == 0 && conn->request_count == 0) {
- auth_worker_destroy(conn, "Max requests limit");
- if (idle_count == 0)
- auth_worker_create();
+ id = (unsigned int)strtoul(t_strcut(id_str, '\t'),
+ NULL, 10);
+ if (conn->request != NULL && id == conn->request->id) {
+ auth_worker_request_handle(conn, conn->request,
+ line + 1);
+ } else {
+ if (conn->request != NULL) {
+ i_error("BUG: Worker sent reply with id %u, "
+ "expected %u", id, conn->request->id);
+ } else {
+ i_error("BUG: Worker sent reply with id %u, "
+ "none was expected", id);
+ }
+ auth_worker_destroy(&conn, "Worker is buggy", TRUE);
+ return;
+ }
}
-}
-
-static struct auth_worker_request *
-auth_worker_request_get(struct auth_worker_connection *conn)
-{
- struct auth_worker_request *request;
- request = auth_worker_request_lookup(conn, 0);
- return request != NULL ? request : array_append_space(&conn->requests);
-}
-
-static void auth_worker_call_timeout(struct auth_worker_connection *conn)
-{
- i_assert(conn->request_count > 0);
-
- auth_worker_destroy(conn, "Lookup timed out");
+ if (conn->requests_left == 0)
+ auth_worker_destroy(&conn, "Max requests limit", TRUE);
+ else
+ auth_worker_request_send_next(conn);
}
void auth_worker_call(struct auth_request *auth_request,
{
struct auth_worker_connection *conn;
struct auth_worker_request *request;
- const char *reply, *data_str;
- struct const_iovec iov[3];
-
- conn = auth_worker_find_free();
- if (conn == NULL) {
- /* no connections currently. can happen if all have been
- idle for last 10 minutes. create a new one. */
- conn = auth_worker_create();
- if (conn == NULL) {
- auth_request_log_error(auth_request, "worker-server",
- "Couldn't create new auth worker");
- reply = t_strdup_printf("FAIL\t%d",
- PASSDB_RESULT_INTERNAL_FAILURE);
- callback(auth_request, reply);
- return;
- }
- }
- i_assert(conn->requests_left > 0);
-
- data_str = auth_stream_reply_export(data);
- iov[0].iov_base = t_strdup_printf("%d\t", ++conn->id_counter);
- iov[0].iov_len = strlen(iov[0].iov_base);
- iov[1].iov_base = data_str;
- iov[1].iov_len = strlen(data_str);
- iov[2].iov_base = "\n";
- iov[2].iov_len = 1;
-
- if (o_stream_get_buffer_used_size(conn->output) +
- iov[0].iov_len + iov[1].iov_len + 1 > AUTH_WORKER_MAX_OUTBUF_SIZE) {
- auth_request_log_error(auth_request, "worker-server",
- "All auth workers are busy");
- reply = t_strdup_printf("FAIL\t%d",
- PASSDB_RESULT_INTERNAL_FAILURE);
- callback(auth_request, reply);
- return;
- }
-
- /* find an empty request */
- request = auth_worker_request_get(conn);
- request->id = conn->id_counter;
+ request = p_new(auth_request->pool, struct auth_worker_request, 1);
+ request->created = ioloop_time;
+ request->data_str = p_strdup(auth_request->pool,
+ auth_stream_reply_export(data));
request->auth_request = auth_request;
request->callback = callback;
auth_request_ref(auth_request);
- o_stream_sendv(conn->output, iov, 3);
-
- if (idle_count == 0) {
- /* this request was queued, we need new workers */
- auth_worker_create();
+ if (aqueue_count(worker_request_queue) > 0) {
+ /* requests are already being queued, no chance of
+ finding/creating a worker */
+ conn = NULL;
+ } else {
+ conn = auth_worker_find_free();
+ if (conn == NULL) {
+ /* no free connections, create a new one */
+ conn = auth_worker_create();
+ }
}
-
- if (conn->request_count == 0) {
- timeout_remove(&conn->to);
- conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000,
- auth_worker_call_timeout, conn);
- idle_count--;
+ if (conn != NULL)
+ auth_worker_request_send(conn, request);
+ else {
+ /* reached the limit, queue the request */
+ aqueue_append(worker_request_queue, &request);
}
- conn->request_count++;
- conn->requests_left--;
}
void auth_worker_server_init(void)
if (auth_workers_max_request_count == 0)
auth_workers_max_request_count = (unsigned int)-1;
+ i_array_init(&worker_request_array, 128);
+ worker_request_queue = aqueue_init(&worker_request_array.arr);
+
i_array_init(&connections, 16);
- auth_worker_create();
+ (void)auth_worker_create();
}
void auth_worker_server_deinit(void)
{
- struct auth_worker_connection **connp;
+ struct auth_worker_connection **connp, *conn;
if (!array_is_created(&connections))
return;
while (array_count(&connections) > 0) {
connp = array_idx_modifiable(&connections, 0);
- auth_worker_destroy(*connp, "Shutting down");
+ conn = *connp;
+ auth_worker_destroy(&conn, "Shutting down", FALSE);
}
array_free(&connections);
+ aqueue_deinit(&worker_request_queue);
+ array_free(&worker_request_array);
i_free(worker_socket_path);
}