]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-http: http-client: Fixed request scheduling and connection management.
authorStephan Bosch <stephan@rename-it.nl>
Sun, 15 Sep 2013 00:31:28 +0000 (03:31 +0300)
committerStephan Bosch <stephan@rename-it.nl>
Sun, 15 Sep 2013 00:31:28 +0000 (03:31 +0300)
src/lib-http/http-client-connection.c
src/lib-http/http-client-host.c
src/lib-http/http-client-peer.c
src/lib-http/http-client-private.h
src/lib-http/http-client-request.c
src/lib-http/http-client.c
src/lib-http/http-client.h

index 78c0212c07622d45c6fbfd54822a8a857143a501..be90822b63aedc0c9f620de1eb4ca4cd05fc69b2 100644 (file)
@@ -167,8 +167,7 @@ http_client_connection_idle_timeout(struct http_client_connection *conn)
        http_client_connection_unref(&conn);
 }
 
-static void
-http_client_connection_check_idle(struct http_client_connection *conn)
+void http_client_connection_check_idle(struct http_client_connection *conn)
 {
        unsigned int timeout, count;
 
@@ -254,7 +253,7 @@ http_client_connection_continue_timeout(struct http_client_connection *conn)
        }
 }
 
-bool http_client_connection_next_request(struct http_client_connection *conn)
+int http_client_connection_next_request(struct http_client_connection *conn)
 {
        struct http_client_request *req = NULL;
        const char *error;
@@ -262,17 +261,15 @@ bool http_client_connection_next_request(struct http_client_connection *conn)
 
        if (!http_client_connection_is_ready(conn)) {
                http_client_connection_debug(conn, "Not ready for next request");
-               return FALSE;
+               return 0;
        }
 
        /* claim request, but no urgent request can be second in line */
        have_pending_requests = array_count(&conn->request_wait_list) > 0 ||
                conn->pending_request != NULL;
        req = http_client_peer_claim_request(conn->peer, have_pending_requests);
-       if (req == NULL) {
-               http_client_connection_check_idle(conn);
-               return FALSE;   
-       }
+       if (req == NULL)
+               return 0;       
 
        if (conn->to_idle != NULL)
                timeout_remove(&conn->to_idle);
@@ -300,7 +297,7 @@ bool http_client_connection_next_request(struct http_client_connection *conn)
                http_client_connection_abort_temp_error(&conn,
                        HTTP_CLIENT_REQUEST_ERROR_CONNECTION_LOST,
                        t_strdup_printf("Failed to send request: %s", error));
-               return FALSE;
+               return -1;
        }
 
        /* https://tools.ietf.org/html/draft-ietf-httpbis-p2-semantics-21;
@@ -321,7 +318,7 @@ bool http_client_connection_next_request(struct http_client_connection *conn)
                        http_client_connection_continue_timeout, conn);
        }
 
-       return TRUE;
+       return 1;
 }
 
 static void http_client_connection_destroy(struct connection *_conn)
@@ -595,7 +592,7 @@ static void http_client_connection_input(struct connection *_conn)
                                conn->output_locked = FALSE;
                                conn->peer->no_payload_sync = TRUE;
                                http_client_request_retry(req, response->status, response->reason);
-                               return;
+       
                        } else if (response->status / 100 == 3 && response->status != 304 &&
                                response->location != NULL) {
                                /* redirect */
@@ -648,9 +645,13 @@ static void http_client_connection_input(struct connection *_conn)
        }
 
        if (finished > 0) {
+               /* connection still alive after (at least one) request;
+                  we can pipeline -> mark for subsequent connections */
+               conn->peer->allows_pipelining = TRUE;
+
                /* room for new requests */
-               http_client_peer_handle_requests(conn->peer);
-               http_client_connection_check_idle(conn);
+               if (http_client_connection_is_ready(conn))
+                       http_client_peer_trigger_request_handler(conn->peer);
        }
 }
 
@@ -688,8 +689,8 @@ static int http_client_connection_output(struct http_client_connection *conn)
                        }
                        if (!conn->output_locked) {
                                /* room for new requests */
-                               http_client_peer_handle_requests(conn->peer);
-                               http_client_connection_check_idle(conn);
+                               if (http_client_connection_is_ready(conn))
+                                       http_client_peer_trigger_request_handler(conn->peer);
                        }
                }
        }
@@ -701,27 +702,28 @@ http_client_connection_ready(struct http_client_connection *conn)
 {
        struct stat st;
 
+       /* connected */
        conn->connected = TRUE;
-       conn->connect_succeeded = TRUE;
        if (conn->to_connect != NULL &&
            (conn->ssl_iostream == NULL ||
             ssl_iostream_is_handshaked(conn->ssl_iostream)))
                timeout_remove(&conn->to_connect);
 
+       /* indicate connection success */
+       conn->connect_succeeded = TRUE;
        http_client_peer_connection_success(conn->peer);
 
+       /* start raw log */
        if (conn->client->set.rawlog_dir != NULL &&
                stat(conn->client->set.rawlog_dir, &st) == 0) {
                iostream_rawlog_create(conn->client->set.rawlog_dir,
                                       &conn->conn.input, &conn->conn.output);
        }
 
+       /* start protocol I/O */
        conn->http_parser = http_response_parser_init(conn->conn.input);
        o_stream_set_flush_callback(conn->conn.output,
     http_client_connection_output, conn);
-
-       /* we never pipeline before the first response */
-       (void)http_client_connection_next_request(conn);
 }
 
 static int
