struct client {
int fd;
struct io *io_client, *io_server;
- struct istream *input;
+ struct istream *input, *stdin_input;
struct ostream *output;
bool compressed;
+ bool compress_waiting;
};
+static bool client_input_is_compress_command(const char *line)
+{
+ /* skip tag */
+ while (*line != ' ' && *line != '\0')
+ line++;
+ return strcasecmp(line, " COMPRESS DEFLATE") == 0;
+}
+
+static bool client_input_uncompressed(struct client *client)
+{
+ const char *line;
+
+ if (client->compress_waiting) {
+ /* just read all the pipelined input for now */
+ (void)i_stream_read(client->stdin_input);
+ return TRUE;
+ }
+
+ while ((line = i_stream_read_next_line(client->stdin_input)) != NULL) {
+ o_stream_nsend_str(client->output, line);
+ o_stream_nsend(client->output, "\n", 1);
+ if (client_input_is_compress_command(line))
+ return TRUE;
+ }
+ return FALSE;
+}
+
static void client_input(struct client *client)
{
- struct istream *input;
- struct ostream *output;
- unsigned char buf[1024];
- ssize_t ret;
-
- ret = read(STDIN_FILENO, buf, sizeof(buf));
- if (ret == 0) {
- if (client->compressed) {
- master_service_stop(master_service);
- return;
- }
- /* start compression */
- i_info("<Compression started>");
- input = i_stream_create_deflate(client->input);
- output = o_stream_create_deflate(client->output, 6);
- i_stream_unref(&client->input);
- o_stream_unref(&client->output);
- client->input = input;
- client->output = output;
- client->compressed = TRUE;
+ const unsigned char *data;
+ size_t size;
+
+ if (!client->compressed &&
+ client_input_uncompressed(client)) {
+ /* stop until server has sent reply to COMPRESS command. */
+ client->compress_waiting = TRUE;
return;
}
- if (ret < 0)
- i_fatal("read(stdin) failed: %m");
+ if (client->compressed) {
+ if (i_stream_read_more(client->stdin_input, &data, &size) > 0) {
+ o_stream_nsend(client->output, data, size);
+ i_stream_skip(client->stdin_input, size);
+ }
+ }
+ if (client->stdin_input->eof) {
+ if (client->stdin_input->stream_errno != 0) {
+ i_fatal("read(stdin) failed: %s",
+ i_stream_get_error(client->stdin_input));
+ }
+ master_service_stop(master_service);
+ }
+}
+
+static bool server_input_is_compress_reply(const char *line)
+{
+ /* skip tag */
+ while (*line != ' ' && *line != '\0')
+ line++;
+ return str_begins(line, " OK Begin compression");
+}
+
+static bool server_input_uncompressed(struct client *client)
+{
+ const char *line;
- o_stream_nsend(client->output, buf, ret);
+ while ((line = i_stream_read_next_line(client->input)) != NULL) {
+ if (write(STDOUT_FILENO, line, strlen(line)) < 0)
+ i_fatal("write(stdout) failed: %m");
+ if (write(STDOUT_FILENO, "\n", 1) < 0)
+ i_fatal("write(stdout) failed: %m");
+ if (server_input_is_compress_reply(line))
+ return TRUE;
+ }
+ return FALSE;
}
static void server_input(struct client *client)
return;
}
+ if (!client->compressed && server_input_uncompressed(client)) {
+ /* start compression */
+ struct istream *input;
+ struct ostream *output;
+
+ i_info("<Compression started>");
+ input = i_stream_create_deflate(client->input);
+ output = o_stream_create_deflate(client->output, 6);
+ i_stream_unref(&client->input);
+ o_stream_unref(&client->output);
+ client->input = input;
+ client->output = output;
+ client->compressed = TRUE;
+ client->compress_waiting = FALSE;
+ i_stream_set_input_pending(client->stdin_input, TRUE);
+ }
+
data = i_stream_get_data(client->input, &size);
if (write(STDOUT_FILENO, data, size) < 0)
i_fatal("write(stdout) failed: %m");
if ((fd = net_connect_ip(&ips[0], port, NULL)) == -1)
i_fatal("connect(%s, %u) failed: %m", argv[1], port);
- i_info("Connected to %s port %u. Ctrl-D starts compression",
- net_ip2addr(&ips[0]), port);
+ i_info("Connected to %s port %u.", net_ip2addr(&ips[0]), port);
i_zero(&client);
client.fd = fd;
+ fd_set_nonblock(STDIN_FILENO, TRUE);
+ client.stdin_input = i_stream_create_fd(STDIN_FILENO, SIZE_MAX);
client.input = i_stream_create_fd(fd, SIZE_MAX);
client.output = o_stream_create_fd(fd, 0);
o_stream_set_no_error_handling(client.output, TRUE);
- client.io_client = io_add(STDIN_FILENO, IO_READ, client_input, &client);
- client.io_server = io_add(fd, IO_READ, server_input, &client);
+ client.io_client = io_add_istream(client.stdin_input, client_input, &client);
+ client.io_server = io_add_istream(client.input, server_input, &client);
master_service_run(master_service, NULL);
io_remove(&client.io_client);
io_remove(&client.io_server);
+ i_stream_unref(&client.stdin_input);
i_stream_unref(&client.input);
o_stream_unref(&client.output);
if (close(fd) < 0)