#include "ostream-dot.h"
#include "istream-dot.h"
#include "ostream.h"
+#include "iostream-pump.h"
#include "iostream-temp.h"
#include "lib-signals.h"
int ret;
o_stream_destroy(&pclient->program_output);
+ o_stream_destroy(&pclient->raw_program_output);
if ((ret = pclient->close_output(pclient)) < 0)
return -1;
- pclient->program_output = NULL;
return ret;
}
o_stream_destroy(&pclient->raw_program_output);
io_remove(&pclient->io);
+ iostream_pump_destroy(&pclient->pump_out);
if (pclient->fd_in != -1 && close(pclient->fd_in) < 0)
i_error("close(%s) failed: %m", pclient->path);
timeout_remove(&pclient->to);
io_remove(&pclient->io);
+ iostream_pump_destroy(&pclient->pump_out);
if ((ret = program_client_close_output(pclient)) < 0)
pclient->other_error = TRUE;
struct program_client_extra_fd *efds = NULL;
unsigned int count, i;
+ if (pclient->pump_out != NULL)
+ return TRUE;
+
+ if (pclient->program_output != NULL &&
+ !pclient->program_output->closed &&
+ o_stream_get_buffer_used_size(pclient->program_output) > 0) {
+ return TRUE;
+ }
if (pclient->program_input != NULL &&
!pclient->program_input->closed &&
i_stream_have_bytes_left(pclient->program_input)) {
return FALSE;
}
+static void
+program_client_output_finished(struct program_client *pclient)
+{
+ /* check whether program i/o is finished */
+ if (!program_client_input_pending(pclient)) {
+ /* finished */
+ program_client_disconnect(pclient, FALSE);
+ /* close output towards program, so that it reads EOF */
+ } else if (program_client_close_output(pclient) < 0) {
+ program_client_fail(pclient,
+ PROGRAM_CLIENT_ERROR_OTHER);
+ }
+}
+
static int
-program_client_program_output(struct program_client *pclient)
+program_client_output_finish(struct program_client *pclient)
{
- struct istream *input = pclient->input;
- struct ostream *output = pclient->program_output;
- enum ostream_send_istream_result res;
+ struct ostream *output = pclient->program_output;
int ret = 0;
- /* flush the output first, before writing more */
- if ((ret = o_stream_flush(output)) <= 0) {
- if (ret < 0) {
- i_error("write(%s) failed: %s",
- o_stream_get_name(output),
- o_stream_get_error(output));
- program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
- }
-
- return ret;
+ /* flush the output */
+ if ((ret=o_stream_finish(output)) < 0) {
+ i_error("write(%s) failed: %s",
+ o_stream_get_name(output),
+ o_stream_get_error(output));
+ program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+ return -1;
}
+ if (ret > 0)
+ program_client_output_finished(pclient);
+ return ret;
+}
- if (input != NULL && output != NULL) {
- /* transfer provided input stream to output towards program */
- res = o_stream_send_istream(output, input);
- switch (res) {
- case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
- i_stream_unref(&pclient->input);
- input = NULL;
- break;
- case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
- return 1;
- case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
- return 0;
- case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
- i_error("read(%s) failed: %s",
- i_stream_get_name(input),
- i_stream_get_error(input));
- program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
- return -1;
- case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
- i_error("write(%s) failed: %s",
- o_stream_get_name(output),
- o_stream_get_error(output));
- program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
- return -1;
- }
- }
+static void
+program_client_output_pump_finished(enum iostream_pump_status status,
+ struct program_client *pclient)
+{
+ struct istream *input = pclient->input;
+ struct ostream *output = pclient->program_output;
- if (input == NULL && output != NULL) {
- /* finish and flush program output */
- if ((ret=o_stream_finish(output)) < 0) {
- i_error("write(%s) failed: %s",
- o_stream_get_name(output),
- o_stream_get_error(output));
- program_client_fail(pclient,
- PROGRAM_CLIENT_ERROR_IO);
- return -1;
- }
- if (ret == 0)
- return 0;
- o_stream_unref(&pclient->program_output);
- o_stream_unref(&pclient->raw_program_output);
- }
+ i_assert(input != NULL);
+ i_assert(output != NULL);
- if (input == NULL) {
- /* check whether program i/o is finished */
- if (!program_client_input_pending(pclient)) {
- /* finished */
- program_client_disconnect(pclient, FALSE);
- /* close output towards program, so that it reads EOF */
- } else if (program_client_close_output(pclient) < 0) {
- program_client_fail(pclient,
- PROGRAM_CLIENT_ERROR_OTHER);
- }
+ switch (status) {
+ case IOSTREAM_PUMP_STATUS_INPUT_EOF:
+ break;
+ case IOSTREAM_PUMP_STATUS_INPUT_ERROR:
+ i_error("read(%s) failed: %s",
+ i_stream_get_name(input),
+ i_stream_get_error(input));
+ program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+ return;
+ case IOSTREAM_PUMP_STATUS_OUTPUT_ERROR:
+ i_error("write(%s) failed: %s",
+ o_stream_get_name(output),
+ o_stream_get_error(output));
+ program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
+ return;
}
- return 1;
+
+ iostream_pump_destroy(&pclient->pump_out);
+
+ o_stream_set_flush_callback(pclient->program_output,
+ program_client_output_finish, pclient);
+ o_stream_set_flush_pending(pclient->program_output, TRUE);
}
void program_client_program_input(struct program_client *pclient)
program_client_timeout, pclient);
}
- /* run output */
- if (pclient->program_output != NULL &&
- (ret = program_client_program_output(pclient)) == 0) {
- if (pclient->program_output != NULL) {
- o_stream_set_flush_callback(pclient->program_output,
- program_client_program_output, pclient);
- }
+ /* run program output */
+ if (pclient->program_output == NULL) {
+ /* nothing */
+ } else if (pclient->input == NULL) {
+ o_stream_set_flush_callback(pclient->program_output,
+ program_client_output_finish, pclient);
+ o_stream_set_flush_pending(pclient->program_output, TRUE);
+ } else {
+ pclient->pump_out =
+ iostream_pump_create(pclient->input,
+ pclient->program_output);
+ iostream_pump_set_completion_callback(pclient->pump_out,
+ program_client_output_pump_finished, pclient);
+ iostream_pump_start(pclient->pump_out);
}
return ret;
o_stream_switch_ioloop(pclient->program_output);
if (pclient->to != NULL)
pclient->to = io_loop_move_timeout(&pclient->to);
+ if (pclient->pump_out != NULL)
+ iostream_pump_switch_ioloop(pclient->pump_out);
if (pclient->io != NULL)
pclient->io = io_loop_move_io(&pclient->io);
pclient->switch_ioloop(pclient);