worker_connection_request(conn, request, wrequest);
}
-static void queue_handle_existing_user_requests(struct indexer_queue *queue)
+static void queue_try_send_more(struct indexer_queue *queue)
{
struct worker_connection *conn;
struct indexer_request *request;
while ((request = indexer_queue_request_peek(queue)) != NULL) {
conn = worker_pool_find_username_connection(worker_pool,
request->username);
- if (conn == NULL)
- break;
-
+ if (conn != NULL) {
+ /* there is already a worker handling this user.
+ it must be the one doing the indexing. use the same
+ connection for sending this next request. */
+ } else {
+ /* try to find an empty worker */
+ if (!worker_pool_get_connection(worker_pool, &conn))
+ break;
+ }
indexer_queue_request_remove(queue);
- /* there is already a worker handling this user.
- it must be the one doing the indexing. */
worker_send_request(conn, request);
}
}
-static void queue_try_send_more(struct indexer_queue *queue)
-{
- struct worker_connection *conn;
- struct indexer_request *request;
-
- queue_handle_existing_user_requests(queue);
-
- request = indexer_queue_request_peek(queue);
- if (request == NULL)
- return;
-
- /* okay, we have a request for a new user. */
- if (!worker_pool_get_connection(worker_pool, &conn))
- return;
-
- indexer_queue_request_remove(queue);
- worker_send_request(conn, request);
-}
-
static void queue_listen_callback(struct indexer_queue *queue)
{
queue_try_send_more(queue);