]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
json-stream: hide JsonStreamQueueItem as an implementation detail
authorDaan De Meyer <daan@amutable.com>
Mon, 13 Apr 2026 08:18:04 +0000 (08:18 +0000)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Mon, 20 Apr 2026 17:52:32 +0000 (19:52 +0200)
The json-stream API previously exposed JsonStreamQueueItem and several
functions operating on it (json_stream_make_queue_item(),
json_stream_enqueue_item(), json_stream_queue_item_free(),
json_stream_queue_item_get_data()). These existed solely to support
sd-varlink's "defer-and-modify" pattern for streaming replies, where a
reply is held back so its "continues" field can be set before
transmission. This is a varlink protocol concern that should not leak
into the generic transport layer.

Similarly, the fd pushing API (json_stream_push_fd(),
json_stream_reset_pushed_fds()) and the pushed_fds state lived inside
JsonStream, even though fd-to-message association is a protocol-level
concern managed entirely by sd-varlink.

Rework the API so that:

- JsonStreamQueueItem and all its functions become static to
  json-stream.c. The only output API is now json_stream_enqueue_full()
  (accepting explicit fds) and the inline json_stream_enqueue() wrapper
  for the common no-fds case.

- The pushed_fds state moves from JsonStream into sd_varlink, where
  sd_varlink_push_fd() and sd_varlink_reset_fds() manage it directly.

- The deferred reply in sd-varlink changes from a JsonStreamQueueItem*
  to a plain sd_json_variant* plus a separate previous_fds/n_previous_fds
  pair, keeping the protocol-specific bookkeeping in sd-varlink where it
  belongs.

- A new varlink_enqueue() helper wraps json_stream_enqueue_full() with
  the varlink connection's pushed fds, transferring fd ownership to the
  queue item on success.

qmp-client.c is fixed to use the new API as well.

src/libsystemd/sd-json/json-stream.c
src/libsystemd/sd-json/json-stream.h
src/libsystemd/sd-varlink/sd-varlink.c
src/libsystemd/sd-varlink/varlink-internal.h
src/shared/qmp-client.c

index d8475ae87342434853d6285c7440d7012cebf145..1900d1c7da4d3c101dae5811dd3167da7915bb0a 100644 (file)
@@ -50,12 +50,7 @@ static usec_t json_stream_now(const JsonStream *s) {
         return now(CLOCK_MONOTONIC);
 }
 
-sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q) {
-        assert(q);
-        return &q->data;
-}
-
-JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) {
+static JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) {
         if (!q)
                 return NULL;
 
@@ -160,10 +155,6 @@ static void json_stream_clear(JsonStream *s) {
         s->output_fds = mfree(s->output_fds);
         s->n_output_fds = 0;
 
-        close_many(s->pushed_fds, s->n_pushed_fds);
-        s->pushed_fds = mfree(s->pushed_fds);
-        s->n_pushed_fds = 0;
-
         LIST_CLEAR(queue, s->output_queue, json_stream_queue_item_free);
         s->output_queue_tail = NULL;
         s->n_output_queue = 0;
@@ -883,30 +874,6 @@ int json_stream_flush(JsonStream *s) {
         return ret;
 }
 
-int json_stream_push_fd(JsonStream *s, int fd) {
-        int i;
-
-        assert(s);
-        assert(fd >= 0);
-
-        if (s->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message */
-                return -ENOBUFS;
-
-        if (!GREEDY_REALLOC(s->pushed_fds, s->n_pushed_fds + 1))
-                return -ENOMEM;
-
-        i = (int) s->n_pushed_fds;
-        s->pushed_fds[s->n_pushed_fds++] = fd;
-        return i;
-}
-
-void json_stream_reset_pushed_fds(JsonStream *s) {
-        assert(s);
-
-        close_many(s->pushed_fds, s->n_pushed_fds);
-        s->n_pushed_fds = 0;
-}
-
 int json_stream_peek_input_fd(const JsonStream *s, size_t i) {
         assert(s);
 
@@ -1060,57 +1027,26 @@ static int json_stream_format_queue(JsonStream *s) {
         return 0;
 }
 
-int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q) {
-        assert(s);
-        assert(q);
-
-        if (s->n_output_queue >= s->queue_max)
-                return -ENOBUFS;
-
-        LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q);
-        s->output_queue_tail = q;
-        s->n_output_queue++;
-        return 0;
-}
-
-int json_stream_enqueue(JsonStream *s, sd_json_variant *m) {
-        JsonStreamQueueItem *q;
-
+int json_stream_enqueue_full(JsonStream *s, sd_json_variant *m, const int fds[], size_t n_fds) {
         assert(s);
         assert(m);
+        assert(fds || n_fds == 0);
 
-        /* Fast path: no fds pending and no items currently queued — append directly into the
+        /* Fast path: no fds and no items currently queued — append directly into the
          * output buffer to avoid the queue allocation. */
-        if (s->n_pushed_fds == 0 && !s->output_queue)
+        if (n_fds == 0 && !s->output_queue)
                 return json_stream_format_json(s, m);
 
         if (s->n_output_queue >= s->queue_max)
                 return -ENOBUFS;
 
-        q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds);
+        JsonStreamQueueItem *q = json_stream_queue_item_new(m, fds, n_fds);
         if (!q)
                 return -ENOMEM;
 
-        s->n_pushed_fds = 0; /* fds belong to the queue entry now */
-
-        assert_se(json_stream_enqueue_item(s, q) >= 0);
-        return 0;
-}
-
-int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret) {
-        JsonStreamQueueItem *q;
-
-        assert(s);
-        assert(m);
-        assert(ret);
-
-        q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds);
-        if (!q)
-                return -ENOMEM;
-
-        s->n_pushed_fds = 0; /* fds belong to the queue entry now */
-
-        *ret = q;
+        LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q);
+        s->output_queue_tail = q;
+        s->n_output_queue++;
         return 0;
 }
 
