#define TIMEOUT_CMP_MARGIN_USECS 2000
+static void
+http_client_queue_set_delay_timer(struct http_client_queue *queue,
+ struct timeval time);
+static void
+http_client_queue_set_request_timer(struct http_client_queue *queue,
+ const struct timeval *time);
+
+
/*
* Logging
*/
}
/*
- * Queue
+ * Queue object
*/
-static void
-http_client_queue_set_delay_timer(struct http_client_queue *queue,
- struct timeval time);
-
static struct http_client_queue *
http_client_queue_find(struct http_client_host *host,
const struct http_client_peer_addr *addr)
queue->addr.https_name = queue->https_name;
queue->name = name;
queue->ips_connect_idx = 0;
- i_array_init(&queue->request_queue, 16);
- i_array_init(&queue->delayed_request_queue, 4);
+ i_array_init(&queue->requests, 16);
+ i_array_init(&queue->queued_requests, 16);
+ i_array_init(&queue->queued_urgent_requests, 16);
+ i_array_init(&queue->delayed_requests, 4);
array_append(&host->queues, &queue, 1);
}
i_free(queue->https_name);
if (array_is_created(&queue->pending_peers))
array_free(&queue->pending_peers);
- array_free(&queue->request_queue);
- array_free(&queue->delayed_request_queue);
+ array_free(&queue->requests);
+ array_free(&queue->queued_requests);
+ array_free(&queue->queued_urgent_requests);
+ array_free(&queue->delayed_requests);
if (queue->to_connect != NULL)
timeout_remove(&queue->to_connect);
if (queue->to_delayed != NULL)
i_free(queue);
}
+/*
+ * Error handling
+ */
+
void http_client_queue_fail(struct http_client_queue *queue,
unsigned int status, const char *error)
{
struct http_client_request **req_idx;
/* abort all pending requests */
- req_arr = &queue->request_queue;
+ req_arr = &queue->requests;
t_array_init(&treqs, array_count(req_arr));
array_copy(&treqs.arr, 0, &req_arr->arr, 0, array_count(req_arr));
array_foreach_modifiable(&treqs, req_idx) {
}
array_clear(req_arr);
- /* abort all delayed requests */
- req_arr = &queue->delayed_request_queue;
- array_clear(&treqs);
- array_copy(&treqs.arr, 0, &req_arr->arr, 0, array_count(req_arr));
- array_foreach_modifiable(&treqs, req_idx) {
- http_client_request_error(*req_idx, status, error);
- }
- array_clear(req_arr);
+ /* all queues must be empty now */
+ i_assert(array_count(&queue->delayed_requests) == 0);
+ i_assert(array_count(&queue->queued_requests) == 0);
+ i_assert(array_count(&queue->queued_urgent_requests) == 0);
}
-void
-http_client_queue_drop_request(struct http_client_queue *queue,
- struct http_client_request *req)
-{
- ARRAY_TYPE(http_client_request) *req_arr;
- struct http_client_request **req_idx;
-
- /* remove from main queue */
- req_arr = &queue->request_queue;
- array_foreach_modifiable(req_arr, req_idx) {
- if (*req_idx == req) {
- array_delete(req_arr, array_foreach_idx(req_arr, req_idx), 1);
- break;
- }
- }
-
- /* remove from delay queue */
- if (req->release_time.tv_sec > 0) {
- req_arr = &queue->delayed_request_queue;
- array_foreach_modifiable(req_arr, req_idx) {
- if (*req_idx == req) {
- array_delete(req_arr, array_foreach_idx(req_arr, req_idx), 1);
- break;
- }
- }
- }
-}
+/*
+ * Connection management
+ */
static bool
http_client_queue_is_last_connect_ip(struct http_client_queue *queue)
struct http_client_host *host = queue->host;
struct http_client_peer *peer = NULL;
const struct http_client_peer_addr *addr = &queue->addr;
- unsigned int num_requests = array_count(&queue->request_queue);
+ unsigned int num_requests =
+ array_count(&queue->queued_requests) +
+ array_count(&queue->queued_urgent_requests);
if (num_requests == 0)
return;
(addr->https_name == NULL ? "" :
t_strdup_printf(" (SSL=%s)", addr->https_name)), num_requests);
+
/* create/get peer */
peer = http_client_peer_get(queue->client, addr);
http_client_peer_link_queue(peer, queue);
/* start soft connect time-out (but only if we have another IP left) */
msecs = host->client->set.soft_connect_timeout_msecs;
if (!http_client_queue_is_last_connect_ip(queue) && msecs > 0 &&
- queue->to_connect == NULL) {
+ queue->to_connect == NULL) {
queue->to_connect =
timeout_add(msecs, http_client_queue_soft_connect_timeout, queue);
}
t_strdup_printf(" (SSL=%s)", addr->https_name)), reason,
(array_is_created(&queue->pending_peers) ?
array_count(&queue->pending_peers): 0),
- array_count(&queue->request_queue));
+ array_count(&queue->requests));
if (array_is_created(&queue->pending_peers) &&
array_count(&queue->pending_peers) > 0) {
struct http_client_peer *const *peer_idx;
return TRUE;
}
+/*
+ * Main request queue
+ */
+
+void
+http_client_queue_drop_request(struct http_client_queue *queue,
+ struct http_client_request *req)
+{
+ struct http_client_request **reqs;
+ unsigned int count, i;
+
+ http_client_queue_debug(queue,
+ "Dropping request %s", http_client_request_label(req));
+
+ /* drop from queue */
+ if (req->urgent) {
+ reqs = array_get_modifiable(&queue->queued_urgent_requests, &count);
+ for (i = 0; i < count; i++) {
+ if (reqs[i] == req) {
+ array_delete(&queue->queued_urgent_requests, i, 1);
+ break;
+ }
+ }
+ } else {
+ reqs = array_get_modifiable(&queue->queued_requests, &count);
+ for (i = 0; i < count; i++) {
+ if (reqs[i] == req) {
+ array_delete(&queue->queued_requests, i, 1);
+ break;
+ }
+ }
+ }
+
+ /* drop from delay queue */
+ if (req->release_time.tv_sec > 0) {
+ reqs = array_get_modifiable(&queue->delayed_requests, &count);
+ for (i = 0; i < count; i++) {
+ if (reqs[i] == req)
+ break;
+ }
+ if (i < count) {
+ if (i == 0) {
+ if (queue->to_delayed != NULL) {
+ timeout_remove(&queue->to_delayed);
+ if (count > 1) {
+ i_assert(reqs[1]->release_time.tv_sec > 0);
+ http_client_queue_set_request_timer(queue, &reqs[1]->release_time);
+ }
+ }
+ }
+ array_delete(&queue->delayed_requests, i, 1);
+ }
+ }
+
+ /* drop from main request list */
+ reqs = array_get_modifiable(&queue->requests, &count);
+ for (i = 0; i < count; i++) {
+ if (reqs[i] == req)
+ break;
+ }
+ i_assert(i < count);
+
+ if (i == 0) {
+ if (queue->to_request != NULL) {
+ timeout_remove(&queue->to_request);
+ if (count > 1 && reqs[1]->timeout_time.tv_sec > 0)
+ http_client_queue_set_request_timer(queue, &reqs[1]->timeout_time);
+ }
+ }
+ req->queue = NULL;
+ array_delete(&queue->requests, i, 1);
+ return;
+}
+
+static void
+http_client_queue_request_timeout(struct http_client_queue *queue)
+{
+ struct http_client_request *const *reqs;
+ ARRAY_TYPE(http_client_request) failed_requests;
+ struct timeval new_to = { 0, 0 };
+ unsigned int count, i;
+
+ http_client_queue_debug(queue, "Timeout (now: %s.%03lu)",
+ t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
+ ((unsigned long)ioloop_timeval.tv_usec)/1000);
+
+ if (queue->to_request != NULL)
+ timeout_remove(&queue->to_request);
+
+ /* collect failed requests */
+ reqs = array_get(&queue->requests, &count);
+ i_assert(count > 0);
+ t_array_init(&failed_requests, count);
+ for (i = 0; i < count; i++) {
+ if (reqs[i]->timeout_time.tv_sec > 0 &&
+ timeval_cmp_margin(&reqs[i]->timeout_time,
+ &ioloop_timeval, TIMEOUT_CMP_MARGIN_USECS) > 0) {
+ break;
+ }
+ array_append(&failed_requests, &reqs[i], 1);
+ }
+
+ /* update timout */
+ if (i < count)
+ new_to = reqs[i]->timeout_time;
+
+ /* abort all failed request */
+ reqs = array_get(&failed_requests, &count);
+ i_assert(count > 0); /* at least one request timed out */
+ for (i = 0; i < count; i++) {
+ struct http_client_request *req = reqs[i];
+
+ http_client_queue_debug(queue,
+ "Request %s timed out", http_client_request_label(req));
+ http_client_request_error(req,
+ HTTP_CLIENT_REQUEST_ERROR_TIMED_OUT,
+ "Timed out");
+ }
+
+ if (new_to.tv_sec > 0) {
+ http_client_queue_debug(queue, "New timeout");
+ http_client_queue_set_request_timer(queue, &new_to);
+ }
+}
+
+static void
+http_client_queue_set_request_timer(struct http_client_queue *queue,
+ const struct timeval *time)
+{
+ i_assert(time->tv_sec > 0);
+ if (queue->to_request != NULL)
+ timeout_remove(&queue->to_request);
+
+ if (queue->client->set.debug) {
+ http_client_queue_debug(queue,
+ "Set request timeout to %s.%03lu (now: %s.%03lu)",
+ t_strflocaltime("%Y-%m-%d %H:%M:%S", time->tv_sec),
+ ((unsigned long)time->tv_usec)/1000,
+ t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
+ ((unsigned long)ioloop_timeval.tv_usec)/1000);
+ }
+
+ /* set timer */
+ queue->to_request = timeout_add_absolute
+ (time, http_client_queue_request_timeout, queue);
+}
+
+static int
+http_client_queue_request_timeout_cmp(struct http_client_request *const *req1,
+ struct http_client_request *const *req2)
+{
+ int ret;
+
+ /* 0 means no timeout */
+ if ((*req1)->timeout_time.tv_sec == 0) {
+ if ((*req2)->timeout_time.tv_sec == 0) {
+ /* sort by age */
+ if ((ret=timeval_cmp(&(*req1)->submit_time, &(*req2)->submit_time)) != 0)
+ return ret;
+
+ } else {
+ return 1;
+ }
+ } else if ((*req2)->timeout_time.tv_sec == 0) {
+ return -1;
+
+ /* sort by timeout */
+ } else if
+ ((ret=timeval_cmp(&(*req1)->timeout_time, &(*req2)->timeout_time)) != 0) {
+ return ret;
+ }
+
+ /* sort by minumum attempts for fairness */
+ return ((*req2)->attempts - (*req1)->attempts);
+}
+
static void http_client_queue_submit_now(struct http_client_queue *queue,
struct http_client_request *req)
{
+ ARRAY_TYPE(http_client_request) *req_queue;
+
req->release_time.tv_sec = 0;
req->release_time.tv_usec = 0;
if (req->urgent)
- array_insert(&queue->request_queue, 0, &req, 1);
+ req_queue = &queue->queued_urgent_requests;
else
- array_append(&queue->request_queue, &req, 1);
+ req_queue = &queue->queued_requests;
+
+ /* enqueue */
+ if (req->timeout_time.tv_sec == 0) {
+ /* no timeout; enqueue at end */
+ array_append(req_queue, &req, 1);
+
+ } else if (timeval_diff_msecs(&req->timeout_time, &ioloop_timeval) <= 1) {
+ /* pretty much already timed out; don't bother */
+
+ } else {
+ unsigned int insert_idx;
+
+ /* keep transmission queue sorted earliest timeout first */
+ (void)array_bsearch_insert_pos(req_queue,
+ &req, http_client_queue_request_timeout_cmp, &insert_idx);
+ array_insert(req_queue, insert_idx, &req, 1);
+ }
}
+/*
+ * Delayed request queue
+ */
+
static void
http_client_queue_delay_timeout(struct http_client_queue *queue)
{
io_loop_time_refresh();
finished = 0;
- reqs = array_get(&queue->delayed_request_queue, &count);
+ reqs = array_get(&queue->delayed_requests, &count);
for (i = 0; i < count; i++) {
if (timeval_cmp_margin(&reqs[i]->release_time,
&ioloop_timeval, TIMEOUT_CMP_MARGIN_USECS) > 0) {
if (i < count) {
http_client_queue_set_delay_timer(queue, reqs[i]->release_time);
}
- array_delete(&queue->delayed_request_queue, 0, finished);
+ array_delete(&queue->delayed_requests, 0, finished);
http_client_queue_connection_setup(queue);
}
return timeval_cmp(&(*req1)->release_time, &(*req2)->release_time);
}
+/*
+ * Request submission
+ */
+
void http_client_queue_submit_request(struct http_client_queue *queue,
struct http_client_request *req)
{
unsigned int insert_idx;
+ if (req->queue != NULL)
+ http_client_queue_drop_request(req->queue, req);
req->queue = queue;
+ /* check delay vs timeout */
+ if (req->release_time.tv_sec > 0 && req->timeout_time.tv_sec > 0 &&
+ timeval_cmp_margin(&req->release_time,
+ &req->timeout_time, TIMEOUT_CMP_MARGIN_USECS) >= 0) {
+ /* release time is later than absolute timeout */
+ req->release_time.tv_sec = 0;
+ req->release_time.tv_usec = 0;
+
+ /* timeout rightaway */
+ req->timeout_time = ioloop_timeval;
+
+ http_client_queue_debug(queue,
+ "Delayed request %s%s already timed out",
+ http_client_request_label(req),
+ (req->urgent ? " (urgent)" : ""));
+ }
+
+ /* add to main request list */
+ if (req->timeout_time.tv_sec == 0) {
+ /* no timeout; just append */
+ array_append(&queue->requests, &req, 1);
+
+ } else {
+ unsigned int insert_idx;
+
+ /* keep main request list sorted earliest timeout first */
+ (void)array_bsearch_insert_pos(&queue->requests,
+ &req, http_client_queue_request_timeout_cmp, &insert_idx);
+ array_insert(&queue->requests, insert_idx, &req, 1);
+
+ /* now first in queue; update timer */
+ if (insert_idx == 0)
+ http_client_queue_set_request_timer(queue, &req->timeout_time);
+ }
+
+ /* handle delay */
if (req->release_time.tv_sec > 0) {
io_loop_time_refresh();
if (timeval_cmp_margin(&req->release_time,
&ioloop_timeval, TIMEOUT_CMP_MARGIN_USECS) > 0) {
- (void)array_bsearch_insert_pos(&queue->delayed_request_queue,
+ (void)array_bsearch_insert_pos(&queue->delayed_requests,
&req, http_client_queue_delayed_cmp, &insert_idx);
- array_insert(&queue->delayed_request_queue, insert_idx, &req, 1);
+ array_insert(&queue->delayed_requests, insert_idx, &req, 1);
if (insert_idx == 0)
http_client_queue_set_delay_timer(queue, req->release_time);
return;
http_client_queue_submit_now(queue, req);
}
+/*
+ * Request retrieval
+ */
+
struct http_client_request *
http_client_queue_claim_request(struct http_client_queue *queue,
const struct http_client_peer_addr *addr, bool no_urgent)
struct http_client_request *req;
unsigned int i, count;
- requests = array_get(&queue->request_queue, &count);
+ count = 0;
+ if (!no_urgent)
+ requests = array_get(&queue->queued_urgent_requests, &count);
+
+ if (count == 0)
+ requests = array_get(&queue->queued_requests, &count);
if (count == 0)
return NULL;
i = 0;
- if (requests[0]->urgent && no_urgent) {
- for (; requests[i]->urgent; i++) {
- if (i == count)
- return NULL;
- }
- }
req = requests[i];
- array_delete(&queue->request_queue, i, 1);
+ if (req->urgent)
+ array_delete(&queue->queued_urgent_requests, i, 1);
+ else
+ array_delete(&queue->queued_requests, i, 1);
http_client_queue_debug(queue,
"Connection to peer %s claimed request %s %s",
http_client_queue_requests_pending(struct http_client_queue *queue,
unsigned int *num_urgent_r)
{
- struct http_client_request *const *requests;
- unsigned int count, i;
-
- *num_urgent_r = 0;
+ unsigned int urg_count = array_count(&queue->queued_urgent_requests);
- requests = array_get(&queue->request_queue, &count);
- for (i = 0; i < count; i++) {
- if (requests[i]->urgent)
- (*num_urgent_r)++;
- else
- break;
- }
- return count;
+ if (num_urgent_r != NULL)
+ *num_urgent_r = urg_count;
+ return array_count(&queue->queued_requests) + urg_count;
}
+/*
+ * ioloop
+ */
+
void http_client_queue_switch_ioloop(struct http_client_queue *queue)
{
if (queue->to_connect != NULL)
queue->to_connect = io_loop_move_timeout(&queue->to_connect);
+ if (queue->to_request != NULL)
+ queue->to_request = io_loop_move_timeout(&queue->to_request);
if (queue->to_delayed != NULL)
queue->to_delayed = io_loop_move_timeout(&queue->to_delayed);
}
if (--req->refcount > 0)
return;
+ /* cannot be destroyed while it is still pending */
+ i_assert(req->conn == NULL || req->conn->pending_request == NULL);
+
+ if (req->queue != NULL)
+ http_client_queue_drop_request(req->queue, req);
+
if (req->destroy_callback != NULL) {
req->destroy_callback(req->destroy_context);
req->destroy_callback = NULL;
req->payload_sync = TRUE;
}
+void http_client_request_set_timeout_msecs(struct http_client_request *req,
+ unsigned int msecs)
+{
+ i_assert(req->state == HTTP_REQUEST_STATE_NEW ||
+ req->state == HTTP_REQUEST_STATE_GOT_RESPONSE);
+
+ req->timeout_msecs = msecs;
+}
+
+void http_client_request_set_timeout(struct http_client_request *req,
+ const struct timeval *time)
+{
+ i_assert(req->state == HTTP_REQUEST_STATE_NEW ||
+ req->state == HTTP_REQUEST_STATE_GOT_RESPONSE);
+
+ req->timeout_time = *time;
+ req->timeout_msecs = 0;
+}
+
void http_client_request_delay_until(struct http_client_request *req,
time_t time)
{
req->urgent = TRUE;
}
+ if (req->timeout_time.tv_sec == 0) {
+ if (req->timeout_msecs > 0) {
+ req->timeout_time = ioloop_timeval;
+ timeval_add_msecs(&req->timeout_time, req->timeout_msecs);
+ } else if ( client->set.request_absolute_timeout_msecs > 0) {
+ req->timeout_time = ioloop_timeval;
+ timeval_add_msecs(&req->timeout_time, client->set.request_absolute_timeout_msecs);
+ }
+ }
+
host = http_client_host_get(req->client, req->host_url);
req->state = HTTP_REQUEST_STATE_QUEUED;
unsigned int status, const char *error)
{
http_client_request_callback_t *callback;
+ bool sending = (req->state == HTTP_REQUEST_STATE_PAYLOAD_OUT);
if (req->state >= HTTP_REQUEST_STATE_FINISHED)
return;
http_response_init(&response, status, error);
(void)callback(&response, req->context);
- /* release payload early (prevents server/client in proxy) */
- if (req->payload_input != NULL)
+ /* release payload early (prevents server/client deadlock in proxy) */
+ if (!sending && req->payload_input != NULL)
i_stream_unref(&req->payload_input);
}
}
{
struct http_client_request *req = *_req;
+ if (req->state >= HTTP_REQUEST_STATE_FINISHED)
+ return;
+
i_assert(req->delayed_error != NULL && req->delayed_error_status != 0);
http_client_request_send_error(req, req->delayed_error_status,
req->delayed_error);
+ if (req->queue != NULL)
+ http_client_queue_drop_request(req->queue, req);
http_client_request_unref(_req);
}
void http_client_request_error(struct http_client_request *req,
unsigned int status, const char *error)
{
- if (!req->submitted && req->state < HTTP_REQUEST_STATE_FINISHED) {
+ if (req->state >= HTTP_REQUEST_STATE_FINISHED)
+ return;
+
+ if (!req->submitted) {
/* we're still in http_client_request_submit(). delay
reporting the error, so the caller doesn't have to handle
immediate callbacks. */
http_client_host_delay_request_error(req->host, req);
} else {
http_client_request_send_error(req, status, error);
+ if (req->queue != NULL)
+ http_client_queue_drop_request(req->queue, req);
http_client_request_unref(&req);
}
}
void http_client_request_abort(struct http_client_request **_req)
{
struct http_client_request *req = *_req;
+ bool sending = (req->state == HTTP_REQUEST_STATE_PAYLOAD_OUT);
if (req->state >= HTTP_REQUEST_STATE_FINISHED)
return;
+
req->callback = NULL;
req->state = HTTP_REQUEST_STATE_ABORTED;
+
+ /* release payload early (prevents server/client deadlock in proxy) */
+ if (!sending && req->payload_input != NULL)
+ i_stream_unref(&req->payload_input);
+
if (req->queue != NULL)
http_client_queue_drop_request(req->queue, req);
http_client_request_unref(_req);
req->callback = NULL;
req->state = HTTP_REQUEST_STATE_FINISHED;
+ if (req->queue != NULL)
+ http_client_queue_drop_request(req->queue, req);
if (req->payload_wait && req->client->ioloop != NULL)
io_loop_stop(req->client->ioloop);
http_client_request_unref(_req);
}
req->host = NULL;
- req->queue = NULL;
req->conn = NULL;
origin_url = http_url_create(&req->origin_url);