timeout_remove(&pclient->to);
io_remove(&pclient->io);
+ iostream_pump_destroy(&pclient->pump_in);
iostream_pump_destroy(&pclient->pump_out);
if (pclient->fd_in != -1 && close(pclient->fd_in) < 0)
timeout_remove(&pclient->to);
io_remove(&pclient->io);
+ iostream_pump_destroy(&pclient->pump_in);
iostream_pump_destroy(&pclient->pump_out);
if ((ret = program_client_close_output(pclient)) < 0)
struct program_client_extra_fd *efds = NULL;
unsigned int count, i;
- if (pclient->pump_out != NULL)
+ if (pclient->pump_in != NULL || pclient->pump_out != NULL)
return TRUE;
if (pclient->program_output != NULL &&
o_stream_set_flush_pending(pclient->program_output, TRUE);
}
-void program_client_program_input(struct program_client *pclient)
+static void
+program_client_input_finished(struct program_client *pclient)
+{
+ /* check whether program i/o is finished */
+ if (program_client_input_pending(pclient))
+ return;
+
+ /* finished */
+ program_client_disconnect(pclient, FALSE);
+}
+
+static void
+program_client_input_finish(struct program_client *pclient)
{
struct istream *input = pclient->program_input;
- struct ostream *output = pclient->output;
- enum ostream_send_istream_result res;
const unsigned char *data;
size_t size;
- int ret = 0;
-
- if (input != NULL) {
- /* transfer input from program to provided output stream */
- if (output != NULL) {
- res = o_stream_send_istream(output, input);
- switch (res) {
- case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
- /* return to raw program input */
- if (pclient->program_input ==
- pclient->raw_program_input)
- break;
- i_stream_unref(&pclient->program_input);
- input = pclient->program_input =
- pclient->raw_program_input;
- i_stream_ref(pclient->program_input);
- break;
- case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
- case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
- return;
- case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
- i_error("read(%s) failed: %s",
- i_stream_get_name(input),
- i_stream_get_error(input));
- program_client_fail(pclient,
- PROGRAM_CLIENT_ERROR_IO);
- return;
- case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
- i_error("write(%s) failed: %s",
- o_stream_get_name(output),
- o_stream_get_error(output));
- program_client_fail(pclient,
- PROGRAM_CLIENT_ERROR_IO);
- return;
- }
- }
+ int ret;
- /* read (the remainder of) the raw program input */
- while ((ret=i_stream_read_more(input, &data, &size)) > 0)
- i_stream_skip(input, size);
- if (ret == 0)
+ /* read (the remainder of) the raw program input */
+ while ((ret=i_stream_read_more(input, &data, &size)) > 0)
+ i_stream_skip(input, size);
+ if (ret == 0)
+ return;
+ if (ret < 0) {
+ if (input->stream_errno != 0) {
+ i_error("read(%s) failed: %s",
+ i_stream_get_name(input),
+ i_stream_get_error(input));
+ program_client_fail(pclient,
+ PROGRAM_CLIENT_ERROR_IO);
return;
- if (ret < 0) {
- if (input->stream_errno != 0) {
- i_error("read(%s) failed: %s",
- i_stream_get_name(input),
- i_stream_get_error(input));
- program_client_fail(pclient,
- PROGRAM_CLIENT_ERROR_IO);
- return;
- }
}
+ }
- /* flush output stream to make sure all is sent */
- if (output != NULL) {
- if ((ret=o_stream_flush(output)) < 0) {
- i_error("write(%s) failed: %s",
- o_stream_get_name(output),
- o_stream_get_error(output));
- program_client_fail(pclient,
- PROGRAM_CLIENT_ERROR_IO);
- return;
- }
- if (ret == 0)
- return;
- }
+ if (pclient->program_input != pclient->raw_program_input) {
+ /* return to raw program input */
+ i_stream_unref(&pclient->program_input);
+ pclient->program_input = pclient->raw_program_input;
+ i_stream_ref(pclient->program_input);
+
+ io_remove(&pclient->io);
+ pclient->io = io_add_istream(pclient->program_input,
+ program_client_input_finish,
+ pclient);
+ io_set_pending(pclient->io);
+ }
- /* check whether program i/o is finished */
- if (program_client_input_pending(pclient))
- return;
+ program_client_input_finished(pclient);
+}
+
+static void
+program_client_input_pump_finished(enum iostream_pump_status status,
+ struct program_client *pclient)
+{
+ struct istream *input = pclient->program_input;
+ struct ostream *output = pclient->output;
+
+ i_assert(input != NULL);
+ i_assert(output != NULL);
+
+ switch (status) {
+ case IOSTREAM_PUMP_STATUS_INPUT_EOF:
+ break;
+ case IOSTREAM_PUMP_STATUS_INPUT_ERROR:
+ i_error("read(%s) failed: %s",
+ i_stream_get_name(input),
+ i_stream_get_error(input));
+ program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+ return;
+ case IOSTREAM_PUMP_STATUS_OUTPUT_ERROR:
+ i_error("write(%s) failed: %s",
+ o_stream_get_name(output),
+ o_stream_get_error(output));
+ program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+ return;
}
- /* finished */
- program_client_disconnect(pclient, FALSE);
+ iostream_pump_destroy(&pclient->pump_in);
+
+ if (pclient->program_input != pclient->raw_program_input) {
+ /* return to raw program input */
+ i_stream_unref(&pclient->program_input);
+ pclient->program_input = pclient->raw_program_input;
+ i_stream_ref(pclient->program_input);
+ }
+
+ i_assert(pclient->io == NULL);
+ pclient->io = io_add_istream(pclient->program_input,
+ program_client_input_finish, pclient);
+ io_set_pending(pclient->io);
}
static void
program_client_timeout, pclient);
}
+ /* run program input */
+ if (pclient->program_input == NULL) {
+ /* nothing */
+ } else if (pclient->output == NULL) {
+ i_assert(pclient->io == NULL);
+ pclient->io = io_add_istream(pclient->program_input,
+ program_client_input_finish,
+ pclient);
+ io_set_pending(pclient->io);
+ } else {
+ pclient->pump_in =
+ iostream_pump_create(pclient->program_input,
+ pclient->output);
+ iostream_pump_set_completion_callback(pclient->pump_in,
+ program_client_input_pump_finished, pclient);
+ iostream_pump_start(pclient->pump_in);
+ }
+
/* run program output */
if (pclient->program_output == NULL) {
/* nothing */
program_input = i_stream_create_fd(pclient->fd_in, (size_t)-1);
i_stream_set_name(program_input, "program stdout");
- pclient->io = io_add(pclient->fd_in, IO_READ,
- program_client_program_input, pclient);
pclient->raw_program_input = program_input;
}
i_stream_unref(&pclient->raw_program_input);
o_stream_unref(&pclient->raw_program_output);
- io_remove(&pclient->io);
-
if (pclient->destroy != NULL)
pclient->destroy(pclient);
o_stream_switch_ioloop(pclient->program_output);
if (pclient->to != NULL)
pclient->to = io_loop_move_timeout(&pclient->to);
+ if (pclient->pump_in != NULL)
+ iostream_pump_switch_ioloop(pclient->pump_in);
if (pclient->pump_out != NULL)
iostream_pump_switch_ioloop(pclient->pump_out);
if (pclient->io != NULL)