index 671b0f8985c9ca0a8b2c3232691b814fb6dea2b5..b502c98676e12a8c08b7c8745c45eaca383141d4 100644 (file)
@@ -119,9 +119,6 @@ typedef struct JsonStream {
         JsonStreamQueueItem *output_queue_tail;
         size_t n_output_queue;
 
-        int *pushed_fds;
-        size_t n_pushed_fds;
-
         JsonStreamFlags flags;
 } JsonStream;
 
@@ -174,23 +171,18 @@ bool json_stream_should_disconnect(const JsonStream *s);
 int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt);
 int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled);
 
-/* Output: enqueue a JSON variant. Fast path concatenates into the output buffer; if
- * pushed_fds are present or the queue is non-empty the message is queued instead, so that
- * fd-to-message boundaries are preserved. */
-int json_stream_enqueue(JsonStream *s, sd_json_variant *m);
-
-/* Allocate a queue item carrying `m` and the currently pushed fds. The pushed fds are
- * transferred to the new item; on success n_pushed_fds is reset to 0. The caller may
- * later submit the item via json_stream_enqueue_item() or free it. */
-int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret);
-int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q);
-JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q);
-DEFINE_TRIVIAL_CLEANUP_FUNC(JsonStreamQueueItem*, json_stream_queue_item_free);
-sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q);
-
-/* fd push/peek/take */
-int json_stream_push_fd(JsonStream *s, int fd);
-void json_stream_reset_pushed_fds(JsonStream *s);
+/* Output: enqueue a JSON variant together with an optional set of file descriptors. Fast
+ * path concatenates into the output buffer when fds is empty and the queue is empty; if fds
+ * are present or the queue is non-empty the message is queued instead, so that
+ * fd-to-message boundaries are preserved. The queue item copies the fd values; On success,
+ * ownership of the fd values transfers to the queue item (the caller must free its array
+ * without closing the fds). On failure, the fds remain untouched and the caller retains
+ * ownership. */
+int json_stream_enqueue_full(JsonStream *s, sd_json_variant *m, const int fds[], size_t n_fds);
+
+static inline int json_stream_enqueue(JsonStream *s, sd_json_variant *m) {
+        return json_stream_enqueue_full(s, m, NULL, 0);
+}
 
 int json_stream_peek_input_fd(const JsonStream *s, size_t i);
 int json_stream_take_input_fd(JsonStream *s, size_t i);
