]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
Added support for a simplified IPC infrastructure.
authorTimo Sirainen <tss@iki.fi>
Fri, 20 May 2011 15:45:29 +0000 (18:45 +0300)
committerTimo Sirainen <tss@iki.fi>
Fri, 20 May 2011 15:45:29 +0000 (18:45 +0300)
The idea is that you have one "ipc" proxy process, where all server
processes connect to. IPC clients can then connect to the proxy and ask it
to forward commands to either a specific server or all servers. The proxy
does this, and forwards back any replies from the server.

15 files changed:
src/Makefile.am
src/ipc/Makefile.am [new file with mode: 0644]
src/ipc/client.c [new file with mode: 0644]
src/ipc/client.h [new file with mode: 0644]
src/ipc/ipc-connection.c [new file with mode: 0644]
src/ipc/ipc-connection.h [new file with mode: 0644]
src/ipc/ipc-group.c [new file with mode: 0644]
src/ipc/ipc-group.h [new file with mode: 0644]
src/ipc/ipc-settings.c [new file with mode: 0644]
src/ipc/main.c [new file with mode: 0644]
src/lib-master/Makefile.am
src/lib-master/ipc-client.c [new file with mode: 0644]
src/lib-master/ipc-client.h [new file with mode: 0644]
src/lib-master/ipc-server.c [new file with mode: 0644]
src/lib-master/ipc-server.h [new file with mode: 0644]

index 91b5dd8c9471380d4622fc4a6c56beca17ad2607..272d83c520601fbf5dea9ed08c738c59e923336c 100644 (file)
@@ -24,6 +24,7 @@ SUBDIRS = \
        auth \
        dict \
        dns \
+       ipc \
        master \
        login-common \
        imap-login \
diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am
new file mode 100644 (file)
index 0000000..ef1789f
--- /dev/null
@@ -0,0 +1,25 @@
+pkglibexecdir = $(libexecdir)/dovecot
+
+pkglibexec_PROGRAMS = ipc
+
+AM_CPPFLAGS = \
+       -I$(top_srcdir)/src/lib \
+       -I$(top_srcdir)/src/lib-settings \
+       -I$(top_srcdir)/src/lib-master
+
+ipc_LDADD = \
+       $(LIBDOVECOT) \
+       $(MODULE_LIBS)
+ipc_DEPENDENCIES = $(LIBDOVECOT_DEPS)
+
+ipc_SOURCES = \
+       client.c \
+       main.c \
+       ipc-connection.c \
+       ipc-group.c \
+       ipc-settings.c
+
+noinst_HEADERS = \
+       client.h \
+       ipc-connection.h \
+       ipc-group.h
diff --git a/src/ipc/client.c b/src/ipc/client.c
new file mode 100644 (file)
index 0000000..22bdb5a
--- /dev/null
@@ -0,0 +1,150 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "llist.h"
+#include "ioloop.h"
+#include "istream.h"
+#include "ostream.h"
+#include "master-service.h"
+#include "ipc-group.h"
+#include "ipc-connection.h"
+#include "client.h"
+
+#include <unistd.h>
+
+struct client {
+       struct client *prev, *next;
+
+       int fd;
+       struct io *io;
+       struct istream *input;
+       struct ostream *output;
+};
+
+static struct client *clients;
+
+static void client_input(struct client *client);
+
+static void
+client_cmd_input(enum ipc_cmd_status status, const char *line, void *context)
+{
+       struct client *client = context;
+       char chr = '\0';
+
+       switch (status) {
+       case IPC_CMD_STATUS_REPLY:
+               chr = ':';
+               break;
+       case IPC_CMD_STATUS_OK:
+               chr = '+';
+               break;
+       case IPC_CMD_STATUS_ERROR:
+               chr = '-';
+               break;
+       }
+
+       T_BEGIN {
+               o_stream_send_str(client->output,
+                                 t_strdup_printf("%c%s\n", chr, line));
+       } T_END;
+
+       if (status != IPC_CMD_STATUS_REPLY) {
+               i_assert(client->io == NULL);
+               client->io = io_add(client->fd, IO_READ, client_input, client);
+               client_input(client);
+       }
+}
+
+static void client_input(struct client *client)
+{
+       struct ipc_group *group;
+       struct ipc_connection *conn;
+       char *line, *id, *data;
+       unsigned int id_num;
+
+       while ((line = i_stream_read_next_line(client->input)) != NULL) {
+               /* <ipc name> *|<id> <command> */
+               id = strchr(line, '\t');
+               if (id == NULL)
+                       data = NULL;
+               else {
+                       *id++ = '\0';
+                       data = strchr(id, '\t');
+               }
+               if (data == NULL || data[1] == '\0') {
+                       o_stream_send_str(client->output, "-Invalid input\n");
+                       continue;
+               }
+               *data++ = '\0';
+
+               group = ipc_group_lookup_name(line);
+               if (group == NULL) {
+                       o_stream_send_str(client->output,
+                               t_strdup_printf("-Unknown IPC group: %s\n", line));
+                       continue;
+               }
+
+               if (strcmp(id, "*") == 0) {
+                       /* send to everyone */
+                       ipc_group_cmd(group, data, client_cmd_input, client);
+               } else if (str_to_uint(id, &id_num) < 0) {
+                       o_stream_send_str(client->output,
+                               t_strdup_printf("-Invalid IPC connection id: %s\n", id));
+                       continue;
+               } else if ((conn = ipc_connection_lookup_id(group, id_num)) == NULL) {
+                       o_stream_send_str(client->output,
+                               t_strdup_printf("-Unknown IPC connection id: %u\n", id_num));
+                       continue;
+               } else {
+                       ipc_connection_cmd(conn, data, client_cmd_input, client);
+               }
+
+               /* we'll handle commands one at a time. stop reading input
+                  until this command is finished. */
+               io_remove(&client->io);
+               break;
+       }
+       if (client->input->eof || client->input->stream_errno != 0)
+               client_destroy(&client);
+}
+
+struct client *client_create(int fd)
+{
+       struct client *client;
+
+       client = i_new(struct client, 1);
+       client->fd = fd;
+       client->io = io_add(fd, IO_READ, client_input, client);
+       client->input = i_stream_create_fd(fd, (size_t)-1, FALSE);
+       client->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
+
+       DLLIST_PREPEND(&clients, client);
+       return client;
+}
+
+void client_destroy(struct client **_client)
+{
+       struct client *client = *_client;
+
+       *_client = NULL;
+
+       DLLIST_REMOVE(&clients, client);
+       if (client->io != NULL)
+               io_remove(&client->io);
+       i_stream_destroy(&client->input);
+       o_stream_destroy(&client->output);
+       if (close(client->fd) < 0)
+               i_error("close(client) failed: %m");
+       i_free(client);
+
+       master_service_client_connection_destroyed(master_service);
+}
+
+void clients_destroy_all(void)
+{
+       while (clients != NULL) {
+               struct client *client = clients;
+
+               client_destroy(&client);
+       }
+}
diff --git a/src/ipc/client.h b/src/ipc/client.h
new file mode 100644 (file)
index 0000000..a5936a6
--- /dev/null
@@ -0,0 +1,9 @@
+#ifndef CLIENT_H
+#define CLIENT_H
+
+struct client *client_create(int fd);
+void client_destroy(struct client **client);
+
+void clients_destroy_all(void);
+
+#endif
diff --git a/src/ipc/ipc-connection.c b/src/ipc/ipc-connection.c
new file mode 100644 (file)
index 0000000..2649450
--- /dev/null
@@ -0,0 +1,242 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "ioloop.h"
+#include "istream.h"
+#include "ostream.h"
+#include "llist.h"
+#include "master-service.h"
+#include "ipc-group.h"
+#include "ipc-connection.h"
+
+#include <unistd.h>
+
+#define IPC_SERVER_PROTOCOL_MAJOR_VERSION 1
+#define IPC_SERVER_PROTOCOL_MINOR_VERSION 0
+
+#define IPC_SERVER_HANDSHAKE "VERSION\tipc-proxy\t1\t0\n"
+
+static unsigned int connection_id_counter;
+static void ipc_connection_cmd_free(struct ipc_connection_cmd **cmd);
+
+static void ipc_connection_cmd_free(struct ipc_connection_cmd **_cmd)
+{
+       struct ipc_connection_cmd *cmd = *_cmd;
+       struct ipc_connection_cmd **cmds;
+       unsigned int i, count;
+
+       *_cmd = NULL;
+
+       cmds = array_get_modifiable(&cmd->conn->cmds, &count);
+       for (i = 0; i < count; i++) {
+               if (cmds[i] == cmd) {
+                       array_delete(&cmd->conn->cmds, i, 1);
+                       break;
+               }
+       }
+       if (cmd->callback != NULL) {
+               cmd->callback(IPC_CMD_STATUS_ERROR,
+                             "Connection to server died", cmd->context);
+       }
+       i_free(cmd);
+}
+
+static struct ipc_connection_cmd *
+ipc_connection_cmd_find(struct ipc_connection *conn, unsigned int tag)
+{
+       struct ipc_connection_cmd *const *cmdp;
+
+       array_foreach(&conn->cmds, cmdp) {
+               if ((*cmdp)->tag == tag)
+                       return *cmdp;
+       }
+       return NULL;
+}
+
+static int
+ipc_connection_input_line(struct ipc_connection *conn, char *line)
+{
+       struct ipc_connection_cmd *cmd;
+       unsigned int tag;
+       enum ipc_cmd_status status;
+       char *data;
+
+       /* <tag> [:+-]<data> */
+       data = strchr(line, '\t');
+       if (data == NULL)
+               return -1;
+
+       *data++ = '\0';
+       if (str_to_uint(line, &tag) < 0)
+               return -1;
+
+       switch (data[0]) {
+       case ':':
+               status = IPC_CMD_STATUS_REPLY;
+               break;
+       case '+':
+               status = IPC_CMD_STATUS_OK;
+               break;
+       case '-':
+               status = IPC_CMD_STATUS_ERROR;
+               break;
+       default:
+               return -1;
+       }
+       data++;
+
+       cmd = ipc_connection_cmd_find(conn, tag);
+       if (cmd == NULL) {
+               i_error("IPC server: Input for unexpected command tag %u", tag);
+               return 0;
+       }
+       cmd->callback(status, data, cmd->context);
+       if (status != IPC_CMD_STATUS_REPLY) {
+               cmd->callback = NULL;
+               ipc_connection_cmd_free(&cmd);
+       }
+       return 0;
+}
+
+static void ipc_connection_input(struct ipc_connection *conn)
+{
+       const char *const *args;
+       char *line;
+       int ret;
+
+       if (i_stream_read(conn->input) < 0) {
+               ipc_connection_destroy(&conn);
+               return;
+       }
+
+       if (!conn->version_received) {
+               if ((line = i_stream_next_line(conn->input)) == NULL)
+                       return;
+
+               if (!version_string_verify(line, "ipc-server",
+                               IPC_SERVER_PROTOCOL_MAJOR_VERSION)) {
+                       i_error("IPC server not compatible with this server "
+                               "(mixed old and new binaries?)");
+                       ipc_connection_destroy(&conn);
+                       return;
+               }
+               conn->version_received = TRUE;
+       }
+       if (!conn->handshake_received) {
+               if ((line = i_stream_next_line(conn->input)) == NULL)
+                       return;
+
+               args = t_strsplit(line, "\t");
+               if (str_array_length(args) < 3 ||
+                   strcmp(args[0], "HANDSHAKE") != 0) {
+                       i_error("IPC server sent invalid handshake");
+                       ipc_connection_destroy(&conn);
+                       return;
+               }
+               if (ipc_group_update_name(conn->group, args[1]) < 0) {
+                       i_error("IPC server named itself unexpectedly: %s "
+                               "(existing ones were %s)",
+                               args[1], conn->group->name);
+                       ipc_connection_destroy(&conn);
+                       return;
+               }
+               if (str_to_pid(args[2], &conn->pid) < 0) {
+                       i_error("IPC server gave broken PID: %s", args[2]);
+                       ipc_connection_destroy(&conn);
+                       return;
+               }
+               conn->handshake_received = TRUE;
+       }
+
+       while ((line = i_stream_next_line(conn->input)) != NULL) {
+               T_BEGIN {
+                       ret = ipc_connection_input_line(conn, line);
+               } T_END;
+               if (ret < 0) {
+                       i_error("Invalid input from IPC server '%s': %s",
+                               conn->group->name, line);
+                       ipc_connection_destroy(&conn);
+                       break;
+               }
+       }
+}
+
+struct ipc_connection *ipc_connection_create(int listen_fd, int fd)
+{
+       struct ipc_connection *conn;
+
+       conn = i_new(struct ipc_connection, 1);
+       conn->group = ipc_group_lookup_listen_fd(listen_fd);
+       if (conn->group == NULL)
+               conn->group = ipc_group_alloc(listen_fd);
+       conn->id = ++connection_id_counter;
+       if (conn->id == 0)
+               conn->id = ++connection_id_counter;
+       conn->fd = fd;
+       conn->io = io_add(fd, IO_READ, ipc_connection_input, conn);
+       conn->input = i_stream_create_fd(fd, (size_t)-1, FALSE);
+       conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
+       i_array_init(&conn->cmds, 8);
+       o_stream_send_str(conn->output, IPC_SERVER_HANDSHAKE);
+
+       DLLIST_PREPEND(&conn->group->connections, conn);
+       return conn;
+}
+
+void ipc_connection_destroy(struct ipc_connection **_conn)
+{
+       struct ipc_connection *conn = *_conn;
+       struct ipc_connection_cmd *const *cmdp, *cmd;
+
+       *_conn = NULL;
+
+       DLLIST_REMOVE(&conn->group->connections, conn);
+
+       while (array_count(&conn->cmds) > 0) {
+               cmdp = array_idx(&conn->cmds, 0);
+               cmd = *cmdp;
+
+               ipc_connection_cmd_free(&cmd);
+       }
+       array_free(&conn->cmds);
+
+       io_remove(&conn->io);
+       i_stream_destroy(&conn->input);
+       o_stream_destroy(&conn->output);
+       if (close(conn->fd) < 0)
+               i_error("close(ipc connection) failed: %m");
+       i_free(conn);
+
+       master_service_client_connection_destroyed(master_service);
+}
+
+struct ipc_connection *
+ipc_connection_lookup_id(struct ipc_group *group, unsigned int id)
+{
+       struct ipc_connection *conn;
+
+       for (conn = group->connections; conn != NULL; conn = conn->next) {
+               if (conn->id == id)
+                       return conn;
+       }
+       return NULL;
+}
+
+void ipc_connection_cmd(struct ipc_connection *conn, const char *cmd,
+                       ipc_cmd_callback_t *callback, void *context)
+{
+       struct ipc_connection_cmd *ipc_cmd;
+
+       ipc_cmd = i_new(struct ipc_connection_cmd, 1);
+       ipc_cmd->tag = ++conn->cmd_tag_counter;
+       ipc_cmd->conn = conn;
+       ipc_cmd->callback = callback;
+       ipc_cmd->context = context;
+       array_append(&conn->cmds, &ipc_cmd, 1);
+
+       T_BEGIN {
+               o_stream_send_str(conn->output,
+                       t_strdup_printf("%u\t%s\n", ipc_cmd->tag, cmd));
+       } T_END;
+}
diff --git a/src/ipc/ipc-connection.h b/src/ipc/ipc-connection.h
new file mode 100644 (file)
index 0000000..6c33d48
--- /dev/null
@@ -0,0 +1,45 @@
+#ifndef IPC_CONNECTION_H
+#define IPC_CONNECTION_H
+
+#include "ipc-group.h"
+
+struct ipc_connection_cmd {
+       unsigned int tag;
+       struct ipc_connection *conn;
+
+       ipc_cmd_callback_t *callback;
+       void *context;
+};
+
+struct ipc_connection {
+       struct ipc_group *group;
+       /* prev/next within group */
+       struct ipc_connection *prev, *next;
+
+       unsigned int id;
+       pid_t pid;
+
+       int fd;
+       struct io *io;
+       struct istream *input;
+       struct ostream *output;
+
+       unsigned int cmd_tag_counter;
+
+       /* running commands */
+       ARRAY_DEFINE(cmds, struct ipc_connection_cmd *);
+
+       unsigned int version_received:1;
+       unsigned int handshake_received:1;
+};
+
+struct ipc_connection *ipc_connection_create(int listen_fd, int fd);
+void ipc_connection_destroy(struct ipc_connection **conn);
+
+struct ipc_connection *
+ipc_connection_lookup_id(struct ipc_group *group, unsigned int id);
+
+void ipc_connection_cmd(struct ipc_connection *conn, const char *cmd,
+                       ipc_cmd_callback_t *callback, void *context);
+
+#endif
diff --git a/src/ipc/ipc-group.c b/src/ipc/ipc-group.c
new file mode 100644 (file)
index 0000000..cbfeb14
--- /dev/null
@@ -0,0 +1,160 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "ipc-connection.h"
+#include "ipc-group.h"
+
+struct ipc_group_cmd {
+       ipc_cmd_callback_t *callback;
+       void *context;
+
+       int refcount;
+       char *first_error;
+};
+
+static ARRAY_DEFINE(ipc_groups, struct ipc_group *);
+
+struct ipc_group *ipc_group_alloc(int listen_fd)
+{
+       struct ipc_group *group;
+
+       i_assert(ipc_group_lookup_listen_fd(listen_fd) == NULL);
+
+       group = i_new(struct ipc_group, 1);
+       group->listen_fd = listen_fd;
+       array_append(&ipc_groups, &group, 1);
+       return group;
+}
+
+void ipc_group_free(struct ipc_group **_group)
+{
+       struct ipc_group *const *groups, *group = *_group;
+       unsigned int i, count;
+
+       i_assert(group->connections == NULL);
+
+       *_group = NULL;
+       groups = array_get(&ipc_groups, &count);
+       for (i = 0; i < count; i++) {
+               if (groups[i] == group) {
+                       array_delete(&ipc_groups, i, 1);
+                       break;
+               }
+       }
+       i_free(group->name);
+       i_free(group);
+}
+
+struct ipc_group *ipc_group_lookup_listen_fd(int listen_fd)
+{
+       struct ipc_group *const *groupp;
+
+       array_foreach(&ipc_groups, groupp) {
+               if ((*groupp)->listen_fd == listen_fd)
+                       return *groupp;
+       }
+       return NULL;
+}
+
+struct ipc_group *ipc_group_lookup_name(const char *name)
+{
+       struct ipc_group *const *groupp;
+
+       array_foreach(&ipc_groups, groupp) {
+               if ((*groupp)->name != NULL &&
+                   strcmp((*groupp)->name, name) == 0)
+                       return *groupp;
+       }
+       return NULL;
+}
+
+int ipc_group_update_name(struct ipc_group *group, const char *name)
+{
+       if (group->name == NULL)
+               group->name = i_strdup(name);
+       else if (strcmp(group->name, name) != 0)
+               return -1;
+       return 0;
+}
+
+static void ipc_group_cmd_callback(enum ipc_cmd_status status,
+                                  const char *line, void *context)
+{
+       struct ipc_group_cmd *group_cmd = context;
+
+       i_assert(group_cmd->refcount > 0);
+
+       switch (status) {
+       case IPC_CMD_STATUS_REPLY:
+               group_cmd->callback(IPC_CMD_STATUS_REPLY, line,
+                                   group_cmd->context);
+               break;
+       case IPC_CMD_STATUS_ERROR:
+               if (group_cmd->first_error == NULL)
+                       group_cmd->first_error = i_strdup(line);
+               /* fall through */
+       case IPC_CMD_STATUS_OK:
+               if (--group_cmd->refcount > 0)
+                       break;
+
+               if (group_cmd->first_error == NULL) {
+                       group_cmd->callback(IPC_CMD_STATUS_OK, line,
+                                           group_cmd->context);
+               } else {
+                       group_cmd->callback(IPC_CMD_STATUS_ERROR,
+                                           group_cmd->first_error,
+                                           group_cmd->context);
+                       i_free(group_cmd->first_error);
+               }
+               i_free(group_cmd);
+               break;
+       }
+
+}
+
+void ipc_group_cmd(struct ipc_group *group, const char *cmd,
+                  ipc_cmd_callback_t *callback, void *context)
+{
+       struct ipc_connection *conn, *next;
+       struct ipc_group_cmd *group_cmd;
+
+       if (group->connections == NULL) {
+               callback(IPC_CMD_STATUS_OK, NULL, context);
+               return;
+       }
+
+       group_cmd = i_new(struct ipc_group_cmd, 1);
+       group_cmd->callback = callback;
+       group_cmd->context = context;
+
+       for (conn = group->connections; conn != NULL; conn = next) {
+               next = conn->next;
+
+               group_cmd->refcount++;
+               ipc_connection_cmd(conn, cmd,
+                                  ipc_group_cmd_callback, group_cmd);
+       }
+}
+
+void ipc_groups_init(void)
+{
+       i_array_init(&ipc_groups, 16);
+}
+
+void ipc_groups_deinit(void)
+{
+       struct ipc_group *const *groupp, *group;
+
+       while (array_count(&ipc_groups) > 0) {
+               groupp = array_idx(&ipc_groups, 0);
+               group = *groupp;
+
+               while ((*groupp)->connections != NULL) {
+                       struct ipc_connection *conn = (*groupp)->connections;
+                       ipc_connection_destroy(&conn);
+               }
+               ipc_group_free(&group);
+       }
+       array_free(&ipc_groups);
+}
diff --git a/src/ipc/ipc-group.h b/src/ipc/ipc-group.h
new file mode 100644 (file)
index 0000000..56d41d8
--- /dev/null
@@ -0,0 +1,43 @@
+#ifndef IPC_GROUP_H
+#define IPC_GROUP_H
+
+enum ipc_cmd_status {
+       /* Command received a reply line */
+       IPC_CMD_STATUS_REPLY,
+       /* Command finished successfully */
+       IPC_CMD_STATUS_OK,
+       /* Command finished with errors */
+       IPC_CMD_STATUS_ERROR
+};
+
+struct ipc_group {
+       int listen_fd;
+       char *name;
+
+       /* connections list also acts as a refcount */
+       struct ipc_connection *connections;
+};
+
+/* line is non-NULL only with IPC_CMD_STATUS_REPLY.
+   Each line begins with the connection ID and TAB. */
+typedef void ipc_cmd_callback_t(enum ipc_cmd_status status,
+                               const char *line, void *context);
+
+struct ipc_group *ipc_group_alloc(int listen_fd);
+void ipc_group_free(struct ipc_group **group);
+
+struct ipc_group *ipc_group_lookup_listen_fd(int listen_fd);
+struct ipc_group *ipc_group_lookup_name(const char *name);
+
+/* Returns 0 if name is ok, -1 if name doesn't match the existing one. */
+int ipc_group_update_name(struct ipc_group *group, const char *name);
+
+/* Send a command to all connections in a group. All connections are expected
+   to answer something. All replies are  */
+void ipc_group_cmd(struct ipc_group *group, const char *cmd,
+                  ipc_cmd_callback_t *callback, void *context);
+
+void ipc_groups_init(void);
+void ipc_groups_deinit(void);
+
+#endif
diff --git a/src/ipc/ipc-settings.c b/src/ipc/ipc-settings.c
new file mode 100644 (file)
index 0000000..7b32786
--- /dev/null
@@ -0,0 +1,48 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "settings-parser.h"
+#include "service-settings.h"
+
+#include <stddef.h>
+
+/* <settings checks> */
+static struct file_listener_settings ipc_unix_listeners_array[] = {
+       { "ipc", 0600, "", "" },
+       { "login/ipc-proxy", 0600, "$default_login_user", "" }
+};
+static struct file_listener_settings *ipc_unix_listeners[] = {
+       &ipc_unix_listeners_array[0],
+       &ipc_unix_listeners_array[1]
+};
+static buffer_t ipc_unix_listeners_buf = {
+       ipc_unix_listeners, sizeof(ipc_unix_listeners), { 0, }
+};
+/* </settings checks> */
+
+struct service_settings ipc_service_settings = {
+       .name = "ipc",
+       .protocol = "",
+       .type = "",
+       .executable = "ipc",
+       .user = "$default_internal_user",
+       .group = "",
+       .privileged_group = "",
+       .extra_groups = "",
+       .chroot = "empty",
+
+       .drop_priv_before_exec = FALSE,
+
+       .process_min_avail = 0,
+       .process_limit = 1,
+       .client_limit = 0,
+       .service_count = 0,
+       .idle_kill = 0,
+       .vsz_limit = (uoff_t)-1,
+
+       .unix_listeners = { { &ipc_unix_listeners_buf,
+                             sizeof(ipc_unix_listeners[0]) } },
+       .fifo_listeners = ARRAY_INIT,
+       .inet_listeners = ARRAY_INIT
+};
diff --git a/src/ipc/main.c b/src/ipc/main.c
new file mode 100644 (file)
index 0000000..6d1d166
--- /dev/null
@@ -0,0 +1,66 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "restrict-access.h"
+#include "master-service.h"
+#include "master-service-settings.h"
+#include "ipc-group.h"
+#include "ipc-connection.h"
+#include "client.h"
+
+static bool ipc_socket_is_client(int listen_fd)
+{
+       const char *path, *name;
+
+       if (net_getunixname(listen_fd, &path) < 0) {
+               if (errno != ENOTSOCK)
+                       i_fatal("getunixname(%d) failed: %m", listen_fd);
+               /* not a UNIX socket. let's just assume it's a client. */
+               return TRUE;
+       }
+
+       name = strrchr(path, '/');
+       if (name == NULL)
+               name = path;
+       else
+               name++;
+       return strcmp(name, "ipc") == 0;
+}
+
+static void client_connected(struct master_service_connection *conn)
+{
+       master_service_client_connection_accept(conn);
+
+       if (ipc_socket_is_client(conn->listen_fd))
+               (void)client_create(conn->fd);
+       else
+               (void)ipc_connection_create(conn->listen_fd, conn->fd);
+}
+
+int main(int argc, char *argv[])
+{
+       const enum master_service_flags service_flags =
+               MASTER_SERVICE_FLAG_UPDATE_PROCTITLE;
+       const char *error;
+
+       master_service = master_service_init("ipc", service_flags,
+                                            &argc, &argv, NULL);
+       if (master_getopt(master_service) > 0)
+               return FATAL_DEFAULT;
+       if (master_service_settings_read_simple(master_service,
+                                               NULL, &error) < 0)
+               i_fatal("Error reading configuration: %s", error);
+       master_service_init_log(master_service, "ipc: ");
+
+       restrict_access_by_env(NULL, FALSE);
+       restrict_access_allow_coredumps(TRUE);
+       master_service_init_finish(master_service);
+       ipc_groups_init();
+
+       master_service_run(master_service, client_connected);
+
+       clients_destroy_all();
+       ipc_groups_deinit();
+       master_service_deinit(&master_service);
+        return 0;
+}
index 588e1e017f2de9866c635302dbb1ac683b1f23c5..a41968836d055faf8149ab4e5f9318f04c230e30 100644 (file)
@@ -11,6 +11,8 @@ AM_CPPFLAGS = \
 
 libmaster_la_SOURCES = \
        anvil-client.c \
