]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
o_stream_send_istream() API changed again
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Fri, 20 May 2016 12:16:41 +0000 (15:16 +0300)
committerGitLab <gitlab@git.dovecot.net>
Mon, 30 May 2016 18:31:18 +0000 (21:31 +0300)
The new API enforces the caller to correctly handle all the possible
situations. It also makes in unambiguous whether to wait for input or
output stream.

23 files changed:
src/doveadm/server-connection.c
src/imap/cmd-getmetadata.c
src/imap/cmd-urlfetch.c
src/imap/imap-fetch-body.c
src/lib-fs/fs-api.c
src/lib-fs/ostream-metawrap.c
src/lib-http/http-client-request.c
src/lib-http/http-server-response.c
src/lib-http/test-http-payload.c
src/lib-http/test-http-request-parser.c
src/lib-http/test-http-response-parser.c
src/lib-http/test-http-transfer.c
src/lib-imap-client/imapc-connection.c
src/lib-storage/index/index-storage.c
src/lib/file-copy.c
src/lib/iostream-temp.c
src/lib/ostream-file.c
src/lib/ostream-private.h
src/lib/ostream.c
src/lib/ostream.h
src/lib/test-iostream-temp.c
src/lib/test-ostream-file.c
src/plugins/mail-filter/istream-ext-filter.c

index 9f048f6f5a0c026440fb87f1d39b7e67563691fa..0b44b5177d2bfb0d51293d2086b2932b362ecc3e 100644 (file)
@@ -83,26 +83,40 @@ static void print_connection_released(void)
 
 static int server_connection_send_cmd_input_more(struct server_connection *conn)
 {
-       int ret;
+       enum ostream_send_istream_result res;
+       int ret = -1;
 
        /* ostream-dot writes only up to max buffer size, so keep it non-zero */
        o_stream_set_max_buffer_size(conn->cmd_output, IO_BLOCK_SIZE);
-       ret = o_stream_send_istream(conn->cmd_output, conn->cmd_input);
+       res = o_stream_send_istream(conn->cmd_output, conn->cmd_input);
        o_stream_set_max_buffer_size(conn->cmd_output, (size_t)-1);
 
-       if (ret == 0) {
-               o_stream_set_flush_pending(conn->cmd_output, TRUE);
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               return 1;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
                return 0;
-       }
-       if (conn->cmd_input->stream_errno != 0) {
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                i_error("read(%s) failed: %s",
                        i_stream_get_name(conn->cmd_input),
                        i_stream_get_error(conn->cmd_input));
-       } else if (conn->cmd_output->stream_errno != 0 ||
-                  o_stream_flush(conn->cmd_output) < 0) {
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
                i_error("write(%s) failed: %s",
                        o_stream_get_name(conn->cmd_output),
                        o_stream_get_error(conn->cmd_output));
+               break;
+       }
+       if (res == OSTREAM_SEND_ISTREAM_RESULT_FINISHED) {
+               if ((ret = o_stream_flush(conn->cmd_output)) == 0)
+                       return 0;
+               else if (ret < 0) {
+                       i_error("write(%s) failed: %s",
+                               o_stream_get_name(conn->cmd_output),
+                               o_stream_get_error(conn->cmd_output));
+               }
        }
 
        i_stream_destroy(&conn->cmd_input);
index 891e08f04b62527ff58cb5fe680766efb4337cdf..eb350e12c09364d8f2555445f6dcf07c9d3e71b0 100644 (file)
@@ -211,25 +211,31 @@ static void cmd_getmetadata_send_entry(struct imap_getmetadata_context *ctx,
 static bool
 cmd_getmetadata_stream_continue(struct imap_getmetadata_context *ctx)
 {
-       int ret;
+       enum ostream_send_istream_result res;
 
        o_stream_set_max_buffer_size(ctx->cmd->client->output, 0);
-       ret = o_stream_send_istream(ctx->cmd->client->output, ctx->cur_stream);
+       res = o_stream_send_istream(ctx->cmd->client->output, ctx->cur_stream);
        o_stream_set_max_buffer_size(ctx->cmd->client->output, (size_t)-1);
 
-       if (ret > 0) {
-               /* finished */
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
                return TRUE;
-       } else if (ret < 0) {
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               return FALSE;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                i_error("read(%s) failed: %s",
                        i_stream_get_name(ctx->cur_stream),
                        i_stream_get_error(ctx->cur_stream));
                client_disconnect(ctx->cmd->client,
                                  "Internal GETMETADATA failure");
                return TRUE;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               /* client disconnected */
+               return TRUE;
        }
-       o_stream_set_flush_pending(ctx->cmd->client->output, TRUE);
-       return FALSE;
+       i_unreached();
 }
 
 static int
index 611892388d676480d0307ba0d1157119c7fd0314..50299f2478d24a3272cbb90a9f4bdd408c5548f1 100644 (file)
@@ -83,6 +83,7 @@ static int cmd_urlfetch_transfer_literal(struct client_command_context *cmd)
        struct client *client = cmd->client;
        struct cmd_urlfetch_context *ctx =
                (struct cmd_urlfetch_context *)cmd->context;
+       enum ostream_send_istream_result res;
        int ret;
 
        /* are we in the middle of an urlfetch literal? */
@@ -98,27 +99,28 @@ static int cmd_urlfetch_transfer_literal(struct client_command_context *cmd)
 
        /* transfer literal to client */
        o_stream_set_max_buffer_size(client->output, 0);
-       ret = o_stream_send_istream(client->output, ctx->input);
+       res = o_stream_send_istream(client->output, ctx->input);
        o_stream_set_max_buffer_size(client->output, (size_t)-1);
 
-       if (ret > 0) {
-               /* finished successfully */
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
                i_stream_unref(&ctx->input);
                return 1;
-       }
-       if (client->output->closed) {
-               /* client disconnected */
-               return -1;
-       }
-       if (ctx->input->stream_errno != 0) {
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               return 0;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                i_error("read(%s) failed: %s (URLFETCH)",
                        i_stream_get_name(ctx->input),
                        i_stream_get_error(ctx->input));
                client_disconnect(client, "URLFETCH failed");
                return -1;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               /* client disconnected */
+               return -1;
        }
-       o_stream_set_flush_pending(client->output, TRUE);
-       return 0;
+       i_unreached();
 }
 
 static bool cmd_urlfetch_continue(struct client_command_context *cmd)
index 0a8aafef1712c6e6f6d64efabac4481f5aa2070b..0cb0bd2b6705faa8564e9020d5042d7c7a44852a 100644 (file)
@@ -93,10 +93,10 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx)
        struct imap_fetch_state *state = &ctx->state;
        const char *disconnect_reason;
        uoff_t orig_input_offset = state->cur_input->v_offset;
