]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-program-client: Use iostream-pump for streaming data towards program.
authorStephan Bosch <stephan.bosch@dovecot.fi>
Sun, 25 Feb 2018 16:21:47 +0000 (17:21 +0100)
committerAki Tuomi <aki.tuomi@open-xchange.com>
Tue, 25 Jun 2019 09:22:31 +0000 (12:22 +0300)
src/lib-program-client/program-client-private.h
src/lib-program-client/program-client.c

index 7abad7ed4b8e153f5ab644ec408988c2ad7b937e..9873c33286a8703f215474159d31018a9f9e7a00 100644 (file)
@@ -44,6 +44,8 @@ struct program_client {
        struct istream *input, *program_input, *raw_program_input;
        struct ostream *output, *program_output, *raw_program_output;
 
+       struct iostream_pump *pump_out;
+
        ARRAY(struct program_client_extra_fd) extra_fds;
 
        program_client_callback_t *callback;
index d977dfc7b5fe9da58cd93695a301c8488ed6943c..a3e61c806f61dbcb6726f230d2b3748eb8e069dd 100644 (file)
@@ -9,6 +9,7 @@
 #include "ostream-dot.h"
 #include "istream-dot.h"
 #include "ostream.h"
+#include "iostream-pump.h"
 #include "iostream-temp.h"
 #include "lib-signals.h"
 
@@ -65,9 +66,9 @@ program_client_close_output(struct program_client *pclient)
        int ret;
 
        o_stream_destroy(&pclient->program_output);
+       o_stream_destroy(&pclient->raw_program_output);
        if ((ret = pclient->close_output(pclient)) < 0)
                return -1;
-       pclient->program_output = NULL;
 
        return ret;
 }
@@ -98,6 +99,7 @@ void program_client_disconnected(struct program_client *pclient)
        o_stream_destroy(&pclient->raw_program_output);
 
        io_remove(&pclient->io);
+       iostream_pump_destroy(&pclient->pump_out);
 
        if (pclient->fd_in != -1 && close(pclient->fd_in) < 0)
                i_error("close(%s) failed: %m", pclient->path);
@@ -130,6 +132,7 @@ program_client_disconnect(struct program_client *pclient, bool force)
 
        timeout_remove(&pclient->to);
        io_remove(&pclient->io);
+       iostream_pump_destroy(&pclient->pump_out);
 
        if ((ret = program_client_close_output(pclient)) < 0)
                pclient->other_error = TRUE;
@@ -155,6 +158,14 @@ program_client_input_pending(struct program_client *pclient)
        struct program_client_extra_fd *efds = NULL;
        unsigned int count, i;
 
+       if (pclient->pump_out != NULL)
+               return TRUE;
+
+       if (pclient->program_output != NULL &&
+           !pclient->program_output->closed &&
+           o_stream_get_buffer_used_size(pclient->program_output) > 0) {
+               return TRUE;
+       }
        if (pclient->program_input != NULL &&
            !pclient->program_input->closed &&
            i_stream_have_bytes_left(pclient->program_input)) {
@@ -175,81 +186,71 @@ program_client_input_pending(struct program_client *pclient)
        return FALSE;
 }
 
+static void
+program_client_output_finished(struct program_client *pclient)
+{
+       /* check whether program i/o is finished */
+       if (!program_client_input_pending(pclient)) {
+               /* finished */
+               program_client_disconnect(pclient, FALSE);
+       /* close output towards program, so that it reads EOF */
+       } else if (program_client_close_output(pclient) < 0) {
+               program_client_fail(pclient,
+                                   PROGRAM_CLIENT_ERROR_OTHER);
+       }
+}
+
 static int
