From: Stephan Bosch Date: Sun, 25 Feb 2018 23:07:10 +0000 (+0100) Subject: lib-program-client: Use iostream-pump for streaming data from program. X-Git-Tag: 2.3.9~2081 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=33065ed599071b974b867c3a84bc44850f48d8d5;p=thirdparty%2Fdovecot%2Fcore.git lib-program-client: Use iostream-pump for streaming data from program. --- diff --git a/src/lib-program-client/program-client-private.h b/src/lib-program-client/program-client-private.h index d2f6806fbc..ff5750744a 100644 --- a/src/lib-program-client/program-client-private.h +++ b/src/lib-program-client/program-client-private.h @@ -44,7 +44,7 @@ struct program_client { struct istream *input, *program_input, *raw_program_input; struct ostream *output, *program_output, *raw_program_output; - struct iostream_pump *pump_out; + struct iostream_pump *pump_in, *pump_out; ARRAY(struct program_client_extra_fd) extra_fds; diff --git a/src/lib-program-client/program-client.c b/src/lib-program-client/program-client.c index 4df036c494..7f7592d8e2 100644 --- a/src/lib-program-client/program-client.c +++ b/src/lib-program-client/program-client.c @@ -100,6 +100,7 @@ void program_client_disconnected(struct program_client *pclient) timeout_remove(&pclient->to); io_remove(&pclient->io); + iostream_pump_destroy(&pclient->pump_in); iostream_pump_destroy(&pclient->pump_out); if (pclient->fd_in != -1 && close(pclient->fd_in) < 0) @@ -133,6 +134,7 @@ program_client_disconnect(struct program_client *pclient, bool force) timeout_remove(&pclient->to); io_remove(&pclient->io); + iostream_pump_destroy(&pclient->pump_in); iostream_pump_destroy(&pclient->pump_out); if ((ret = program_client_close_output(pclient)) < 0) @@ -159,7 +161,7 @@ program_client_input_pending(struct program_client *pclient) struct program_client_extra_fd *efds = NULL; unsigned int count, i; - if (pclient->pump_out != NULL) + if (pclient->pump_in != NULL || pclient->pump_out != NULL) return TRUE; if (pclient->program_output != NULL && @@ -254,87 +256,97 @@ program_client_output_pump_finished(enum iostream_pump_status status, o_stream_set_flush_pending(pclient->program_output, TRUE); } -void program_client_program_input(struct program_client *pclient) +static void +program_client_input_finished(struct program_client *pclient) +{ + /* check whether program i/o is finished */ + if (program_client_input_pending(pclient)) + return; + + /* finished */ + program_client_disconnect(pclient, FALSE); +} + +static void +program_client_input_finish(struct program_client *pclient) { struct istream *input = pclient->program_input; - struct ostream *output = pclient->output; - enum ostream_send_istream_result res; const unsigned char *data; size_t size; - int ret = 0; - - if (input != NULL) { - /* transfer input from program to provided output stream */ - if (output != NULL) { - res = o_stream_send_istream(output, input); - switch (res) { - case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: - /* return to raw program input */ - if (pclient->program_input == - pclient->raw_program_input) - break; - i_stream_unref(&pclient->program_input); - input = pclient->program_input = - pclient->raw_program_input; - i_stream_ref(pclient->program_input); - break; - case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: - case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: - return; - case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: - i_error("read(%s) failed: %s", - i_stream_get_name(input), - i_stream_get_error(input)); - program_client_fail(pclient, - PROGRAM_CLIENT_ERROR_IO); - return; - case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: - i_error("write(%s) failed: %s", - o_stream_get_name(output), - o_stream_get_error(output)); - program_client_fail(pclient, - PROGRAM_CLIENT_ERROR_IO); - return; - } - } + int ret; - /* read (the remainder of) the raw program input */ - while ((ret=i_stream_read_more(input, &data, &size)) > 0) - i_stream_skip(input, size); - if (ret == 0) + /* read (the remainder of) the raw program input */ + while ((ret=i_stream_read_more(input, &data, &size)) > 0) + i_stream_skip(input, size); + if (ret == 0) + return; + if (ret < 0) { + if (input->stream_errno != 0) { + i_error("read(%s) failed: %s", + i_stream_get_name(input), + i_stream_get_error(input)); + program_client_fail(pclient, + PROGRAM_CLIENT_ERROR_IO); return; - if (ret < 0) { - if (input->stream_errno != 0) { - i_error("read(%s) failed: %s", - i_stream_get_name(input), - i_stream_get_error(input)); - program_client_fail(pclient, - PROGRAM_CLIENT_ERROR_IO); - return; - } } + } - /* flush output stream to make sure all is sent */ - if (output != NULL) { - if ((ret=o_stream_flush(output)) < 0) { - i_error("write(%s) failed: %s", - o_stream_get_name(output), - o_stream_get_error(output)); - program_client_fail(pclient, - PROGRAM_CLIENT_ERROR_IO); - return; - } - if (ret == 0) - return; - } + if (pclient->program_input != pclient->raw_program_input) { + /* return to raw program input */ + i_stream_unref(&pclient->program_input); + pclient->program_input = pclient->raw_program_input; + i_stream_ref(pclient->program_input); + + io_remove(&pclient->io); + pclient->io = io_add_istream(pclient->program_input, + program_client_input_finish, + pclient); + io_set_pending(pclient->io); + } - /* check whether program i/o is finished */ - if (program_client_input_pending(pclient)) - return; + program_client_input_finished(pclient); +} + +static void +program_client_input_pump_finished(enum iostream_pump_status status, + struct program_client *pclient) +{ + struct istream *input = pclient->program_input; + struct ostream *output = pclient->output; + + i_assert(input != NULL); + i_assert(output != NULL); + + switch (status) { + case IOSTREAM_PUMP_STATUS_INPUT_EOF: + break; + case IOSTREAM_PUMP_STATUS_INPUT_ERROR: + i_error("read(%s) failed: %s", + i_stream_get_name(input), + i_stream_get_error(input)); + program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); + return; + case IOSTREAM_PUMP_STATUS_OUTPUT_ERROR: + i_error("write(%s) failed: %s", + o_stream_get_name(output), + o_stream_get_error(output)); + program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO); + return; } - /* finished */ - program_client_disconnect(pclient, FALSE); + iostream_pump_destroy(&pclient->pump_in); + + if (pclient->program_input != pclient->raw_program_input) { + /* return to raw program input */ + i_stream_unref(&pclient->program_input); + pclient->program_input = pclient->raw_program_input; + i_stream_ref(pclient->program_input); + } + + i_assert(pclient->io == NULL); + pclient->io = io_add_istream(pclient->program_input, + program_client_input_finish, pclient); + io_set_pending(pclient->io); } static void @@ -384,6 +396,24 @@ void program_client_connected(struct program_client *pclient) program_client_timeout, pclient); } + /* run program input */ + if (pclient->program_input == NULL) { + /* nothing */ + } else if (pclient->output == NULL) { + i_assert(pclient->io == NULL); + pclient->io = io_add_istream(pclient->program_input, + program_client_input_finish, + pclient); + io_set_pending(pclient->io); + } else { + pclient->pump_in = + iostream_pump_create(pclient->program_input, + pclient->output); + iostream_pump_set_completion_callback(pclient->pump_in, + program_client_input_pump_finished, pclient); + iostream_pump_start(pclient->pump_in); + } + /* run program output */ if (pclient->program_output == NULL) { /* nothing */ @@ -512,8 +542,6 @@ void program_client_init_streams(struct program_client *pclient) program_input = i_stream_create_fd(pclient->fd_in, (size_t)-1); i_stream_set_name(program_input, "program stdout"); - pclient->io = io_add(pclient->fd_in, IO_READ, - program_client_program_input, pclient); pclient->raw_program_input = program_input; } @@ -558,8 +586,6 @@ void program_client_destroy(struct program_client **_pclient) i_stream_unref(&pclient->raw_program_input); o_stream_unref(&pclient->raw_program_output); - io_remove(&pclient->io); - if (pclient->destroy != NULL) pclient->destroy(pclient); @@ -578,6 +604,8 @@ void program_client_switch_ioloop(struct program_client *pclient) o_stream_switch_ioloop(pclient->program_output); if (pclient->to != NULL) pclient->to = io_loop_move_timeout(&pclient->to); + if (pclient->pump_in != NULL) + iostream_pump_switch_ioloop(pclient->pump_in); if (pclient->pump_out != NULL) iostream_pump_switch_ioloop(pclient->pump_out); if (pclient->io != NULL)