]> git.ipfire.org Git - thirdparty/systemd.git/blobdiff - src/shared/varlink.c
Merge pull request #14109 from poettering/varlink-tweaks
[thirdparty/systemd.git] / src / shared / varlink.c
index 3256a934901e213ba0ed0f6d4e7531c98574485f..ee4fb9e843961105ec478fc4fe133cc8f7a3f451 100644 (file)
@@ -29,6 +29,7 @@ typedef enum VarlinkState {
         /* Client side states */
         VARLINK_IDLE_CLIENT,
         VARLINK_AWAITING_REPLY,
+        VARLINK_AWAITING_REPLY_MORE,
         VARLINK_CALLING,
         VARLINK_CALLED,
         VARLINK_PROCESSING_REPLY,
@@ -39,7 +40,6 @@ typedef enum VarlinkState {
         VARLINK_PROCESSING_METHOD_MORE,
         VARLINK_PROCESSING_METHOD_ONEWAY,
         VARLINK_PROCESSED_METHOD,
-        VARLINK_PROCESSED_METHOD_MORE,
         VARLINK_PENDING_METHOD,
         VARLINK_PENDING_METHOD_MORE,
 
@@ -63,6 +63,7 @@ typedef enum VarlinkState {
         IN_SET(state,                                   \
                VARLINK_IDLE_CLIENT,                     \
                VARLINK_AWAITING_REPLY,                  \
+               VARLINK_AWAITING_REPLY_MORE,             \
                VARLINK_CALLING,                         \
                VARLINK_CALLED,                          \
                VARLINK_PROCESSING_REPLY,                \
@@ -71,7 +72,6 @@ typedef enum VarlinkState {
                VARLINK_PROCESSING_METHOD_MORE,          \
                VARLINK_PROCESSING_METHOD_ONEWAY,        \
                VARLINK_PROCESSED_METHOD,                \
-               VARLINK_PROCESSED_METHOD_MORE,           \
                VARLINK_PENDING_METHOD,                  \
                VARLINK_PENDING_METHOD_MORE)
 
@@ -185,6 +185,7 @@ struct VarlinkServer {
 static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
         [VARLINK_IDLE_CLIENT]              = "idle-client",
         [VARLINK_AWAITING_REPLY]           = "awaiting-reply",
+        [VARLINK_AWAITING_REPLY_MORE]      = "awaiting-reply-more",
         [VARLINK_CALLING]                  = "calling",
         [VARLINK_CALLED]                   = "called",
         [VARLINK_PROCESSING_REPLY]         = "processing-reply",
@@ -193,7 +194,6 @@ static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
         [VARLINK_PROCESSING_METHOD_MORE]   = "processing-method-more",
         [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
         [VARLINK_PROCESSED_METHOD]         = "processed-method",
-        [VARLINK_PROCESSED_METHOD_MORE]    = "processed-method-more",
         [VARLINK_PENDING_METHOD]           = "pending-method",
         [VARLINK_PENDING_METHOD_MORE]      = "pending-method-more",
         [VARLINK_PENDING_DISCONNECT]       = "pending-disconnect",
@@ -246,8 +246,7 @@ static int varlink_new(Varlink **ret) {
 
         assert(ret);
 
-        /* Here use new0 as the below structured initializer is nested. */
-        v = new0(Varlink, 1);
+        v = new(Varlink, 1);
         if (!v)
                 return -ENOMEM;
 
@@ -288,6 +287,8 @@ int varlink_connect_address(Varlink **ret, const char *address) {
         if (v->fd < 0)
                 return -errno;
 
+        v->fd = fd_move_above_stdio(v->fd);
+
         if (connect(v->fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0) {
                 if (!IN_SET(errno, EAGAIN, EINPROGRESS))
                         return -errno;
@@ -342,11 +343,8 @@ static void varlink_detach_event_sources(Varlink *v) {
         assert(v);
 
         v->io_event_source = sd_event_source_disable_unref(v->io_event_source);
-
         v->time_event_source = sd_event_source_disable_unref(v->time_event_source);
-
         v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
-
         v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
 }
 
@@ -406,7 +404,7 @@ static int varlink_test_disconnect(Varlink *v) {
                 goto disconnect;
 
         /* If we are waiting for incoming data but the read side is shut down, disconnect. */
-        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
+        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
                 goto disconnect;
 
         /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
@@ -479,7 +477,7 @@ static int varlink_read(Varlink *v) {
 
         assert(v);
 
-        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER))
+        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
                 return 0;
         if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
                 return 0;
@@ -597,7 +595,7 @@ static int varlink_parse_message(Varlink *v) {
 static int varlink_test_timeout(Varlink *v) {
         assert(v);
 
-        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
+        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
                 return 0;
         if (v->timeout == USEC_INFINITY)
                 return 0;
@@ -674,7 +672,7 @@ static int varlink_dispatch_reply(Varlink *v) {
 
         assert(v);
 
-        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
+        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
                 return 0;
         if (!v->current)
                 return 0;
@@ -716,6 +714,11 @@ static int varlink_dispatch_reply(Varlink *v) {
                         goto invalid;
         }
 
+        /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
+        if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+                goto invalid;
+
+        /* An error is final */
         if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
                 goto invalid;
 
@@ -723,7 +726,7 @@ static int varlink_dispatch_reply(Varlink *v) {
         if (r < 0)
                 goto invalid;
 
-        if (v->state == VARLINK_AWAITING_REPLY) {
+        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) {
                 varlink_set_state(v, VARLINK_PROCESSING_REPLY);
 
                 if (v->reply_callback) {
@@ -735,17 +738,18 @@ static int varlink_dispatch_reply(Varlink *v) {
                 v->current = json_variant_unref(v->current);
 
                 if (v->state == VARLINK_PROCESSING_REPLY) {
+
                         assert(v->n_pending > 0);
-                        v->n_pending--;
 
-                        varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
+                        if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+                                v->n_pending--;
+
+                        varlink_set_state(v,
+                                          FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
+                                          v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
                 }
         } else {
                 assert(v->state == VARLINK_CALLING);
-
-                if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
-                        goto invalid;
-
                 varlink_set_state(v, VARLINK_CALLED);
         }
 
@@ -879,7 +883,6 @@ static int varlink_dispatch_method(Varlink *v) {
                 varlink_set_state(v, VARLINK_PENDING_METHOD);
                 break;
 
-        case VARLINK_PROCESSED_METHOD_MORE:  /* One reply for a "more" message was sent, more to come */
         case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
                 varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
                 break;
@@ -1003,7 +1006,6 @@ int varlink_wait(Varlink *v, usec_t timeout) {
         usec_t t;
 
         assert_return(v, -EINVAL);
-        assert_return(!v->server, -ENOTTY);
 
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
@@ -1075,7 +1077,7 @@ int varlink_get_events(Varlink *v) {
                 return EPOLLOUT;
 
         if (!v->read_disconnected &&
-            IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
+            IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
             !v->current &&
             v->input_buffer_unscanned <= 0)
                 ret |= EPOLLIN;
@@ -1093,7 +1095,7 @@ int varlink_get_timeout(Varlink *v, usec_t *ret) {
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
 
-        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) &&
+        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
             v->timeout != USEC_INFINITY) {
                 if (ret)
                         *ret = usec_add(v->timestamp, v->timeout);
@@ -1212,6 +1214,7 @@ static int varlink_enqueue_json(Varlink *v, JsonVariant *m) {
         r = json_variant_format(m, 0, &text);
         if (r < 0)
                 return r;
+        assert(text[r] == '\0');
 
         if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX)
                 return -ENOBUFS;
@@ -1260,6 +1263,8 @@ int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) {
 
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
+
+        /* We allow enqueuing multiple method calls at once! */
         if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
                 return -EBUSY;
 
@@ -1309,6 +1314,8 @@ int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) {
 
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
+
+        /* We allow enqueing multiple method calls at once! */
         if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
                 return -EBUSY;
 
@@ -1350,6 +1357,60 @@ int varlink_invokeb(Varlink *v, const char *method, ...) {
         return varlink_invoke(v, method, parameters);
 }
 
+int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) {
+        _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
+        int r;
+
+        assert_return(v, -EINVAL);
+        assert_return(method, -EINVAL);
+
+        if (v->state == VARLINK_DISCONNECTED)
+                return -ENOTCONN;
+        /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
+         * thus insist on an idle client here. */
+        if (v->state != VARLINK_IDLE_CLIENT)
+                return -EBUSY;
+
+        r = varlink_sanitize_parameters(&parameters);
+        if (r < 0)
+                return r;
+
+        r = json_build(&m, JSON_BUILD_OBJECT(
+                                       JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
+                                       JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
+                                       JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
+        if (r < 0)
+                return r;
+
+        r = varlink_enqueue_json(v, m);
+        if (r < 0)
+                return r;
+
+
+        varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
+        v->n_pending++;
+        v->timestamp = now(CLOCK_MONOTONIC);
+
+        return 0;
+}
+
+int varlink_observeb(Varlink *v, const char *method, ...) {
+        _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
+        va_list ap;
+        int r;
+
+        assert_return(v, -EINVAL);
+
+        va_start(ap, method);
+        r = json_buildv(&parameters, ap);
+        va_end(ap);
+
+        if (r < 0)
+                return r;
+
+        return varlink_observe(v, method, parameters);
+}
+
 int varlink_call(
                 Varlink *v,
                 const char *method,
@@ -1770,6 +1831,7 @@ static int prepare_callback(sd_event_source *s, void *userdata) {
         Varlink *v = userdata;
         int r, e;
         usec_t until;
+        bool have_timeout;
 
         assert(s);
         assert(v);
@@ -1785,13 +1847,15 @@ static int prepare_callback(sd_event_source *s, void *userdata) {
         r = varlink_get_timeout(v, &until);
         if (r < 0)
                 return r;
-        if (r > 0) {
+        have_timeout = r > 0;
+
+        if (have_timeout) {
                 r = sd_event_source_set_time(v->time_event_source, until);
                 if (r < 0)
                         return r;
         }
 
-        r = sd_event_source_set_enabled(v->time_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF);
+        r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF);
         if (r < 0)
                 return r;
 
@@ -1875,7 +1939,6 @@ fail:
         return r;
 }
 
-
 void varlink_detach_event(Varlink *v) {
         if (!v)
                 return;
@@ -2053,12 +2116,14 @@ int varlink_server_add_connection(VarlinkServer *server, int fd, Varlink **ret)
 
         varlink_set_state(v, VARLINK_IDLE_SERVER);
 
-        r = varlink_attach_event(v, server->event, server->event_priority);
-        if (r < 0) {
-                varlink_log_errno(v, r, "Failed to attach new connection: %m");
-                v->fd = -1; /* take the fd out of the connection again */
-                varlink_close(v);
-                return r;
+        if (server->event) {
+                r = varlink_attach_event(v, server->event, server->event_priority);
+                if (r < 0) {
+                        varlink_log_errno(v, r, "Failed to attach new connection: %m");
+                        v->fd = -1; /* take the fd out of the connection again */
+                        varlink_close(v);
+                        return r;
+                }
         }
 
         if (ret)
@@ -2157,6 +2222,8 @@ int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t
         if (fd < 0)
                 return -errno;
 
+        fd = fd_move_above_stdio(fd);
+
         (void) sockaddr_un_unlink(&sockaddr.un);
 
         RUN_WITH_UMASK(~m & 0777)
@@ -2304,7 +2371,7 @@ int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMeth
 
 int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) {
         va_list ap;
-        int r;
+        int r = 0;
 
         assert_return(s, -EINVAL);
 
@@ -2321,10 +2388,11 @@ int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) {
 
                 r = varlink_server_bind_method(s, method, callback);
                 if (r < 0)
-                        return r;
+                        break;
         }
+        va_end(ap);
 
-        return 0;
+        return r;
 }
 
 int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) {
@@ -2338,17 +2406,18 @@ int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) {
 }
 
 unsigned varlink_server_connections_max(VarlinkServer *s) {
-        struct rlimit rl;
+        int dts;
 
         /* If a server is specified, return the setting for that server, otherwise the default value */
         if (s)
                 return s->connections_max;
 
-        assert_se(getrlimit(RLIMIT_NOFILE, &rl) >= 0);
+        dts = getdtablesize();
+        assert_se(dts > 0);
 
         /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
-        if (VARLINK_DEFAULT_CONNECTIONS_MAX > rl.rlim_cur / 4 * 3)
-                return rl.rlim_cur / 4 * 3;
+        if (VARLINK_DEFAULT_CONNECTIONS_MAX > (unsigned) dts / 4 * 3)
+                return dts / 4 * 3;
 
         return VARLINK_DEFAULT_CONNECTIONS_MAX;
 }