#include "lib.h"
#include "array.h"
#include "aqueue.h"
+#include "connection.h"
#include "ioloop.h"
#include "istream.h"
#include "ostream.h"
#define INDEXER_WORKER_NAME "indexer-worker-master"
struct worker_connection {
+ struct connection conn;
+
int refcount;
- char *socket_path;
indexer_status_callback_t *callback;
- int fd;
- struct io *io;
- struct istream *input;
- struct ostream *output;
-
char *request_username;
struct indexer_request *request;
unsigned int process_limit;
- bool version_received:1;
};
static void worker_connection_unref(struct worker_connection *conn)
if (--conn->refcount > 0)
return;
- i_free(conn->socket_path);
+ i_free(conn->conn.base_name);
i_free(conn);
}
-static void worker_connection_disconnect(struct worker_connection *conn)
+static void worker_connection_disconnect(struct connection *conn)
{
- if (conn->fd != -1) {
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
+ if (conn->fd_in != -1) {
io_remove(&conn->io);
i_stream_destroy(&conn->input);
o_stream_destroy(&conn->output);
- if (close(conn->fd) < 0)
- i_error("close(%s) failed: %m", conn->socket_path);
- conn->fd = -1;
+ if (close(conn->fd_in) < 0)
+ i_error("close(%s) failed: %m", conn->base_name);
+ conn->fd_in = -1;
}
/* conn->callback() can try to destroy us */
- conn->refcount++;
- i_free_and_null(conn->request_username);
- worker_connection_unref(conn);
+ worker->refcount++;
+ i_free_and_null(worker->request_username);
+ worker_connection_unref(worker);
}
-void worker_connection_destroy(struct worker_connection **_conn)
+void worker_connection_destroy(struct connection **_conn)
{
- struct worker_connection *conn = *_conn;
+ struct connection *conn = *_conn;
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
*_conn = NULL;
worker_connection_disconnect(conn);
- worker_connection_unref(conn);
+ worker_connection_unref(worker);
}
static int
-worker_connection_input_line(struct worker_connection *conn, const char *line)
+worker_connection_input_line(struct connection *conn, const char *line)
{
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
int percentage;
/* return -1 -> error
0 -> request completed (100%)
else if (percentage == 100)
ret = 0;
- conn->callback(percentage, conn);
+ worker->callback(percentage, conn);
return ret;
}
-static void worker_connection_input(struct worker_connection *conn)
+static void worker_connection_input(struct connection *conn)
{
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
const char *line;
if (i_stream_read(conn->input) < 0) {
}
conn->version_received = TRUE;
}
- if (conn->process_limit == 0) {
+ if (worker->process_limit == 0) {
if ((line = i_stream_next_line(conn->input)) == NULL)
return;
- if (str_to_uint(line, &conn->process_limit) < 0 ||
- conn->process_limit == 0) {
+ if (str_to_uint(line, &worker->process_limit) < 0 ||
+ worker->process_limit == 0) {
i_error("Indexer worker sent invalid handshake: %s",
line);
worker_connection_disconnect(conn);
}
}
-int worker_connection_connect(struct worker_connection *conn)
+int worker_connection_connect(struct connection *conn)
{
- i_assert(conn->fd == -1);
+ i_assert(conn->fd_in == -1);
- conn->fd = net_connect_unix(conn->socket_path);
- if (conn->fd == -1) {
- i_error("connect(%s) failed: %m", conn->socket_path);
+ conn->fd_in = net_connect_unix(conn->base_name);
+ if (conn->fd_in == -1) {
+ i_error("connect(%s) failed: %m", conn->base_name);
return -1;
}
- conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn);
- conn->input = i_stream_create_fd(conn->fd, SIZE_MAX);
- conn->output = o_stream_create_fd(conn->fd, SIZE_MAX);
+ conn->io = io_add(conn->fd_in, IO_READ, worker_connection_input, conn);
+ conn->input = i_stream_create_fd(conn->fd_in, SIZE_MAX);
+ conn->output = o_stream_create_fd(conn->fd_in, SIZE_MAX);
o_stream_set_no_error_handling(conn->output, TRUE);
o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE);
return 0;
}
-bool worker_connection_is_connected(struct worker_connection *conn)
+bool worker_connection_is_connected(struct connection *conn)
{
- return conn->fd != -1;
+ return conn->fd_in != -1;
}
-bool worker_connection_get_process_limit(struct worker_connection *conn,
+bool worker_connection_get_process_limit(struct connection *conn,
unsigned int *limit_r)
{
- if (conn->process_limit == 0)
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
+
+ if (worker->process_limit == 0)
return FALSE;
- *limit_r = conn->process_limit;
+ *limit_r = worker->process_limit;
return TRUE;
}
-void worker_connection_request(struct worker_connection *conn,
+void worker_connection_request(struct connection *conn,
struct indexer_request *request,
void *context)
{
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
+
i_assert(worker_connection_is_connected(conn));
i_assert(context != NULL);
i_assert(request->index || request->optimize);
- if (conn->request_username == NULL)
- conn->request_username = i_strdup(request->username);
+ if (worker->request_username == NULL)
+ worker->request_username = i_strdup(request->username);
else {
- i_assert(strcmp(conn->request_username,
+ i_assert(strcmp(worker->request_username,
request->username) == 0);
}
- conn->request = request;
+ worker->request = request;
T_BEGIN {
string_t *str = t_str_new(128);
} T_END;
}
-bool worker_connection_is_busy(struct worker_connection *conn)
+bool worker_connection_is_busy(struct connection *conn)
{
- return conn->request != NULL;
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
+ return worker->request != NULL;
}
-const char *worker_connection_get_username(struct worker_connection *conn)
+const char *worker_connection_get_username(struct connection *conn)
{
- return conn->request_username;
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
+ return worker->request_username;
}
struct indexer_request *
-worker_connection_get_request(struct worker_connection *conn)
+worker_connection_get_request(struct connection *conn)
{
- return conn->request;
+ struct worker_connection *worker =
+ container_of(conn, struct worker_connection, conn);
+ return worker->request;
}
-struct worker_connection *
+struct connection *
worker_connection_create(const char *socket_path,
indexer_status_callback_t *callback)
{
struct worker_connection *conn;
conn = i_new(struct worker_connection, 1);
-
conn->refcount = 1;
- conn->socket_path = i_strdup(socket_path);
+ conn->conn.base_name = i_strdup(socket_path);
conn->callback = callback;
- conn->fd = -1;
- return conn;
+ conn->conn.fd_in = -1;
+ return &conn->conn;
}
struct indexer_request;
-struct worker_connection *
+struct connection *
worker_connection_create(const char *socket_path,
indexer_status_callback_t *callback);
-void worker_connection_destroy(struct worker_connection **conn);
+void worker_connection_destroy(struct connection **conn);
-int worker_connection_connect(struct worker_connection *conn);
+int worker_connection_connect(struct connection *conn);
/* Returns TRUE if worker is connected to (not necessarily handshaked yet) */
-bool worker_connection_is_connected(struct worker_connection *conn);
+bool worker_connection_is_connected(struct connection *conn);
/* After initial handshake the worker process tells how many of its kind
can be at maximum. This returns the value, of FALSE if handshake isn't
finished yet. */
-bool worker_connection_get_process_limit(struct worker_connection *conn,
+bool worker_connection_get_process_limit(struct connection *conn,
unsigned int *limit_r);
/* Send a new indexing request for username+mailbox. The status callback is
called as necessary with the given context. Requests can be queued, but
only for the same username. */
-void worker_connection_request(struct worker_connection *conn,
+void worker_connection_request(struct connection *conn,
struct indexer_request *request,
void *context);
/* Returns TRUE if a request is being handled. */
-bool worker_connection_is_busy(struct worker_connection *conn);
+bool worker_connection_is_busy(struct connection *conn);
/* Returns username of the currently pending requests,
or NULL if there are none. */
-const char *worker_connection_get_username(struct worker_connection *conn);
+const char *worker_connection_get_username(struct connection *conn);
struct indexer_request *
-worker_connection_get_request(struct worker_connection *conn);
+worker_connection_get_request(struct connection *conn);
#endif