From: Daan De Meyer Date: Mon, 13 Apr 2026 08:18:04 +0000 (+0000) Subject: json-stream: hide JsonStreamQueueItem as an implementation detail X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8ad4adcb6f900c333da2cb6ac63e21f581d2ce4f;p=thirdparty%2Fsystemd.git json-stream: hide JsonStreamQueueItem as an implementation detail The json-stream API previously exposed JsonStreamQueueItem and several functions operating on it (json_stream_make_queue_item(), json_stream_enqueue_item(), json_stream_queue_item_free(), json_stream_queue_item_get_data()). These existed solely to support sd-varlink's "defer-and-modify" pattern for streaming replies, where a reply is held back so its "continues" field can be set before transmission. This is a varlink protocol concern that should not leak into the generic transport layer. Similarly, the fd pushing API (json_stream_push_fd(), json_stream_reset_pushed_fds()) and the pushed_fds state lived inside JsonStream, even though fd-to-message association is a protocol-level concern managed entirely by sd-varlink. Rework the API so that: - JsonStreamQueueItem and all its functions become static to json-stream.c. The only output API is now json_stream_enqueue_full() (accepting explicit fds) and the inline json_stream_enqueue() wrapper for the common no-fds case. - The pushed_fds state moves from JsonStream into sd_varlink, where sd_varlink_push_fd() and sd_varlink_reset_fds() manage it directly. - The deferred reply in sd-varlink changes from a JsonStreamQueueItem* to a plain sd_json_variant* plus a separate previous_fds/n_previous_fds pair, keeping the protocol-specific bookkeeping in sd-varlink where it belongs. - A new varlink_enqueue() helper wraps json_stream_enqueue_full() with the varlink connection's pushed fds, transferring fd ownership to the queue item on success. qmp-client.c is fixed to use the new API as well. --- diff --git a/src/libsystemd/sd-json/json-stream.c b/src/libsystemd/sd-json/json-stream.c index d8475ae8734..1900d1c7da4 100644 --- a/src/libsystemd/sd-json/json-stream.c +++ b/src/libsystemd/sd-json/json-stream.c @@ -50,12 +50,7 @@ static usec_t json_stream_now(const JsonStream *s) { return now(CLOCK_MONOTONIC); } -sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q) { - assert(q); - return &q->data; -} - -JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) { +static JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) { if (!q) return NULL; @@ -160,10 +155,6 @@ static void json_stream_clear(JsonStream *s) { s->output_fds = mfree(s->output_fds); s->n_output_fds = 0; - close_many(s->pushed_fds, s->n_pushed_fds); - s->pushed_fds = mfree(s->pushed_fds); - s->n_pushed_fds = 0; - LIST_CLEAR(queue, s->output_queue, json_stream_queue_item_free); s->output_queue_tail = NULL; s->n_output_queue = 0; @@ -883,30 +874,6 @@ int json_stream_flush(JsonStream *s) { return ret; } -int json_stream_push_fd(JsonStream *s, int fd) { - int i; - - assert(s); - assert(fd >= 0); - - if (s->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message */ - return -ENOBUFS; - - if (!GREEDY_REALLOC(s->pushed_fds, s->n_pushed_fds + 1)) - return -ENOMEM; - - i = (int) s->n_pushed_fds; - s->pushed_fds[s->n_pushed_fds++] = fd; - return i; -} - -void json_stream_reset_pushed_fds(JsonStream *s) { - assert(s); - - close_many(s->pushed_fds, s->n_pushed_fds); - s->n_pushed_fds = 0; -} - int json_stream_peek_input_fd(const JsonStream *s, size_t i) { assert(s); @@ -1060,57 +1027,26 @@ static int json_stream_format_queue(JsonStream *s) { return 0; } -int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q) { - assert(s); - assert(q); - - if (s->n_output_queue >= s->queue_max) - return -ENOBUFS; - - LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q); - s->output_queue_tail = q; - s->n_output_queue++; - return 0; -} - -int json_stream_enqueue(JsonStream *s, sd_json_variant *m) { - JsonStreamQueueItem *q; - +int json_stream_enqueue_full(JsonStream *s, sd_json_variant *m, const int fds[], size_t n_fds) { assert(s); assert(m); + assert(fds || n_fds == 0); - /* Fast path: no fds pending and no items currently queued — append directly into the + /* Fast path: no fds and no items currently queued — append directly into the * output buffer to avoid the queue allocation. */ - if (s->n_pushed_fds == 0 && !s->output_queue) + if (n_fds == 0 && !s->output_queue) return json_stream_format_json(s, m); if (s->n_output_queue >= s->queue_max) return -ENOBUFS; - q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds); + JsonStreamQueueItem *q = json_stream_queue_item_new(m, fds, n_fds); if (!q) return -ENOMEM; - s->n_pushed_fds = 0; /* fds belong to the queue entry now */ - - assert_se(json_stream_enqueue_item(s, q) >= 0); - return 0; -} - -int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret) { - JsonStreamQueueItem *q; - - assert(s); - assert(m); - assert(ret); - - q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds); - if (!q) - return -ENOMEM; - - s->n_pushed_fds = 0; /* fds belong to the queue entry now */ - - *ret = q; + LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q); + s->output_queue_tail = q; + s->n_output_queue++; return 0; } diff --git a/src/libsystemd/sd-json/json-stream.h b/src/libsystemd/sd-json/json-stream.h index 671b0f8985c..b502c98676e 100644 --- a/src/libsystemd/sd-json/json-stream.h +++ b/src/libsystemd/sd-json/json-stream.h @@ -119,9 +119,6 @@ typedef struct JsonStream { JsonStreamQueueItem *output_queue_tail; size_t n_output_queue; - int *pushed_fds; - size_t n_pushed_fds; - JsonStreamFlags flags; } JsonStream; @@ -174,23 +171,18 @@ bool json_stream_should_disconnect(const JsonStream *s); int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt); int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled); -/* Output: enqueue a JSON variant. Fast path concatenates into the output buffer; if - * pushed_fds are present or the queue is non-empty the message is queued instead, so that - * fd-to-message boundaries are preserved. */ -int json_stream_enqueue(JsonStream *s, sd_json_variant *m); - -/* Allocate a queue item carrying `m` and the currently pushed fds. The pushed fds are - * transferred to the new item; on success n_pushed_fds is reset to 0. The caller may - * later submit the item via json_stream_enqueue_item() or free it. */ -int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret); -int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q); -JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q); -DEFINE_TRIVIAL_CLEANUP_FUNC(JsonStreamQueueItem*, json_stream_queue_item_free); -sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q); - -/* fd push/peek/take */ -int json_stream_push_fd(JsonStream *s, int fd); -void json_stream_reset_pushed_fds(JsonStream *s); +/* Output: enqueue a JSON variant together with an optional set of file descriptors. Fast + * path concatenates into the output buffer when fds is empty and the queue is empty; if fds + * are present or the queue is non-empty the message is queued instead, so that + * fd-to-message boundaries are preserved. The queue item copies the fd values; On success, + * ownership of the fd values transfers to the queue item (the caller must free its array + * without closing the fds). On failure, the fds remain untouched and the caller retains + * ownership. */ +int json_stream_enqueue_full(JsonStream *s, sd_json_variant *m, const int fds[], size_t n_fds); + +static inline int json_stream_enqueue(JsonStream *s, sd_json_variant *m) { + return json_stream_enqueue_full(s, m, NULL, 0); +} int json_stream_peek_input_fd(const JsonStream *s, size_t i); int json_stream_take_input_fd(JsonStream *s, size_t i); diff --git a/src/libsystemd/sd-varlink/sd-varlink.c b/src/libsystemd/sd-varlink/sd-varlink.c index 7130be69a4b..2a5f677ef37 100644 --- a/src/libsystemd/sd-varlink/sd-varlink.c +++ b/src/libsystemd/sd-varlink/sd-varlink.c @@ -590,7 +590,10 @@ static void varlink_clear_current(sd_varlink *v) { json_stream_close_input_fds(&v->stream); - v->previous = json_stream_queue_item_free(v->previous); + v->previous = sd_json_variant_unref(v->previous); + close_many(v->previous_fds, v->n_previous_fds); + v->previous_fds = mfree(v->previous_fds); + v->n_previous_fds = 0; if (v->sentinel != POINTER_MAX) v->sentinel = mfree(v->sentinel); else @@ -602,14 +605,17 @@ static void varlink_clear(sd_varlink *v) { /* Detach event sources first so the kernel no longer has epoll watches on the * stream's fds, then free the stream — json_stream_done() closes the input/output - * fds, the cached peer_pidfd, the received input fds, the queued output fds, and - * the pushed fds. */ + * fds, the cached peer_pidfd, the received input fds, and the queued output fds. */ sd_varlink_detach_event(v); varlink_clear_current(v); json_stream_done(&v->stream); + close_many(v->pushed_fds, v->n_pushed_fds); + v->pushed_fds = mfree(v->pushed_fds); + v->n_pushed_fds = 0; + pidref_done_sigterm_wait(&v->exec_pidref); } @@ -642,6 +648,20 @@ static int varlink_test_disconnect(sd_varlink *v) { return 1; } +static int varlink_enqueue(sd_varlink *v, sd_json_variant *m) { + int r; + + assert(v); + assert(m); + + r = json_stream_enqueue_full(&v->stream, m, v->pushed_fds, v->n_pushed_fds); + if (r >= 0) + v->n_pushed_fds = 0; /* fds belong to the queue entry now */ + /* We don't free v->pushed_fds so it can be reused for the next message. */ + + return r; +} + static int varlink_write(sd_varlink *v) { assert(v); @@ -1098,9 +1118,11 @@ static int varlink_dispatch_method(sd_varlink *v) { r = sd_varlink_error_errno(v, r); } else if (v->sentinel) { if (v->previous) { - r = json_stream_enqueue_item(&v->stream, v->previous); + r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds); if (r >= 0) { - TAKE_PTR(v->previous); + v->previous = sd_json_variant_unref(v->previous); + v->previous_fds = mfree(v->previous_fds); + v->n_previous_fds = 0; varlink_set_state(v, VARLINK_PROCESSED_METHOD); } } else { @@ -1538,7 +1560,7 @@ _public_ int sd_varlink_send(sd_varlink *v, const char *method, sd_json_variant if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -1585,7 +1607,7 @@ _public_ int sd_varlink_invoke(sd_varlink *v, const char *method, sd_json_varian if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -1636,7 +1658,7 @@ _public_ int sd_varlink_observe(sd_varlink *v, const char *method, sd_json_varia if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -1683,7 +1705,7 @@ static int varlink_call_internal(sd_varlink *v, sd_json_variant *request) { * that we can assign a new reply shortly. */ varlink_clear_current(v); - r = json_stream_enqueue(&v->stream, request); + r = varlink_enqueue(v, request); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -1991,7 +2013,7 @@ _public_ int sd_varlink_collect_full( if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2159,23 +2181,28 @@ _public_ int sd_varlink_reply(sd_varlink *v, sd_json_variant *parameters) { if (more && v->sentinel) { if (v->previous) { - r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true); + r = sd_json_variant_set_field_boolean(&v->previous, "continues", true); if (r < 0) return r; - r = json_stream_enqueue_item(&v->stream, v->previous); + r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); + + v->previous = sd_json_variant_unref(v->previous); + v->previous_fds = mfree(v->previous_fds); + v->n_previous_fds = 0; } - r = json_stream_make_queue_item(&v->stream, m, &v->previous); - if (r < 0) - return r; + v->previous = sd_json_variant_ref(m); + v->previous_fds = TAKE_PTR(v->pushed_fds); + v->n_previous_fds = v->n_pushed_fds; + v->n_pushed_fds = 0; return 1; } - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2255,7 +2282,7 @@ _public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parame if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2289,7 +2316,8 @@ _public_ int sd_varlink_reset_fds(sd_varlink *v) { * rollback the fds. Note that this is implicitly called whenever an error reply is sent, see * below. */ - json_stream_reset_pushed_fds(&v->stream); + close_many(v->pushed_fds, v->n_pushed_fds); + v->n_pushed_fds = 0; return 0; } @@ -2308,18 +2336,20 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy."); if (v->previous) { - r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true); + r = sd_json_variant_set_field_boolean(&v->previous, "continues", true); if (r < 0) return r; /* If we have a previous reply still ready make sure we queue it before the error. We only * ever set "previous" if we're in a streaming method so we pass more=true unconditionally * here as we know we're still going to queue an error afterwards. */ - r = json_stream_enqueue_item(&v->stream, v->previous); + r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); - TAKE_PTR(v->previous); + v->previous = sd_json_variant_unref(v->previous); + v->previous_fds = mfree(v->previous_fds); + v->n_previous_fds = 0; } /* Reset the list of pushed file descriptors before sending an error reply. We do this here to @@ -2350,7 +2380,7 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2490,7 +2520,7 @@ _public_ int sd_varlink_notify(sd_varlink *v, sd_json_variant *parameters) { if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = json_stream_enqueue(&v->stream, m); + r = varlink_enqueue(v, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2722,7 +2752,15 @@ _public_ int sd_varlink_push_fd(sd_varlink *v, int fd) { if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT)) return -EPERM; - return json_stream_push_fd(&v->stream, fd); + if (v->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message, refuse early hence */ + return -ENOBUFS; + + if (!GREEDY_REALLOC(v->pushed_fds, v->n_pushed_fds + 1)) + return -ENOMEM; + + int i = (int) v->n_pushed_fds; + v->pushed_fds[v->n_pushed_fds++] = fd; + return i; } _public_ int sd_varlink_push_dup_fd(sd_varlink *v, int fd) { diff --git a/src/libsystemd/sd-varlink/varlink-internal.h b/src/libsystemd/sd-varlink/varlink-internal.h index 966afd0b06c..8087c2c4324 100644 --- a/src/libsystemd/sd-varlink/varlink-internal.h +++ b/src/libsystemd/sd-varlink/varlink-internal.h @@ -94,7 +94,12 @@ typedef struct sd_varlink { sd_varlink_reply_flags_t current_reply_flags; sd_varlink_symbol *current_method; - JsonStreamQueueItem *previous; + int *pushed_fds; + size_t n_pushed_fds; + + sd_json_variant *previous; + int *previous_fds; + size_t n_previous_fds; char *sentinel; /* Per-call protocol-upgrade marker: set when the *current* method call carries the diff --git a/src/shared/qmp-client.c b/src/shared/qmp-client.c index 6a92550e727..dec965a4603 100644 --- a/src/shared/qmp-client.c +++ b/src/shared/qmp-client.c @@ -692,33 +692,6 @@ static QmpClientArgs* qmp_client_args_close_fds(QmpClientArgs *p) { DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds); -/* Transfer fds to the stream. On partial failure narrow args to the unstaged tail so - * the caller's cleanup closes only the untransferred fds. */ -static int qmp_client_stage_fds(QmpClient *c, QmpClientArgs *args) { - int r; - - assert(c); - - if (!args || args->n_fds == 0) - return 0; - - assert(args->fds_consume); - - for (size_t i = 0; i < args->n_fds; i++) { - r = json_stream_push_fd(&c->stream, args->fds_consume[i]); - if (r < 0) { - /* Already-staged are owned by the stream; narrow args to the rest. */ - json_stream_reset_pushed_fds(&c->stream); - args->fds_consume = &args->fds_consume[i]; - args->n_fds -= i; - return r; - } - } - - args->n_fds = 0; - return 0; -} - int qmp_client_invoke( QmpClient *c, const char *command, @@ -728,8 +701,8 @@ int qmp_client_invoke( _cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL; _cleanup_free_ QmpSlot *pending = NULL; - /* Closes any fds in args not yet handed to the stream on every early-return path; - * TAKE_PTR()'d on the success path below once stage_fds has consumed them. */ + /* Closes any fds in args on every early-return path; TAKE_PTR()'d on the success path + * below once json_stream_enqueue_full() has taken ownership of them. */ _cleanup_(qmp_client_args_close_fdsp) QmpClientArgs *fds_owner = args; uint64_t id; int r; @@ -761,16 +734,10 @@ int qmp_client_invoke( return r; assert(r > 0); - /* Stage AFTER ensure_running() drained internal enqueues so the next enqueue is ours. */ - r = qmp_client_stage_fds(c, args); - if (r < 0) { - set_remove(c->slots, pending); - return r; - } - - r = json_stream_enqueue(&c->stream, cmd); + r = json_stream_enqueue_full(&c->stream, cmd, + args ? args->fds_consume : NULL, + args ? args->n_fds : 0); if (r < 0) { - json_stream_reset_pushed_fds(&c->stream); set_remove(c->slots, pending); return r; }