From: Stephan Bosch Date: Sun, 15 Sep 2013 00:35:04 +0000 (+0300) Subject: lib-http: Added support for asynchronous payload for requests. X-Git-Tag: 2.2.6~85 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a4e186e3ef267fc7a6b592788067c8c9c87d0785;p=thirdparty%2Fdovecot%2Fcore.git lib-http: Added support for asynchronous payload for requests. This means that the payload stream passed to the request can be a non-blocking socket stream from some other connection (e.g. proxy client connection). --- diff --git a/src/lib-http/http-client-connection.c b/src/lib-http/http-client-connection.c index be90822b63..8f4ad1707b 100644 --- a/src/lib-http/http-client-connection.c +++ b/src/lib-http/http-client-connection.c @@ -655,7 +655,7 @@ static void http_client_connection_input(struct connection *_conn) } } -static int http_client_connection_output(struct http_client_connection *conn) +int http_client_connection_output(struct http_client_connection *conn) { struct http_client_request *const *req_idx, *req; struct ostream *output = conn->conn.output; @@ -944,6 +944,8 @@ void http_client_connection_unref(struct http_client_connection **_conn) ssl_iostream_unref(&conn->ssl_iostream); connection_deinit(&conn->conn); + if (conn->io_req_payload != NULL) + io_remove(&conn->io_req_payload); if (conn->to_requests != NULL) timeout_remove(&conn->to_requests); if (conn->to_connect != NULL) @@ -972,6 +974,8 @@ void http_client_connection_unref(struct http_client_connection **_conn) void http_client_connection_switch_ioloop(struct http_client_connection *conn) { + if (conn->io_req_payload != NULL) + conn->io_req_payload = io_loop_move_io(&conn->io_req_payload); if (conn->to_requests != NULL) conn->to_requests = io_loop_move_timeout(&conn->to_requests); if (conn->to_connect != NULL) diff --git a/src/lib-http/http-client-private.h b/src/lib-http/http-client-private.h index 23e942f383..af08416f52 100644 --- a/src/lib-http/http-client-private.h +++ b/src/lib-http/http-client-private.h @@ -161,6 +161,7 @@ struct http_client_connection { struct http_client_request *pending_request; struct istream *incoming_payload; + struct io *io_req_payload; /* requests that have been sent, waiting for response */ ARRAY_TYPE(http_client_request) request_wait_list; @@ -244,6 +245,7 @@ struct http_client_connection * http_client_connection_create(struct http_client_peer *peer); void http_client_connection_ref(struct http_client_connection *conn); void http_client_connection_unref(struct http_client_connection **_conn); +int http_client_connection_output(struct http_client_connection *conn); unsigned int http_client_connection_count_pending(struct http_client_connection *conn); bool http_client_connection_is_ready(struct http_client_connection *conn); diff --git a/src/lib-http/http-client-request.c b/src/lib-http/http-client-request.c index 6bee4b66ee..bcc1decf19 100644 --- a/src/lib-http/http-client-request.c +++ b/src/lib-http/http-client-request.c @@ -232,7 +232,8 @@ http_client_request_finish_payload_out(struct http_client_request *req) } req->state = HTTP_REQUEST_STATE_WAITING; req->conn->output_locked = FALSE; - http_client_request_debug(req, "Sent all payload"); + + http_client_request_debug(req, "Finished sending payload"); } static int @@ -321,16 +322,30 @@ int http_client_request_finish_payload(struct http_client_request **_req) return http_client_request_continue_payload(_req, NULL, 0); } +static void http_client_request_payload_input(struct http_client_request *req) +{ + struct http_client_connection *conn = req->conn; + + if (conn->io_req_payload != NULL) + io_remove(&conn->io_req_payload); + + (void)http_client_connection_output(conn); +} + int http_client_request_send_more(struct http_client_request *req, const char **error_r) { struct http_client_connection *conn = req->conn; struct ostream *output = req->payload_output; off_t ret; + int fd; i_assert(req->payload_input != NULL); i_assert(req->payload_output != NULL); + if (conn->io_req_payload != NULL) + io_remove(&conn->io_req_payload); + /* chunked ostream needs to write to the parent stream's buffer */ o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE); ret = o_stream_send_istream(output, req->payload_input); @@ -340,15 +355,17 @@ int http_client_request_send_more(struct http_client_request *req, errno = req->payload_input->stream_errno; *error_r = t_strdup_printf("read(%s) failed: %m", i_stream_get_name(req->payload_input)); + ret = -1; } else if (output->stream_errno != 0) { errno = output->stream_errno; *error_r = t_strdup_printf("write(%s) failed: %m", o_stream_get_name(output)); + ret = -1; } else { i_assert(ret >= 0); } - if (!i_stream_have_bytes_left(req->payload_input)) { + if (ret < 0 || i_stream_is_eof(req->payload_input)) { if (!req->payload_chunked && req->payload_input->v_offset - req->payload_offset != req->payload_size) { i_error("stream input size changed"); //FIXME @@ -362,11 +379,18 @@ int http_client_request_send_more(struct http_client_request *req, } else { http_client_request_finish_payload_out(req); } - - } else { + } else if (i_stream_get_data_size(req->payload_input) > 0) { + /* output is blocking */ conn->output_locked = TRUE; o_stream_set_flush_pending(output, TRUE); http_client_request_debug(req, "Partially sent payload"); + } else { + /* input is blocking */ + fd = i_stream_get_fd(req->payload_input); + conn->output_locked = TRUE; + i_assert(fd >= 0); + conn->io_req_payload = io_add + (fd, IO_READ, http_client_request_payload_input, req); } return ret < 0 ? -1 : 0; }