static const struct master_service_settings *set;
static struct indexer_queue *queue;
static struct worker_pool *worker_pool;
-static struct timeout *to_send_more;
void indexer_refresh_proctitle(void)
{
struct connection *conn;
struct indexer_request *request, *first_moved_request = NULL;
- timeout_remove(&to_send_more);
-
while ((request = indexer_queue_request_peek(queue)) != NULL) {
conn = worker_pool_find_username_connection(worker_pool,
request->username);
indexer_queue_request_finish(queue, &request,
percentage == 100);
+}
- /* if this was the last request for the connection, we can send more
- through it. delay it a bit, since we may be coming here from
- worker_connection_disconnect() and we want to finish it up. */
- if (to_send_more == NULL)
- to_send_more = timeout_add_short(0, queue_try_send_more, queue);
+static void worker_avail_callback(void)
+{
+ /* A new worker became available. Try to shrink the queue. */
+ queue_try_send_more(queue);
}
int main(int argc, char *argv[])
queue = indexer_queue_init(indexer_client_status_callback);
indexer_queue_set_listen_callback(queue, queue_listen_callback);
worker_pool = worker_pool_init("indexer-worker",
- worker_status_callback);
+ worker_status_callback,
+ worker_avail_callback);
master_service_init_finish(master_service);
master_service_run(master_service, client_connected);
indexer_clients_destroy_all();
worker_pool_deinit(&worker_pool);
indexer_queue_deinit(&queue);
- timeout_remove(&to_send_more);
master_service_deinit(&master_service);
return 0;
struct connection conn;
indexer_status_callback_t *callback;
+ worker_available_callback_t *avail_callback;
char *request_username;
struct indexer_request *request;
worker->request = NULL;
i_free_and_null(worker->request_username);
connection_deinit(conn);
+
+ worker->avail_callback();
i_free(conn);
}
struct connection *
worker_connection_create(const char *socket_path,
indexer_status_callback_t *callback,
+ worker_available_callback_t *avail_callback,
struct connection_list *list)
{
struct worker_connection *conn;
conn = i_new(struct worker_connection, 1);
conn->callback = callback;
+ conn->avail_callback = avail_callback;
connection_init_client_unix(list, &conn->conn, socket_path);
return &conn->conn;
struct indexer_request;
struct connection_list;
+typedef void worker_available_callback_t(void);
+
struct connection *
worker_connection_create(const char *socket_path,
indexer_status_callback_t *callback,
+ worker_available_callback_t *avail_callback,
struct connection_list *list);
void worker_connection_destroy(struct connection *conn);
struct worker_pool {
char *socket_path;
indexer_status_callback_t *callback;
+ worker_available_callback_t *avail_callback;
struct connection_list *connection_list;
};
struct worker_pool *
-worker_pool_init(const char *socket_path, indexer_status_callback_t *callback)
+worker_pool_init(const char *socket_path, indexer_status_callback_t *callback,
+ worker_available_callback_t *avail_callback)
{
struct worker_pool *pool;
pool = i_new(struct worker_pool, 1);
pool->socket_path = i_strdup(socket_path);
pool->callback = callback;
+ pool->avail_callback = avail_callback;
pool->connection_list = worker_connection_list_create();
return pool;
}
struct connection *conn;
conn = worker_connection_create(pool->socket_path, pool->callback,
+ pool->avail_callback,
pool->connection_list);
if (connection_client_connect(conn) < 0) {
worker_connection_destroy(conn);
#define WORKER_POOL_H
#include "indexer.h"
+#include "worker-connection.h"
struct connection;
struct worker_pool *
-worker_pool_init(const char *socket_path, indexer_status_callback_t *callback);
+worker_pool_init(const char *socket_path, indexer_status_callback_t *callback,
+ worker_available_callback_t *avail_callback);
void worker_pool_deinit(struct worker_pool **pool);
bool worker_pool_have_busy_connections(struct worker_pool *pool);