index 7130be69a4bf5ab1b8788cdcc5b3f4248b6c6843..2a5f677ef37d0f3fa15819ff1d94953a62f4d094 100644 (file)
@@ -590,7 +590,10 @@ static void varlink_clear_current(sd_varlink *v) {
 
         json_stream_close_input_fds(&v->stream);
 
-        v->previous = json_stream_queue_item_free(v->previous);
+        v->previous = sd_json_variant_unref(v->previous);
+        close_many(v->previous_fds, v->n_previous_fds);
+        v->previous_fds = mfree(v->previous_fds);
+        v->n_previous_fds = 0;
         if (v->sentinel != POINTER_MAX)
                 v->sentinel = mfree(v->sentinel);
         else
@@ -602,14 +605,17 @@ static void varlink_clear(sd_varlink *v) {
 
         /* Detach event sources first so the kernel no longer has epoll watches on the
          * stream's fds, then free the stream — json_stream_done() closes the input/output
-         * fds, the cached peer_pidfd, the received input fds, the queued output fds, and
-         * the pushed fds. */
+         * fds, the cached peer_pidfd, the received input fds, and the queued output fds. */
         sd_varlink_detach_event(v);
 
         varlink_clear_current(v);
 
         json_stream_done(&v->stream);
 
+        close_many(v->pushed_fds, v->n_pushed_fds);
+        v->pushed_fds = mfree(v->pushed_fds);
+        v->n_pushed_fds = 0;
+
         pidref_done_sigterm_wait(&v->exec_pidref);
 }
 
@@ -642,6 +648,20 @@ static int varlink_test_disconnect(sd_varlink *v) {
         return 1;
 }
 
+static int varlink_enqueue(sd_varlink *v, sd_json_variant *m) {
+        int r;
+
+        assert(v);
+        assert(m);
+
+        r = json_stream_enqueue_full(&v->stream, m, v->pushed_fds, v->n_pushed_fds);
+        if (r >= 0)
+                v->n_pushed_fds = 0; /* fds belong to the queue entry now */
+                /* We don't free v->pushed_fds so it can be reused for the next message. */
+
+        return r;
+}
+
 static int varlink_write(sd_varlink *v) {
         assert(v);
 
@@ -1098,9 +1118,11 @@ static int varlink_dispatch_method(sd_varlink *v) {
                                         r = sd_varlink_error_errno(v, r);
                                 } else if (v->sentinel) {
                                         if (v->previous) {
-                                                r = json_stream_enqueue_item(&v->stream, v->previous);
+                                                r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
                                                 if (r >= 0) {
-                                                        TAKE_PTR(v->previous);
+                                                        v->previous = sd_json_variant_unref(v->previous);
+                                                        v->previous_fds = mfree(v->previous_fds);
+                                                        v->n_previous_fds = 0;
                                                         varlink_set_state(v, VARLINK_PROCESSED_METHOD);
                                                 }
                                         } else {
@@ -1538,7 +1560,7 @@ _public_ int sd_varlink_send(sd_varlink *v, const char *method, sd_json_variant
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -1585,7 +1607,7 @@ _public_ int sd_varlink_invoke(sd_varlink *v, const char *method, sd_json_varian
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -1636,7 +1658,7 @@ _public_ int sd_varlink_observe(sd_varlink *v, const char *method, sd_json_varia
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -1683,7 +1705,7 @@ static int varlink_call_internal(sd_varlink *v, sd_json_variant *request) {
          * that we can assign a new reply shortly. */
         varlink_clear_current(v);
 
-        r = json_stream_enqueue(&v->stream, request);
+        r = varlink_enqueue(v, request);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -1991,7 +2013,7 @@ _public_ int sd_varlink_collect_full(
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -2159,23 +2181,28 @@ _public_ int sd_varlink_reply(sd_varlink *v, sd_json_variant *parameters) {
 
         if (more && v->sentinel) {
                 if (v->previous) {
-                        r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true);
+                        r = sd_json_variant_set_field_boolean(&v->previous, "continues", true);
                         if (r < 0)
                                 return r;
 
-                        r = json_stream_enqueue_item(&v->stream, v->previous);
+                        r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
                         if (r < 0)
                                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+
+                        v->previous = sd_json_variant_unref(v->previous);
+                        v->previous_fds = mfree(v->previous_fds);
+                        v->n_previous_fds = 0;
                 }
 
-                r = json_stream_make_queue_item(&v->stream, m, &v->previous);
-                if (r < 0)
-                        return r;
+                v->previous = sd_json_variant_ref(m);
+                v->previous_fds = TAKE_PTR(v->pushed_fds);
+                v->n_previous_fds = v->n_pushed_fds;
+                v->n_pushed_fds = 0;
 
                 return 1;
         }
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -2255,7 +2282,7 @@ _public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parame
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -2289,7 +2316,8 @@ _public_ int sd_varlink_reset_fds(sd_varlink *v) {
          * rollback the fds. Note that this is implicitly called whenever an error reply is sent, see
          * below. */
 
-        json_stream_reset_pushed_fds(&v->stream);
+        close_many(v->pushed_fds, v->n_pushed_fds);
+        v->n_pushed_fds = 0;
         return 0;
 }
 
@@ -2308,18 +2336,20 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia
                 return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
 
         if (v->previous) {
-                r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true);
+                r = sd_json_variant_set_field_boolean(&v->previous, "continues", true);
                 if (r < 0)
                         return r;
 
                 /* If we have a previous reply still ready make sure we queue it before the error. We only
                  * ever set "previous" if we're in a streaming method so we pass more=true unconditionally
                  * here as we know we're still going to queue an error afterwards. */
-                r = json_stream_enqueue_item(&v->stream, v->previous);
+                r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
                 if (r < 0)
                         return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
-                TAKE_PTR(v->previous);
+                v->previous = sd_json_variant_unref(v->previous);
+                v->previous_fds = mfree(v->previous_fds);
+                v->n_previous_fds = 0;
         }
 
         /* Reset the list of pushed file descriptors before sending an error reply. We do this here to
@@ -2350,7 +2380,7 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -2490,7 +2520,7 @@ _public_ int sd_varlink_notify(sd_varlink *v, sd_json_variant *parameters) {
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to build json message: %m");
 
-        r = json_stream_enqueue(&v->stream, m);
+        r = varlink_enqueue(v, m);
         if (r < 0)
                 return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
 
@@ -2722,7 +2752,15 @@ _public_ int sd_varlink_push_fd(sd_varlink *v, int fd) {
         if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT))
                 return -EPERM;
 
-        return json_stream_push_fd(&v->stream, fd);
+        if (v->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message, refuse early hence */
+                return -ENOBUFS;
+
+        if (!GREEDY_REALLOC(v->pushed_fds, v->n_pushed_fds + 1))
+                return -ENOMEM;
+
+        int i = (int) v->n_pushed_fds;
+        v->pushed_fds[v->n_pushed_fds++] = fd;
+        return i;
 }
 
 _public_ int sd_varlink_push_dup_fd(sd_varlink *v, int fd) {
index 966afd0b06cffaee73c779443c971aee98f810f0..8087c2c43246498276a36aa332ca91178818cf1f 100644 (file)
@@ -94,7 +94,12 @@ typedef struct sd_varlink {
         sd_varlink_reply_flags_t current_reply_flags;
         sd_varlink_symbol *current_method;
 
-        JsonStreamQueueItem *previous;
+        int *pushed_fds;
+        size_t n_pushed_fds;
+
+        sd_json_variant *previous;
+        int *previous_fds;
+        size_t n_previous_fds;
         char *sentinel;
 
         /* Per-call protocol-upgrade marker: set when the *current* method call carries the
index 6a92550e727bf67923714165d2b33101729103ea..dec965a46032b2fcc97f082bfa9da405ea9e33cc 100644 (file)
@@ -692,33 +692,6 @@ static QmpClientArgs* qmp_client_args_close_fds(QmpClientArgs *p) {
 
 DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds);
 
-/* Transfer fds to the stream. On partial failure narrow args to the unstaged tail so
- * the caller's cleanup closes only the untransferred fds. */
-static int qmp_client_stage_fds(QmpClient *c, QmpClientArgs *args) {
-        int r;
-
-        assert(c);
-
-        if (!args || args->n_fds == 0)
-                return 0;
-
-        assert(args->fds_consume);
-
-        for (size_t i = 0; i < args->n_fds; i++) {
-                r = json_stream_push_fd(&c->stream, args->fds_consume[i]);
-                if (r < 0) {
-                        /* Already-staged are owned by the stream; narrow args to the rest. */
-                        json_stream_reset_pushed_fds(&c->stream);
-                        args->fds_consume = &args->fds_consume[i];
-                        args->n_fds -= i;
-                        return r;
-                }
-        }
-
-        args->n_fds = 0;
-        return 0;
-}
-
 int qmp_client_invoke(
                 QmpClient *c,
                 const char *command,
@@ -728,8 +701,8 @@ int qmp_client_invoke(
 
         _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL;
         _cleanup_free_ QmpSlot *pending = NULL;
-        /* Closes any fds in args not yet handed to the stream on every early-return path;
-         * TAKE_PTR()'d on the success path below once stage_fds has consumed them. */
+        /* Closes any fds in args on every early-return path; TAKE_PTR()'d on the success path
+         * below once json_stream_enqueue_full() has taken ownership of them. */
         _cleanup_(qmp_client_args_close_fdsp) QmpClientArgs *fds_owner = args;
         uint64_t id;
         int r;
@@ -761,16 +734,10 @@ int qmp_client_invoke(
                 return r;
         assert(r > 0);
 
-        /* Stage AFTER ensure_running() drained internal enqueues so the next enqueue is ours. */
-        r = qmp_client_stage_fds(c, args);
-        if (r < 0) {
-                set_remove(c->slots, pending);
-                return r;
-        }
-
-        r = json_stream_enqueue(&c->stream, cmd);
+        r = json_stream_enqueue_full(&c->stream, cmd,
+                                     args ? args->fds_consume : NULL,
+                                     args ? args->n_fds : 0);
         if (r < 0) {
-                json_stream_reset_pushed_fds(&c->stream);
                 set_remove(c->slots, pending);
                 return r;
         }