+       ipc-client.c \
+       ipc-server.c \
        master-auth.c \
        master-login.c \
        master-login-auth.c \
@@ -21,6 +23,8 @@ libmaster_la_SOURCES = \
 
 headers = \
        anvil-client.h \
+       ipc-client.h \
+       ipc-server.h \
        master-auth.h \
        master-interface.h \
        master-login.h \
diff --git a/src/lib-master/ipc-client.c b/src/lib-master/ipc-client.c
new file mode 100644 (file)
index 0000000..71b200e
--- /dev/null
@@ -0,0 +1,163 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "ioloop.h"
+#include "network.h"
+#include "istream.h"
+#include "ostream.h"
+#include "hostpid.h"
+#include "master-service.h"
+#include "ipc-client.h"
+
+#include <unistd.h>
+
+struct ipc_client_cmd {
+       ipc_client_callback_t *callback;
+       void *context;
+};
+
+struct ipc_client {
+       char *path;
+       ipc_client_callback_t *callback;
+
+       int fd;
+       struct io *io;
+       struct timeout *to;
+       struct istream *input;
+       struct ostream *output;
+       ARRAY_DEFINE(cmds, struct ipc_client_cmd);
+};
+
+static void ipc_client_disconnect(struct ipc_client *client);
+
+static void ipc_client_input_line(struct ipc_client *client, const char *line)
+{
+       const struct ipc_client_cmd *cmds;
+       unsigned int count;
+       enum ipc_client_cmd_state state;
+
+       cmds = array_get(&client->cmds, &count);
+       if (count == 0) {
+               i_error("IPC proxy sent unexpected input: %s", line);
+               return;
+       }
+
+       switch (*line++) {
+       case ':':
+               state = IPC_CLIENT_CMD_STATE_REPLY;
+               break;
+       case '+':
+               state = IPC_CLIENT_CMD_STATE_OK;
+               break;
+       case '-':
+               state = IPC_CLIENT_CMD_STATE_ERROR;
+               break;
+       default:
+               i_error("IPC proxy sent invalid input: %s", line);
+               line = "Invalid input";
+               ipc_client_disconnect(client);
+               state = IPC_CLIENT_CMD_STATE_ERROR;
+               break;
+       }
+
+       cmds[0].callback(state, line, cmds[0].context);
+       if (state != IPC_CLIENT_CMD_STATE_REPLY)
+               array_delete(&client->cmds, 0, 1);
+}
+
+static void ipc_client_input(struct ipc_client *client)
+{
+       const char *line;
+
+       if (i_stream_read(client->input) < 0) {
+               ipc_client_disconnect(client);
+               return;
+       }
+       while ((line = i_stream_next_line(client->input)) != NULL)
+               ipc_client_input_line(client, line);
+}
+
+static int ipc_client_connect(struct ipc_client *client)
+{
+       if (client->fd != -1)
+               return 0;
+
+       client->fd = net_connect_unix(client->path);
+       if (client->fd == -1) {
+               i_error("connect(%s) failed: %m", client->path);
+               return -1;
+       }
+
+       client->io = io_add(client->fd, IO_READ, ipc_client_input, client);
+       client->input = i_stream_create_fd(client->fd, (size_t)-1, FALSE);
+       client->output = o_stream_create_fd(client->fd, (size_t)-1, FALSE);
+       return 0;
+}
+
+static void ipc_client_disconnect(struct ipc_client *client)
+{
+       const struct ipc_client_cmd *cmd;
+
+       if (client->fd == -1)
+               return;
+
+       array_foreach(&client->cmds, cmd) {
+               cmd->callback(IPC_CLIENT_CMD_STATE_ERROR,
+                             "Disconnected", cmd->context);
+       }
+       array_clear(&client->cmds);
+
+       io_remove(&client->io);
+       i_stream_destroy(&client->input);
+       o_stream_destroy(&client->output);
+       if (close(client->fd) < 0)
+               i_error("close(%s) failed: %m", client->path);
+}
+
+struct ipc_client *
+ipc_client_init(const char *ipc_socket_path)
+{
+       struct ipc_client *client;
+
+       client = i_new(struct ipc_client, 1);
+       client->path = i_strdup(ipc_socket_path);
+       client->fd = -1;
+       i_array_init(&client->cmds, 8);
+       return client;
+}
+
+void ipc_client_deinit(struct ipc_client **_client)
+{
+       struct ipc_client *client = *_client;
+
+       *_client = NULL;
+
+       ipc_client_disconnect(client);
+       array_free(&client->cmds);
+       i_free(client->path);
+       i_free(client);
+}
+
+void ipc_client_cmd(struct ipc_client *client, const char *cmd,
+                   ipc_client_callback_t *callback, void *context)
+{
+       struct ipc_client_cmd *ipc_cmd;
+       struct const_iovec iov[2];
+
+       if (ipc_client_connect(client) < 0) {
+               callback(IPC_CLIENT_CMD_STATE_ERROR,
+                        "ipc connect failed", context);
+               return;
+       }
+
+       iov[0].iov_base = cmd;
+       iov[0].iov_len = strlen(cmd);
+       iov[1].iov_base = "\n";
+       iov[1].iov_len = 1;
+       o_stream_sendv(client->output, iov, N_ELEMENTS(iov));
+
+       ipc_cmd = array_append_space(&client->cmds);
+       ipc_cmd->callback = callback;
+       ipc_cmd->context = context;
+}
diff --git a/src/lib-master/ipc-client.h b/src/lib-master/ipc-client.h
new file mode 100644 (file)
index 0000000..34cb8fb
--- /dev/null
@@ -0,0 +1,20 @@
+#ifndef IPC_CLIENT_H
+#define IPC_CLIENT_H
+
+enum ipc_client_cmd_state {
+       IPC_CLIENT_CMD_STATE_REPLY,
+       IPC_CLIENT_CMD_STATE_OK,
+       IPC_CLIENT_CMD_STATE_ERROR
+};
+
+typedef void ipc_client_callback_t(enum ipc_client_cmd_state state,
+                                  const char *data, void *context);
+
+struct ipc_client *
+ipc_client_init(const char *ipc_socket_path);
+void ipc_client_deinit(struct ipc_client **client);
+
+void ipc_client_cmd(struct ipc_client *client, const char *cmd,
+                   ipc_client_callback_t *callback, void *context);
+
+#endif
diff --git a/src/lib-master/ipc-server.c b/src/lib-master/ipc-server.c
new file mode 100644 (file)
index 0000000..cf2269e
--- /dev/null
@@ -0,0 +1,197 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "network.h"
+#include "istream.h"
+#include "ostream.h"
+#include "hostpid.h"
+#include "master-service.h"
+#include "ipc-server.h"
+
+#include <unistd.h>
+
+#define IPC_SERVER_RECONNECT_MSECS (60*1000)
+#define IPC_SERVER_PROTOCOL_MAJOR_VERSION 1
+#define IPC_SERVER_PROTOCOL_MINOR_VERSION 0
+#define IPC_SERVER_HANDSHAKE "VERSION\tipc-server\t1\t0\nHANDSHAKE\t%s\t%s\n"
+
+struct ipc_cmd {
+       struct ipc_server *server;
+       unsigned int tag;
+};
+
+struct ipc_server {
+       char *name, *path;
+       ipc_command_callback_t *callback;
+
+       int ipc_cmd_refcount;
+
+       int fd;
+       struct io *io;
+       struct timeout *to;
+       struct istream *input;
+       struct ostream *output;
+
+       unsigned int version_received:1;
+};
+
+static void ipc_server_disconnect(struct ipc_server *server);
+
+static void ipc_server_input_line(struct ipc_server *server, char *line)
+{
+       struct ipc_cmd *cmd;
+       unsigned int tag = 0;
+       char *p;
+
+       /* tag cmd */
+       p = strchr(line, '\t');
+       if (p != NULL) {
+               *p++ = '\0';
+               if (str_to_uint(line, &tag) < 0)
+                       p = NULL;
+       }
+       if (p == NULL || *p == '\0') {
+               i_error("IPC proxy sent invalid input: %s", line);
+               return;
+       }
+
+       cmd = i_new(struct ipc_cmd, 1);
+       cmd->server = server;
+       cmd->tag = tag;
+
+       server->ipc_cmd_refcount++;
+       T_BEGIN {
+               server->callback(cmd, p);
+       } T_END;
+}
+
+static void ipc_server_input(struct ipc_server *server)
+{
+       char *line;
+
+       if (i_stream_read(server->input) < 0) {
+               ipc_server_disconnect(server);
+               return;
+       }
+
+       if (!server->version_received) {
+               if ((line = i_stream_next_line(server->input)) == NULL)
+                       return;
+
+               if (!version_string_verify(line, "ipc-proxy",
+                               IPC_SERVER_PROTOCOL_MAJOR_VERSION)) {
+                       i_error("IPC proxy not compatible with this server "
+                               "(mixed old and new binaries?)");
+                       ipc_server_disconnect(server);
+                       return;
+               }
+               server->version_received = TRUE;
+       }
+
+       while ((line = i_stream_next_line(server->input)) != NULL)
+               ipc_server_input_line(server, line);
+}
+
+static void ipc_server_connect(struct ipc_server *server)
+{
+       i_assert(server->fd == -1);
+
+       if (server->to != NULL)
+               timeout_remove(&server->to);
+
+       server->fd = net_connect_unix(server->path);
+       if (server->fd == -1) {
+               i_error("connect(%s) failed: %m", server->path);
+               server->to = timeout_add(IPC_SERVER_RECONNECT_MSECS,
+                                        ipc_server_connect, server);
+               return;
+       }
+
+       server->io = io_add(server->fd, IO_READ, ipc_server_input, server);
+       server->input = i_stream_create_fd(server->fd, (size_t)-1, FALSE);
+       server->output = o_stream_create_fd(server->fd, (size_t)-1, FALSE);
+       o_stream_send_str(server->output,
+               t_strdup_printf(IPC_SERVER_HANDSHAKE, server->name, my_pid));
+       o_stream_cork(server->output);
+}
+
+static void ipc_server_disconnect(struct ipc_server *server)
+{
+       if (server->fd == -1)
+               return;
+
+       io_remove(&server->io);
+       i_stream_destroy(&server->input);
+       o_stream_destroy(&server->output);
+       if (close(server->fd) < 0)
+               i_error("close(%s) failed: %m", server->path);
+}
+
+struct ipc_server *
+ipc_server_init(const char *ipc_socket_path, const char *name,
+               ipc_command_callback_t *callback)
+{
+       struct ipc_server *server;
+
+       server = i_new(struct ipc_server, 1);
+       server->name = i_strdup(name);
+       server->path = i_strdup(ipc_socket_path);
+       server->callback = callback;
+       server->fd = -1;
+       ipc_server_connect(server);
+       return server;
+}
+
+void ipc_server_deinit(struct ipc_server **_server)
+{
+       struct ipc_server *server = *_server;
+
+       *_server = NULL;
+
+       i_assert(server->ipc_cmd_refcount == 0);
+
+       ipc_server_disconnect(server);
+       i_free(server->name);
+       i_free(server->path);
+       i_free(server);
+}
+
+void ipc_cmd_send(struct ipc_cmd *cmd, const char *data)
+{
+       o_stream_send_str(cmd->server->output,
+                         t_strdup_printf("%u\t:%s\n", cmd->tag, data));
+}
+
+static void ipc_cmd_finish(struct ipc_cmd *cmd, const char *line)
+{
+       o_stream_send_str(cmd->server->output,
+                         t_strdup_printf("%u\t%s\n", cmd->tag, line));
+       o_stream_uncork(cmd->server->output);
+
+       i_assert(cmd->server->ipc_cmd_refcount > 0);
+       cmd->server->ipc_cmd_refcount--;
+}
+
+void ipc_cmd_success(struct ipc_cmd **_cmd)
+{
+       ipc_cmd_success_reply(_cmd, NULL);
+}
+
+void ipc_cmd_success_reply(struct ipc_cmd **_cmd, const char *data)
+{
+       struct ipc_cmd *cmd = *_cmd;
+
+       *_cmd = NULL;
+       ipc_cmd_finish(cmd, t_strconcat("+", data, NULL));
+}
+
+void ipc_cmd_fail(struct ipc_cmd **_cmd, const char *errormsg)
+{
+       struct ipc_cmd *cmd = *_cmd;
+
+       i_assert(errormsg != NULL);
+
+       *_cmd = NULL;
+       ipc_cmd_finish(cmd, t_strconcat("-", errormsg, NULL));
+}
diff --git a/src/lib-master/ipc-server.h b/src/lib-master/ipc-server.h
new file mode 100644 (file)
index 0000000..e9e2736
--- /dev/null
@@ -0,0 +1,20 @@
+#ifndef IPC_SERVER_H
+#define IPC_SERVER_H
+
+struct ipc_cmd;
+
+/* The callback must eventually free the cmd by calling ip_cmd_success/fail().
+   line is guaranteed to be non-empty. */
+typedef void ipc_command_callback_t(struct ipc_cmd *cmd, const char *line);
+
+struct ipc_server *
+ipc_server_init(const char *ipc_socket_path, const char *name,
+               ipc_command_callback_t *callback);
+void ipc_server_deinit(struct ipc_server **server);
+
+void ipc_cmd_send(struct ipc_cmd *cmd, const char *data);
+void ipc_cmd_success(struct ipc_cmd **cmd);
+void ipc_cmd_success_reply(struct ipc_cmd **cmd, const char *data);
+void ipc_cmd_fail(struct ipc_cmd **cmd, const char *errormsg);
+
+#endif