]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Start work on worker->main pipe interface
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Nov 2015 14:14:09 +0000 (14:14 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 25 Nov 2015 14:14:09 +0000 (14:14 +0000)
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/libserver/worker_util.c
src/rspamd.h

index 9e17f212017923172e7ccf1bd907d8a7bc85a545..f16b095ff8a92eaf9de3a9e869efe36f59f5135b 100644 (file)
@@ -484,3 +484,132 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
        cd->handlers[type].handler = handler;
        cd->handlers[type].ud = ud;
 }
+
+struct rspamd_srv_reply_data {
+       struct rspamd_worker *worker;
+       gint fd;
+       struct rspamd_srv_reply rep;
+};
+
+static void
+rspamd_srv_handler (gint fd, short what, gpointer ud)
+{
+       struct rspamd_worker *worker;
+       struct rspamd_srv_command cmd;
+       struct rspamd_main *srv;
+       struct rspamd_srv_reply_data *rdata;
+       struct msghdr msg;
+       struct cmsghdr *cmsg;
+       struct iovec iov;
+       guchar fdspace[CMSG_SPACE(sizeof (int))];
+       gint *spair;
+       gchar *nid;
+       gssize r;
+
+       if (what == EV_READ) {
+               worker = ud;
+               srv = worker->srv;
+
+               r = read (fd, &cmd, sizeof (cmd));
+
+               if (r == -1) {
+                       msg_err ("cannot read from worker's srv pipe: %s",
+                                       strerror (errno));
+               }
+               else if (r != sizeof (cmd)) {
+                       msg_err ("cannot read from worker's srv pipe incomplete command: %d",
+                                       (gint) r);
+               }
+               else {
+                       rdata = g_slice_alloc0 (sizeof (*rdata));
+                       rdata->worker = worker;
+                       rdata->rep.id = cmd.id;
+                       rdata->rep.type = cmd.type;
+                       rdata->fd = -1;
+
+                       switch (cmd.type) {
+                       case RSPAMD_SRV_SOCKETPAIR:
+                               spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id);
+                               if (spair == NULL) {
+                                       spair = g_malloc (sizeof (gint) * 2);
+                                       if (rspamd_socketpair (spair) == -1) {
+                                               rdata->rep.reply.spair.code = errno;
+                                               msg_err ("cannot create socket pair: %s", strerror (errno));
+                                       }
+                                       else {
+                                               nid = g_malloc (sizeof (cmd.cmd.spair.pair_id));
+                                               memcpy (nid, cmd.cmd.spair.pair_id,
+                                                               sizeof (cmd.cmd.spair.pair_id));
+                                               g_hash_table_insert (srv->spairs,
+                                                               cmd.cmd.spair.pair_id, spair);
+                                               rdata->rep.reply.spair.code = 0;
+                                               rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
+                                       }
+                               }
+                               else {
+                                       rdata->rep.reply.spair.code = 0;
+                                       rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
+                               }
+                               break;
+                       default:
+                               msg_err ("unknown command type: %d", cmd.type);
+                               break;
+                       }
+
+                       /* Now plan write event and send data back */
+                       event_del (&worker->srv_ev);
+                       event_set (&worker->srv_ev,
+                                       worker->srv_pipe[0],
+                                       EV_WRITE,
+                                       rspamd_srv_handler,
+                                       rdata);
+                       event_add (&worker->srv_ev, NULL);
+               }
+       }
+       else if (what == EV_WRITE) {
+               rdata = ud;
+
+               memset (&msg, 0, sizeof (msg));
+
+               /* Attach fd to the message */
+               if (rdata->fd != -1) {
+                       msg.msg_control = fdspace;
+                       msg.msg_controllen = sizeof (fdspace);
+                       cmsg = CMSG_FIRSTHDR (&msg);
+                       cmsg->cmsg_level = SOL_SOCKET;
+                       cmsg->cmsg_type = SCM_RIGHTS;
+                       cmsg->cmsg_len = CMSG_LEN (sizeof (int));
+                       memcpy (CMSG_DATA (cmsg), &rdata->fd, sizeof (int));
+               }
+
+               iov.iov_base = &rdata->rep;
+               iov.iov_len = sizeof (rdata->rep);
+               msg.msg_iov = &iov;
+               msg.msg_iovlen = 1;
+
+               r = sendmsg (fd, &msg, 0);
+
+               if (r == -1) {
+                       msg_err ("cannot write to worker's srv pipe: %s",
+                                       strerror (errno));
+               }
+
+               g_slice_free1 (sizeof (*rdata), rdata);
+               event_del (&worker->srv_ev);
+               event_set (&worker->srv_ev,
+                               worker->srv_pipe[0],
+                               EV_READ | EV_PERSIST,
+                               rspamd_srv_handler,
+                               worker);
+               event_add (&worker->srv_ev, NULL);
+       }
+}
+
+void rspamd_main_start_watching (struct rspamd_worker *worker,
+               struct event_base *ev_base)
+{
+       event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
+                       rspamd_srv_handler, worker);
+       event_base_set (ev_base, &worker->srv_ev);
+       event_add (&worker->srv_ev, NULL);
+}
index ddca143cc75dbcd295fbd6464646c65bf23ae441..24874e78536118d88cfae9e593180dac8b088846 100644 (file)
@@ -38,6 +38,10 @@ enum rspamd_control_type {
        RSPAMD_CONTROL_MAX
 };
 
