It now supports both blocking and non-blocking behavior.
http-client-queue.c \
http-client-host.c \
http-client.c \
+ http-server-ostream.c \
http-server-response.c \
http-server-request.c \
http-server-connection.c \
--- /dev/null
+/* Copyright (c) 2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "dns-lookup.h"
+#include "ostream-wrapper.h"
+
+#include "http-server-private.h"
+
+/*
+ * Payload output stream
+ */
+
+struct http_server_ostream {
+ struct wrapper_ostream wostream;
+
+ struct http_server_connection *conn;
+ struct http_server_response *resp;
+
+ bool response_destroyed:1;
+};
+
+static void http_server_ostream_output_error(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+
+ if (hsostream->response_destroyed)
+ return;
+
+ i_assert(hsostream->resp != NULL);
+ http_server_connection_handle_output_error(conn);
+}
+
+static void http_server_ostream_output_start(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_response *resp = hsostream->resp;
+
+ i_assert(hsostream->response_destroyed || resp != NULL);
+
+ if (!hsostream->response_destroyed &&
+ resp->request->state <= HTTP_SERVER_REQUEST_STATE_PROCESSING) {
+ /* implicitly submit the request */
+ http_server_response_submit(resp);
+ }
+}
+
+void http_server_ostream_output_available(
+ struct http_server_ostream *hsostream)
+{
+ struct http_server_response *resp = hsostream->resp;
+
+ i_assert(resp != NULL);
+ i_assert(!hsostream->response_destroyed);
+ wrapper_ostream_output_available(&hsostream->wostream,
+ resp->payload_output);
+}
+
+static bool http_server_ostream_output_ready(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_response *resp = hsostream->resp;
+
+ i_assert(resp != NULL);
+ i_assert(!hsostream->response_destroyed);
+ return (resp->request->state >= HTTP_SERVER_REQUEST_STATE_PAYLOAD_OUT);
+}
+
+static int http_server_ostream_output_finish(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_response *resp = hsostream->resp;
+
+ i_assert(resp != NULL);
+ i_assert(!hsostream->response_destroyed);
+
+ e_debug(wostream->event, "Finished response payload stream");
+
+ /* finished sending payload */
+ return http_server_response_finish_payload_out(resp);
+}
+
+static void http_server_ostream_output_halt(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+ struct http_server_response *resp = hsostream->resp;
+
+ i_assert(hsostream->response_destroyed || resp != NULL);
+
+ if (hsostream->response_destroyed ||
+ resp->request->state < HTTP_SERVER_REQUEST_STATE_PAYLOAD_OUT)
+ return;
+
+ http_server_connection_output_halt(conn);
+}
+
+static void http_server_ostream_output_resume(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+
+ if (hsostream->response_destroyed)
+ return;
+ i_assert(hsostream->resp != NULL);
+
+ http_server_connection_output_resume(conn);
+}
+
+static void
+http_server_ostream_output_update_timeouts(struct wrapper_ostream *wostream,
+ bool sender_blocking)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+
+ if (hsostream->response_destroyed)
+ return;
+ i_assert(hsostream->resp != NULL);
+
+ if (sender_blocking) {
+ http_server_connection_stop_idle_timeout(conn);
+ return;
+ }
+
+ http_server_connection_start_idle_timeout(conn);
+}
+
+static void http_server_ostream_close(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_response *resp = hsostream->resp;
+
+ e_debug(wostream->event, "Response payload stream closed");
+
+ if (hsostream->response_destroyed) {
+ http_server_response_unref(&hsostream->resp);
+ return;
+ }
+ hsostream->response_destroyed = TRUE;
+
+ i_assert(resp != NULL);
+ (void)http_server_response_finish_payload_out(resp);
+ resp->payload_stream = NULL;
+ http_server_response_unref(&hsostream->resp);
+}
+
+static void http_server_ostream_destroy(struct wrapper_ostream *wostream)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_response *resp = hsostream->resp;
+ struct http_server_request *req;
+
+ e_debug(wostream->event, "Response payload stream destroyed");
+
+ if (hsostream->response_destroyed) {
+ http_server_response_unref(&hsostream->resp);
+ return;
+ }
+ hsostream->response_destroyed = TRUE;
+ i_assert(resp != NULL);
+
+ req = resp->request;
+ resp->payload_stream = NULL;
+ http_server_request_abort(
+ &req, "Response output stream destroyed prematurely");
+}
+
+static struct ioloop *
+http_server_ostream_wait_begin(struct wrapper_ostream *wostream,
+ struct ioloop *ioloop)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+ struct ioloop *prev_ioloop;
+
+ i_assert(hsostream->resp != NULL);
+ i_assert(!hsostream->response_destroyed);
+ http_server_connection_ref(conn);
+ prev_ioloop = http_server_connection_switch_ioloop_to(conn, ioloop);
+ return prev_ioloop;
+}
+
+static void
+http_server_ostream_wait_end(struct wrapper_ostream *wostream,
+ struct ioloop *prev_ioloop)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+
+ (void)http_server_connection_switch_ioloop_to(conn, prev_ioloop);
+ http_server_connection_unref(&conn);
+}
+
+void http_server_ostream_continue(struct http_server_ostream *hsostream)
+{
+ struct wrapper_ostream *wostream = &hsostream->wostream;
+ struct http_server_response *resp = hsostream->resp;
+
+ i_assert(hsostream->response_destroyed || resp != NULL);
+
+ i_assert(hsostream->response_destroyed ||
+ resp->request->state >= HTTP_SERVER_REQUEST_STATE_PAYLOAD_OUT);
+
+ wrapper_ostream_continue(wostream);
+}
+
+bool http_server_ostream_get_size(struct http_server_ostream *hsostream,
+ uoff_t *size_r)
+{
+ return wrapper_ostream_get_buffered_size(&hsostream->wostream, size_r);
+}
+
+static void
+http_server_ostream_switch_ioloop_to(struct wrapper_ostream *wostream,
+ struct ioloop *ioloop)
+{
+ struct http_server_ostream *hsostream =
+ (struct http_server_ostream *)wostream;
+ struct http_server_connection *conn = hsostream->conn;
+
+ if (hsostream->response_destroyed)
+ return;
+ i_assert(hsostream->resp != NULL);
+
+ http_server_connection_switch_ioloop_to(conn, ioloop);
+}
+
+struct ostream *
+http_server_ostream_create(struct http_server_response *resp,
+ size_t max_buffer_size, bool blocking)
+{
+ struct http_server_ostream *hsostream;
+
+ i_assert(resp->payload_stream == NULL);
+
+ hsostream = i_new(struct http_server_ostream, 1);
+
+ resp->payload_stream = hsostream;
+ http_server_response_ref(resp);
+ hsostream->conn = resp->request->conn;
+ hsostream->resp = resp;
+
+ hsostream->wostream.output_start = http_server_ostream_output_start;
+ hsostream->wostream.output_ready = http_server_ostream_output_ready;
+ hsostream->wostream.output_error = http_server_ostream_output_error;
+ hsostream->wostream.output_finish = http_server_ostream_output_finish;
+ hsostream->wostream.output_halt = http_server_ostream_output_halt;
+ hsostream->wostream.output_resume = http_server_ostream_output_resume;
+ hsostream->wostream.output_update_timeouts =
+ http_server_ostream_output_update_timeouts;
+
+ hsostream->wostream.wait_begin = http_server_ostream_wait_begin;
+ hsostream->wostream.wait_end = http_server_ostream_wait_end;
+
+ hsostream->wostream.switch_ioloop_to =
+ http_server_ostream_switch_ioloop_to;
+
+ hsostream->wostream.close = http_server_ostream_close;
+ hsostream->wostream.destroy = http_server_ostream_destroy;
+
+ return wrapper_ostream_create(&hsostream->wostream, max_buffer_size,
+ blocking, resp->event);
+}
+
+void http_server_ostream_response_destroyed(
+ struct http_server_ostream *hsostream)
+{
+ i_assert(hsostream->resp != NULL);
+ hsostream->resp->payload_stream = NULL;
+
+ e_debug(hsostream->wostream.event,
+ "Response payload parent stream lost");
+
+ hsostream->response_destroyed = TRUE;
+ wrapper_ostream_output_destroyed(&hsostream->wostream);
+ wrapper_ostream_notify_error(&hsostream->wostream);
+}
+
+struct ostream *
+http_server_ostream_get_output(struct http_server_ostream *hsostream)
+{
+ return &hsostream->wostream.ostream.ostream;
+}
+
+void http_server_ostream_set_error(struct http_server_ostream *hsostream,
+ int stream_errno, const char *stream_error)
+{
+ wrapper_ostream_set_error(&hsostream->wostream, stream_errno,
+ stream_error);
+}
#include "http-server.h"
#include "llist.h"
+struct http_server_ostream;
struct http_server_payload_handler;
struct http_server_request;
struct http_server_connection;
struct istream *payload_input;
uoff_t payload_size, payload_offset;
struct ostream *payload_output;
-
- struct ostream *blocking_output;
+ struct http_server_ostream *payload_stream;
http_server_tunnel_callback_t tunnel_callback;
void *tunnel_context;
bool shutting_down:1; /* shutting down server */
};
+/*
+ * Response output stream
+ */
+
+struct ostream *
+http_server_ostream_create(struct http_server_response *resp,
+ size_t max_buffer_size, bool blocking);
+bool http_server_ostream_get_size(struct http_server_ostream *hsostream,
+ uoff_t *size_r);
+void http_server_ostream_continue(struct http_server_ostream *hsostream);
+
+void http_server_ostream_output_available(
+ struct http_server_ostream *hsostream);
+void http_server_ostream_response_destroyed(
+ struct http_server_ostream *hsostream);
+
+struct ostream *
+http_server_ostream_get_output(struct http_server_ostream *hsostream);
+
+void http_server_ostream_set_error(struct http_server_ostream *hsostream,
+ int stream_errno, const char *stream_error);
+
/*
* Response
*/
void http_server_response_request_free(struct http_server_response *resp);
void http_server_response_request_destroy(struct http_server_response *resp);
+void http_server_response_request_abort(struct http_server_response *resp,
+ const char *reason);
int http_server_response_send(struct http_server_response *resp);
int http_server_response_send_more(struct http_server_response *resp);
else
e_debug(req->event, "Abort: %s", reason);
+ if (req->response != NULL)
+ http_server_response_request_abort(req->response, reason);
+
req->conn = NULL;
if (req->state < HTTP_SERVER_REQUEST_STATE_FINISHED) {
if (conn != NULL) {
req->state = HTTP_SERVER_REQUEST_STATE_ABORTED;
}
- if (req->response != NULL && !req->response->payload_blocking) {
- http_server_response_request_free(req->response);
- req->response = NULL;
- }
-
http_server_request_destroy(_req);
}
e_debug(resp->event, "Free");
i_assert(!resp->payload_blocking);
+ /* Cannot be destroyed while payload output stream still exists */
+ i_assert(resp->payload_stream == NULL);
i_stream_unref(&resp->payload_input);
o_stream_unref(&resp->payload_output);
void http_server_response_request_destroy(struct http_server_response *resp)
{
e_debug(resp->event, "Destroy");
+
+ if (resp->payload_stream != NULL)
+ http_server_ostream_response_destroyed(resp->payload_stream);
+}
+
+void http_server_response_request_abort(struct http_server_response *resp,
+ const char *reason)
+{
+ if (reason == NULL)
+ e_debug(resp->event, "Abort");
+ else
+ e_debug(resp->event, "Abort: %s", reason);
+
+ if (resp->payload_stream != NULL) {
+ http_server_ostream_set_error(resp->payload_stream,
+ EPIPE, reason);
+ }
}
void http_server_response_ref(struct http_server_response *resp)
int ret;
i_assert(!resp->submitted);
- i_assert(resp->blocking_output == NULL);
i_assert(resp->payload_input == NULL);
+ i_assert(resp->payload_stream == NULL);
i_stream_ref(input);
resp->payload_input = input;
struct istream *input;
unsigned char *payload_data;
+ i_assert(!resp->submitted);
+ i_assert(resp->payload_input == NULL);
+ i_assert(resp->payload_stream == NULL);
+
if (size == 0)
return;
i_stream_unref(&input);
}
+struct ostream *
+http_server_response_get_payload_output(struct http_server_response *resp,
+ size_t max_buffer_size, bool blocking)
+{
+ struct http_server_request *req = resp->request;
+ struct http_server_connection *conn = req->conn;
+ struct ostream *output;
+
+ i_assert(conn != NULL);
+ i_assert(!resp->submitted);
+ i_assert(resp->payload_input == NULL);
+ i_assert(resp->payload_stream == NULL);
+
+ output = http_server_ostream_create(resp, max_buffer_size, blocking);
+ o_stream_set_name(output,
+ t_strdup_printf("(conn %s: request %s: %u response payload)",
+ conn->conn.label,
+ http_server_request_label(req), resp->status));
+ return output;
+}
+
void http_server_response_add_auth(struct http_server_response *resp,
const struct http_auth_challenge *chlng)
{
int http_server_response_finish_payload_out(struct http_server_response *resp)
{
- struct http_server_connection *conn = resp->request->conn;
+ struct http_server_request *req = resp->request;
+ struct http_server_connection *conn = req->conn;
int ret;
+ if (req->state >= HTTP_SERVER_REQUEST_STATE_FINISHED)
+ return 1;
+
resp->payload_finished = TRUE;
if (resp->payload_output != NULL) {
struct const_iovec iov;
int ret;
- i_assert(resp->blocking_output == NULL);
+ i_assert(resp->payload_stream == NULL);
resp->payload_corked = TRUE;
struct http_server_response *resp = *_resp;
int ret;
- i_assert(resp->blocking_output == NULL);
+ i_assert(resp->payload_stream == NULL);
*_resp = NULL;
ret = http_server_response_output_payload(&resp, NULL, 0);
enum ostream_send_istream_result res;
int ret = 0;
- i_assert(!resp->payload_blocking);
- i_assert(resp->payload_input != NULL);
i_assert(resp->payload_output != NULL);
- if (resp->payload_finished)
+ if (resp->payload_finished) {
+ e_debug(resp->event, "Finish sending payload (more)");
return http_server_response_finish_payload_out(resp);
+ }
+
+ if (resp->payload_stream != NULL) {
+ conn->output_locked = TRUE;
+ http_server_ostream_continue(resp->payload_stream);
+ return 0;
+ }
+ i_assert(resp->payload_input != NULL);
io_remove(&conn->io_resp_payload);
/* Chunked ostream needs to write to the parent stream's buffer */
if (ret != 0) {
/* Finished sending payload (or error) */
+ e_debug(resp->event, "Finish sending payload");
if (http_server_response_finish_payload_out(resp) < 0)
return -1;
}
content_length = resp->payload_size;
send_content_length = TRUE;
}
+ } else if (resp->payload_stream != NULL) {
+ /* HTTP payload output stream */
+ if (!http_server_ostream_get_size(resp->payload_stream,
+ &content_length)) {
+ /* size not known at this point */
+ chunked = TRUE;
+ } else {
+ /* output stream already finished, so data is
+ pre-buffered */
+ send_content_length = TRUE;
+ }
} else if (resp->tunnel_callback == NULL && resp->status / 100 != 1 &&
resp->status != 204 && resp->status != 304 && !is_head) {
/* RFC 7230, Section 3.3: Message Body
if (is_head) {
e_debug(resp->event, "A HEAD response has no payload");
} else if (chunked) {
- i_assert(resp->payload_input != NULL || resp->payload_direct);
+ i_assert(resp->payload_input != NULL || resp->payload_direct ||
+ resp->payload_stream != NULL);
e_debug(resp->event, "Will send payload in chunks");
http_transfer_chunked_ostream_create(conn->conn.output);
} else if (send_content_length) {
i_assert(resp->payload_input != NULL || content_length == 0 ||
- resp->payload_direct);
+ resp->payload_stream != NULL || resp->payload_direct);
e_debug(resp->event,
"Will send payload with explicit size %"PRIuUOFF_T,
e_debug(resp->event, "Sent header");
+ if (resp->payload_stream != NULL)
+ http_server_ostream_output_available(resp->payload_stream);
if (resp->payload_blocking) {
/* Blocking payload */
conn->output_locked = TRUE;
return -1;
} else {
/* No payload to send */
+ e_debug(resp->event, "No payload to send");
+ if (resp->payload_stream != NULL)
+ http_server_ostream_continue(resp->payload_stream);
conn->output_locked = FALSE;
http_server_response_finish_payload_out(resp);
}
return ret;
}
-/*
- * Payload output stream
- */
-
-struct http_server_ostream {
- struct ostream_private ostream;
-
- struct http_server_response *resp;
-};
-
-static ssize_t
-http_server_ostream_sendv(struct ostream_private *stream,
- const struct const_iovec *iov, unsigned int iov_count)
-{
- struct http_server_ostream *hsostream =
- (struct http_server_ostream *)stream;
- unsigned int i;
- ssize_t ret;
-
- if (http_server_response_output_payload(&hsostream->resp,
- iov, iov_count) < 0) {
- if (stream->parent->stream_errno != 0) {
- o_stream_copy_error_from_parent(stream);
- } else {
- io_stream_set_error(
- &stream->iostream,
- "HTTP connection broke while sending payload");
- stream->ostream.stream_errno = EIO;
- }
- return -1;
- }
-
- ret = 0;
- for (i = 0; i < iov_count; i++)
- ret += iov[i].iov_len;
- stream->ostream.offset += ret;
- return ret;
-}
-
-static void
-http_server_ostream_close(struct iostream_private *stream,
- bool close_parent ATTR_UNUSED)
-{
- struct http_server_ostream *hsostream =
- (struct http_server_ostream *)stream;
- struct ostream_private *ostream = &hsostream->ostream;
-
- if (hsostream->resp == NULL)
- return;
- hsostream->resp->blocking_output = NULL;
-
- if (http_server_response_output_payload(
- &hsostream->resp, NULL, 0) < 0) {
- if (ostream->parent->stream_errno != 0) {
- o_stream_copy_error_from_parent(ostream);
- } else {
- io_stream_set_error(
- &ostream->iostream,
- "HTTP connection broke while sending payload");
- ostream->ostream.stream_errno = EIO;
- }
- }
- hsostream->resp = NULL;
-}
-
-static void http_server_ostream_destroy(struct iostream_private *stream)
-{
- struct http_server_ostream *hsostream =
- (struct http_server_ostream *)stream;
-
- if (hsostream->resp != NULL) {
- hsostream->resp->blocking_output = NULL;
- http_server_response_abort_payload(&hsostream->resp);
- }
-}
-
-struct ostream *
-http_server_response_get_payload_output(struct http_server_response *resp,
- bool blocking)
-{
- struct http_server_connection *conn = resp->request->conn;
- struct http_server_ostream *hsostream;
-
- i_assert(resp->payload_input == NULL);
- i_assert(resp->blocking_output == NULL);
-
- i_assert(blocking == TRUE); // FIXME: support non-blocking
-
- hsostream = i_new(struct http_server_ostream, 1);
- hsostream->ostream.sendv = http_server_ostream_sendv;
- hsostream->ostream.iostream.close = http_server_ostream_close;
- hsostream->ostream.iostream.destroy = http_server_ostream_destroy;
- hsostream->resp = resp;
-
- resp->blocking_output = o_stream_create(&hsostream->ostream,
- conn->conn.output, -1);
- return resp->blocking_output;
-}
-
void http_server_response_get_status(struct http_server_response *resp,
int *status_r, const char **reason_r)
{
const unsigned char *data,
size_t size);
-/* Obtain an output stream for the response payload. This is an alternative to
- using http_server_response_set_payload(). Currently, this can only return a
- blocking output stream. The request is submitted implicitly once the output
- stream is written to. Closing the stream concludes the payload. Destroying
- the stream before that aborts the response and closes the connection.
- */
+/* Get an output stream for the outgoing payload of this response. The output
+ stream operates asynchronously when blocking is FALSE. In that case the
+ flush callback is called once more data can be sent. When blocking is TRUE,
+ writing to the stream will block until all data is sent. In every respect,
+ it operates very similar to a normal file output stream. The response is
+ submitted implicitly when the stream is first used; e.g., when it is written,
+ flushed, or o_stream_set_flush_pending(ostream, TRUE) is called. */
struct ostream *
http_server_response_get_payload_output(struct http_server_response *resp,
- bool blocking);
+ size_t max_buffer_size, bool blocking);
/* Get the status code and reason string currently set for this response. */
void http_server_response_get_status(struct http_server_response *resp,
http_server_response_add_header(resp, "Content-Type", "text/plain");
if (tset.server_blocking) {
- output = http_server_response_get_payload_output(resp, TRUE);
+ output = http_server_response_get_payload_output(
+ resp, IO_BLOCK_SIZE, TRUE);
ret = 0;
switch (o_stream_send_istream(output, fstream)) {
struct ostream *payload_output;
int ret;
- payload_output = http_server_response_get_payload_output(resp, TRUE);
+ payload_output = http_server_response_get_payload_output(
+ resp, IO_BLOCK_SIZE, TRUE);
ret = 0;
switch (o_stream_send_istream(payload_output, input)) {