From 57d86b6fd46cb6d37bfc28f67ae4be80296ad35a Mon Sep 17 00:00:00 2001 From: Stephan Bosch Date: Fri, 16 Sep 2016 01:35:09 +0200 Subject: [PATCH] lib-http: client: Link peers to queue earlier: during connection attempts. This makes sure that queues keep track of which peers are doing stuff on its behalf. This is important to be able to manage the active peers when a new host name lookup was performed; if a peer is no longer listed in the returned IPs it should be dropped. --- src/lib-http/http-client-private.h | 3 + src/lib-http/http-client-queue.c | 156 +++++++++++++++++++++++------ 2 files changed, 127 insertions(+), 32 deletions(-) diff --git a/src/lib-http/http-client-private.h b/src/lib-http/http-client-private.h index f01820d5dd..0a5c58b0dd 100644 --- a/src/lib-http/http-client-private.h +++ b/src/lib-http/http-client-private.h @@ -236,6 +236,9 @@ struct http_client_queue { this can be more than one when soft connect timeouts are enabled */ ARRAY_TYPE(http_client_peer) pending_peers; + /* currently active peer */ + struct http_client_peer *cur_peer; + /* all requests associated to this queue (ordered by earliest timeout first) */ ARRAY_TYPE(http_client_request) requests; diff --git a/src/lib-http/http-client-queue.c b/src/lib-http/http-client-queue.c index 56e6499c98..3aaa7a3416 100644 --- a/src/lib-http/http-client-queue.c +++ b/src/lib-http/http-client-queue.c @@ -122,18 +122,41 @@ http_client_queue_create(struct http_client_host *host, void http_client_queue_free(struct http_client_queue *queue) { + struct http_client_peer *const *peer_idx; + ARRAY_TYPE(http_client_peer) peers; + + http_client_queue_debug(queue, "Destroy"); + + /* unlink all peers */ + if (queue->cur_peer != NULL) { + struct http_client_peer *peer = queue->cur_peer; + queue->cur_peer = NULL; + http_client_peer_unlink_queue(peer, queue); + } + if (array_is_created(&queue->pending_peers)) { + t_array_init(&peers, array_count(&queue->pending_peers)); + array_copy(&peers.arr, 0, &queue->pending_peers.arr, 0, + array_count(&queue->pending_peers)); + array_foreach(&peers, peer_idx) + http_client_peer_unlink_queue(*peer_idx, queue); + array_free(&queue->pending_peers); + } + + /* abort all requests */ http_client_queue_fail (queue, HTTP_CLIENT_REQUEST_ERROR_ABORTED, "Aborted"); - if (array_is_created(&queue->pending_peers)) - array_free(&queue->pending_peers); array_free(&queue->requests); array_free(&queue->queued_requests); array_free(&queue->queued_urgent_requests); array_free(&queue->delayed_requests); + + /* cancel timeouts */ if (queue->to_connect != NULL) timeout_remove(&queue->to_connect); if (queue->to_delayed != NULL) timeout_remove(&queue->to_delayed); + + /* free */ i_free(queue->addr_name); i_free(queue->name); i_free(queue); @@ -226,18 +249,19 @@ http_client_queue_soft_connect_timeout(struct http_client_queue *queue) http_client_queue_connection_setup(queue); } -void http_client_queue_connection_setup(struct http_client_queue *queue) +static struct http_client_peer * +http_client_queue_connection_attempt(struct http_client_queue *queue) { struct http_client_host *host = queue->host; - struct http_client_peer *peer = NULL; - const struct http_client_peer_addr *addr = &queue->addr; + struct http_client_peer *peer; + struct http_client_peer_addr *addr = &queue->addr; unsigned int num_requests = array_count(&queue->queued_requests) + array_count(&queue->queued_urgent_requests); const char *ssl = ""; if (num_requests == 0) - return; + return NULL; /* update our peer address */ if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) { @@ -247,31 +271,77 @@ void http_client_queue_connection_setup(struct http_client_queue *queue) ssl = (ssl == NULL ? "" : t_strdup_printf(" (SSL=%s)", ssl)); } + /* already got a peer? */ + peer = NULL; + if (queue->cur_peer != NULL) { + i_assert(!array_is_created(&queue->pending_peers) || + array_count(&queue->pending_peers) == 0); + + /* is it still the one we want? */ + if (http_client_peer_addr_cmp(addr, &queue->cur_peer->addr) == 0) { + /* is it still connected? */ + if (http_client_peer_is_connected(queue->cur_peer)) { + /* yes */ + http_client_queue_debug(queue, + "Using existing connection to %s%s " + "(%u requests pending)", + http_client_peer_addr2str(addr), ssl, num_requests); + + /* handle requests; */ + http_client_peer_trigger_request_handler(queue->cur_peer); + return queue->cur_peer; + } + /* no */ + peer = queue->cur_peer; + } else { + /* peer is not relevant to this queue anymore */ + http_client_peer_unlink_queue(queue->cur_peer, queue); + } + + queue->cur_peer = NULL; + } + + if (peer == NULL) + peer = http_client_peer_get(queue->client, addr); + http_client_queue_debug(queue, "Setting up connection to %s%s " "(%u requests pending)", http_client_peer_addr2str(addr), ssl, num_requests); - - /* create/get peer */ - peer = http_client_peer_get(queue->client, addr); + /* create provisional link between queue and peer */ http_client_peer_link_queue(peer, queue); /* handle requests; creates new connections when needed/possible */ http_client_peer_trigger_request_handler(peer); - if (!http_client_peer_is_connected(peer)) { + if (http_client_peer_is_connected(peer)) { + /* drop any pending peers */ + if (array_is_created(&queue->pending_peers) && + array_count(&queue->pending_peers) > 0) { + struct http_client_peer *const *peer_idx; + + array_foreach(&queue->pending_peers, peer_idx) { + i_assert(http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) != 0); + http_client_peer_unlink_queue(*peer_idx, queue); + } + array_clear(&queue->pending_peers); + } + queue->cur_peer = peer; + + } else { unsigned int msecs; bool new_peer = TRUE; /* not already connected, wait for connections */ - if (!array_is_created(&queue->pending_peers)) + if (!array_is_created(&queue->pending_peers)) { i_array_init(&queue->pending_peers, 8); - else { + } else { struct http_client_peer *const *peer_idx; /* we may be waiting for this peer already */ array_foreach(&queue->pending_peers, peer_idx) { if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) { + i_assert(*peer_idx == peer); new_peer = FALSE; break; } @@ -296,6 +366,13 @@ void http_client_queue_connection_setup(struct http_client_queue *queue) } } } + + return peer; +} + +void http_client_queue_connection_setup(struct http_client_queue *queue) +{ + (void)http_client_queue_connection_attempt(queue); } void @@ -328,6 +405,8 @@ http_client_queue_connection_success(struct http_client_queue *queue, connected peer, even if some of the connections are pending. they may be intended for urgent requests. */ + i_assert(queue->cur_peer == NULL); + queue->cur_peer = *peer_idx; continue; } /* unlink this queue from the peer; if this was the last/only queue, the @@ -336,6 +415,7 @@ http_client_queue_connection_success(struct http_client_queue *queue, http_client_peer_unlink_queue(*peer_idx, queue); } array_clear(&queue->pending_peers); + i_assert(queue->cur_peer != NULL); } } @@ -347,6 +427,12 @@ http_client_queue_connection_failure(struct http_client_queue *queue, &queue->client->set; const char *https_name = http_client_peer_addr_get_https_name(addr); struct http_client_host *host = queue->host; + struct http_client_peer *failed_peer; + struct http_client_peer *const *peer_idx; + + i_assert(queue->cur_peer == NULL); + i_assert(array_is_created(&queue->pending_peers) && + array_count(&queue->pending_peers) > 0); http_client_queue_debug(queue, "Failed to set up connection to %s%s: %s " @@ -358,27 +444,26 @@ http_client_queue_connection_failure(struct http_client_queue *queue, array_count(&queue->pending_peers): 0), array_count(&queue->requests)); - if (array_is_created(&queue->pending_peers) && - array_count(&queue->pending_peers) > 0) { - struct http_client_peer *const *peer_idx; - - /* we're still doing the initial connections to this hport. if - we're also doing parallel connections with soft timeouts - (pending_peer_count>1), wait for them to finish - first. */ - array_foreach(&queue->pending_peers, peer_idx) { - if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) { - array_delete(&queue->pending_peers, - array_foreach_idx(&queue->pending_peers, peer_idx), 1); - break; - } - } - if (array_count(&queue->pending_peers) > 0) { - http_client_queue_debug(queue, - "Waiting for remaining pending peers."); - return; + /* we're still doing the initial connections to this hport. if + we're also doing parallel connections with soft timeouts + (pending_peer_count>1), wait for them to finish + first. */ + failed_peer = NULL; + array_foreach(&queue->pending_peers, peer_idx) { + if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) { + failed_peer = *peer_idx; + array_delete(&queue->pending_peers, + array_foreach_idx(&queue->pending_peers, peer_idx), 1); + break; } } + i_assert(failed_peer != NULL); + if (array_count(&queue->pending_peers) > 0) { + http_client_queue_debug(queue, + "Waiting for remaining pending peers."); + http_client_peer_unlink_queue(failed_peer, queue); + return; + } /* one of the connections failed. if we're not using soft timeouts, we need to try to connect to the next IP. if we are using soft @@ -410,6 +495,7 @@ http_client_queue_connection_failure(struct http_client_queue *queue, total_msecs/1000, total_msecs%1000); } queue->connect_attempts = 0; + http_client_peer_unlink_queue(failed_peer, queue); http_client_queue_fail(queue, HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED, reason); return; @@ -418,7 +504,8 @@ http_client_queue_connection_failure(struct http_client_queue *queue, queue->ips_connect_idx = (queue->ips_connect_idx + 1) % host->ips_count; } - http_client_queue_connection_setup(queue); + if (http_client_queue_connection_attempt(queue) != failed_peer) + http_client_peer_unlink_queue(failed_peer, queue); return; } @@ -428,6 +515,11 @@ http_client_queue_peer_disconnected(struct http_client_queue *queue, { struct http_client_peer *const *peer_idx; + if (queue->cur_peer == peer) { + queue->cur_peer = NULL; + return; + } + if (!array_is_created(&queue->pending_peers)) return; -- 2.47.3