-       int ret;
+       enum ostream_send_istream_result res;
 
        o_stream_set_max_buffer_size(ctx->client->output, 0);
-       ret = o_stream_send_istream(ctx->client->output, state->cur_input);
+       res = o_stream_send_istream(ctx->client->output, state->cur_input);
        o_stream_set_max_buffer_size(ctx->client->output, (size_t)-1);
 
        if (ctx->state.cur_stats_sizep != NULL) {
@@ -104,14 +104,9 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx)
                        state->cur_input->v_offset - orig_input_offset;
        }
 
-       if (state->cur_input->v_offset != state->cur_size) {
-               /* unfinished */
-               if (state->cur_input->stream_errno != 0) {
-                       fetch_read_error(ctx, &disconnect_reason);
-                       client_disconnect(ctx->client, disconnect_reason);
-                       return -1;
-               }
-               if (!i_stream_have_bytes_left(state->cur_input)) {
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               if (state->cur_input->v_offset != state->cur_size) {
                        /* Input stream gave less data than expected */
                        mail_set_cache_corrupted_reason(state->cur_mail,
                                state->cur_size_field, t_strdup_printf(
@@ -123,15 +118,20 @@ static int fetch_stream_continue(struct imap_fetch_context *ctx)
                        client_disconnect(ctx->client, "FETCH failed");
                        return -1;
                }
-               if (ret < 0) {
-                       /* client probably disconnected */
-                       return -1;
-               }
-
-               o_stream_set_flush_pending(ctx->client->output, TRUE);
+               return 1;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
                return 0;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+               fetch_read_error(ctx, &disconnect_reason);
+               client_disconnect(ctx->client, disconnect_reason);
+               return -1;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               /* client disconnected */
+               return -1;
        }
-       return 1;
+       i_unreached();
 }
 
 static const char *
index 785a32212a415eed946d17d2f311f969183d0ce8..f124b2b1026c3e2e260dfe2efe9560336c6fa2c2 100644 (file)
@@ -831,11 +831,14 @@ int fs_default_copy(struct fs_file *src, struct fs_file *dest)
                dest->copy_input = fs_read_stream(src, IO_BLOCK_SIZE);
                dest->copy_output = fs_write_stream(dest);
        }
-       if (o_stream_send_istream(dest->copy_output, dest->copy_input) == 0) {
+       switch (o_stream_send_istream(dest->copy_output, dest->copy_input)) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
                fs_set_error_async(dest->fs);
                return -1;
-       }
-       if (dest->copy_input->stream_errno != 0) {
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                errno = dest->copy_input->stream_errno;
                fs_set_error(dest->fs, "read(%s) failed: %s",
                             i_stream_get_name(dest->copy_input),
@@ -843,8 +846,7 @@ int fs_default_copy(struct fs_file *src, struct fs_file *dest)
                i_stream_unref(&dest->copy_input);
                fs_write_stream_abort(dest, &dest->copy_output);
                return -1;
-       }
-       if (dest->copy_output->stream_errno != 0) {
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
                errno = dest->copy_output->stream_errno;
                fs_set_error(dest->fs, "write(%s) failed: %s",
                             o_stream_get_name(dest->copy_output),
index 08cd4caed34dd03dcad117bbbbcfdcaedda52076..1cde1b983a4e791a4f75663a2f568ddd20355c55 100644 (file)
@@ -38,20 +38,20 @@ o_stream_metawrap_sendv(struct ostream_private *stream,
        return ret;
 }
 
-static int
+static enum ostream_send_istream_result
 o_stream_metawrap_send_istream(struct ostream_private *_outstream,
                               struct istream *instream)
 {
        struct metawrap_ostream *outstream =
                (struct metawrap_ostream *)_outstream;
        uoff_t orig_instream_offset = instream->v_offset;
-       int ret;
+       enum ostream_send_istream_result res;
 
        o_stream_metawrap_call_callback(outstream);
-       if ((ret = o_stream_send_istream(_outstream->parent, instream)) < 0)
+       if ((res = o_stream_send_istream(_outstream->parent, instream)) == OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT)
                o_stream_copy_error_from_parent(_outstream);
        _outstream->ostream.offset += instream->v_offset - orig_instream_offset;
-       return ret;
+       return res;
 }
 
 struct ostream *
index 25bc3d6a7ab6a8e2dcc7f6d7ea5c24eeb4987523..5f2b4cf67e101df17894543b9f5c5f69cf557701 100644 (file)
@@ -833,7 +833,7 @@ int http_client_request_send_more(struct http_client_request *req,
 {
        struct http_client_connection *conn = req->conn;
        struct ostream *output = req->payload_output;
-       int ret;
+       enum ostream_send_istream_result res;
 
        i_assert(req->payload_input != NULL);
        i_assert(req->payload_output != NULL);
@@ -843,31 +843,11 @@ int http_client_request_send_more(struct http_client_request *req,
 
        /* chunked ostream needs to write to the parent stream's buffer */
        o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE);
-       ret = o_stream_send_istream(output, req->payload_input);
+       res = o_stream_send_istream(output, req->payload_input);
        o_stream_set_max_buffer_size(output, (size_t)-1);
 
-       if (req->payload_input->stream_errno != 0) {
-               /* we're in the middle of sending a request, so the connection
-                  will also have to be aborted */
-               *error_r = t_strdup_printf("read(%s) failed: %s",
-                                          i_stream_get_name(req->payload_input),
-                                          i_stream_get_error(req->payload_input));
-               
-               /* the payload stream assigned to this request is broken,
-                  fail this the request immediately */
-               http_client_request_error(&req,
-                       HTTP_CLIENT_REQUEST_ERROR_BROKEN_PAYLOAD,
-                       "Broken payload stream");
-               return -1;
-       } else if (output->stream_errno != 0) {
-               /* failed to send request */
-               *error_r = t_strdup_printf("write(%s) failed: %s",
-                                          o_stream_get_name(output),
-                                          o_stream_get_error(output));
-               return -1;
-       }
-
-       if (ret > 0) {
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
                /* finished sending */
                if (!req->payload_chunked &&
                    req->payload_input->v_offset - req->payload_offset != req->payload_size) {
@@ -891,22 +871,43 @@ int http_client_request_send_more(struct http_client_request *req,
                        /* finished sending payload */
                        http_client_request_finish_payload_out(req);
                }
-       } else if (i_stream_have_bytes_left(req->payload_input)) {
-               /* output is blocking (server needs to act; enable timeout) */
-               conn->output_locked = TRUE;
-               if (!pipelined)
-                       http_client_connection_start_request_timeout(conn);
-               o_stream_set_flush_pending(output, TRUE);
-               http_client_request_debug(req, "Partially sent payload");
-       } else {
+               return 0;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
                /* input is blocking (client needs to act; disable timeout) */
                conn->output_locked = TRUE;     
                if (!pipelined)
                        http_client_connection_stop_request_timeout(conn);
                conn->io_req_payload = io_add_istream(req->payload_input,
                        http_client_request_payload_input, req);
+               return 0;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               /* output is blocking (server needs to act; enable timeout) */
+               conn->output_locked = TRUE;
+               if (!pipelined)
+                       http_client_connection_start_request_timeout(conn);
+               http_client_request_debug(req, "Partially sent payload");
+               return 0;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+               /* we're in the middle of sending a request, so the connection
+                  will also have to be aborted */
+               *error_r = t_strdup_printf("read(%s) failed: %s",
+                                          i_stream_get_name(req->payload_input),
+                                          i_stream_get_error(req->payload_input));
+
+               /* the payload stream assigned to this request is broken,
+                  fail this the request immediately */
+               http_client_request_error(&req,
+                       HTTP_CLIENT_REQUEST_ERROR_BROKEN_PAYLOAD,
+                       "Broken payload stream");
+               return -1;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               /* failed to send request */
+               *error_r = t_strdup_printf("write(%s) failed: %s",
+                                          o_stream_get_name(output),
+                                          o_stream_get_error(output));
+               return -1;
        }
-       return 0;
+       i_unreached();
 }
 
 static int http_client_request_send_real(struct http_client_request *req,
index d9ffe98e5373aecf8d96064545125be46dab737d..d05eb3bc32de045ce14f1dbefbcbcfb4c7a8e83a 100644 (file)
@@ -481,7 +481,8 @@ int http_server_response_send_more(struct http_server_response *resp,
 {
        struct http_server_connection *conn = resp->request->conn;
        struct ostream *output = resp->payload_output;
-       off_t ret;
+       enum ostream_send_istream_result res;
+       int ret = 0;
 
        *error_r = NULL;
 
@@ -494,17 +495,44 @@ int http_server_response_send_more(struct http_server_response *resp,
 
        /* chunked ostream needs to write to the parent stream's buffer */
        o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE);
-       ret = o_stream_send_istream(output, resp->payload_input);
+       res = o_stream_send_istream(output, resp->payload_input);
        o_stream_set_max_buffer_size(output, (size_t)-1);
 
-       if (resp->payload_input->stream_errno != 0) {
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               /* finished sending */
+               if (!resp->payload_chunked &&
+                   resp->payload_input->v_offset - resp->payload_offset !=
+                               resp->payload_size) {
+                       *error_r = t_strdup_printf(
+                               "Input stream %s size changed unexpectedly",
+                               i_stream_get_name(resp->payload_input));
+                       ret = -1;
+               } else {
+                       ret = 1;
+               }
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               /* input is blocking */
+               conn->output_locked = TRUE;     
+               conn->io_resp_payload = io_add_istream(resp->payload_input,
+                       http_server_response_payload_input, resp);
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               /* output is blocking */
+               conn->output_locked = TRUE;
+               o_stream_set_flush_pending(output, TRUE);
+               //http_server_response_debug(resp, "Partially sent payload");
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                /* we're in the middle of sending a response, so the connection
                   will also have to be aborted */
                *error_r = t_strdup_printf("read(%s) failed: %s",
                        i_stream_get_name(resp->payload_input),
                        i_stream_get_error(resp->payload_input));
                ret = -1;
-       } else if (output->stream_errno != 0) {
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
                /* failed to send response */
                if (output->stream_errno != EPIPE &&
                    output->stream_errno != ECONNRESET) {
@@ -512,30 +540,12 @@ int http_server_response_send_more(struct http_server_response *resp,
                                o_stream_get_name(output), o_stream_get_error(output));
                }
                ret = -1;
+               break;
        }
 
        if (ret != 0) {
-               /* finished sending */
-               if (ret > 0 && !resp->payload_chunked &&
-                       resp->payload_input->v_offset - resp->payload_offset !=
-                               resp->payload_size) {
-                       *error_r = t_strdup_printf(
-                               "Input stream %s size changed unexpectedly",
-                               i_stream_get_name(resp->payload_input));
-                       ret = -1;
-               }
-               /* finished sending payload */
+               /* finished sending payload (or error) */
                http_server_response_finish_payload_out(resp);
-       } else if (i_stream_have_bytes_left(resp->payload_input)) {
-               /* output is blocking */
-               conn->output_locked = TRUE;
-               o_stream_set_flush_pending(output, TRUE);
-               //http_server_response_debug(resp, "Partially sent payload");
-       } else {
-               /* input is blocking */
-               conn->output_locked = TRUE;     
-               conn->io_resp_payload = io_add_istream(resp->payload_input,
-                       http_server_response_payload_input, resp);
        }
        return ret < 0 ? -1 : 0;
 }
index 01bf55b048cbbad2a016ffa2b4681d6273dd9239..82546a97937459f62ab9ca56a5d2838e8d1dfb1a 100644 (file)
@@ -218,7 +218,6 @@ client_handle_download_request(
        struct istream *fstream;
        struct ostream *output;
        unsigned int status;
-       int ret;
 
        if (strcmp(hreq->method, "GET") != 0) {
                http_server_request_fail(req,
@@ -244,8 +243,7 @@ client_handle_download_request(
 
        if (blocking) {
                output = http_server_response_get_payload_output(resp, TRUE);
-               ret=o_stream_send_istream(output, fstream);
-               if (ret < 0) {
+               if (o_stream_send_istream(output, fstream) != OSTREAM_SEND_ISTREAM_RESULT_FINISHED) {
                        i_fatal("test server: download: "
                                "failed to send blocking file payload");
                }
@@ -273,24 +271,25 @@ client_request_read_echo_more(struct client_request *creq)
 {
        struct http_server_response *resp;
        struct istream *payload_input;
-       int ret;
+       enum ostream_send_istream_result res;
 
        o_stream_set_max_buffer_size(creq->payload_output, IO_BLOCK_SIZE);
-       ret = o_stream_send_istream(creq->payload_output, creq->payload_input);
+       res = o_stream_send_istream(creq->payload_output, creq->payload_input);
        o_stream_set_max_buffer_size(creq->payload_output, (size_t)-1);
-       if (ret < 0) {
-               if (creq->payload_output->stream_errno != 0) {
-                       i_fatal("test server: echo: "
-                               "Failed to write all echo payload [%s]", creq->path);
-               }
-               if (creq->payload_input->stream_errno != 0) {
-                       i_fatal("test server: echo: "
-                               "Failed to read all echo payload [%s]", creq->path);
-               }
-               i_unreached();
-       }
-       if (i_stream_have_bytes_left(creq->payload_input))
+
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
                return;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+               i_fatal("test server: echo: "
+                       "Failed to read all echo payload [%s]", creq->path);
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               i_fatal("test server: echo: "
+                       "Failed to write all echo payload [%s]", creq->path);
+       }
 
        io_remove(&creq->io);
        i_stream_unref(&creq->payload_input);
@@ -321,7 +320,6 @@ client_handle_echo_request(struct client_request *creq,
        struct http_server_response *resp;
        struct ostream *payload_output;
        uoff_t size;
-       int ret;
 
        creq->path = p_strdup
                (http_server_request_get_pool(req), path);
@@ -358,8 +356,7 @@ client_handle_echo_request(struct client_request *creq,
                        payload_input = partial;
                }
 
-               ret = o_stream_send_istream(payload_output, payload_input);
-               if (ret < 0) {
+               if (o_stream_send_istream(payload_output, payload_input) != OSTREAM_SEND_ISTREAM_RESULT_FINISHED) {
                        i_fatal("test server: echo: "
                                "failed to receive blocking echo payload");
                }
@@ -376,8 +373,7 @@ client_handle_echo_request(struct client_request *creq,
                http_server_response_add_header(resp, "Content-Type", "text/plain");
 
                payload_output = http_server_response_get_payload_output(resp, TRUE);
-               ret = o_stream_send_istream(payload_output, payload_input);
-               if (ret < 0) {
+               if (o_stream_send_istream(payload_output, payload_input) != OSTREAM_SEND_ISTREAM_RESULT_FINISHED) {
                        i_fatal("test server: echo: "
                                "failed to send blocking echo payload");
                }
index 3011db55220cedaa7a0fb690aa69b00ac7f0bba2..029aae68e0a9f072e6e8365b0b34452978a54cd4 100644 (file)
@@ -201,7 +201,7 @@ static void test_http_request_parse_valid(void)
                                buffer_set_used_size(payload_buffer, 0);
                                output = o_stream_create_buffer(payload_buffer);
                                test_out("payload receive", 
-                                       o_stream_send_istream(output, request.payload));
+                                       o_stream_send_istream(output, request.payload) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
                                o_stream_destroy(&output);
                                payload = str_c(payload_buffer);
                        } else {
index 7d2c11dbd3b1afa89f133165b7b059f2c050f3ba..5105bfc9b2c7b77c08d0269e6f5b9867c7cf2c1a 100644 (file)
@@ -126,7 +126,7 @@ static void test_http_response_parse_valid(void)
                                buffer_set_used_size(payload_buffer, 0);
                                output = o_stream_create_buffer(payload_buffer);
                                test_out("payload receive", 
-                                       o_stream_send_istream(output, response.payload));
+                                       o_stream_send_istream(output, response.payload) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
                                o_stream_destroy(&output);
                                payload = str_c(payload_buffer);
                        } else {
index 95b2e4cc46b66077fee1b8af4e3ba9f8282708de..2a54af0f9cae66b87ec29a8e20ebd885a6e7a3b2 100644 (file)
@@ -104,7 +104,7 @@ static void test_http_transfer_chunked_input_valid(void)
 
                buffer_set_used_size(payload_buffer, 0);
                output = o_stream_create_buffer(payload_buffer);
-               test_out("payload read", o_stream_send_istream(output, chunked) > 0
+               test_out("payload read", o_stream_send_istream(output, chunked) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED
                        && chunked->stream_errno == 0);
                o_stream_destroy(&output);
                i_stream_unref(&chunked);
@@ -315,8 +315,8 @@ static void test_http_transfer_chunked_output_valid(void)
                /* read back chunk */
                buffer_set_used_size(plain_buffer, 0);
                output = o_stream_create_buffer(plain_buffer);
-               ret = o_stream_send_istream(output, ichunked);
-               test_out("payload unchunk", ret >= 0
+               test_out("payload unchunk",
+                       o_stream_send_istream(output, ichunked) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED
                        && ichunked->stream_errno == 0);
                o_stream_destroy(&output);
                i_stream_destroy(&ichunked);
index f0d372378f4b2ff3287bd0f5b895e1973dc0a00c..8bf5b7d8ea486b235d9a742e9a958e3357995128 100644 (file)
@@ -1888,21 +1888,33 @@ static int imapc_command_try_send_stream(struct imapc_connection *conn,
                                         struct imapc_command *cmd)
 {
        struct imapc_command_stream *stream;
-       int ret;
+       enum ostream_send_istream_result res;
 
        stream = imapc_command_get_sending_stream(cmd);
        if (stream == NULL)
-               return -1;
+               return -2;
 
        /* we're sending the stream now */
        o_stream_set_max_buffer_size(conn->output, 0);
-       ret = o_stream_send_istream(conn->output, stream->input);
+       res = o_stream_send_istream(conn->output, stream->input);
        o_stream_set_max_buffer_size(conn->output, (size_t)-1);
 
-       if (ret == 0) {
-               o_stream_set_flush_pending(conn->output, TRUE);
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
                i_assert(stream->input->v_offset < stream->size);
                return 0;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+               i_error("imapc: read(%s) failed: %s",
+                       i_stream_get_name(stream->input),
+                       i_stream_get_error(stream->input));
+               return -1;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               /* disconnected */
+               return -1;
        }
        i_assert(stream->input->v_offset == stream->size);
 
@@ -2018,9 +2030,19 @@ static void imapc_command_send_more(struct imapc_connection *conn)
        timeout_reset(conn->to_output);
        if ((ret = imapc_command_try_send_stream(conn, cmd)) == 0)
                return;
+       if (ret == -1) {
+               memset(&reply, 0, sizeof(reply));
+               reply.text_without_resp = reply.text_full = "Mailbox not open";
+               reply.state = IMAPC_COMMAND_STATE_DISCONNECTED;
+
+               array_delete(&conn->cmd_send_queue, 0, 1);
+               imapc_command_reply_free(cmd, &reply);
+               imapc_command_send_more(conn);
+               return;
+       }
 
        seek_pos = cmd->send_pos;
-       if (seek_pos != 0 && ret < 0) {
+       if (seek_pos != 0 && ret == -2) {
                /* skip over the literal. we can also get here from
                   AUTHENTICATE command, which doesn't use a literal */
                if (parse_sync_literal(cmd->data->data, seek_pos, &size)) {
index c0a93b6ac9fa59f70dab79d456457b9b792f201f..8ea8142e36a9ae2ea014f94585e54a89109952f1 100644 (file)
@@ -1025,9 +1025,17 @@ int index_storage_save_continue(struct mail_save_context *ctx,
        struct mail_storage *storage = ctx->transaction->box->storage;
 
        do {
-               if (o_stream_send_istream(ctx->data.output, input) < 0) {
-                       if (input->stream_errno != 0)
-                               break;
+               switch (o_stream_send_istream(ctx->data.output, input)) {
+               case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+                       break;
+               case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+                       break;
+               case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+                       i_unreached();
+               case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+                       /* handle below */
+                       break;
+               case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
                        if (!mail_storage_set_error_from_errno(storage)) {
                                mail_storage_set_critical(storage,
                                        "save: write(%s) failed: %s",
index aab1f756e5e693063417b0887560ec364ade4d6e..9e6058f1c7920ac31b5630d11c41fd37d8ccba57 100644 (file)
@@ -70,11 +70,21 @@ static int file_copy_to_tmp(const char *srcpath, const char *tmppath,
        input = i_stream_create_fd(fd_in, IO_BLOCK_SIZE, FALSE);
        output = o_stream_create_fd_file(fd_out, 0, FALSE);
 
-       ret = o_stream_send_istream(output, input);
-       if (ret < 0)
-               i_error("write(%s) failed: %m", tmppath);
-       else
-               i_assert(ret != 0);
+       switch (o_stream_send_istream(output, input)) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+               i_error("read(%s) failed: %s", srcpath,
+                       i_stream_get_error(input));
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               i_error("write(%s) failed: %s", tmppath,
+                       o_stream_get_error(output));
+               break;
+       }
 
        i_stream_destroy(&input);
        o_stream_destroy(&output);
index af29eba06b83e22ec9bcd8d20e195b351db6b8da..baefc597d37d00aea30642cfdce0c7c6cc3b1937 100644 (file)
@@ -30,7 +30,8 @@ struct temp_ostream {
        uoff_t fd_size;
 };
 
-static int o_stream_temp_dup_cancel(struct temp_ostream *tstream);
+static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
+                                    enum ostream_send_istream_result *res_r);
 
 static void
 o_stream_temp_close(struct iostream_private *stream,
@@ -104,10 +105,12 @@ o_stream_temp_sendv(struct ostream_private *stream,
        struct temp_ostream *tstream = (struct temp_ostream *)stream;
        ssize_t ret = 0;
        unsigned int i;
+       enum ostream_send_istream_result res;
+
 
        tstream->flags &= ~IOSTREAM_TEMP_FLAG_TRY_FD_DUP;
        if (tstream->dupstream != NULL) {
-               if (o_stream_temp_dup_cancel(tstream) < 0)
+               if (o_stream_temp_dup_cancel(tstream, &res))
                        return -1;
        }
 
@@ -129,12 +132,13 @@ o_stream_temp_sendv(struct ostream_private *stream,
        return ret;
 }
 
-static int o_stream_temp_dup_cancel(struct temp_ostream *tstream)
+static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
+                                    enum ostream_send_istream_result *res_r)
 {
        struct istream *input;
        uoff_t size = tstream->dupstream_offset -
                tstream->dupstream_start_offset;
-       int ret = -1;
+       bool ret = TRUE; /* use res_r to return error */
 
        i_stream_seek(tstream->dupstream, tstream->dupstream_start_offset);
        tstream->ostream.ostream.offset = 0;
@@ -142,29 +146,43 @@ static int o_stream_temp_dup_cancel(struct temp_ostream *tstream)
        input = i_stream_create_limit(tstream->dupstream, size);
        i_stream_unref(&tstream->dupstream);
 
-       if (io_stream_copy(&tstream->ostream.ostream, input) > 0) {
+       *res_r = io_stream_copy(&tstream->ostream.ostream, input);
+       switch (*res_r) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
                /* everything copied */
-               ret = 0;
-       } else if (tstream->ostream.ostream.stream_errno == 0) {
-               i_assert(input->stream_errno != 0);
+               ret = FALSE;
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                tstream->ostream.ostream.stream_errno = input->stream_errno;
+               io_stream_set_error(&tstream->ostream.iostream,
+                       "iostream-temp: read(%s) failed: %s",
+                       i_stream_get_name(input),
+                       i_stream_get_error(input));
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               break;
        }
        i_stream_destroy(&input);
        return ret;
 }
 
-static int o_stream_temp_dup_istream(struct temp_ostream *outstream,
-                                    struct istream *instream)
+static bool
+o_stream_temp_dup_istream(struct temp_ostream *outstream,
+                         struct istream *instream,
+                         enum ostream_send_istream_result *res_r)
 {
        uoff_t in_size;
 
        if (!instream->readable_fd || i_stream_get_fd(instream) == -1)
-               return 0;
+               return FALSE;
 
        if (i_stream_get_size(instream, TRUE, &in_size) <= 0) {
                if (outstream->dupstream != NULL)
-                       return o_stream_temp_dup_cancel(outstream);
-               return 0;
+                       return o_stream_temp_dup_cancel(outstream, res_r);
+               return FALSE;
        }
 
        if (outstream->dupstream == NULL) {
@@ -175,7 +193,7 @@ static int o_stream_temp_dup_istream(struct temp_ostream *outstream,
                if (outstream->dupstream != instream ||
                    outstream->dupstream_offset != instream->v_offset ||
                    outstream->dupstream_offset > in_size)
-                       return o_stream_temp_dup_cancel(outstream);
+                       return o_stream_temp_dup_cancel(outstream, res_r);
        }
        i_stream_seek(instream, in_size);
        /* we should be at EOF now. o_stream_send_istream() asserts if
@@ -184,18 +202,20 @@ static int o_stream_temp_dup_istream(struct temp_ostream *outstream,
        outstream->dupstream_offset = instream->v_offset;
        outstream->ostream.ostream.offset =
                outstream->dupstream_offset - outstream->dupstream_start_offset;
-       return 1;
+       *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+       return TRUE;
 }
 
-static int o_stream_temp_send_istream(struct ostream_private *_outstream,
-                                     struct istream *instream)
+static enum ostream_send_istream_result
+o_stream_temp_send_istream(struct ostream_private *_outstream,
+                          struct istream *instream)
 {
        struct temp_ostream *outstream = (struct temp_ostream *)_outstream;
-       int ret;
+       enum ostream_send_istream_result res;
 
        if ((outstream->flags & IOSTREAM_TEMP_FLAG_TRY_FD_DUP) != 0) {
-               if ((ret = o_stream_temp_dup_istream(outstream, instream)) != 0)
-                       return ret;
+               if (!o_stream_temp_dup_istream(outstream, instream, &res))
+                       return res;
                outstream->flags &= ~IOSTREAM_TEMP_FLAG_TRY_FD_DUP;
        }
        return io_stream_copy(&outstream->ostream.ostream, instream);
index 51658c97dc83e6a628b77a15eea560739aa03543..4ccc1740ddbc18849cb65d7b3e7ca4db459076ed 100644 (file)
@@ -687,31 +687,40 @@ o_stream_file_write_at(struct ostream_private *stream,
        return 0;
 }
 
-static int io_stream_sendfile(struct ostream_private *outstream,
-                             struct istream *instream, int in_fd,
-                             bool *sendfile_not_supported_r)
+static bool
+io_stream_sendfile(struct ostream_private *outstream,
+                  struct istream *instream, int in_fd,
+                  enum ostream_send_istream_result *res_r)
 {
        struct file_ostream *foutstream = (struct file_ostream *)outstream;
        uoff_t in_size, offset, send_size, v_offset, abs_start_offset;
        ssize_t ret;
+       bool sendfile_not_supported = FALSE;
 
-       *sendfile_not_supported_r = FALSE;
-
-       if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
-               return -1;
+       if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0) {
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
+               return TRUE;
+       }
        if (ret == 0) {
-               *sendfile_not_supported_r = TRUE;
-               return -1;
+               /* size unknown. we can't use sendfile(). */
+               return FALSE;
        }
 
        o_stream_socket_cork(foutstream);
 
        /* flush out any data in buffer */
-       if ((ret = buffer_flush(foutstream)) <= 0)
-               return ret;
+       if ((ret = buffer_flush(foutstream)) < 0) {
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+               return TRUE;
+       } else if (ret == 0) {
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
+               return TRUE;
+       }
 
-       if (o_stream_lseek(foutstream) < 0)
-               return -1;
+       if (o_stream_lseek(foutstream) < 0) {
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+               return TRUE;
+       }
 
        v_offset = instream->v_offset;
        abs_start_offset = i_stream_get_absolute_offset(instream) - v_offset;
@@ -736,7 +745,7 @@ static int io_stream_sendfile(struct ostream_private *outstream,
                                }
                        }
                        if (errno == EINVAL)
-                               *sendfile_not_supported_r = TRUE;
+                               sendfile_not_supported = TRUE;
                        else {
                                io_stream_set_error(&outstream->iostream,
                                                    "sendfile() failed: %m");
@@ -757,14 +766,22 @@ static int io_stream_sendfile(struct ostream_private *outstream,
        i_stream_seek(instream, v_offset);
        if (v_offset == in_size) {
                instream->eof = TRUE;
-               return 1;
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
+               return TRUE;
        }
        i_assert(ret <= 0);
-       return ret;
+       if (sendfile_not_supported)
+               return FALSE;
+       if (ret < 0)
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+       else
+               *res_r = OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
+       return TRUE;
 }
 
-static int io_stream_copy_backwards(struct ostream_private *outstream,
-                                   struct istream *instream, uoff_t in_size)
+static enum ostream_send_istream_result
+io_stream_copy_backwards(struct ostream_private *outstream,
+                        struct istream *instream, uoff_t in_size)
 {
        struct file_ostream *foutstream = (struct file_ostream *)outstream;
        uoff_t in_start_offset, in_offset, in_limit, out_offset;
@@ -804,6 +821,7 @@ static int io_stream_copy_backwards(struct ostream_private *outstream,
                        i_stream_seek(instream, in_offset);
                        read_size = in_limit - in_offset;
 
+                       /* FIXME: something's wrong here */
                        if (i_stream_read_bytes(instream, &data, &size,
                                                read_size) == 0)
                                i_unreached();
@@ -832,7 +850,7 @@ static int io_stream_copy_backwards(struct ostream_private *outstream,
                if (ret < 0) {
                        /* error */
                        outstream->ostream.stream_errno = errno;
-                       return -1;
+                       return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
                }
                i_stream_skip(instream, size);
        }
@@ -841,11 +859,12 @@ static int io_stream_copy_backwards(struct ostream_private *outstream,
        instream->eof = TRUE;
 
        outstream->ostream.offset += in_size - in_start_offset;
-       return 1;
+       return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
 }
 
-static int io_stream_copy_same_stream(struct ostream_private *outstream,
-                                     struct istream *instream)
+static enum ostream_send_istream_result
+io_stream_copy_same_stream(struct ostream_private *outstream,
+                          struct istream *instream)
 {
        uoff_t in_size;
        off_t in_abs_offset, ret = 0;
@@ -853,7 +872,7 @@ static int io_stream_copy_same_stream(struct ostream_private *outstream,
        /* copying data within same fd. we'll have to be careful with
           seeks and overlapping writes. */
        if ((ret = i_stream_get_size(instream, TRUE, &in_size)) < 0)
-               return -1;
+               return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
        if (ret == 0) {
                /* if we couldn't find out the size, it means that instream
                   isn't a regular file_istream. we can be reasonably sure that
@@ -868,7 +887,7 @@ static int io_stream_copy_same_stream(struct ostream_private *outstream,
        if (ret == 0) {
                /* copying data over itself. we don't really
                   need to do that, just fake it. */
-               return 1;
+               return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
        }
        if (ret > 0 && in_size > (uoff_t)ret) {
                /* overlapping */
@@ -880,21 +899,20 @@ static int io_stream_copy_same_stream(struct ostream_private *outstream,
        }
 }
 
-static int o_stream_file_send_istream(struct ostream_private *outstream,
-                                     struct istream *instream)
+static enum ostream_send_istream_result
+o_stream_file_send_istream(struct ostream_private *outstream,
+                          struct istream *instream)
 {
        struct file_ostream *foutstream = (struct file_ostream *)outstream;
        bool same_stream;
-       int in_fd, ret;
-       bool sendfile_not_supported;
+       int in_fd;
+       enum ostream_send_istream_result res;
 
        in_fd = !instream->readable_fd ? -1 : i_stream_get_fd(instream);
        if (!foutstream->no_sendfile && in_fd != -1 &&
            in_fd != foutstream->fd && instream->seekable) {
-               ret = io_stream_sendfile(outstream, instream, in_fd,
-                                        &sendfile_not_supported);
-               if (ret >= 0 || !sendfile_not_supported)
-                       return ret;
+               if (io_stream_sendfile(outstream, instream, in_fd, &res))
+                       return res;
 
                /* sendfile() not supported (with this fd), fallback to
                   regular sending. */
index 7f6da788e59bfa439b454d5afe7f75fbd3fb039e..c792e87dd873472ecab1c32115ca8fb22e59e44e 100644 (file)
@@ -22,8 +22,9 @@ struct ostream_private {
                         unsigned int iov_count);
        int (*write_at)(struct ostream_private *stream,
                        const void *data, size_t size, uoff_t offset);
-       int (*send_istream)(struct ostream_private *outstream,
-                           struct istream *instream);
+       enum ostream_send_istream_result
+               (*send_istream)(struct ostream_private *outstream,
+                               struct istream *instream);
        void (*switch_ioloop)(struct ostream_private *stream);
 
 /* data: */
@@ -47,7 +48,8 @@ struct ostream *
 o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd)
        ATTR_NULL(2);
 
-int io_stream_copy(struct ostream *outstream, struct istream *instream);
+enum ostream_send_istream_result
+io_stream_copy(struct ostream *outstream, struct istream *instream);
 
 void o_stream_copy_error_from_parent(struct ostream_private *_stream);
 /* This should be called before sending data to parent stream. It makes sure
index 557d0700d4fe5ee99cba7bf13aa18fafa5b0a8cf..d5df233cd7f3a2d563c5cbb3d7430686ac0f8e70 100644 (file)
@@ -341,57 +341,71 @@ void o_stream_set_no_error_handling(struct ostream *stream, bool set)
        stream->real_stream->error_handling_disabled = set;
 }
 
-int o_stream_send_istream(struct ostream *outstream, struct istream *instream)
+enum ostream_send_istream_result
+o_stream_send_istream(struct ostream *outstream, struct istream *instream)
 {
        struct ostream_private *_outstream = outstream->real_stream;
        uoff_t old_outstream_offset = outstream->offset;
        uoff_t old_instream_offset = instream->v_offset;
-       int ret;
-
-       if (unlikely(outstream->closed || instream->closed ||
-                    outstream->stream_errno != 0)) {
-               errno = outstream->stream_errno;
-               return -1;
-       }
+       enum ostream_send_istream_result res;
 
-       ret = _outstream->send_istream(_outstream, instream);
-       if (instream->stream_errno != 0) {
+       if (unlikely(instream->closed || instream->stream_errno != 0)) {
                errno = instream->stream_errno;
-               return -1;
-       } else if (outstream->stream_errno != 0) {
+               return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
+       }
+       if (unlikely(outstream->closed || outstream->stream_errno != 0)) {
                errno = outstream->stream_errno;
-               return -1;
+               return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
        }
-       if (ret == 0) {
-               /* partial send */
-               i_assert(!outstream->blocking || !instream->blocking);
-       } else {
-               /* fully sent everything */
-               i_assert(ret == 1);
+
+       res = _outstream->send_istream(_outstream, instream);
+       switch (res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               i_assert(instream->stream_errno == 0);
+               i_assert(outstream->stream_errno == 0);
                i_assert(!i_stream_have_bytes_left(instream));
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_assert(!instream->blocking);
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               i_assert(!outstream->blocking);
+               o_stream_set_flush_pending(outstream, TRUE);
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+               i_assert(instream->stream_errno != 0);
+               return res;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               i_assert(outstream->stream_errno != 0);
+               return res;
        }
+       /* non-failure - make sure stream offsets match */
        i_assert((outstream->offset - old_outstream_offset) ==
                 (instream->v_offset - old_instream_offset));
-       return ret;
+       return res;
 }
 
 void o_stream_nsend_istream(struct ostream *outstream, struct istream *instream)
 {
+       i_assert(instream->blocking);
+
        switch (o_stream_send_istream(outstream, instream)) {
-       case 1:
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
                break;
-       case 0:
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_unreached();
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
                outstream->real_stream->noverflow = TRUE;
                break;
-       default:
-               if (outstream->stream_errno != 0)
-                       break;
-               i_assert(instream->stream_errno != 0);
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
                outstream->stream_errno = instream->stream_errno;
                io_stream_set_error(&outstream->real_stream->iostream,
                        "nsend-istream: read(%s) failed: %s",
                        i_stream_get_name(instream),
-                       o_stream_get_name(outstream));
+                       i_stream_get_error(instream));
+               break;
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               break;
        }
        outstream->real_stream->last_errors_not_checked = TRUE;
 }
@@ -415,7 +429,8 @@ int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
        return ret;
 }
 
-int io_stream_copy(struct ostream *outstream, struct istream *instream)
+enum ostream_send_istream_result
+io_stream_copy(struct ostream *outstream, struct istream *instream)
 {
        struct const_iovec iov;
        const unsigned char *data;
@@ -423,14 +438,18 @@ int io_stream_copy(struct ostream *outstream, struct istream *instream)
 
        while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) {
                iov.iov_base = data;
-               if ((ret = o_stream_sendv(outstream, &iov, 1)) <= 0)
-                       return ret;
+               if ((ret = o_stream_sendv(outstream, &iov, 1)) < 0)
+                       return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
+               else if (ret == 0)
+                       return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
                i_stream_skip(instream, ret);
        }
 
        if (instream->stream_errno != 0)
-               return -1;
-       return i_stream_have_bytes_left(instream) ? 0 : 1;
+               return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
+       if (i_stream_have_bytes_left(instream))
+               return OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT;
+       return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
 }
 
 void o_stream_switch_ioloop(struct ostream *stream)
@@ -581,8 +600,9 @@ o_stream_default_write_at(struct ostream_private *_stream,
        return -1;
 }
 
-static int o_stream_default_send_istream(struct ostream_private *outstream,
-                                        struct istream *instream)
+static enum ostream_send_istream_result
+o_stream_default_send_istream(struct ostream_private *outstream,
+                             struct istream *instream)
 {
        return io_stream_copy(&outstream->ostream, instream);
 }
index 4808f05c1043513a5faf13a0611ef10df0382f47..2b9afe18745af51efd8f237910b8dcddf3d1b923 100644 (file)
@@ -3,6 +3,20 @@
 
 #include "ioloop.h"
 
+enum ostream_send_istream_result {
+       /* All of the istream was successfully sent to ostream. */
+       OSTREAM_SEND_ISTREAM_RESULT_FINISHED,
+       /* Caller needs to wait for more input from non-blocking istream. */
+       OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT,
+       /* Caller needs to wait for output to non-blocking ostream.
+          o_stream_set_flush_pending() is automatically called. */
+       OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT,
+       /* Read from istream failed. See istream->stream_errno. */
+       OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT,
+       /* Write to ostream failed. See ostream->stream_errno. */
+       OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT
+};
+
 struct ostream {
        /* Number of bytes sent via o_stream_send*() and similar functions.
           This is counting the input data. For example with a compressed
@@ -151,24 +165,17 @@ void o_stream_ignore_last_errors(struct ostream *stream);
    When creating wrapper streams, they copy this behavior from the parent
    stream. */
 void o_stream_set_no_error_handling(struct ostream *stream, bool set);
-/* Send data from input stream. Returns 1 if the entire instream was sent
-   without errors, 0 if either instream or outstream is nonblocking and not
-   everything was sent, or -1 if either instream or outstream failed (see their
-   stream_errno for which one).
+/* Send all of the instream to outstream.
 
    On non-failure instream is skips over all data written to outstream.
    This means that the number of bytes written to outstream is always equal to
    the number of bytes skipped in instream.
 
-   For non-blocking outstreams: Note that this function may not add anything to
-   the output buffer, so if you want the flush callback to be called when more
-   data can be written, you'll need to call o_stream_set_flush_pending()
-   explicitly.
-
    It's also possible to use this function to copy data within same file
    descriptor, even if the source and destination overlaps. If the file must
    be grown, you have to do it manually before calling this function. */
-int o_stream_send_istream(struct ostream *outstream, struct istream *instream);
+enum ostream_send_istream_result
+o_stream_send_istream(struct ostream *outstream, struct istream *instream);
 /* Same as o_stream_send_istream(), but assume that reads and writes will
    succeed. If not, o_stream_nfinish() will fail with the correct error
    message (even istream's). */
index d4157ba3c5d93739cb59cb1595bfc3627e5756a0..0a3bdf7b1b8c2a7de15d989f434e04f85b150b98 100644 (file)
@@ -61,7 +61,7 @@ static void test_iostream_temp_istream(void)
        /* a working fd-dup */
        output = iostream_temp_create_sized(".nonexistent/",
                IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 1);
-       test_assert(o_stream_send_istream(output, input) > 0);
+       test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        temp_input = iostream_temp_finish(&output, 128);
        test_assert(i_stream_read(temp_input) == 6);
        i_stream_destroy(&temp_input);
@@ -72,7 +72,7 @@ static void test_iostream_temp_istream(void)
                IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 4);
        test_assert(o_stream_send(output, "1234", 4) == 4);
        test_expect_errors(1);
-       test_assert(o_stream_send_istream(output, input) > 0);
+       test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_expect_no_more_errors();
        o_stream_destroy(&output);
 
@@ -80,7 +80,7 @@ static void test_iostream_temp_istream(void)
        i_stream_seek(input, 0);
        output = iostream_temp_create_sized(".intentional-nonexistent-error/",
                IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 4);
-       test_assert(o_stream_send_istream(output, input) > 0);
+       test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_expect_errors(1);
        test_assert(o_stream_send(output, "1", 1) == 1);
        test_expect_no_more_errors();
@@ -91,9 +91,9 @@ static void test_iostream_temp_istream(void)
        input2 = i_stream_create_limit(input, (uoff_t)-1);
        output = iostream_temp_create_sized(".intentional-nonexistent-error/",
                IOSTREAM_TEMP_FLAG_TRY_FD_DUP, "test", 4);
-       test_assert(o_stream_send_istream(output, input) > 0);
+       test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_expect_errors(1);
-       test_assert(o_stream_send_istream(output, input2) > 0);
+       test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_expect_no_more_errors();
        o_stream_destroy(&output);
        i_stream_unref(&input2);
index d7573063fba74e6b68a65c928bb630e05a21a9e6..281e701c9dcd0dfaeab95b374c7a8f3547fb0951 100644 (file)
@@ -95,7 +95,7 @@ static void test_ostream_file_send_istream_file(void)
        /* test that writing works between two files */
        i_stream_seek(input, 3);
        input2 = i_stream_create_limit(input, 4);
-       test_assert(o_stream_send_istream(output, input2) > 0);
+       test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_assert(output->offset == 4);
        test_assert(pread(fd, buf, sizeof(buf), 0) == 4 &&
                    memcmp(buf, "4567", 4) == 0);
@@ -109,7 +109,7 @@ static void test_ostream_file_send_istream_file(void)
        o_stream_seek(output, 1);
        i_stream_seek(input, 2);
        input2 = i_stream_create_limit(input, 2);
-       test_assert(o_stream_send_istream(output, input2) > 0);
+       test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_assert(output->offset == 3);
        test_assert(pread(fd, buf, sizeof(buf), 0) == 4 &&
                    memcmp(buf, "4677", 4) == 0);
@@ -121,7 +121,7 @@ static void test_ostream_file_send_istream_file(void)
        test_assert(pwrite(fd, buf, 4, 0) == 4);
        input = i_stream_create_fd(fd, 1024, FALSE);
        o_stream_seek(output, 1);
-       test_assert(o_stream_send_istream(output, input) > 0);
+       test_assert(o_stream_send_istream(output, input) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_assert(output->offset == 5);
        test_assert(pread(fd, buf, sizeof(buf), 0) == 5 &&
                    memcmp(buf, "11234", 5) == 0);
@@ -158,7 +158,7 @@ static void test_ostream_file_send_istream_sendfile(void)
        /* test that sendfile() works */
        i_stream_seek(input, 3);
        input2 = i_stream_create_limit(input, 4);
-       test_assert(o_stream_send_istream(output, input2) > 0);
+       test_assert(o_stream_send_istream(output, input2) == OSTREAM_SEND_ISTREAM_RESULT_FINISHED);
        test_assert(output->offset == 4);
        test_assert(read(sock_fd[1], buf, sizeof(buf)) == 4 &&
                    memcmp(buf, "defg", 4) == 0);
index 93112ba582dc42242d2e78a31cb0fd57adee73eb..0c8ac22702ebdefe013160913d442ddd60ba1f39 100644 (file)
@@ -80,13 +80,8 @@ i_stream_mail_filter_read_once(struct mail_filter_istream *mstream)
 
        if (mstream->ext_out != NULL) {
                /* we haven't sent everything yet */
-               ret = o_stream_send_istream(mstream->ext_out, stream->parent);
-               if (mstream->ext_out->stream_errno != 0) {
-                       stream->istream.stream_errno =
-                               mstream->ext_out->stream_errno;
-                       return -1;
-               }
-               if (ret > 0) {
+               switch (o_stream_send_istream(mstream->ext_out, stream->parent)) {
+               case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
                        o_stream_destroy(&mstream->ext_out);
                        /* if we wanted to be a blocking stream,
                           from now on the rest of the reads are */
@@ -94,6 +89,19 @@ i_stream_mail_filter_read_once(struct mail_filter_istream *mstream)
                                net_set_nonblock(mstream->fd, FALSE);
                        if (shutdown(mstream->fd, SHUT_WR) < 0)
                                i_error("ext-filter: shutdown() failed: %m");
+                       break;
+               case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+                       break;
+               case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+                       stream->istream.stream_errno =
+                               mstream->ext_out->stream_errno;
+                       io_stream_set_error(&stream->iostream,
+                               "write(%s) failed: %s",
+                               o_stream_get_name(mstream->ext_out),
+                               o_stream_get_error(mstream->ext_out));
+                       return -1;
                }
        }