+enum rspamd_srv_type {
+       RSPAMD_SRV_SOCKETPAIR = 0,
+};
+
 struct rspamd_control_command {
        enum rspamd_control_type type;
        union {
@@ -72,6 +76,29 @@ struct rspamd_control_reply {
        } reply;
 };
 
+#define PAIR_ID_LEN 16
+struct rspamd_srv_command {
+       enum rspamd_srv_type type;
+       guint64 id;
+       union {
+               struct {
+                       gint af;
+                       gchar pair_id[PAIR_ID_LEN];
+                       guint pair_num;
+               } spair;
+       } cmd;
+};
+
+struct rspamd_srv_reply {
+       enum rspamd_srv_type type;
+       guint64 id;
+       union {
+               struct {
+                       gint code;
+               } spair;
+       } reply;
+};
+
 typedef gboolean (*rspamd_worker_control_handler) (struct rspamd_main *rspamd_main,
                struct rspamd_worker *worker, gint fd,
                struct rspamd_control_command *cmd,
@@ -97,4 +124,10 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
                rspamd_worker_control_handler handler,
                gpointer ud);
 
+/**
+ * Start watching on srv pipe
+ */
+void rspamd_main_start_watching (struct rspamd_worker *worker,
+               struct event_base *ev_base);
+
 #endif
index f404a329588a2f6f8ee8b98561457b709e405ed5..4b24ee377010bca86e81d327a6c9c2aa6617f057 100644 (file)
@@ -448,6 +448,11 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
                exit (-errno);
        }
 
+       if (!rspamd_socketpair (cur->srv_pipe)) {
+               msg_err ("socketpair failure: %s", strerror (errno));
+               exit (-errno);
+       }
+
        cur->srv = rspamd_main;
        cur->type = cf->type;
        cur->cf = g_malloc (sizeof (struct rspamd_worker_conf));
index f10a8e90af061fd67b6be45bf227d922a0773c64..61ef25cd4e74f0eeec24e1f255e64b47bf879bce 100644 (file)
@@ -56,6 +56,9 @@ struct rspamd_worker {
        gpointer ctx;                   /**< worker's specific data                                                     */
        gint control_pipe[2];           /**< control pipe. [0] is used by main process,
                                                           [1] is used by a worker                      */
+       gint srv_pipe[2];               /**< used by workers to request something from the
+                                            main process. [0] - main, [1] - worker                     */
+       struct event srv_ev;            /**< used by main for read workers' requests            */
        gpointer control_data;          /**< used by control protocol to handle commands        */
 };
 
@@ -160,11 +163,11 @@ struct rspamd_main {
        /* Pid file structure */
        rspamd_pidfh_t *pfh;                                        /**< struct pidfh for pidfile                                               */
        GQuark type;                                                /**< process type                                                                   */
-       guint ev_initialized;                                       /**< is event system is initialized                                 */
        struct rspamd_stat *stat;                                   /**< pointer to statistics                                                  */
 
-       rspamd_mempool_t *server_pool;                                  /**< server's memory pool                                                       */
+       rspamd_mempool_t *server_pool;                              /**< server's memory pool                                                   */
        GHashTable *workers;                                        /**< workers pool indexed by pid                    */
+       GHashTable *spairs;                                         /**< socket pairs requested by workers                              */
        rspamd_logger_t *logger;
        uid_t workers_uid;                                          /**< worker's uid running to                        */
        gid_t workers_gid;                                          /**< worker's gid running to                                                */