-program_client_program_output(struct program_client *pclient)
+program_client_output_finish(struct program_client *pclient)
 {
-       struct istream *input = pclient->input;
-       struct ostream *output = pclient->program_output;
-       enum ostream_send_istream_result res;
+        struct ostream *output = pclient->program_output;
        int ret = 0;
 
-       /* flush the output first, before writing more */
-       if ((ret = o_stream_flush(output)) <= 0) {
-               if (ret < 0) {
-                       i_error("write(%s) failed: %s",
-                               o_stream_get_name(output),
-                               o_stream_get_error(output));
-                       program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
-               }
-
-               return ret;
+       /* flush the output */
+       if ((ret=o_stream_finish(output)) < 0) {
+               i_error("write(%s) failed: %s",
+                       o_stream_get_name(output),
+                       o_stream_get_error(output));
+               program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+               return -1;
        }
+       if (ret > 0)
+               program_client_output_finished(pclient);
+       return ret;
+}
 
-       if (input != NULL && output != NULL) {
-               /* transfer provided input stream to output towards program */
-               res = o_stream_send_istream(output, input);
-               switch (res) {
-               case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
-                       i_stream_unref(&pclient->input);
-                       input = NULL;
-                       break;
-               case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
-                       return 1;
-               case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
-                       return 0;
-               case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
-                       i_error("read(%s) failed: %s",
-                               i_stream_get_name(input),
-                               i_stream_get_error(input));
-                       program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
-                       return -1;
-               case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
-                       i_error("write(%s) failed: %s",
-                               o_stream_get_name(output),
-                               o_stream_get_error(output));
-                       program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
-                       return -1;
-               }
-       }
+static void
+program_client_output_pump_finished(enum iostream_pump_status status,
+                                   struct program_client *pclient)
+{
+       struct istream *input = pclient->input;
+       struct ostream *output = pclient->program_output;
 
-       if (input == NULL && output != NULL) {
-               /* finish and flush program output */
-               if ((ret=o_stream_finish(output)) < 0) {
-                       i_error("write(%s) failed: %s",
-                               o_stream_get_name(output),
-                               o_stream_get_error(output));
-                       program_client_fail(pclient,
-                                           PROGRAM_CLIENT_ERROR_IO);
-                       return -1;
-               }
-               if (ret == 0)
-                       return 0;
-               o_stream_unref(&pclient->program_output);
-               o_stream_unref(&pclient->raw_program_output);
-       }
+       i_assert(input != NULL);
+       i_assert(output != NULL);
 
-       if (input == NULL) {
-               /* check whether program i/o is finished */
-               if (!program_client_input_pending(pclient)) {
-                       /* finished */
-                       program_client_disconnect(pclient, FALSE);
-               /* close output towards program, so that it reads EOF */
-               } else if (program_client_close_output(pclient) < 0) {
-                       program_client_fail(pclient,
-                                           PROGRAM_CLIENT_ERROR_OTHER);
-               }
+       switch (status) {
+       case IOSTREAM_PUMP_STATUS_INPUT_EOF:
+               break;
+       case IOSTREAM_PUMP_STATUS_INPUT_ERROR:
+               i_error("read(%s) failed: %s",
+                       i_stream_get_name(input),
+                       i_stream_get_error(input));
+               program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+               return;
+       case IOSTREAM_PUMP_STATUS_OUTPUT_ERROR:
+               i_error("write(%s) failed: %s",
+                       o_stream_get_name(output),
+                       o_stream_get_error(output));
+               program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+               return;
        }
-       return 1;
+
+       iostream_pump_destroy(&pclient->pump_out);
+
+       o_stream_set_flush_callback(pclient->program_output,
+               program_client_output_finish, pclient);
+       o_stream_set_flush_pending(pclient->program_output, TRUE);
 }
 
 void program_client_program_input(struct program_client *pclient)
@@ -384,13 +385,20 @@ int program_client_connected(struct program_client *pclient)
                                    program_client_timeout, pclient);
        }
 
-       /* run output */
-       if (pclient->program_output != NULL &&
-           (ret = program_client_program_output(pclient)) == 0) {
-               if (pclient->program_output != NULL) {
-                       o_stream_set_flush_callback(pclient->program_output,
-                                program_client_program_output, pclient);
-               }
+       /* run program output */
+       if (pclient->program_output == NULL) {
+               /* nothing */
+       } else if (pclient->input == NULL) {
+               o_stream_set_flush_callback(pclient->program_output,
+                       program_client_output_finish, pclient);
+               o_stream_set_flush_pending(pclient->program_output, TRUE);
+       } else {
+               pclient->pump_out =
+                       iostream_pump_create(pclient->input,
+                                            pclient->program_output);
+               iostream_pump_set_completion_callback(pclient->pump_out,
+                       program_client_output_pump_finished, pclient);
+               iostream_pump_start(pclient->pump_out);
        }
 
        return ret;
@@ -573,6 +581,8 @@ void program_client_switch_ioloop(struct program_client *pclient)
                o_stream_switch_ioloop(pclient->program_output);
        if (pclient->to != NULL)
                pclient->to = io_loop_move_timeout(&pclient->to);
+       if (pclient->pump_out != NULL)
+               iostream_pump_switch_ioloop(pclient->pump_out);
        if (pclient->io != NULL)
                pclient->io = io_loop_move_io(&pclient->io);
        pclient->switch_ioloop(pclient);