]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-master: anvil-client - Add support for handling incoming commands
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 4 Jan 2022 16:28:22 +0000 (18:28 +0200)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 8 Feb 2022 09:48:24 +0000 (10:48 +0100)
src/lib-master/anvil-client.c
src/lib-master/anvil-client.h

index d6a95e306e2c5919f5ab03dfebab77708d974666..4d4ea8f34da3de5c6d79ce29252eb52424c335b8 100644 (file)
@@ -15,6 +15,8 @@
 #include "master-service.h"
 #include "anvil-client.h"
 
+#define ANVIL_CMD_CHANNEL_ID 1
+
 struct anvil_query {
        anvil_callback_t *callback;
        void *context;
@@ -30,9 +32,14 @@ struct anvil_client {
        ARRAY(struct anvil_query *) queries_arr;
        struct aqueue *queries;
 
+       struct istream *cmd_input;
+       struct ostream *cmd_output;
+       struct io *cmd_io;
+
        struct anvil_client_callbacks callbacks;
        enum anvil_client_flags flags;
        bool deinitializing;
+       bool reply_pending:1;
 };
 
 #define ANVIL_INBUF_SIZE 1024
@@ -102,6 +109,40 @@ void anvil_client_deinit(struct anvil_client **_client)
                connection_list_deinit(&anvil_connections);
 }
 
+static void anvil_client_cmd_pending_input(struct anvil_client *client)
+{
+       const char *line, *cmd, *const *args;
+
+       while (!client->reply_pending &&
+              (line = i_stream_next_line(client->cmd_input)) != NULL) T_BEGIN {
+               args = t_strsplit_tabescaped(line);
+               cmd = args[0]; args++;
+               if (cmd == NULL) {
+                       o_stream_nsend_str(client->cmd_output,
+                                          "-Empty command\n");
+               } else {
+                       /* Set reply_pending before the callback, since it
+                          can immediately call anvil_client_send_reply() */
+                       client->reply_pending = TRUE;
+                       if (!client->callbacks.command(cmd, args)) {
+                               client->reply_pending = FALSE;
+                               o_stream_nsend_str(client->cmd_output,
+                                                  "-Unknown command\n");
+                       }
+               }
+       } T_END;
+       if (client->reply_pending)
+               io_remove(&client->cmd_io);
+}
+
+static void anvil_client_cmd_input(struct anvil_client *client)
+{
+       anvil_client_cmd_pending_input(client);
+       if (connection_input_read_stream(&client->conn, client->cmd_input) < 0)
+               return;
+       anvil_client_cmd_pending_input(client);
+}
+
 static void anvil_client_start_multiplex_input(struct anvil_client *client)
 {
        struct istream *orig_input = client->conn.input;
@@ -109,6 +150,11 @@ static void anvil_client_start_multiplex_input(struct anvil_client *client)
        i_stream_unref(&orig_input);
 
        connection_streams_changed(&client->conn);
+
+       client->cmd_input = i_stream_multiplex_add_channel(client->conn.input,
+                                                          ANVIL_CMD_CHANNEL_ID);
+       client->cmd_io = io_add_istream(client->cmd_input,
+                                       anvil_client_cmd_input, client);
 }
 
 static void
@@ -118,6 +164,9 @@ anvil_client_start_multiplex_output(struct anvil_client *client)
        client->conn.output = o_stream_create_multiplex(orig_output, SIZE_MAX);
        o_stream_set_no_error_handling(client->conn.output, TRUE);
        o_stream_unref(&orig_output);
+
+       client->cmd_output = o_stream_multiplex_add_channel(client->conn.output,
+                                                           ANVIL_CMD_CHANNEL_ID);
 }
 
 static void anvil_client_reconnect(struct anvil_client *client)
@@ -156,7 +205,8 @@ static int anvil_client_input_line(struct connection *conn, const char *line)
                                CONNECTION_DISCONNECT_HANDSHAKE_FAILED;
                        return -1;
                }
-               anvil_client_start_multiplex_input(client);
+               if (client->callbacks.command != NULL)
+                       anvil_client_start_multiplex_input(client);
                return 1;
        }
 
@@ -201,12 +251,13 @@ int anvil_client_connect(struct anvil_client *client, bool retry)
        }
        timeout_remove(&client->to_reconnect);
 
-       const char *anvil_handshake =
+       const char *anvil_handshake = client->callbacks.command == NULL ? "\n" :
                t_strdup_printf("%s\t%s\n",
                                master_service_get_name(master_service),
                                my_pid);
        o_stream_nsend_str(client->conn.output, anvil_handshake);
-       anvil_client_start_multiplex_output(client);
+       if (client->callbacks.command != NULL)
+               anvil_client_start_multiplex_output(client);
        return 0;
 }
 
@@ -231,6 +282,9 @@ static void anvil_client_destroy(struct connection *conn)
        struct anvil_client *client =
                container_of(conn, struct anvil_client, conn);
 
+       io_remove(&client->cmd_io);
+       i_stream_destroy(&client->cmd_input);
+       o_stream_destroy(&client->cmd_output);
        connection_disconnect(&client->conn);
        anvil_client_cancel_queries(client);
        timeout_remove(&client->to_reconnect);
@@ -311,6 +365,25 @@ void anvil_client_query_abort(struct anvil_client *client,
        i_panic("anvil query to be aborted doesn't exist");
 }
 
+void anvil_client_send_reply(struct anvil_client *client, const char *reply)
+{
+       i_assert(client->reply_pending);
+
+       struct const_iovec iov[] = {
+               { reply, strlen(reply) },
+               { "\n", 1 }
+       };
+       o_stream_nsendv(client->cmd_output, iov, N_ELEMENTS(iov));
+
+       if (client->cmd_io == NULL) {
+               /* asynchronous reply from cmd_callback() */
+               client->cmd_io = io_add_istream(client->cmd_input,
+                                               anvil_client_cmd_input, client);
+               i_stream_set_input_pending(client->cmd_input, TRUE);
+       }
+       client->reply_pending = FALSE;
+}
+
 void anvil_client_cmd(struct anvil_client *client, const char *cmd)
 {
        (void)anvil_client_send(client, cmd);
index 997791ffdf97481aae8a1785ec88cb0c2335f3a0..a085a6a2d2f07cef7e3aa9c28569a2a504e47d6a 100644 (file)
@@ -10,6 +10,13 @@ struct anvil_client_callbacks {
        /* Called when connection is lost. If it returns FALSE, reconnection
           isn't attempted. */
        bool (*reconnect)(void);
+
+       /* Handle any command sent by anvil process. Send the reply with
+          anvil_client_send_reply(). The command can be processed
+          asynchronously, but the next command callback isn't called before
+          the first one is replied to. Returns TRUE if the command was handled,
+          FALSE if the command was unknown. */
+       bool (*command)(const char *cmd, const char *const *args);
 };
 
 /* reply=NULL if query failed */
@@ -38,6 +45,9 @@ void anvil_client_query_abort(struct anvil_client *client,
 /* Send a command to anvil, don't expect any replies. */
 void anvil_client_cmd(struct anvil_client *client, const char *cmd);
 
+/* Send reply to anvil for a command from anvil_client_callbacks.command(). */
+void anvil_client_send_reply(struct anvil_client *client, const char *reply);
+
 /* Returns TRUE if anvil is connected to. */
 bool anvil_client_is_connected(struct anvil_client *client);