@@ -872,23 +874,24 @@ http_client_connection_create(struct http_client_peer *peer)
 {
        struct http_client_connection *conn;
        static unsigned int id = 0;
+       const struct http_client_peer_addr *addr = &peer->addr;
 
        conn = i_new(struct http_client_connection, 1);
        conn->refcount = 1;
        conn->client = peer->client;
+       conn->id = id++;
        conn->peer = peer;
        i_array_init(&conn->request_wait_list, 16);
 
        connection_init_client_ip
-               (peer->client->conn_list, &conn->conn, &peer->addr.ip, peer->addr.port);
+               (peer->client->conn_list, &conn->conn, &addr->ip, addr->port);
        http_client_connection_connect(conn);
 
-       conn->id = id++;
        array_append(&peer->conns, &conn, 1);
 
        http_client_connection_debug(conn,
-               "Connection created (%d parallel connections exist)",
-               array_count(&peer->conns));
+               "Connection created (%d parallel connections exist)%s",
+               array_count(&peer->conns), (conn->to_input == NULL ? "" : " [broken]"));
        return conn;
 }
 
index 520e820ed0d666e57d457808a96836fcee2b44f5..ed4363f330f8bc98fb5b5a0cec533d93b8a758a4 100644 (file)
@@ -46,13 +46,13 @@ http_client_host_port_connection_setup(struct http_client_host_port *hport);
 
 static struct http_client_host_port *
 http_client_host_port_find(struct http_client_host *host,
-       unsigned int port, const char *https_name)
+       in_port_t port, const char *https_name)
 {
        struct http_client_host_port *hport;
 
        array_foreach_modifiable(&host->ports, hport) {
-               if (hport->port == port &&
-                   null_strcmp(hport->https_name, https_name) == 0)
+               if (hport->addr.port == port &&
+                   null_strcmp(hport->addr.https_name, https_name) == 0)
                        return hport;
        }
 
@@ -61,7 +61,7 @@ http_client_host_port_find(struct http_client_host *host,
 
 static struct http_client_host_port *
 http_client_host_port_init(struct http_client_host *host,
-       unsigned int port, const char *https_name)
+       in_port_t port, const char *https_name)
 {
        struct http_client_host_port *hport;
 
@@ -69,8 +69,8 @@ http_client_host_port_init(struct http_client_host *host,
        if (hport == NULL) {
                hport = array_append_space(&host->ports);
                hport->host = host;
-               hport->port = port;
-               hport->https_name = i_strdup(https_name);
+               hport->addr.port = port;
+               hport->addr.https_name = i_strdup(https_name);
                hport->ips_connect_idx = 0;
                i_array_init(&hport->request_queue, 16);
        }
@@ -94,7 +94,9 @@ static void http_client_host_port_deinit(struct http_client_host_port *hport)
 {
        http_client_host_port_error
                (hport, HTTP_CLIENT_REQUEST_ERROR_ABORTED, "Aborted");
-       i_free(hport->https_name);
+       i_free(hport->addr.https_name);
+       if (array_is_created(&hport->pending_peers))
+               array_free(&hport->pending_peers);
        array_free(&hport->request_queue);
 }
 
@@ -136,20 +138,25 @@ http_client_host_port_soft_connect_timeout(struct http_client_host_port *hport)
        if (hport->to_connect != NULL)
                timeout_remove(&hport->to_connect);
 
-       if (http_client_hport_is_last_connect_ip(hport))
+       if (http_client_hport_is_last_connect_ip(hport)) {
+               /* no more IPs to try */
                return;
+       }
 
        /* if our our previous connection attempt takes longer than the
-          soft_connect_timeout we start a connection attempt to the next IP in
+          soft_connect_timeout, we start a connection attempt to the next IP in
           parallel */
 
        http_client_host_debug(host, "Connection to %s:%u%s is taking a long time; "
                "starting parallel connection attempt to next IP",
-               net_ip2addr(&host->ips[hport->ips_connect_idx]), hport->port,
-               hport->https_name == NULL ? "" :
-                       t_strdup_printf(" (SSL=%s)", hport->https_name));
+               net_ip2addr(&hport->addr.ip), hport->addr.port,
+               hport->addr.https_name == NULL ? "" :
+                       t_strdup_printf(" (SSL=%s)", hport->addr.https_name));
 
+       /* next IP */
        hport->ips_connect_idx = (hport->ips_connect_idx + 1) % host->ips_count;
+
+       /* setup connection to new peer (can start new soft timeout) */
        http_client_host_port_connection_setup(hport);
 }
 
@@ -158,59 +165,41 @@ http_client_host_port_connection_setup(struct http_client_host_port *hport)
 {
        struct http_client_host *host = hport->host;
        struct http_client_peer *peer = NULL;
-       struct http_client_peer_addr addr;
-       unsigned int msecs;
+       const struct http_client_peer_addr *addr = &hport->addr;
+       unsigned int num_requests = array_count(&hport->request_queue);
 
-       addr.ip = host->ips[hport->ips_connect_idx];
-       addr.port = hport->port;
-       addr.https_name = hport->https_name;
+       if (num_requests == 0)
+               return;
+
+       /* update our peer address */
+       hport->addr.ip = host->ips[hport->ips_connect_idx];
 
-       http_client_host_debug(host, "Setting up connection to %s:%u%s",
-               net_ip2addr(&addr.ip), addr.port, addr.https_name == NULL ? "" :
-               t_strdup_printf(" (SSL=%s)", addr.https_name));
+       http_client_host_debug(host, "Setting up connection to %s:%u%s "
+               "(%u requests pending)", net_ip2addr(&addr->ip), addr->port,
+               addr->https_name == NULL ? "" :
+                       t_strdup_printf(" (SSL=%s)", addr->https_name), num_requests);
 
-       peer = http_client_peer_get(host->client, &addr);
+       /* create/get peer */
+       peer = http_client_peer_get(host->client, addr);
        http_client_peer_add_host(peer, host);
-       if (http_client_peer_handle_requests(peer))
-               hport->pending_connection_count++;
-
-       /* start soft connect time-out (but only if we have another IP left) */
-       msecs = host->client->set.soft_connect_timeout_msecs;
-       if (!http_client_hport_is_last_connect_ip(hport) && msecs > 0 &&
-           hport->to_connect == NULL) {
-               hport->to_connect =
-                       timeout_add(msecs, http_client_host_port_soft_connect_timeout, hport);
-       }
-}
 
-static void
-http_client_host_drop_pending_connections(struct http_client_host_port *hport,
-                                         const struct http_client_peer_addr *addr)
-{
-       struct http_client_peer *peer;
-       struct http_client_connection *const *conns, *conn;
-       unsigned int i, count;
+       /* handle requests; creates new connections when needed/possible */
+       http_client_peer_trigger_request_handler(peer);
 
-       for (peer = hport->host->client->peers_list; peer != NULL; peer = peer->next) {
-               if (http_client_peer_addr_cmp(&peer->addr, addr) == 0) {
-                       /* don't drop any connections to the successfully
-                          connected peer, even if some of the connections
-                          are pending. they may be intended for urgent
-                          requests. */
-                       continue;
-               }
-               if (!http_client_peer_have_host(peer, hport->host))
-                       continue;
-
-               conns = array_get(&peer->conns, &count);
-               for (i = count; i > 0; i--) {
-                       conn = conns[i-1];
-                       if (!conn->connected) {
-                               i_assert(conn->refcount == 1);
-                               /* avoid recreating the connection */
-                               peer->last_connect_failed = TRUE;
-                               http_client_connection_unref(&conn);
-                       }
+       if (!http_client_peer_is_connected(peer)) {
+               unsigned int msecs;
+
+               /* not already connected, wait for connections */
+               if (!array_is_created(&hport->pending_peers))
+                       i_array_init(&hport->pending_peers, 8);
+               array_append(&hport->pending_peers, &peer, 1);                  
+
+               /* start soft connect time-out (but only if we have another IP left) */
+               msecs = host->client->set.soft_connect_timeout_msecs;
+               if (!http_client_hport_is_last_connect_ip(hport) && msecs > 0 &&
+                   hport->to_connect == NULL) {
+                       hport->to_connect =
+                               timeout_add(msecs, http_client_host_port_soft_connect_timeout, hport);
                }
        }
 }
@@ -241,28 +230,51 @@ http_client_host_port_connection_success(struct http_client_host_port *hport,
                timeout_remove(&hport->to_connect);
 
        /* drop all other attempts to the hport. note that we get here whenever
-          a connection is successfully created, so pending_connection_count
-          may be 0. */
-       if (hport->pending_connection_count > 1)
-               http_client_host_drop_pending_connections(hport, addr);
-       /* since this hport is now successfully connected, we won't be
-          getting any connection failures to it anymore. so we need
-          to reset the pending_connection_count count here. */
-       hport->pending_connection_count = 0;
+          a connection is successfully created, so pending_peers array
+          may be empty. */
+       if (array_is_created(&hport->pending_peers) &&
+               array_count(&hport->pending_peers) > 0) {
+               struct http_client_peer *const *peer_idx;
+
+               array_foreach(&hport->pending_peers, peer_idx) {
+                       if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
+                               /* don't drop any connections to the successfully
+                                  connected peer, even if some of the connections
+                                  are pending. they may be intended for urgent
+                                  requests. */
+                               continue;
+                       }
+                       /* remove this host from the peer; if this was the last/only host, the
+                          peer will be freed, closing all connections.
+                        */
+                       http_client_peer_remove_host(*peer_idx, hport->host);
+               }
+               array_clear(&hport->pending_peers);
+       }
 }
 
 static bool
 http_client_host_port_connection_failure(struct http_client_host_port *hport,
-       const char *reason)
+       const struct http_client_peer_addr *addr, const char *reason)
 {
        struct http_client_host *host = hport->host;
 
-       if (hport->pending_connection_count > 0) {
+       if (array_is_created(&hport->pending_peers) &&
+               array_count(&hport->pending_peers) > 0) {
+               struct http_client_peer *const *peer_idx;
+
                /* we're still doing the initial connections to this hport. if
                   we're also doing parallel connections with soft timeouts
-                  (pending_connection_count>1), wait for them to finish
+                  (pending_peer_count>1), wait for them to finish
                   first. */
-               if (--hport->pending_connection_count > 0)
+               array_foreach(&hport->pending_peers, peer_idx) {
+                       if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
+                               array_delete(&hport->pending_peers,
+                                       array_foreach_idx(&hport->pending_peers, peer_idx), 1);
+                               break;
+                       }
+               }
+               if (array_count(&hport->pending_peers) > 0)
                        return TRUE;
        }
 
@@ -282,7 +294,6 @@ http_client_host_port_connection_failure(struct http_client_host_port *hport,
                return FALSE;
        }
        hport->ips_connect_idx = (hport->ips_connect_idx + 1) % host->ips_count;
-
        http_client_host_port_connection_setup(hport);
        return TRUE;
 }
@@ -318,7 +329,7 @@ void http_client_host_connection_failure(struct http_client_host *host,
        if (hport == NULL)
                return;
 
-       if (!http_client_host_port_connection_failure(hport, reason)) {
+       if (!http_client_host_port_connection_failure(hport, addr, reason)) {
                /* failed definitively for currently queued requests */
                if (host->client->ioloop != NULL)
                        io_loop_stop(host->client->ioloop);
@@ -359,8 +370,8 @@ http_client_host_dns_callback(const struct dns_lookup_result *result,
        host->ips_count = result->ips_count;
        host->ips = i_new(struct ip_addr, host->ips_count);
        memcpy(host->ips, result->ips, sizeof(*host->ips) * host->ips_count);
-
-       // FIXME: make DNS result expire 
+       
+       /* FIXME: make DNS result expire */
 
        /* make connections to requested ports */
        array_foreach_modifiable(&host->ports, hport) {
@@ -488,7 +499,7 @@ http_client_host_claim_request(struct http_client_host *host,
        if (hport == NULL)
                return NULL;
 
-       requests = array_get(&hport->request_queue, &count);
+       requests = array_get(&hport->request_queue, &count);
        if (count == 0)
                return NULL;
        i = 0;
index 4531733522962d75c95541641fbfd93862648179..c06b98c6fbf7492d08f9f77bcb7f771e32f6cd4f 100644 (file)
@@ -75,6 +75,27 @@ http_client_peer_connect(struct http_client_peer *peer, unsigned int count)
        }
 }
 
+bool http_client_peer_is_connected(struct http_client_peer *peer)
+{
+       struct http_client_connection *const *conn_idx;
+
+       array_foreach(&peer->conns, conn_idx) {
+               if ((*conn_idx)->connected)
+                       return TRUE;
+       }
+
+       return FALSE;
+}
+
+static void http_client_peer_check_idle(struct http_client_peer *peer)
+{
+       struct http_client_connection *const *conn_idx;
+
+       array_foreach(&peer->conns, conn_idx) {
+               http_client_connection_check_idle(*conn_idx);
+       }
+}
+
 static unsigned int
 http_client_peer_requests_pending(struct http_client_peer *peer,
                                  unsigned int *num_urgent_r)
@@ -92,91 +113,220 @@ http_client_peer_requests_pending(struct http_client_peer *peer,
        return num_requests;
 }
 
-static bool
-http_client_peer_next_request(struct http_client_peer *peer,
-                             bool *created_connections)
+static void
+http_client_peer_handle_requests_real(struct http_client_peer *peer)
 {
+       struct _conn_available {
+               struct http_client_connection *conn;
+               unsigned int pending_requests;
+       };
        struct http_client_connection *const *conn_idx;
-       struct http_client_connection *conn = NULL;
-       unsigned int connecting = 0, closing = 0, min_waiting = UINT_MAX;
-       unsigned int num_urgent, new_connections, working_conn_count;
-
-       if (http_client_peer_requests_pending(peer, &num_urgent) == 0)
-               return FALSE;
+       ARRAY(struct _conn_available) conns_avail;
+       struct _conn_available *conn_avail_idx;
+       unsigned int connecting, closing, idle;
+       unsigned int num_pending, num_urgent, new_connections,  working_conn_count;
+       bool statistics_dirty = TRUE;
+
+       /* FIXME: limit the number of requests handled in one run to prevent
+          I/O starvation. */
+
+       /* don't do anything unless we have pending requests */
+       num_pending = http_client_peer_requests_pending(peer, &num_urgent);
+       if (num_pending == 0) {
+               http_client_peer_check_idle(peer);
+               return;
+       }
 
-       /* find the least busy connection */
-       array_foreach(&peer->conns, conn_idx) {
-               if (http_client_connection_is_ready(*conn_idx)) {
-                       unsigned int waiting = http_client_connection_count_pending(*conn_idx);
-
-                       if (waiting < min_waiting) {
-                               min_waiting = waiting;
-                               conn = *conn_idx;
-                               if (min_waiting == 0) {
-                                       /* found idle connection, use it now */
-                                       break;
+       t_array_init(&conns_avail, array_count(&peer->conns));
+       do {
+               array_clear(&conns_avail);
+               connecting = closing = idle = 0;
+
+               /* gather connection statistics */
+               array_foreach(&peer->conns, conn_idx) {
+                       if (http_client_connection_is_ready(*conn_idx)) {                       
+                               struct _conn_available *conn_avail;
+                               unsigned int insert_idx, pending_requests;
+
+                               /* compile sorted availability list */
+                               pending_requests = http_client_connection_count_pending(*conn_idx);
+                               if (array_count(&conns_avail) == 0) {
+                                       insert_idx = 0;
+                               } else {
+                                       insert_idx = array_count(&conns_avail);
+                                       array_foreach_modifiable(&conns_avail, conn_avail_idx) {
+                                               if (conn_avail_idx->pending_requests > pending_requests) {
+                                                       insert_idx = array_foreach_idx(&conns_avail, conn_avail_idx);
+                                                       break;
+                                               }
+                                       }
                                }
+                               conn_avail = array_insert_space(&conns_avail, insert_idx);
+                               conn_avail->conn = *conn_idx;
+                               conn_avail->pending_requests = pending_requests;
+                               if (pending_requests == 0)
+                                       idle++;
                        }
+                       /* count the number of connecting and closing connections */
+                       if ((*conn_idx)->closing)
+                               closing++;
+                       else if (!(*conn_idx)->connected)
+                               connecting++;
                }
-               /* count the number of connecting and closing connections */
-               if ((*conn_idx)->closing)
-                       closing++;
-               else if (!(*conn_idx)->connected)
-                       connecting++;
-       }
-       working_conn_count = array_count(&peer->conns) - closing;
 
-       /* did we find an idle connection? */
-       if (conn != NULL && min_waiting == 0) {
-               /* yes, use it */
-               return http_client_connection_next_request(conn);
-       }
+               working_conn_count = array_count(&peer->conns) - closing;
+               statistics_dirty = FALSE;
 
-       /* no, but can we create a new connection? */           
-       if (num_urgent == 0 &&
-           working_conn_count >= peer->client->set.max_parallel_connections) {
-               /* no */
-               if (conn == NULL) {
+               /* use idle connections right away */
+               if (idle > 0) {
                        http_client_peer_debug(peer,
-                               "Only non-urgent requests, and we already have "
-                               "%u pending connections", working_conn_count);
-                       return FALSE;
+                               "Using %u idle connections to handle %u requests "
+                               "(%u total connections ready)",
+                               idle, num_pending > idle ? idle : num_pending,
+                               array_count(&conns_avail));
+
+                       array_foreach_modifiable(&conns_avail, conn_avail_idx) {
+                               if (num_pending == 0 || conn_avail_idx->pending_requests > 0)
+                                       break;
+                               idle--;
+                               if (http_client_connection_next_request(conn_avail_idx->conn) <= 0) {
+                                       /* no longer available (probably connection error/closed) */
+                                       statistics_dirty = TRUE;
+                                       conn_avail_idx->conn = NULL;
+                               } else {
+                                       /* update statistics */
+                                       conn_avail_idx->pending_requests++;
+                                       if (num_urgent > 0)
+                                               num_urgent--;
+                                       num_pending--;
+                               }
+                       }
                }
-               /* pipeline it */
-               return http_client_connection_next_request(conn);
-       }
+       
+               /* don't continue unless we have more pending requests */
+               num_pending = http_client_peer_requests_pending(peer, &num_urgent);
+               if (num_pending == 0) {
+                       http_client_peer_check_idle(peer);
+                       return;
+               }
+       } while (statistics_dirty);
+
+       i_assert(idle == 0);
 
-       /* yes, determine how many connections to set up */
+       /* determine how many new connections we can set up */
        if (peer->last_connect_failed && working_conn_count > 0 &&
            working_conn_count == connecting) {
                /* don't create new connections until the existing ones have
                   finished connecting successfully. */
                new_connections = 0;
-       } else if (num_urgent == 0) {
-               new_connections = connecting == 0 ? 1 : 0;
        } else {
-               new_connections = (num_urgent > connecting ? num_urgent - connecting : 0);
+               if (working_conn_count - connecting + num_urgent >=
+                       peer->client->set.max_parallel_connections) {
+                       /* only create connections for urgent requests */
+                       new_connections = (num_urgent > connecting ? num_urgent - connecting : 0);
+               } else if (num_pending <= connecting) {
+                       /* there are already enough connections being made */
+                       new_connections = 0;
+               } else if (working_conn_count == connecting) {
+                       /* no connections succeeded so far, don't hammer the server with more
+                          than one connection attempt unless its urgent */
+                       if (num_urgent > 0) {
+                               new_connections =
+                                       (num_urgent > connecting ? num_urgent - connecting : 0);
+                       } else {
+                               new_connections = (connecting == 0 ? 1 : 0);
+                       }
+               } else if (num_pending - connecting >
+                       peer->client->set.max_parallel_connections - working_conn_count) {
+                       /* create maximum allowed connections */
+                       new_connections =
+                               peer->client->set.max_parallel_connections - working_conn_count;
+               } else {
+                       /* create as many connections as we need */
+                       new_connections = num_pending - connecting;
+               }
        }
-       http_client_peer_debug(peer,
-               "Creating %u new connections to handle requests "
-               "(already %u usable, connecting to %u, closing %u)",
-               new_connections, working_conn_count - connecting,
-               connecting, closing);
+
+       /* create connections */
        if (new_connections > 0) {
-               *created_connections = TRUE;
+               http_client_peer_debug(peer,
+                       "Creating %u new connections to handle requests "
+                       "(already %u usable, connecting to %u, closing %u)",
+                       new_connections, working_conn_count - connecting,
+                       connecting, closing);
                http_client_peer_connect(peer, new_connections);
+               return;
        }
 
-       /* now we wait until it is connected */
-       return FALSE;
+       /* cannot create new connections for normal request; attempt pipelining */
+       if (working_conn_count - connecting >=
+               peer->client->set.max_parallel_connections) {
+               unsigned int pipeline_level = 0, total_handled = 0, handled;
+
+               if (!peer->allows_pipelining) {
+                       http_client_peer_debug(peer,
+                               "Will not pipeline until peer has shown support");
+                       return;
+               }
+
+               /* fill pipelines */
+               do {
+                       handled = 0;
+                       /* fill smallest pipelines first,
+                          until all pipelines are filled to the same level */
+                       array_foreach_modifiable(&conns_avail, conn_avail_idx) {
+                               if (conn_avail_idx->conn == NULL)
+                                       continue;
+                               if (pipeline_level == 0) {
+                                       pipeline_level = conn_avail_idx->pending_requests;
+                               } else if (conn_avail_idx->pending_requests > pipeline_level) {
+                                       pipeline_level = conn_avail_idx->pending_requests;
+                                       break; /* restart from least busy connection */
+                               }
+                               /* pipeline it */
+                               if (http_client_connection_next_request(conn_avail_idx->conn) <= 0) {
+                                       /* connection now unavailable */
+                                       conn_avail_idx->conn = NULL;
+                               } else {
+                                       /* successfully pipelined */
+                                       conn_avail_idx->pending_requests++;
+                                       num_pending--;
+                                       handled++;
+                               }
+                       }
+                       
+                       total_handled += handled;
+               } while (num_pending > num_urgent && handled > 0);
+
+               http_client_peer_debug(peer,
+                       "Pipelined %u requests (filled pipelines up to %u requests)",
+                       total_handled, pipeline_level);
+               return;
+       }
+
+       /* still waiting for connections to finish */
+       http_client_peer_debug(peer,
+               "No request handled; waiting for new connections");
+       return;
 }
 
-bool http_client_peer_handle_requests(struct http_client_peer *peer)
+static void http_client_peer_handle_requests(struct http_client_peer *peer)
 {
-       bool created_connections = FALSE;
+       if (peer->to_req_handling != NULL)
+               timeout_remove(&peer->to_req_handling);
+       
+       T_BEGIN {
+               http_client_peer_handle_requests_real(peer);
+       } T_END;
+}
 
-       while (http_client_peer_next_request(peer, &created_connections)) ;
-       return created_connections;
+void http_client_peer_trigger_request_handler(struct http_client_peer *peer)
+{
+       /* trigger request handling through timeout */
+       if (peer->to_req_handling == NULL) {
+               peer->to_req_handling =
+                       timeout_add_short(0, http_client_peer_handle_requests, peer);
+       }
 }
 
 static struct http_client_peer *
@@ -215,6 +365,9 @@ void http_client_peer_free(struct http_client_peer **_peer)
 
        http_client_peer_debug(peer, "Peer destroy");
 
+       if (peer->to_req_handling != NULL)
+               timeout_remove(&peer->to_req_handling);
+
        /* make a copy of the connection array; freed connections modify it */
        t_array_init(&conns, array_count(&peer->conns));
        array_copy(&conns.arr, 0, &peer->conns.arr, 0, array_count(&peer->conns));
@@ -268,6 +421,21 @@ void http_client_peer_add_host(struct http_client_peer *peer,
                array_append(&peer->hosts, &host, 1);
 }
 
+void http_client_peer_remove_host(struct http_client_peer *peer,
+                               struct http_client_host *host)
+{
+       struct http_client_host *const *host_idx;
+
+       array_foreach(&peer->hosts, host_idx) {
+               if (*host_idx == host) {
+                       array_delete(&peer->hosts, array_foreach_idx(&peer->hosts, host_idx), 1);
+                       if (array_count(&peer->hosts) == 0)
+                               http_client_peer_free(&peer);
+                       return;
+               }
+       }
+}
+
 struct http_client_request *
 http_client_peer_claim_request(struct http_client_peer *peer, bool no_urgent)
 {
@@ -294,6 +462,8 @@ void http_client_peer_connection_success(struct http_client_peer *peer)
        array_foreach(&peer->hosts, host) {
                http_client_host_connection_success(*host, &peer->addr);
        }
+
+       http_client_peer_trigger_request_handler(peer);
 }
 
 void http_client_peer_connection_failure(struct http_client_peer *peer,
@@ -315,7 +485,8 @@ void http_client_peer_connection_failure(struct http_client_peer *peer,
        } else {
                /* this was the only/last connection and connecting to it
                   failed. a second connect will probably also fail, so just
-                  abort all requests. */
+                  try another IP for the hosts(s) or abort all requests if this
+                  was the only/last option. */
                array_foreach(&peer->hosts, host) {
                        http_client_host_connection_failure(*host, &peer->addr, reason);
                }
@@ -341,7 +512,7 @@ void http_client_peer_connection_lost(struct http_client_peer *peer)
 
        /* if there are pending requests for this peer, create a new connection
           for them. */
-       http_client_peer_handle_requests(peer);
+       http_client_peer_trigger_request_handler(peer);
 
        if (array_count(&peer->conns) == 0 &&
            http_client_peer_requests_pending(peer, &num_urgent) == 0)
@@ -350,15 +521,23 @@ void http_client_peer_connection_lost(struct http_client_peer *peer)
 
 unsigned int http_client_peer_idle_connections(struct http_client_peer *peer)
 {
-    struct http_client_connection *const *conn_idx;
-    unsigned int idle = 0;
+       struct http_client_connection *const *conn_idx;
+       unsigned int idle = 0;
 
        /* find idle connections */
-    array_foreach(&peer->conns, conn_idx) {
-        if (http_client_connection_is_idle(*conn_idx))
+       array_foreach(&peer->conns, conn_idx) {
+               if (http_client_connection_is_idle(*conn_idx))
                        idle++;
-    }
+       }
 
        return idle;
 }
 
+void http_client_peer_switch_ioloop(struct http_client_peer *peer)
+{
+       if (peer->to_req_handling != NULL) {
+               peer->to_req_handling =
+                       io_loop_move_timeout(&peer->to_req_handling);
+       }
+}
+
index 810f3a23f4f3830a4542e03572620164ce258916..43a19dcc47a64bad59a28626503184dcc51e19ad 100644 (file)
@@ -20,6 +20,7 @@ struct http_client_connection;
 
 ARRAY_DEFINE_TYPE(http_client_host, struct http_client_host *);
 ARRAY_DEFINE_TYPE(http_client_host_port, struct http_client_host_port);
+ARRAY_DEFINE_TYPE(http_client_peer, struct http_client_peer *);
 ARRAY_DEFINE_TYPE(http_client_connection, struct http_client_connection *);
 ARRAY_DEFINE_TYPE(http_client_request, struct http_client_request *);
 
@@ -28,6 +29,12 @@ HASH_TABLE_DEFINE_TYPE(http_client_host, const char *,
 HASH_TABLE_DEFINE_TYPE(http_client_peer, const struct http_client_peer_addr *,
        struct http_client_peer *);
 
+struct http_client_peer_addr {
+       char *https_name; /* TLS SNI */
+       struct ip_addr ip;
+       in_port_t port;
+};
+
 struct http_client_request {
        pool_t pool;
        unsigned int refcount;
@@ -35,7 +42,7 @@ struct http_client_request {
        struct http_client_request *prev, *next;
 
        const char *method, *hostname, *target;
-       unsigned int port;
+       in_port_t port;
 
        struct http_client *client;
        struct http_client_host *host;
@@ -73,8 +80,7 @@ struct http_client_request {
 struct http_client_host_port {
        struct http_client_host *host;
 
-       unsigned int port;
-       char *https_name;
+       struct http_client_peer_addr addr;
 
        /* current index in host->ips */
        unsigned int ips_connect_idx;
@@ -82,8 +88,10 @@ struct http_client_host_port {
           initially 0, and later set to the ip index of the last successful
           connected IP */
        unsigned int ips_connect_start_idx;
-       /* number of connections trying to connect for this host+port */
-       unsigned int pending_connection_count;
+
+       /* peers we are trying to connect to;
+          this can be more than one when soft connect timeouts are enabled */
+       ARRAY_TYPE(http_client_peer) pending_peers;
 
        /* requests pending in queue to be picked up by connections */
        ARRAY_TYPE(http_client_request) request_queue;
@@ -111,12 +119,6 @@ struct http_client_host {
        struct dns_lookup *dns_lookup;
 };
 
-struct http_client_peer_addr {
-       char *https_name; /* TLS SNI */
-       struct ip_addr ip;
-       unsigned int port;
-};
-
 struct http_client_peer {
        struct http_client_peer_addr addr;
        struct http_client *client;
@@ -128,10 +130,15 @@ struct http_client_peer {
        /* active connections to this peer */
        ARRAY_TYPE(http_client_connection) conns;
 
+       /* zero time-out for consolidating request handling */
+       struct timeout *to_req_handling;
+
        unsigned int destroyed:1;        /* peer is being destroyed */
        unsigned int no_payload_sync:1;  /* expect: 100-continue failed before */
        unsigned int seen_100_response:1;/* expect: 100-continue succeeded before */
        unsigned int last_connect_failed:1;
+       unsigned int allows_pipelining:1;/* peer is known to allow persistent
+                                            connections */
 };
 
 struct http_client_connection {
@@ -227,7 +234,8 @@ unsigned int
 http_client_connection_count_pending(struct http_client_connection *conn);
 bool http_client_connection_is_ready(struct http_client_connection *conn);
 bool http_client_connection_is_idle(struct http_client_connection *conn);
-bool http_client_connection_next_request(struct http_client_connection *conn);
+int http_client_connection_next_request(struct http_client_connection *conn);
+void http_client_connection_check_idle(struct http_client_connection *conn);
 void http_client_connection_switch_ioloop(struct http_client_connection *conn);
 
 unsigned int http_client_peer_addr_hash
@@ -244,15 +252,19 @@ bool http_client_peer_have_host(struct http_client_peer *peer,
                                struct http_client_host *host);
 void http_client_peer_add_host(struct http_client_peer *peer,
        struct http_client_host *host);
+void http_client_peer_remove_host(struct http_client_peer *peer,
+                               struct http_client_host *host);
 struct http_client_request *
        http_client_peer_claim_request(struct http_client_peer *peer,
                bool no_urgent);
-bool http_client_peer_handle_requests(struct http_client_peer *peer);
+void http_client_peer_trigger_request_handler(struct http_client_peer *peer);
 void http_client_peer_connection_success(struct http_client_peer *peer);
 void http_client_peer_connection_failure(struct http_client_peer *peer,
                                         const char *reason);
 void http_client_peer_connection_lost(struct http_client_peer *peer);
+bool http_client_peer_is_connected(struct http_client_peer *peer);
 unsigned int http_client_peer_idle_connections(struct http_client_peer *peer);
+void http_client_peer_switch_ioloop(struct http_client_peer *peer);
 
 struct http_client_host *
        http_client_host_get(struct http_client *client, const char *hostname);
index 080f5b7ad285573a4357ce47d6ac0085c5260eee..09eb3e3da20934e935e65feb432ba76476dc138e 100644 (file)
@@ -119,7 +119,7 @@ void http_client_request_unref(struct http_client_request **_req)
 }
 
 void http_client_request_set_port(struct http_client_request *req,
-       unsigned int port)
+       in_port_t port)
 {
        i_assert(req->state == HTTP_REQUEST_STATE_NEW);
        req->port = port;
index 73f9ca9427f569439e390aaf2668c84735384047..66ca2b3c41c7396df819aa47142f44166b42d67e 100644 (file)
@@ -142,6 +142,7 @@ void http_client_switch_ioloop(struct http_client *client)
 {
        struct connection *_conn = client->conn_list->connections;
        struct http_client_host *host;
+       struct http_client_peer *peer;
 
        /* move connections */
        /* FIXME: we wouldn't necessarily need to switch all of them
@@ -154,6 +155,10 @@ void http_client_switch_ioloop(struct http_client *client)
                http_client_connection_switch_ioloop(conn);
        }
 
+       /* move peers */
+       for (peer = client->peers_list; peer != NULL; peer = peer->next)
+               http_client_peer_switch_ioloop(peer);
+
        /* move dns lookups and delayed requests */
        for (host = client->hosts_list; host != NULL; host = host->next)
                http_client_host_switch_ioloop(host);
index fc2c0a50e9faa6c57bb4eb1b7f452fa3132d7acd..4ea566e500169b6572262ce2f6a17545ac673b44 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef HTTP_CLIENT_H
 #define HTTP_CLIENT_H
 
+#include "net.h"
+
 #include "http-response.h"
 
 struct http_response;
@@ -89,7 +91,7 @@ http_client_request(struct http_client *client,
                (http_client_request_callback_t *)callback, context)
 
 void http_client_request_set_port(struct http_client_request *req,
-       unsigned int port);
+       in_port_t port);
 void http_client_request_set_ssl(struct http_client_request *req,
        bool ssl);
 void http_client_request_set_urgent(struct http_client_request *req);