indexer_queue_request_status_int(queue, request, percentage);
}
+void indexer_queue_move_head_to_tail(struct indexer_queue *queue)
+{
+ struct indexer_request *request = queue->head;
+
+ indexer_queue_request_remove(queue);
+ DLLIST2_APPEND(&queue->head, &queue->tail, request);
+}
+
void indexer_queue_request_work(struct indexer_request *request)
{
request->working = TRUE;
void indexer_queue_request_status(struct indexer_queue *queue,
struct indexer_request *request,
int percentage);
+/* Move the next request to the end of the queue. */
+void indexer_queue_move_head_to_tail(struct indexer_queue *queue);
/* Start working on a request */
void indexer_queue_request_work(struct indexer_request *request);
/* Finish the request and free its memory. */
static void queue_try_send_more(struct indexer_queue *queue)
{
struct worker_connection *conn;
- struct indexer_request *request;
+ struct indexer_request *request, *first_moved_request = NULL;
timeout_remove(&to_send_more);
conn = worker_pool_find_username_connection(worker_pool,
request->username);
if (conn != NULL) {
- /* there is already a worker handling this user.
- it must be the one doing the indexing. use the same
- connection for sending this next request. */
- } else {
- /* try to find an empty worker */
- if (!worker_pool_get_connection(worker_pool, &conn))
+ /* There is already a connection handling a request
+ * for this user. Move the request to the back of the
+ * queue and handle requests from other users.
+ * Terminate if we went through all requests. */
+ if (request == first_moved_request) {
+ /* all requests are waiting for existing users
+ to finish. */
break;
+ }
+ if (first_moved_request == NULL)
+ first_moved_request = request;
+ indexer_queue_move_head_to_tail(queue);
+ continue;
}
+
+ /* create a new connection to a worker */
+ if (!worker_pool_get_connection(worker_pool, &conn))
+ break;
+
indexer_queue_request_remove(queue);
worker_send_request(conn, request);
}
static void worker_status_callback(int percentage, void *context)
{
- struct worker_request *request = context;
+ struct worker_connection *conn = context;
+ struct indexer_request *request = worker_connection_get_request(conn);
if (percentage >= 0 && percentage < 100) {
- indexer_queue_request_status(queue, request->request,
+ indexer_queue_request_status(queue, request,
percentage);
return;
}
- indexer_queue_request_finish(queue, &request->request,
+ indexer_queue_request_finish(queue, &request,
percentage == 100);
if (worker_pool != NULL) /* not in deinit */
- worker_pool_release_connection(worker_pool, request->conn);
- i_free(request);
+ worker_pool_release_connection(worker_pool, conn);
/* if this was the last request for the connection, we can send more
through it. delay it a bit, since we may be coming here from
struct ostream *output;
char *request_username;
- ARRAY(void *) request_contexts;
- struct aqueue *request_queue;
+ struct indexer_request *request;
unsigned int process_limit;
bool version_received:1;
conn->socket_path = i_strdup(socket_path);
conn->callback = callback;
conn->fd = -1;
- i_array_init(&conn->request_contexts, 32);
- conn->request_queue = aqueue_init(&conn->request_contexts.arr);
return conn;
}
if (--conn->refcount > 0)
return;
- aqueue_deinit(&conn->request_queue);
- array_free(&conn->request_contexts);
i_free(conn->socket_path);
i_free(conn);
}
static void worker_connection_disconnect(struct worker_connection *conn)
{
- unsigned int i, count = aqueue_count(conn->request_queue);
-
if (conn->fd != -1) {
io_remove(&conn->io);
i_stream_destroy(&conn->input);
conn->fd = -1;
}
- /* cancel any pending requests */
- if (count > 0) {
- i_error("Indexer worker disconnected, "
- "discarding %u requests for %s",
- count, conn->request_username);
- }
-
/* conn->callback() can try to destroy us */
conn->refcount++;
- for (i = 0; i < count; i++) {
- void *const *contextp =
- array_idx(&conn->request_contexts,
- aqueue_idx(conn->request_queue, 0));
- void *context = *contextp;
-
- aqueue_delete_tail(conn->request_queue);
- conn->callback(-1, context);
- }
i_free_and_null(conn->request_username);
worker_connection_unref(conn);
}
static int
worker_connection_input_line(struct worker_connection *conn, const char *line)
{
- void *const *contextp, *context;
int percentage;
/* return -1 -> error
0 -> request completed (100%)
*/
int ret = 1;
- if (aqueue_count(conn->request_queue) == 0) {
- i_error("Input from worker without pending requests: %s", line);
- return -1;
- }
-
if (str_to_int(line, &percentage) < 0 ||
percentage < -1 || percentage > 100) {
i_error("Invalid input from worker: %s", line);
return -1;
}
- contextp = array_idx(&conn->request_contexts,
- aqueue_idx(conn->request_queue, 0));
- context = *contextp;
- if (percentage < 0 || percentage == 100) {
- /* the request is finished */
- aqueue_delete_tail(conn->request_queue);
- if (aqueue_count(conn->request_queue) == 0)
- i_free_and_null(conn->request_username);
- if (percentage < 0)
- ret = -1;
- else
- ret = 0;
- }
+ /* is request finished */
+ if (percentage < 0)
+ ret = -1;
+ else if (percentage == 100)
+ ret = 0;
- conn->callback(percentage, context);
+ conn->callback(percentage, conn);
return ret;
}
}
void worker_connection_request(struct worker_connection *conn,
- const struct indexer_request *request,
+ struct indexer_request *request,
void *context)
{
i_assert(worker_connection_is_connected(conn));
request->username) == 0);
}
- aqueue_append(conn->request_queue, &context);
+ conn->request = request;
T_BEGIN {
string_t *str = t_str_new(128);
bool worker_connection_is_busy(struct worker_connection *conn)
{
- return aqueue_count(conn->request_queue) > 0;
+ return conn->request != NULL;
}
const char *worker_connection_get_username(struct worker_connection *conn)
{
return conn->request_username;
}
+
+struct indexer_request *
+worker_connection_get_request(struct worker_connection *conn)
+{
+ return conn->request;
+}
called as necessary with the given context. Requests can be queued, but
only for the same username. */
void worker_connection_request(struct worker_connection *conn,
- const struct indexer_request *request,
+ struct indexer_request *request,
void *context);
/* Returns TRUE if a request is being handled. */
bool worker_connection_is_busy(struct worker_connection *conn);
or NULL if there are none. */
const char *worker_connection_get_username(struct worker_connection *conn);
+struct indexer_request *
+worker_connection_get_request(struct worker_connection *conn);
+
#endif