From: Stephan Bosch Date: Sun, 25 Feb 2018 16:21:47 +0000 (+0100) Subject: lib-program-client: Use iostream-pump for streaming data towards program. X-Git-Tag: 2.3.9~2086 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=82fc7bdfc52a093990d370a2da87285af1b0fc4f;p=thirdparty%2Fdovecot%2Fcore.git lib-program-client: Use iostream-pump for streaming data towards program. --- diff --git a/src/lib-program-client/program-client-private.h b/src/lib-program-client/program-client-private.h index 7abad7ed4b..9873c33286 100644 --- a/src/lib-program-client/program-client-private.h +++ b/src/lib-program-client/program-client-private.h @@ -44,6 +44,8 @@ struct program_client { struct istream *input, *program_input, *raw_program_input; struct ostream *output, *program_output, *raw_program_output; + struct iostream_pump *pump_out; + ARRAY(struct program_client_extra_fd) extra_fds; program_client_callback_t *callback; diff --git a/src/lib-program-client/program-client.c b/src/lib-program-client/program-client.c index 06acec091a..d7d33c9a65 100644 --- a/src/lib-program-client/program-client.c +++ b/src/lib-program-client/program-client.c @@ -9,6 +9,7 @@ #include "ostream-dot.h" #include "istream-dot.h" #include "ostream.h" +#include "iostream-pump.h" #include "iostream-temp.h" #include "lib-signals.h" @@ -65,9 +66,9 @@ program_client_close_output(struct program_client *pclient) int ret; o_stream_destroy(&pclient->program_output); + o_stream_destroy(&pclient->raw_program_output); if ((ret = pclient->close_output(pclient)) < 0) return -1; - pclient->program_output = NULL; return ret; } @@ -98,6 +99,7 @@ void program_client_disconnected(struct program_client *pclient) o_stream_destroy(&pclient->raw_program_output); io_remove(&pclient->io); + iostream_pump_destroy(&pclient->pump_out); if (pclient->fd_in != -1 && close(pclient->fd_in) < 0) i_error("close(%s) failed: %m", pclient->path); @@ -130,6 +132,7 @@ program_client_disconnect(struct program_client *pclient, bool force) timeout_remove(&pclient->to); io_remove(&pclient->io); + iostream_pump_destroy(&pclient->pump_out); if ((ret = program_client_close_output(pclient)) < 0) pclient->other_error = TRUE; @@ -155,6 +158,14 @@ program_client_input_pending(struct program_client *pclient) struct program_client_extra_fd *efds = NULL; unsigned int count, i; + if (pclient->pump_out != NULL) + return TRUE; + + if (pclient->program_output != NULL && + !pclient->program_output->closed && + o_stream_get_buffer_used_size(pclient->program_output) > 0) { + return TRUE; + } if (pclient->program_input != NULL && !pclient->program_input->closed && i_stream_have_bytes_left(pclient->program_input)) { @@ -175,81 +186,71 @@ program_client_input_pending(struct program_client *pclient) return FALSE; } +static void +program_client_output_finished(struct program_client *pclient) +{ + /* check whether program i/o is finished */ + if (!program_client_input_pending(pclient)) { + /* finished */ + program_client_disconnect(pclient, FALSE); + /* close output towards program, so that it reads EOF */ + } else if (program_client_close_output(pclient) < 0) { + program_client_fail(pclient, + PROGRAM_CLIENT_ERROR_OTHER); + } +} + static int -program_client_program_output(struct program_client *pclient) +program_client_output_finish(struct program_client *pclient) { - struct istream *input = pclient->input; - struct ostream *output = pclient->program_output; - enum ostream_send_istream_result res; + struct ostream *output = pclient->program_output; int ret = 0; - /* flush the output first, before writing more */ - if ((ret = o_stream_flush(output)) <= 0) { - if (ret < 0) { - i_error("write(%s) failed: %s", - o_stream_get_name(output), - o_stream_get_error(output)); - program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); - } - - return ret; + /* flush the output */ + if ((ret=o_stream_finish(output)) < 0) { + i_error("write(%s) failed: %s", + o_stream_get_name(output), + o_stream_get_error(output)); + program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); + return -1; } + if (ret > 0) + program_client_output_finished(pclient); + return ret; +} - if (input != NULL && output != NULL) { - /* transfer provided input stream to output towards program */ - res = o_stream_send_istream(output, input); - switch (res) { - case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: - i_stream_unref(&pclient->input); - input = NULL; - break; - case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: - return 1; - case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: - return 0; - case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: - i_error("read(%s) failed: %s", - i_stream_get_name(input), - i_stream_get_error(input)); - program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); - return -1; - case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: - i_error("write(%s) failed: %s", - o_stream_get_name(output), - o_stream_get_error(output)); - program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); - return -1; - } - } +static void +program_client_output_pump_finished(enum iostream_pump_status status, + struct program_client *pclient) +{ + struct istream *input = pclient->input; + struct ostream *output = pclient->program_output; - if (input == NULL && output != NULL) { - /* finish and flush program output */ - if ((ret=o_stream_finish(output)) < 0) { - i_error("write(%s) failed: %s", - o_stream_get_name(output), - o_stream_get_error(output)); - program_client_fail(pclient, - PROGRAM_CLIENT_ERROR_IO); - return -1; - } - if (ret == 0) - return 0; - o_stream_unref(&pclient->program_output); - o_stream_unref(&pclient->raw_program_output); - } + i_assert(input != NULL); + i_assert(output != NULL); - if (input == NULL) { - /* check whether program i/o is finished */ - if (!program_client_input_pending(pclient)) { - /* finished */ - program_client_disconnect(pclient, FALSE); - /* close output towards program, so that it reads EOF */ - } else if (program_client_close_output(pclient) < 0) { - program_client_fail(pclient, - PROGRAM_CLIENT_ERROR_OTHER); - } + switch (status) { + case IOSTREAM_PUMP_STATUS_INPUT_EOF: + break; + case IOSTREAM_PUMP_STATUS_INPUT_ERROR: + i_error("read(%s) failed: %s", + i_stream_get_name(input), + i_stream_get_error(input)); + program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); + return; + case IOSTREAM_PUMP_STATUS_OUTPUT_ERROR: + i_error("write(%s) failed: %s", + o_stream_get_name(output), + o_stream_get_error(output)); + program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); + return; } - return 1; + + iostream_pump_destroy(&pclient->pump_out); + + o_stream_set_flush_callback(pclient->program_output, + program_client_output_finish, pclient); + o_stream_set_flush_pending(pclient->program_output, TRUE); } void program_client_program_input(struct program_client *pclient) @@ -384,13 +385,20 @@ int program_client_connected(struct program_client *pclient) program_client_timeout, pclient); } - /* run output */ - if (pclient->program_output != NULL && - (ret = program_client_program_output(pclient)) == 0) { - if (pclient->program_output != NULL) { - o_stream_set_flush_callback(pclient->program_output, - program_client_program_output, pclient); - } + /* run program output */ + if (pclient->program_output == NULL) { + /* nothing */ + } else if (pclient->input == NULL) { + o_stream_set_flush_callback(pclient->program_output, + program_client_output_finish, pclient); + o_stream_set_flush_pending(pclient->program_output, TRUE); + } else { + pclient->pump_out = + iostream_pump_create(pclient->input, + pclient->program_output); + iostream_pump_set_completion_callback(pclient->pump_out, + program_client_output_pump_finished, pclient); + iostream_pump_start(pclient->pump_out); } return ret; @@ -573,6 +581,8 @@ void program_client_switch_ioloop(struct program_client *pclient) o_stream_switch_ioloop(pclient->program_output); if (pclient->to != NULL) pclient->to = io_loop_move_timeout(&pclient->to); + if (pclient->pump_out != NULL) + iostream_pump_switch_ioloop(pclient->pump_out); if (pclient->io != NULL) pclient->io = io_loop_move_io(&pclient->io); pclient->switch_ioloop(pclient);