]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-program-client: Use iostream-pump for streaming data from program.
authorStephan Bosch <stephan.bosch@dovecot.fi>
Sun, 25 Feb 2018 23:07:10 +0000 (00:07 +0100)
committerAki Tuomi <aki.tuomi@dovecot.fi>
Sun, 18 Mar 2018 10:53:18 +0000 (12:53 +0200)
src/lib-program-client/program-client-private.h
src/lib-program-client/program-client.c

index d2f6806fbcf8867b7724570ce2cf61a59d6c48fd..ff5750744aea3aebdf342f8cd7f1c7ff27d09c12 100644 (file)
@@ -44,7 +44,7 @@ struct program_client {
        struct istream *input, *program_input, *raw_program_input;
        struct ostream *output, *program_output, *raw_program_output;
 
-       struct iostream_pump *pump_out;
+       struct iostream_pump *pump_in, *pump_out;
 
        ARRAY(struct program_client_extra_fd) extra_fds;
 
index 4df036c4944feee24ed78b3e21077a5a31a23067..7f7592d8e262c3e9b87bd1ae5fdaa56f912db939 100644 (file)
@@ -100,6 +100,7 @@ void program_client_disconnected(struct program_client *pclient)
 
        timeout_remove(&pclient->to);
        io_remove(&pclient->io);
+       iostream_pump_destroy(&pclient->pump_in);
        iostream_pump_destroy(&pclient->pump_out);
 
        if (pclient->fd_in != -1 && close(pclient->fd_in) < 0)
@@ -133,6 +134,7 @@ program_client_disconnect(struct program_client *pclient, bool force)
 
        timeout_remove(&pclient->to);
        io_remove(&pclient->io);
+       iostream_pump_destroy(&pclient->pump_in);
        iostream_pump_destroy(&pclient->pump_out);
 
        if ((ret = program_client_close_output(pclient)) < 0)
@@ -159,7 +161,7 @@ program_client_input_pending(struct program_client *pclient)
        struct program_client_extra_fd *efds = NULL;
        unsigned int count, i;
 
-       if (pclient->pump_out != NULL)
+       if (pclient->pump_in != NULL || pclient->pump_out != NULL)
                return TRUE;
 
        if (pclient->program_output != NULL &&
@@ -254,87 +256,97 @@ program_client_output_pump_finished(enum iostream_pump_status status,
        o_stream_set_flush_pending(pclient->program_output, TRUE);
 }
 
-void program_client_program_input(struct program_client *pclient)
+static void
+program_client_input_finished(struct program_client *pclient)
+{
+       /* check whether program i/o is finished */
+       if (program_client_input_pending(pclient))
+               return;
+
+       /* finished */
+       program_client_disconnect(pclient, FALSE);
+}
+
+static void
+program_client_input_finish(struct program_client *pclient)
 {
        struct istream *input = pclient->program_input;
-       struct ostream *output = pclient->output;
-       enum ostream_send_istream_result res;
        const unsigned char *data;
        size_t size;
-       int ret = 0;
-
-       if (input != NULL) {
-               /* transfer input from program to provided output stream */
-               if (output != NULL) {
-                       res = o_stream_send_istream(output, input);
-                       switch (res) {
-                       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
-                               /* return to raw program input */
-                               if (pclient->program_input ==
-                                       pclient->raw_program_input)
-                                       break;
-                               i_stream_unref(&pclient->program_input);
-                               input = pclient->program_input =
-                                       pclient->raw_program_input;
-                               i_stream_ref(pclient->program_input);
-                               break;
-                       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
-                       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
-                               return;
-                       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
-                               i_error("read(%s) failed: %s",
-                                       i_stream_get_name(input),
-                                       i_stream_get_error(input));
-                               program_client_fail(pclient,
-                                                   PROGRAM_CLIENT_ERROR_IO);
-                               return;
-                       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
-                               i_error("write(%s) failed: %s",
-                                       o_stream_get_name(output),
-                                       o_stream_get_error(output));
-                               program_client_fail(pclient,
-                                                   PROGRAM_CLIENT_ERROR_IO);
-                               return;
-                       }
-               }
+       int ret;
 
-               /* read (the remainder of) the raw program input */
-               while ((ret=i_stream_read_more(input, &data, &size)) > 0)
-                       i_stream_skip(input, size);
-               if (ret == 0)
+       /* read (the remainder of) the raw program input */
+       while ((ret=i_stream_read_more(input, &data, &size)) > 0)
+               i_stream_skip(input, size);
+       if (ret == 0)
+               return;
+       if (ret < 0) {
+               if (input->stream_errno != 0) {
+                       i_error("read(%s) failed: %s",
+                               i_stream_get_name(input),
+                               i_stream_get_error(input));
+                       program_client_fail(pclient,
+                                           PROGRAM_CLIENT_ERROR_IO);
                        return;
-               if (ret < 0) {
-                       if (input->stream_errno != 0) {
-                               i_error("read(%s) failed: %s",
-                                       i_stream_get_name(input),
-                                       i_stream_get_error(input));
-                               program_client_fail(pclient,
-                                                   PROGRAM_CLIENT_ERROR_IO);
-                               return;
-                       }
                }
+       }
 
-               /* flush output stream to make sure all is sent */
-               if (output != NULL) {
-                       if ((ret=o_stream_flush(output)) < 0) {
-                               i_error("write(%s) failed: %s",
-                                       o_stream_get_name(output),
-                                       o_stream_get_error(output));
-                               program_client_fail(pclient,
-                                                   PROGRAM_CLIENT_ERROR_IO);
-                               return;
-                       }
-                       if (ret == 0)
-                               return;
-               }
+       if (pclient->program_input != pclient->raw_program_input) {
+               /* return to raw program input */
+               i_stream_unref(&pclient->program_input);
+               pclient->program_input = pclient->raw_program_input;
+               i_stream_ref(pclient->program_input);
+
+               io_remove(&pclient->io);
+               pclient->io = io_add_istream(pclient->program_input,
+                                            program_client_input_finish,
+                                            pclient);
+               io_set_pending(pclient->io);
+       }
 
-               /* check whether program i/o is finished */
-               if (program_client_input_pending(pclient))
-                       return;
+       program_client_input_finished(pclient);
+}
+
+static void
+program_client_input_pump_finished(enum iostream_pump_status status,
+                                  struct program_client *pclient)
+{
+       struct istream *input = pclient->program_input;
+       struct ostream *output = pclient->output;
+
+       i_assert(input != NULL);
+       i_assert(output != NULL);
+
+       switch (status) {
+       case IOSTREAM_PUMP_STATUS_INPUT_EOF:
+               break;
+       case IOSTREAM_PUMP_STATUS_INPUT_ERROR:
+               i_error("read(%s) failed: %s",
+                       i_stream_get_name(input),
+                       i_stream_get_error(input));
+               program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+               return;
+       case IOSTREAM_PUMP_STATUS_OUTPUT_ERROR:
+               i_error("write(%s) failed: %s",
+                       o_stream_get_name(output),
+                       o_stream_get_error(output));
+               program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+               return;
        }
 
-       /* finished */
-       program_client_disconnect(pclient, FALSE);
+       iostream_pump_destroy(&pclient->pump_in);
+
+       if (pclient->program_input != pclient->raw_program_input) {
+               /* return to raw program input */
+               i_stream_unref(&pclient->program_input);
+               pclient->program_input = pclient->raw_program_input;
+               i_stream_ref(pclient->program_input);
+       }
+
+       i_assert(pclient->io == NULL);
+       pclient->io = io_add_istream(pclient->program_input,
+                                    program_client_input_finish, pclient);
+       io_set_pending(pclient->io);
 }
 
 static void
@@ -384,6 +396,24 @@ void program_client_connected(struct program_client *pclient)
                                    program_client_timeout, pclient);
        }
 
+       /* run program input */
+       if (pclient->program_input == NULL) {
+               /* nothing */
+       } else if (pclient->output == NULL) {
+               i_assert(pclient->io == NULL);
+               pclient->io = io_add_istream(pclient->program_input,
+                                            program_client_input_finish,
+                                            pclient);
+               io_set_pending(pclient->io);
+       } else {
+               pclient->pump_in =
+                       iostream_pump_create(pclient->program_input,
+                                            pclient->output);
+               iostream_pump_set_completion_callback(pclient->pump_in,
+                       program_client_input_pump_finished, pclient);
+               iostream_pump_start(pclient->pump_in);
+       }
+
        /* run program output */
        if (pclient->program_output == NULL) {
                /* nothing */
@@ -512,8 +542,6 @@ void program_client_init_streams(struct program_client *pclient)
 
                program_input = i_stream_create_fd(pclient->fd_in, (size_t)-1);
                i_stream_set_name(program_input, "program stdout");
-               pclient->io = io_add(pclient->fd_in, IO_READ,
-                                    program_client_program_input, pclient);
                pclient->raw_program_input = program_input;
        }
 
@@ -558,8 +586,6 @@ void program_client_destroy(struct program_client **_pclient)
        i_stream_unref(&pclient->raw_program_input);
        o_stream_unref(&pclient->raw_program_output);
 
-       io_remove(&pclient->io);
-
        if (pclient->destroy != NULL)
                pclient->destroy(pclient);
 
@@ -578,6 +604,8 @@ void program_client_switch_ioloop(struct program_client *pclient)
                o_stream_switch_ioloop(pclient->program_output);
        if (pclient->to != NULL)
                pclient->to = io_loop_move_timeout(&pclient->to);
+       if (pclient->pump_in != NULL)
+               iostream_pump_switch_ioloop(pclient->pump_in);
        if (pclient->pump_out != NULL)
                iostream_pump_switch_ioloop(pclient->pump_out);
        if (pclient->io != NULL)