]> git.ipfire.org Git - thirdparty/systemd.git/blobdiff - src/shared/varlink.c
varlink: fix support for more/continues method calls
[thirdparty/systemd.git] / src / shared / varlink.c
index 99343167f66c2b2a4d59e542e544967487c565ff..7a566762fa63dda26d137e264f736a23e2b3165a 100644 (file)
@@ -29,6 +29,7 @@ typedef enum VarlinkState {
         /* Client side states */
         VARLINK_IDLE_CLIENT,
         VARLINK_AWAITING_REPLY,
+        VARLINK_AWAITING_REPLY_MORE,
         VARLINK_CALLING,
         VARLINK_CALLED,
         VARLINK_PROCESSING_REPLY,
@@ -39,7 +40,6 @@ typedef enum VarlinkState {
         VARLINK_PROCESSING_METHOD_MORE,
         VARLINK_PROCESSING_METHOD_ONEWAY,
         VARLINK_PROCESSED_METHOD,
-        VARLINK_PROCESSED_METHOD_MORE,
         VARLINK_PENDING_METHOD,
         VARLINK_PENDING_METHOD_MORE,
 
@@ -63,6 +63,7 @@ typedef enum VarlinkState {
         IN_SET(state,                                   \
                VARLINK_IDLE_CLIENT,                     \
                VARLINK_AWAITING_REPLY,                  \
+               VARLINK_AWAITING_REPLY_MORE,             \
                VARLINK_CALLING,                         \
                VARLINK_CALLED,                          \
                VARLINK_PROCESSING_REPLY,                \
@@ -71,7 +72,6 @@ typedef enum VarlinkState {
                VARLINK_PROCESSING_METHOD_MORE,          \
                VARLINK_PROCESSING_METHOD_ONEWAY,        \
                VARLINK_PROCESSED_METHOD,                \
-               VARLINK_PROCESSED_METHOD_MORE,           \
                VARLINK_PENDING_METHOD,                  \
                VARLINK_PENDING_METHOD_MORE)
 
@@ -185,6 +185,7 @@ struct VarlinkServer {
 static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
         [VARLINK_IDLE_CLIENT]              = "idle-client",
         [VARLINK_AWAITING_REPLY]           = "awaiting-reply",
+        [VARLINK_AWAITING_REPLY_MORE]      = "awaiting-reply-more",
         [VARLINK_CALLING]                  = "calling",
         [VARLINK_CALLED]                   = "called",
         [VARLINK_PROCESSING_REPLY]         = "processing-reply",
@@ -193,7 +194,6 @@ static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
         [VARLINK_PROCESSING_METHOD_MORE]   = "processing-method-more",
         [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
         [VARLINK_PROCESSED_METHOD]         = "processed-method",
-        [VARLINK_PROCESSED_METHOD_MORE]    = "processed-method-more",
         [VARLINK_PENDING_METHOD]           = "pending-method",
         [VARLINK_PENDING_METHOD_MORE]      = "pending-method-more",
         [VARLINK_PENDING_DISCONNECT]       = "pending-disconnect",
@@ -405,7 +405,7 @@ static int varlink_test_disconnect(Varlink *v) {
                 goto disconnect;
 
         /* If we are waiting for incoming data but the read side is shut down, disconnect. */
-        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
+        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
                 goto disconnect;
 
         /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
@@ -478,7 +478,7 @@ static int varlink_read(Varlink *v) {
 
         assert(v);
 
-        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER))
+        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
                 return 0;
         if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
                 return 0;
@@ -596,7 +596,7 @@ static int varlink_parse_message(Varlink *v) {
 static int varlink_test_timeout(Varlink *v) {
         assert(v);
 
-        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
+        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
                 return 0;
         if (v->timeout == USEC_INFINITY)
                 return 0;
@@ -673,7 +673,7 @@ static int varlink_dispatch_reply(Varlink *v) {
 
         assert(v);
 
-        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
+        if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
                 return 0;
         if (!v->current)
                 return 0;
@@ -715,6 +715,11 @@ static int varlink_dispatch_reply(Varlink *v) {
                         goto invalid;
         }
 
+        /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
+        if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+                goto invalid;
+
+        /* An error is final */
         if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
                 goto invalid;
 
@@ -722,7 +727,7 @@ static int varlink_dispatch_reply(Varlink *v) {
         if (r < 0)
                 goto invalid;
 
-        if (v->state == VARLINK_AWAITING_REPLY) {
+        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) {
                 varlink_set_state(v, VARLINK_PROCESSING_REPLY);
 
                 if (v->reply_callback) {
@@ -734,17 +739,18 @@ static int varlink_dispatch_reply(Varlink *v) {
                 v->current = json_variant_unref(v->current);
 
                 if (v->state == VARLINK_PROCESSING_REPLY) {
+
                         assert(v->n_pending > 0);
-                        v->n_pending--;
 
-                        varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
+                        if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+                                v->n_pending--;
+
+                        varlink_set_state(v,
+                                          FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
+                                          v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
                 }
         } else {
                 assert(v->state == VARLINK_CALLING);
-
-                if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
-                        goto invalid;
-
                 varlink_set_state(v, VARLINK_CALLED);
         }
 
@@ -878,7 +884,6 @@ static int varlink_dispatch_method(Varlink *v) {
                 varlink_set_state(v, VARLINK_PENDING_METHOD);
                 break;
 
-        case VARLINK_PROCESSED_METHOD_MORE:  /* One reply for a "more" message was sent, more to come */
         case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
                 varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
                 break;
@@ -1073,7 +1078,7 @@ int varlink_get_events(Varlink *v) {
                 return EPOLLOUT;
 
         if (!v->read_disconnected &&
-            IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
+            IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
             !v->current &&
             v->input_buffer_unscanned <= 0)
                 ret |= EPOLLIN;
@@ -1091,7 +1096,7 @@ int varlink_get_timeout(Varlink *v, usec_t *ret) {
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
 
-        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) &&
+        if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
             v->timeout != USEC_INFINITY) {
                 if (ret)
                         *ret = usec_add(v->timestamp, v->timeout);
@@ -1259,6 +1264,8 @@ int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) {
 
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
+
+        /* We allow enqueuing multiple method calls at once! */
         if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
                 return -EBUSY;
 
@@ -1308,6 +1315,8 @@ int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) {
 
         if (v->state == VARLINK_DISCONNECTED)
                 return -ENOTCONN;
+
+        /* We allow enqueing multiple method calls at once! */
         if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
                 return -EBUSY;
 
@@ -1349,6 +1358,60 @@ int varlink_invokeb(Varlink *v, const char *method, ...) {
         return varlink_invoke(v, method, parameters);
 }
 
+int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) {
+        _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
+        int r;
+
+        assert_return(v, -EINVAL);
+        assert_return(method, -EINVAL);
+
+        if (v->state == VARLINK_DISCONNECTED)
+                return -ENOTCONN;
+        /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
+         * thus insist on an idle client here. */
+        if (v->state != VARLINK_IDLE_CLIENT)
+                return -EBUSY;
+
+        r = varlink_sanitize_parameters(&parameters);
+        if (r < 0)
+                return r;
+
+        r = json_build(&m, JSON_BUILD_OBJECT(
+                                       JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
+                                       JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
+                                       JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
+        if (r < 0)
+                return r;
+
+        r = varlink_enqueue_json(v, m);
+        if (r < 0)
+                return r;
+
+
+        varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
+        v->n_pending++;
+        v->timestamp = now(CLOCK_MONOTONIC);
+
+        return 0;
+}
+
+int varlink_observeb(Varlink *v, const char *method, ...) {
+        _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
+        va_list ap;
+        int r;
+
+        assert_return(v, -EINVAL);
+
+        va_start(ap, method);
+        r = json_buildv(&parameters, ap);
+        va_end(ap);
+
+        if (r < 0)
+                return r;
+
+        return varlink_observe(v, method, parameters);
+}
+
 int varlink_call(
                 Varlink *v,
                 const char *method,