return now(CLOCK_MONOTONIC);
}
-sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q) {
- assert(q);
- return &q->data;
-}
-
-JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) {
+static JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) {
if (!q)
return NULL;
s->output_fds = mfree(s->output_fds);
s->n_output_fds = 0;
- close_many(s->pushed_fds, s->n_pushed_fds);
- s->pushed_fds = mfree(s->pushed_fds);
- s->n_pushed_fds = 0;
-
LIST_CLEAR(queue, s->output_queue, json_stream_queue_item_free);
s->output_queue_tail = NULL;
s->n_output_queue = 0;
return ret;
}
-int json_stream_push_fd(JsonStream *s, int fd) {
- int i;
-
- assert(s);
- assert(fd >= 0);
-
- if (s->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message */
- return -ENOBUFS;
-
- if (!GREEDY_REALLOC(s->pushed_fds, s->n_pushed_fds + 1))
- return -ENOMEM;
-
- i = (int) s->n_pushed_fds;
- s->pushed_fds[s->n_pushed_fds++] = fd;
- return i;
-}
-
-void json_stream_reset_pushed_fds(JsonStream *s) {
- assert(s);
-
- close_many(s->pushed_fds, s->n_pushed_fds);
- s->n_pushed_fds = 0;
-}
-
int json_stream_peek_input_fd(const JsonStream *s, size_t i) {
assert(s);
return 0;
}
-int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q) {
- assert(s);
- assert(q);
-
- if (s->n_output_queue >= s->queue_max)
- return -ENOBUFS;
-
- LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q);
- s->output_queue_tail = q;
- s->n_output_queue++;
- return 0;
-}
-
-int json_stream_enqueue(JsonStream *s, sd_json_variant *m) {
- JsonStreamQueueItem *q;
-
+int json_stream_enqueue_full(JsonStream *s, sd_json_variant *m, const int fds[], size_t n_fds) {
assert(s);
assert(m);
+ assert(fds || n_fds == 0);
- /* Fast path: no fds pending and no items currently queued — append directly into the
+ /* Fast path: no fds and no items currently queued — append directly into the
* output buffer to avoid the queue allocation. */
- if (s->n_pushed_fds == 0 && !s->output_queue)
+ if (n_fds == 0 && !s->output_queue)
return json_stream_format_json(s, m);
if (s->n_output_queue >= s->queue_max)
return -ENOBUFS;
- q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds);
+ JsonStreamQueueItem *q = json_stream_queue_item_new(m, fds, n_fds);
if (!q)
return -ENOMEM;
- s->n_pushed_fds = 0; /* fds belong to the queue entry now */
-
- assert_se(json_stream_enqueue_item(s, q) >= 0);
- return 0;
-}
-
-int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret) {
- JsonStreamQueueItem *q;
-
- assert(s);
- assert(m);
- assert(ret);
-
- q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds);
- if (!q)
- return -ENOMEM;
-
- s->n_pushed_fds = 0; /* fds belong to the queue entry now */
-
- *ret = q;
+ LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q);
+ s->output_queue_tail = q;
+ s->n_output_queue++;
return 0;
}
JsonStreamQueueItem *output_queue_tail;
size_t n_output_queue;
- int *pushed_fds;
- size_t n_pushed_fds;
-
JsonStreamFlags flags;
} JsonStream;
int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt);
int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled);
-/* Output: enqueue a JSON variant. Fast path concatenates into the output buffer; if
- * pushed_fds are present or the queue is non-empty the message is queued instead, so that
- * fd-to-message boundaries are preserved. */
-int json_stream_enqueue(JsonStream *s, sd_json_variant *m);
-
-/* Allocate a queue item carrying `m` and the currently pushed fds. The pushed fds are
- * transferred to the new item; on success n_pushed_fds is reset to 0. The caller may
- * later submit the item via json_stream_enqueue_item() or free it. */
-int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret);
-int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q);
-JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q);
-DEFINE_TRIVIAL_CLEANUP_FUNC(JsonStreamQueueItem*, json_stream_queue_item_free);
-sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q);
-
-/* fd push/peek/take */
-int json_stream_push_fd(JsonStream *s, int fd);
-void json_stream_reset_pushed_fds(JsonStream *s);
+/* Output: enqueue a JSON variant together with an optional set of file descriptors. Fast
+ * path concatenates into the output buffer when fds is empty and the queue is empty; if fds
+ * are present or the queue is non-empty the message is queued instead, so that
+ * fd-to-message boundaries are preserved. The queue item copies the fd values; On success,
+ * ownership of the fd values transfers to the queue item (the caller must free its array
+ * without closing the fds). On failure, the fds remain untouched and the caller retains
+ * ownership. */
+int json_stream_enqueue_full(JsonStream *s, sd_json_variant *m, const int fds[], size_t n_fds);
+
+static inline int json_stream_enqueue(JsonStream *s, sd_json_variant *m) {
+ return json_stream_enqueue_full(s, m, NULL, 0);
+}
int json_stream_peek_input_fd(const JsonStream *s, size_t i);
int json_stream_take_input_fd(JsonStream *s, size_t i);
json_stream_close_input_fds(&v->stream);
- v->previous = json_stream_queue_item_free(v->previous);
+ v->previous = sd_json_variant_unref(v->previous);
+ close_many(v->previous_fds, v->n_previous_fds);
+ v->previous_fds = mfree(v->previous_fds);
+ v->n_previous_fds = 0;
if (v->sentinel != POINTER_MAX)
v->sentinel = mfree(v->sentinel);
else
/* Detach event sources first so the kernel no longer has epoll watches on the
* stream's fds, then free the stream — json_stream_done() closes the input/output
- * fds, the cached peer_pidfd, the received input fds, the queued output fds, and
- * the pushed fds. */
+ * fds, the cached peer_pidfd, the received input fds, and the queued output fds. */
sd_varlink_detach_event(v);
varlink_clear_current(v);
json_stream_done(&v->stream);
+ close_many(v->pushed_fds, v->n_pushed_fds);
+ v->pushed_fds = mfree(v->pushed_fds);
+ v->n_pushed_fds = 0;
+
pidref_done_sigterm_wait(&v->exec_pidref);
}
return 1;
}
+static int varlink_enqueue(sd_varlink *v, sd_json_variant *m) {
+ int r;
+
+ assert(v);
+ assert(m);
+
+ r = json_stream_enqueue_full(&v->stream, m, v->pushed_fds, v->n_pushed_fds);
+ if (r >= 0)
+ v->n_pushed_fds = 0; /* fds belong to the queue entry now */
+ /* We don't free v->pushed_fds so it can be reused for the next message. */
+
+ return r;
+}
+
static int varlink_write(sd_varlink *v) {
assert(v);
r = sd_varlink_error_errno(v, r);
} else if (v->sentinel) {
if (v->previous) {
- r = json_stream_enqueue_item(&v->stream, v->previous);
+ r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
if (r >= 0) {
- TAKE_PTR(v->previous);
+ v->previous = sd_json_variant_unref(v->previous);
+ v->previous_fds = mfree(v->previous_fds);
+ v->n_previous_fds = 0;
varlink_set_state(v, VARLINK_PROCESSED_METHOD);
}
} else {
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
* that we can assign a new reply shortly. */
varlink_clear_current(v);
- r = json_stream_enqueue(&v->stream, request);
+ r = varlink_enqueue(v, request);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (more && v->sentinel) {
if (v->previous) {
- r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true);
+ r = sd_json_variant_set_field_boolean(&v->previous, "continues", true);
if (r < 0)
return r;
- r = json_stream_enqueue_item(&v->stream, v->previous);
+ r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
+
+ v->previous = sd_json_variant_unref(v->previous);
+ v->previous_fds = mfree(v->previous_fds);
+ v->n_previous_fds = 0;
}
- r = json_stream_make_queue_item(&v->stream, m, &v->previous);
- if (r < 0)
- return r;
+ v->previous = sd_json_variant_ref(m);
+ v->previous_fds = TAKE_PTR(v->pushed_fds);
+ v->n_previous_fds = v->n_pushed_fds;
+ v->n_pushed_fds = 0;
return 1;
}
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
* rollback the fds. Note that this is implicitly called whenever an error reply is sent, see
* below. */
- json_stream_reset_pushed_fds(&v->stream);
+ close_many(v->pushed_fds, v->n_pushed_fds);
+ v->n_pushed_fds = 0;
return 0;
}
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
if (v->previous) {
- r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true);
+ r = sd_json_variant_set_field_boolean(&v->previous, "continues", true);
if (r < 0)
return r;
/* If we have a previous reply still ready make sure we queue it before the error. We only
* ever set "previous" if we're in a streaming method so we pass more=true unconditionally
* here as we know we're still going to queue an error afterwards. */
- r = json_stream_enqueue_item(&v->stream, v->previous);
+ r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
- TAKE_PTR(v->previous);
+ v->previous = sd_json_variant_unref(v->previous);
+ v->previous_fds = mfree(v->previous_fds);
+ v->n_previous_fds = 0;
}
/* Reset the list of pushed file descriptors before sending an error reply. We do this here to
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = json_stream_enqueue(&v->stream, m);
+ r = varlink_enqueue(v, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT))
return -EPERM;
- return json_stream_push_fd(&v->stream, fd);
+ if (v->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message, refuse early hence */
+ return -ENOBUFS;
+
+ if (!GREEDY_REALLOC(v->pushed_fds, v->n_pushed_fds + 1))
+ return -ENOMEM;
+
+ int i = (int) v->n_pushed_fds;
+ v->pushed_fds[v->n_pushed_fds++] = fd;
+ return i;
}
_public_ int sd_varlink_push_dup_fd(sd_varlink *v, int fd) {
DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds);
-/* Transfer fds to the stream. On partial failure narrow args to the unstaged tail so
- * the caller's cleanup closes only the untransferred fds. */
-static int qmp_client_stage_fds(QmpClient *c, QmpClientArgs *args) {
- int r;
-
- assert(c);
-
- if (!args || args->n_fds == 0)
- return 0;
-
- assert(args->fds_consume);
-
- for (size_t i = 0; i < args->n_fds; i++) {
- r = json_stream_push_fd(&c->stream, args->fds_consume[i]);
- if (r < 0) {
- /* Already-staged are owned by the stream; narrow args to the rest. */
- json_stream_reset_pushed_fds(&c->stream);
- args->fds_consume = &args->fds_consume[i];
- args->n_fds -= i;
- return r;
- }
- }
-
- args->n_fds = 0;
- return 0;
-}
-
int qmp_client_invoke(
QmpClient *c,
const char *command,
_cleanup_(sd_json_variant_unrefp) sd_json_variant *cmd = NULL;
_cleanup_free_ QmpSlot *pending = NULL;
- /* Closes any fds in args not yet handed to the stream on every early-return path;
- * TAKE_PTR()'d on the success path below once stage_fds has consumed them. */
+ /* Closes any fds in args on every early-return path; TAKE_PTR()'d on the success path
+ * below once json_stream_enqueue_full() has taken ownership of them. */
_cleanup_(qmp_client_args_close_fdsp) QmpClientArgs *fds_owner = args;
uint64_t id;
int r;
return r;
assert(r > 0);
- /* Stage AFTER ensure_running() drained internal enqueues so the next enqueue is ours. */
- r = qmp_client_stage_fds(c, args);
- if (r < 0) {
- set_remove(c->slots, pending);
- return r;
- }
-
- r = json_stream_enqueue(&c->stream, cmd);
+ r = json_stream_enqueue_full(&c->stream, cmd,
+ args ? args->fds_consume : NULL,
+ args ? args->n_fds : 0);
if (r < 0) {
- json_stream_reset_pushed_fds(&c->stream);
set_remove(c->slots, pending);
return r;
}