From 6b1a59d59426cdda56648b00394addde2d454418 Mon Sep 17 00:00:00 2001 From: Daan De Meyer Date: Thu, 9 Apr 2026 14:54:21 +0000 Subject: [PATCH] sd-json: add JsonStream transport-layer module and migrate sd-varlink MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Introduces JsonStream, a generic transport layer for JSON-line message exchange over a pair of file descriptors. It owns the input/output buffers, SCM_RIGHTS fd passing, the deferred output queue, the read/write/parse step functions, sd-event integration (input/output/time event sources), the idle timeout machinery, and peer credential caching, but knows nothing about the specific JSON protocol on top — the consumer drives its state machine via phase/dispatch callbacks supplied at construction. sd-varlink is reworked to delegate the entire transport layer to a JsonStream owned by sd_varlink. The varlink struct drops every transport-related field (input/output buffers and fds, output queue, fd-passing state, ucred/pidfd cache, prefer_read/write fallback, idle timeout, description, event sources) — all of that lives in JsonStream now. What remains in sd_varlink is the varlink-protocol state machine (state, n_pending, current/previous/sentinel, server linkage, peer credentials accounting, exec_pidref, the varlink-specific quit and defer sources) and a thin wrapper layer over the JsonStream API. The should_disconnect / get_timeout / get_events / wait helpers all live in JsonStream now and are driven by a JsonStreamPhase the consumer reports via its phase callback. --- src/libsystemd/meson.build | 1 + src/libsystemd/sd-json/json-stream.c | 1438 ++++++++++++++++++ src/libsystemd/sd-json/json-stream.h | 267 ++++ src/libsystemd/sd-varlink/sd-varlink.c | 1399 +++-------------- src/libsystemd/sd-varlink/varlink-internal.h | 116 +- 5 files changed, 1959 insertions(+), 1262 deletions(-) create mode 100644 src/libsystemd/sd-json/json-stream.c create mode 100644 src/libsystemd/sd-json/json-stream.h diff --git a/src/libsystemd/meson.build b/src/libsystemd/meson.build index 976f0e99876..08d8d7c5c39 100644 --- a/src/libsystemd/meson.build +++ b/src/libsystemd/meson.build @@ -80,6 +80,7 @@ sd_login_sources = files('sd-login/sd-login.c') ############################################################ sd_json_sources = files( + 'sd-json/json-stream.c', 'sd-json/json-util.c', 'sd-json/sd-json.c', ) diff --git a/src/libsystemd/sd-json/json-stream.c b/src/libsystemd/sd-json/json-stream.c new file mode 100644 index 00000000000..d378d18a282 --- /dev/null +++ b/src/libsystemd/sd-json/json-stream.c @@ -0,0 +1,1438 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ + +#include +#include +#include +#include + +#include "sd-event.h" +#include "sd-json.h" + +#include "alloc-util.h" +#include "errno-util.h" +#include "fd-util.h" +#include "io-util.h" +#include "iovec-util.h" +#include "json-stream.h" +#include "list.h" +#include "log.h" +#include "memory-util.h" +#include "process-util.h" +#include "socket-util.h" +#include "string-util.h" +#include "time-util.h" +#include "user-util.h" + +#define JSON_STREAM_BUFFER_MAX_DEFAULT (16U * 1024U * 1024U) +#define JSON_STREAM_READ_SIZE_DEFAULT (64U * 1024U) +#define JSON_STREAM_QUEUE_MAX_DEFAULT (64U * 1024U) +#define JSON_STREAM_FDS_MAX (16U * 1024U) + +struct JsonStreamQueueItem { + LIST_FIELDS(JsonStreamQueueItem, queue); + sd_json_variant *data; + size_t n_fds; + int fds[]; +}; + +static const char* json_stream_description(const JsonStream *s) { + return (s ? s->description : NULL) ?: "json-stream"; +} + +/* Returns the size of the framing delimiter in bytes: strlen(delimiter) for multi-char + * delimiters (e.g. "\r\n"), or 1 for the default NUL-byte delimiter (delimiter == NULL). */ +static size_t json_stream_delimiter_size(const JsonStream *s) { + return strlen_ptr(s->delimiter) ?: 1; +} + +static usec_t json_stream_now(const JsonStream *s) { + usec_t t; + + if (s->event && sd_event_now(s->event, CLOCK_MONOTONIC, &t) >= 0) + return t; + + return now(CLOCK_MONOTONIC); +} + +#define json_stream_log(s, fmt, ...) \ + log_debug("%s: " fmt, json_stream_description(s), ##__VA_ARGS__) + +#define json_stream_log_errno(s, error, fmt, ...) \ + log_debug_errno((error), "%s: " fmt, json_stream_description(s), ##__VA_ARGS__) + +sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q) { + assert(q); + return &q->data; +} + +JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) { + if (!q) + return NULL; + + sd_json_variant_unref(q->data); + close_many(q->fds, q->n_fds); + + return mfree(q); +} + +static JsonStreamQueueItem* json_stream_queue_item_new(sd_json_variant *m, const int fds[], size_t n_fds) { + JsonStreamQueueItem *q; + + assert(m); + assert(fds || n_fds == 0); + + size_t sz = sizeof(int); + if (!MUL_SAFE(&sz, sz, n_fds) || + !INC_SAFE(&sz, offsetof(JsonStreamQueueItem, fds))) + return NULL; + + q = malloc(sz); + if (!q) + return NULL; + + *q = (JsonStreamQueueItem) { + .data = sd_json_variant_ref(m), + .n_fds = n_fds, + }; + + memcpy_safe(q->fds, fds, n_fds * sizeof(int)); + + return TAKE_PTR(q); +} + +int json_stream_init(JsonStream *s, const JsonStreamParams *params) { + assert(s); + assert(params); + assert(params->phase); + assert(params->dispatch); + + char *delimiter = NULL; + if (params->delimiter) { + delimiter = strdup(params->delimiter); + if (!delimiter) + return -ENOMEM; + } + + *s = (JsonStream) { + .delimiter = delimiter, + .buffer_max = params->buffer_max > 0 ? params->buffer_max : JSON_STREAM_BUFFER_MAX_DEFAULT, + .read_chunk = params->read_chunk > 0 ? params->read_chunk : JSON_STREAM_READ_SIZE_DEFAULT, + .queue_max = params->queue_max > 0 ? params->queue_max : JSON_STREAM_QUEUE_MAX_DEFAULT, + .phase_cb = params->phase, + .dispatch_cb = params->dispatch, + .userdata = params->userdata, + .input_fd = -EBADF, + .output_fd = -EBADF, + .timeout = USEC_INFINITY, + .last_activity = USEC_INFINITY, + .ucred = UCRED_INVALID, + .peer_pidfd = -EBADF, + .af = -1, + }; + + return 0; +} + +static void json_stream_clear(JsonStream *s) { + if (!s) + return; + + json_stream_detach_event(s); + + s->delimiter = mfree(s->delimiter); + s->description = mfree(s->description); + + if (s->input_fd != s->output_fd) { + s->input_fd = safe_close(s->input_fd); + s->output_fd = safe_close(s->output_fd); + } else + s->output_fd = s->input_fd = safe_close(s->input_fd); + + s->peer_pidfd = safe_close(s->peer_pidfd); + s->ucred_acquired = false; + s->af = -1; + + close_many(s->input_fds, s->n_input_fds); + s->input_fds = mfree(s->input_fds); + s->n_input_fds = 0; + + s->input_buffer = FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE) ? erase_and_free(s->input_buffer) : mfree(s->input_buffer); + s->input_buffer_index = s->input_buffer_size = s->input_buffer_unscanned = 0; + + s->output_buffer = FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE) ? erase_and_free(s->output_buffer) : mfree(s->output_buffer); + s->output_buffer_index = s->output_buffer_size = 0; + s->flags &= ~JSON_STREAM_OUTPUT_BUFFER_SENSITIVE; + + s->input_control_buffer = mfree(s->input_control_buffer); + s->input_control_buffer_size = 0; + + close_many(s->output_fds, s->n_output_fds); + s->output_fds = mfree(s->output_fds); + s->n_output_fds = 0; + + close_many(s->pushed_fds, s->n_pushed_fds); + s->pushed_fds = mfree(s->pushed_fds); + s->n_pushed_fds = 0; + + LIST_CLEAR(queue, s->output_queue, json_stream_queue_item_free); + s->output_queue_tail = NULL; + s->n_output_queue = 0; +} + +void json_stream_done(JsonStream *s) { + if (!s) + return; + + json_stream_clear(s); +} + +int json_stream_set_description(JsonStream *s, const char *description) { + assert(s); + return free_and_strdup(&s->description, description); +} + +const char* json_stream_get_description(const JsonStream *s) { + assert(s); + return s->description; +} + +int json_stream_connect_address(JsonStream *s, const char *address) { + union sockaddr_union sockaddr; + int r; + + assert(s); + assert(address); + + _cleanup_close_ int sock_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (sock_fd < 0) + return json_stream_log_errno(s, errno, "Failed to create AF_UNIX socket: %m"); + + sock_fd = fd_move_above_stdio(sock_fd); + + r = sockaddr_un_set_path(&sockaddr.un, address); + if (r < 0) { + if (r != -ENAMETOOLONG) + return json_stream_log_errno(s, r, "Failed to set socket address '%s': %m", address); + + /* Path too long to fit into sockaddr_un, connect via O_PATH instead. */ + r = connect_unix_path(sock_fd, AT_FDCWD, address); + } else + r = RET_NERRNO(connect(sock_fd, &sockaddr.sa, r)); + + if (r < 0) { + if (!IN_SET(r, -EAGAIN, -EINPROGRESS)) + return json_stream_log_errno(s, r, "Failed to connect to %s: %m", address); + + /* The connect() is being processed in the background. As long as that's the + * case the socket is in a special state: we can poll it for POLLOUT, but + * write()s before POLLOUT will fail with ENOTCONN (rather than EAGAIN). Since + * ENOTCONN can mean two different things (not yet connected vs. already + * disconnected), we track this as a separate flag. */ + s->flags |= JSON_STREAM_CONNECTING; + } + + int fd = TAKE_FD(sock_fd); + return json_stream_attach_fds(s, fd, fd); +} + +int json_stream_attach_fds(JsonStream *s, int input_fd, int output_fd) { + struct stat st; + + assert(s); + + /* NB: input_fd and output_fd are donated to the JsonStream instance! */ + + if (s->input_fd != s->output_fd) { + safe_close(s->input_fd); + safe_close(s->output_fd); + } else + safe_close(s->input_fd); + + s->input_fd = input_fd; + s->output_fd = output_fd; + s->flags &= ~(JSON_STREAM_PREFER_READ|JSON_STREAM_PREFER_WRITE); + + /* Detect non-socket fds up front so the read/write paths use read()/write() for + * non-socket fds and send()/recv() for sockets (mostly for MSG_NOSIGNAL). */ + if (input_fd >= 0) { + if (fstat(input_fd, &st) < 0) + return -errno; + if (!S_ISSOCK(st.st_mode)) + s->flags |= JSON_STREAM_PREFER_READ; + } + + if (output_fd >= 0 && output_fd != input_fd) { + if (fstat(output_fd, &st) < 0) + return -errno; + if (!S_ISSOCK(st.st_mode)) + s->flags |= JSON_STREAM_PREFER_WRITE; + } else if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_READ)) + s->flags |= JSON_STREAM_PREFER_WRITE; + + return 0; +} + +int json_stream_connect_fd_pair(JsonStream *s, int input_fd, int output_fd) { + int r; + + assert(s); + assert(input_fd >= 0); + assert(output_fd >= 0); + + r = fd_nonblock(input_fd, true); + if (r < 0) + return json_stream_log_errno(s, r, "Failed to make input fd %d nonblocking: %m", input_fd); + + if (input_fd != output_fd) { + r = fd_nonblock(output_fd, true); + if (r < 0) + return json_stream_log_errno(s, r, "Failed to make output fd %d nonblocking: %m", output_fd); + } + + return json_stream_attach_fds(s, input_fd, output_fd); +} + +bool json_stream_flags_set(const JsonStream *s, JsonStreamFlags flags) { + assert(s); + assert((flags & ~(JSON_STREAM_BOUNDED_READS|JSON_STREAM_INPUT_SENSITIVE|JSON_STREAM_ALLOW_FD_PASSING_INPUT|JSON_STREAM_ALLOW_FD_PASSING_OUTPUT)) == 0); + + return FLAGS_SET(s->flags, flags); +} + +/* Multiple flags may be passed — all are set or cleared together. */ +void json_stream_set_flags(JsonStream *s, JsonStreamFlags flags, bool b) { + assert(s); + assert((flags & ~(JSON_STREAM_BOUNDED_READS|JSON_STREAM_INPUT_SENSITIVE)) == 0); + + SET_FLAG(s->flags, flags, b); +} + +bool json_stream_has_buffered_input(const JsonStream *s) { + assert(s); + return s->input_buffer_size > 0; +} + +/* Query the consumer's current phase. The callback is mandatory (asserted at construction + * time), so we can call it unconditionally. */ +static JsonStreamPhase json_stream_current_phase(const JsonStream *s) { + assert(s); + return s->phase_cb(s->userdata); +} + +/* Both READING and AWAITING_REPLY mean "we want POLLIN and would lose if the read side + * died" — they only differ in whether the idle timeout is in force. */ +static bool phase_is_reading(JsonStreamPhase p) { + return IN_SET(p, JSON_STREAM_PHASE_READING, JSON_STREAM_PHASE_AWAITING_REPLY); +} + +bool json_stream_should_disconnect(const JsonStream *s) { + assert(s); + + /* Carefully decide when the consumer should initiate a teardown. We err on the side + * of staying around so half-open connections can flush remaining data and reads can + * surface buffered messages before we tear everything down. */ + + /* Wait until any in-flight async connect() completes — there's nothing reasonable + * to do until we know whether the socket is connected or not. */ + if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING)) + return false; + + /* Still bytes to write and we can write? Stay around so the flush can complete. */ + if (s->output_buffer_size > 0 && !FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED)) + return false; + + /* Both sides gone already? Then there's no point in lingering. */ + if (FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED|JSON_STREAM_WRITE_DISCONNECTED)) + return true; + + JsonStreamPhase phase = json_stream_current_phase(s); + + /* Caller is waiting for input but the read side is shut down — we'll never see + * another message. */ + if (phase_is_reading(phase) && FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED)) + return true; + + /* Idle client whose write side has died, or we saw POLLHUP. We explicitly check for + * POLLHUP because we likely won't notice the write side being down via send() if we + * never wrote anything in the first place. */ + if (phase == JSON_STREAM_PHASE_IDLE_CLIENT && + (s->flags & (JSON_STREAM_WRITE_DISCONNECTED|JSON_STREAM_GOT_POLLHUP))) + return true; + + /* Caller has more output to send but the peer hung up, and we're either out of + * bytes or already saw a write error. Nothing left to do. */ + if (phase == JSON_STREAM_PHASE_PENDING_OUTPUT && + (FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED) || s->output_buffer_size == 0) && + FLAGS_SET(s->flags, JSON_STREAM_GOT_POLLHUP)) + return true; + + return false; +} + +int json_stream_get_events(const JsonStream *s) { + int ret = 0; + + assert(s); + + /* While an asynchronous connect() is still in flight we only ask for POLLOUT, which + * tells us once the connection is fully established. We must not read or write before + * that. */ + if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING)) + return POLLOUT; + + if (phase_is_reading(json_stream_current_phase(s)) && + !FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED) && + s->input_buffer_unscanned == 0) + ret |= POLLIN; + + if (!FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED) && (s->output_queue || s->output_buffer_size > 0)) + ret |= POLLOUT; + + return ret; +} + +static void json_stream_handle_revents(JsonStream *s, int revents) { + assert(s); + + if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING)) { + /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a + * connect() to complete on, we know we are ready. We don't read the connection + * error here though — we'll get it on the next read() or write(). */ + if ((revents & (POLLOUT|POLLHUP)) == 0) + return; + + json_stream_log(s, "Asynchronous connection completed."); + s->flags &= ~JSON_STREAM_CONNECTING; + return; + } + + /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and + * writing what we can. However, we do care about POLLHUP to detect connection + * termination even if we momentarily don't want to read nor write anything. */ + if (FLAGS_SET(revents, POLLHUP)) { + json_stream_log(s, "Got POLLHUP from socket."); + s->flags |= JSON_STREAM_GOT_POLLHUP; + } +} + +int json_stream_wait(JsonStream *s, usec_t timeout) { + int events, r; + + assert(s); + + events = json_stream_get_events(s); + if (events < 0) + return events; + + /* MIN the caller's timeout with our own deadline (if any) so that we wake up to + * fire the idle timeout. */ + usec_t deadline = json_stream_get_timeout(s); + if (deadline != USEC_INFINITY) + timeout = MIN(timeout, usec_sub_unsigned(deadline, now(CLOCK_MONOTONIC))); + + struct pollfd pollfd[2]; + size_t n_poll_fd = 0; + + if (s->input_fd == s->output_fd) { + pollfd[n_poll_fd++] = (struct pollfd) { + .fd = s->input_fd, + .events = events, + }; + } else { + pollfd[n_poll_fd++] = (struct pollfd) { + .fd = s->input_fd, + .events = events & POLLIN, + }; + pollfd[n_poll_fd++] = (struct pollfd) { + .fd = s->output_fd, + .events = events & POLLOUT, + }; + } + + r = ppoll_usec(pollfd, n_poll_fd, timeout); + if (ERRNO_IS_NEG_TRANSIENT(r)) + /* Treat EINTR as not a timeout, but also nothing happened, and the caller gets + * a chance to call back into us. */ + return 1; + if (r <= 0) + return r; + + int revents = 0; + FOREACH_ARRAY(p, pollfd, n_poll_fd) + revents |= p->revents; + + json_stream_handle_revents(s, revents); + return 1; +} + +/* ===== Timeout management ===== */ + +static usec_t json_stream_get_deadline(const JsonStream *s) { + assert(s); + + return usec_add(s->last_activity, s->timeout); +} + +usec_t json_stream_get_timeout(const JsonStream *s) { + assert(s); + + /* The deadline is in force only when the consumer is in PHASE_AWAITING_REPLY. In + * other phases (idle server, between operations) we ignore the cached deadline even + * if it's still set from a previous operation. */ + if (json_stream_current_phase(s) != JSON_STREAM_PHASE_AWAITING_REPLY) + return USEC_INFINITY; + + return json_stream_get_deadline(s); +} + +static void json_stream_rearm_time_source(JsonStream *s) { + int r; + + assert(s); + + if (!s->time_event_source) + return; + + usec_t deadline = json_stream_get_timeout(s); + if (deadline == USEC_INFINITY) { + (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_OFF); + return; + } + + r = sd_event_source_set_time(s->time_event_source, deadline); + if (r < 0) { + json_stream_log_errno(s, r, "Failed to set time source deadline: %m"); + return; + } + + (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_ON); +} + +void json_stream_set_timeout(JsonStream *s, usec_t timeout) { + assert(s); + + s->timeout = timeout; + + /* If the configured timeout changes mid-flight, rearm the time source so the new + * deadline takes effect immediately rather than waiting for the next mark_activity + * or successful write. */ + json_stream_rearm_time_source(s); +} + +void json_stream_mark_activity(JsonStream *s) { + assert(s); + + s->last_activity = json_stream_now(s); + json_stream_rearm_time_source(s); +} + +static int json_stream_acquire_peer_ucred(JsonStream *s, struct ucred *ret) { + int r; + + assert(s); + assert(ret); + + if (!s->ucred_acquired) { + /* Peer credentials only make sense for a bidirectional socket. */ + if (s->input_fd != s->output_fd) + return -EBADF; + + r = getpeercred(s->input_fd, &s->ucred); + if (r < 0) + return r; + + s->ucred_acquired = true; + } + + *ret = s->ucred; + return 0; +} + +int json_stream_acquire_peer_uid(JsonStream *s, uid_t *ret) { + struct ucred ucred; + int r; + + assert(s); + assert(ret); + + r = json_stream_acquire_peer_ucred(s, &ucred); + if (r < 0) + return json_stream_log_errno(s, r, "Failed to acquire credentials: %m"); + + if (!uid_is_valid(ucred.uid)) + return json_stream_log_errno(s, SYNTHETIC_ERRNO(ENODATA), "Peer UID is invalid."); + + *ret = ucred.uid; + return 0; +} + +int json_stream_acquire_peer_gid(JsonStream *s, gid_t *ret) { + struct ucred ucred; + int r; + + assert(s); + assert(ret); + + r = json_stream_acquire_peer_ucred(s, &ucred); + if (r < 0) + return json_stream_log_errno(s, r, "Failed to acquire credentials: %m"); + + if (!gid_is_valid(ucred.gid)) + return json_stream_log_errno(s, SYNTHETIC_ERRNO(ENODATA), "Peer GID is invalid."); + + *ret = ucred.gid; + return 0; +} + +int json_stream_acquire_peer_pid(JsonStream *s, pid_t *ret) { + struct ucred ucred; + int r; + + assert(s); + assert(ret); + + r = json_stream_acquire_peer_ucred(s, &ucred); + if (r < 0) + return json_stream_log_errno(s, r, "Failed to acquire credentials: %m"); + + if (!pid_is_valid(ucred.pid)) + return json_stream_log_errno(s, SYNTHETIC_ERRNO(ENODATA), "Peer PID is invalid."); + + *ret = ucred.pid; + return 0; +} + +int json_stream_get_peer_ucred(const JsonStream *s, struct ucred *ret) { + assert(s); + assert(ret); + + if (!s->ucred_acquired) + return -ENODATA; + + *ret = s->ucred; + return 0; +} + +void json_stream_set_peer_ucred(JsonStream *s, const struct ucred *ucred) { + assert(s); + assert(ucred); + + s->ucred = *ucred; + s->ucred_acquired = true; +} + +int json_stream_acquire_peer_pidfd(JsonStream *s) { + assert(s); + + if (s->peer_pidfd >= 0) + return s->peer_pidfd; + + if (s->input_fd != s->output_fd) + return json_stream_log_errno(s, SYNTHETIC_ERRNO(EBADF), "Failed to acquire pidfd of peer: separate input/output fds"); + + s->peer_pidfd = getpeerpidfd(s->input_fd); + if (s->peer_pidfd < 0) + return json_stream_log_errno(s, s->peer_pidfd, "Failed to acquire pidfd of peer: %m"); + + return s->peer_pidfd; +} + +static int json_stream_verify_unix_socket(JsonStream *s) { + assert(s); + + /* Returns: + * • 0 if this is an AF_UNIX socket + * • -ENOTSOCK if this is not a socket at all + * • -ENOMEDIUM if this is a socket, but not an AF_UNIX socket + * + * The result is cached after the first call. af < 0 = unchecked, af == AF_UNSPEC = + * checked but not a socket, otherwise af is the resolved address family. */ + + if (s->af < 0) { + /* If we have distinct input + output fds, we don't consider ourselves to be + * connected via a regular AF_UNIX socket. */ + if (s->input_fd != s->output_fd) { + s->af = AF_UNSPEC; + return -ENOTSOCK; + } + + struct stat st; + + if (fstat(s->input_fd, &st) < 0) + return -errno; + if (!S_ISSOCK(st.st_mode)) { + s->af = AF_UNSPEC; + return -ENOTSOCK; + } + + s->af = socket_get_family(s->input_fd); + if (s->af < 0) + return s->af; + } + + if (s->af == AF_UNIX) + return 0; + if (s->af == AF_UNSPEC) + return -ENOTSOCK; + + return -ENOMEDIUM; +} + +int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt) { + int r; + + assert(s); + + if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT) == enabled) + return 0; + + r = json_stream_verify_unix_socket(s); + if (r < 0) { + /* If the caller is disabling, accept the verify failure silently — we just + * leave the flag as it was (or set it to false if currently true). */ + if (!enabled) { + s->flags &= ~JSON_STREAM_ALLOW_FD_PASSING_INPUT; + return 0; + } + return r; + } + + if (with_sockopt) { + r = setsockopt_int(s->input_fd, SOL_SOCKET, SO_PASSRIGHTS, enabled); + if (r < 0 && !ERRNO_IS_NEG_NOT_SUPPORTED(r)) + json_stream_log_errno(s, r, "Failed to set SO_PASSRIGHTS socket option: %m"); + } + + SET_FLAG(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT, enabled); + return 1; +} + +int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled) { + int r; + + assert(s); + + if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT) == enabled) + return 0; + + r = json_stream_verify_unix_socket(s); + if (r < 0) + return r; + + SET_FLAG(s->flags, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT, enabled); + return 1; +} + +/* ===== sd-event integration ===== */ + +static int json_stream_io_callback(sd_event_source *source, int fd, uint32_t revents, void *userdata) { + JsonStream *s = ASSERT_PTR(userdata); + int r; + + json_stream_handle_revents(s, revents); + + r = s->dispatch_cb(s->userdata); + if (r < 0) + json_stream_log_errno(s, r, "Dispatch callback failed, ignoring: %m"); + + return 1; +} + +static int json_stream_time_callback(sd_event_source *source, uint64_t usec, void *userdata) { + JsonStream *s = ASSERT_PTR(userdata); + int r; + + /* Disable the source: it must not fire again until activity is marked. The consumer + * notices the timeout by comparing now() to json_stream_get_timeout() in its dispatch + * callback. */ + (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_OFF); + + r = s->dispatch_cb(s->userdata); + if (r < 0) + json_stream_log_errno(s, r, "Dispatch callback failed, ignoring: %m"); + + return 1; +} + +static int json_stream_prepare_callback(sd_event_source *source, void *userdata) { + JsonStream *s = ASSERT_PTR(userdata); + int r, e; + + e = json_stream_get_events(s); + if (e < 0) + return e; + + if (s->input_event_source == s->output_event_source) + /* Same fd for input + output */ + r = sd_event_source_set_io_events(s->input_event_source, e); + else { + r = sd_event_source_set_io_events(s->input_event_source, e & POLLIN); + if (r >= 0) + r = sd_event_source_set_io_events(s->output_event_source, e & POLLOUT); + } + if (r < 0) + return json_stream_log_errno(s, r, "Failed to set io events: %m"); + + /* Rearm the timeout on every prepare cycle so that phase transitions (e.g. entering + * AWAITING_REPLY) are picked up without requiring the consumer to explicitly call + * mark_activity at every state change. */ + json_stream_rearm_time_source(s); + + return 1; +} + +void json_stream_detach_event(JsonStream *s) { + if (!s) + return; + + s->input_event_source = sd_event_source_disable_unref(s->input_event_source); + s->output_event_source = sd_event_source_disable_unref(s->output_event_source); + s->time_event_source = sd_event_source_disable_unref(s->time_event_source); + s->event = sd_event_unref(s->event); +} + +sd_event* json_stream_get_event(const JsonStream *s) { + assert(s); + return s->event; +} + +int json_stream_attach_event(JsonStream *s, sd_event *event, int64_t priority) { + int r; + + assert(s); + assert(!s->event); + assert(s->input_fd >= 0); + assert(s->output_fd >= 0); + + if (event) + s->event = sd_event_ref(event); + else { + r = sd_event_default(&s->event); + if (r < 0) + return json_stream_log_errno(s, r, "Failed to acquire default event loop: %m"); + } + + r = sd_event_add_io(s->event, &s->input_event_source, s->input_fd, 0, json_stream_io_callback, s); + if (r < 0) + goto fail; + + r = sd_event_source_set_prepare(s->input_event_source, json_stream_prepare_callback); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(s->input_event_source, priority); + if (r < 0) + goto fail; + + (void) sd_event_source_set_description(s->input_event_source, "json-stream-input"); + + if (s->input_fd == s->output_fd) + s->output_event_source = sd_event_source_ref(s->input_event_source); + else { + r = sd_event_add_io(s->event, &s->output_event_source, s->output_fd, 0, json_stream_io_callback, s); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(s->output_event_source, priority); + if (r < 0) + goto fail; + + (void) sd_event_source_set_description(s->output_event_source, "json-stream-output"); + } + + r = sd_event_add_time(s->event, &s->time_event_source, CLOCK_MONOTONIC, /* usec= */ 0, /* accuracy= */ 0, + json_stream_time_callback, s); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(s->time_event_source, priority); + if (r < 0) + goto fail; + + (void) sd_event_source_set_description(s->time_event_source, "json-stream-time"); + + /* Initially disabled — only enabled by mark_activity once a timeout is configured. */ + (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_OFF); + json_stream_rearm_time_source(s); + + return 0; + +fail: + json_stream_log_errno(s, r, "Failed to attach event source: %m"); + json_stream_detach_event(s); + return r; +} + +int json_stream_flush(JsonStream *s) { + int ret = 0, r; + + assert(s); + + for (;;) { + if (s->output_buffer_size == 0 && !s->output_queue) + break; + if (FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED)) + return -ECONNRESET; + + r = json_stream_write(s); + if (r < 0) + return r; + if (r > 0) { + ret = 1; + continue; + } + + r = json_stream_wait(s, USEC_INFINITY); + if (ERRNO_IS_NEG_TRANSIENT(r)) + continue; + if (r < 0) + return json_stream_log_errno(s, r, "Poll failed on fd: %m"); + assert(r > 0); + } + + return ret; +} + +int json_stream_push_fd(JsonStream *s, int fd) { + int i; + + assert(s); + assert(fd >= 0); + + if (s->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message */ + return -ENOBUFS; + + if (!GREEDY_REALLOC(s->pushed_fds, s->n_pushed_fds + 1)) + return -ENOMEM; + + i = (int) s->n_pushed_fds; + s->pushed_fds[s->n_pushed_fds++] = fd; + return i; +} + +void json_stream_reset_pushed_fds(JsonStream *s) { + assert(s); + + close_many(s->pushed_fds, s->n_pushed_fds); + s->n_pushed_fds = 0; +} + +int json_stream_peek_input_fd(const JsonStream *s, size_t i) { + assert(s); + + if (i >= s->n_input_fds) + return -ENXIO; + + return s->input_fds[i]; +} + +int json_stream_take_input_fd(JsonStream *s, size_t i) { + assert(s); + + if (i >= s->n_input_fds) + return -ENXIO; + + return TAKE_FD(s->input_fds[i]); +} + +size_t json_stream_get_n_input_fds(const JsonStream *s) { + assert(s); + return s->n_input_fds; +} + +void json_stream_close_input_fds(JsonStream *s) { + assert(s); + + close_many(s->input_fds, s->n_input_fds); + s->input_fds = mfree(s->input_fds); + s->n_input_fds = 0; +} + +/* ===== Output formatting ===== */ + +static int json_stream_format_json(JsonStream *s, sd_json_variant *m) { + _cleanup_(erase_and_freep) char *text = NULL; + ssize_t sz, r; + + assert(s); + assert(m); + + sz = sd_json_variant_format(m, /* flags= */ 0, &text); + if (sz < 0) + return sz; + assert(text[sz] == '\0'); + + size_t dsz = json_stream_delimiter_size(s); + + /* Append the framing delimiter after the formatted JSON. For varlink (delimiter == + * NULL) this keeps the trailing NUL already placed by sd_json_variant_format(); for + * multi-char delimiters (e.g. "\r\n") we grow the buffer and copy them in. */ + if (s->delimiter) { + if (!GREEDY_REALLOC(text, sz + dsz)) + return -ENOMEM; + memcpy(text + sz, s->delimiter, dsz); + } + + if (s->output_buffer_size + sz + dsz > s->buffer_max) + return -ENOBUFS; + + if (DEBUG_LOGGING) { + _cleanup_(erase_and_freep) char *censored_text = NULL; + + /* Suppress sensitive fields in the debug output */ + r = sd_json_variant_format(m, SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text); + if (r >= 0) + json_stream_log(s, "Sending message: %s", censored_text); + } + + if (s->output_buffer_size == 0) { + if (FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE)) { + s->output_buffer = erase_and_free(s->output_buffer); + s->flags &= ~JSON_STREAM_OUTPUT_BUFFER_SENSITIVE; + } + + free_and_replace(s->output_buffer, text); + + s->output_buffer_size = sz + dsz; + s->output_buffer_index = 0; + + } else if (!FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE) && s->output_buffer_index == 0) { + if (!GREEDY_REALLOC(s->output_buffer, s->output_buffer_size + sz + dsz)) + return -ENOMEM; + + memcpy(s->output_buffer + s->output_buffer_size, text, sz + dsz); + s->output_buffer_size += sz + dsz; + } else { + const size_t new_size = s->output_buffer_size + sz + dsz; + + char *n = new(char, new_size); + if (!n) + return -ENOMEM; + + memcpy(mempcpy(n, s->output_buffer + s->output_buffer_index, s->output_buffer_size), text, sz + dsz); + + if (FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE)) + s->output_buffer = erase_and_free(s->output_buffer); + else + free(s->output_buffer); + s->output_buffer = n; + s->output_buffer_size = new_size; + s->output_buffer_index = 0; + } + + if (sd_json_variant_is_sensitive_recursive(m)) + s->flags |= JSON_STREAM_OUTPUT_BUFFER_SENSITIVE; + else + text = mfree(text); /* Skip the erase_and_free() destructor declared above */ + + return 0; +} + +static int json_stream_format_queue(JsonStream *s) { + int r; + + assert(s); + + /* Drain entries out of the output queue and format them into the output buffer. Stop + * if there are unwritten output_fds, since adding more would corrupt the fd boundary. */ + + while (s->output_queue) { + assert(s->n_output_queue > 0); + + if (s->n_output_fds > 0) + return 0; + + JsonStreamQueueItem *q = s->output_queue; + _cleanup_free_ int *array = NULL; + + if (q->n_fds > 0) { + array = newdup(int, q->fds, q->n_fds); + if (!array) + return -ENOMEM; + } + + r = json_stream_format_json(s, q->data); + if (r < 0) + return r; + + free_and_replace(s->output_fds, array); + s->n_output_fds = q->n_fds; + q->n_fds = 0; + + LIST_REMOVE(queue, s->output_queue, q); + if (!s->output_queue) + s->output_queue_tail = NULL; + s->n_output_queue--; + + json_stream_queue_item_free(q); + } + + return 0; +} + +int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q) { + assert(s); + assert(q); + + if (s->n_output_queue >= s->queue_max) + return -ENOBUFS; + + LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q); + s->output_queue_tail = q; + s->n_output_queue++; + return 0; +} + +int json_stream_enqueue(JsonStream *s, sd_json_variant *m) { + JsonStreamQueueItem *q; + + assert(s); + assert(m); + + /* Fast path: no fds pending and no items currently queued — append directly into the + * output buffer to avoid the queue allocation. */ + if (s->n_pushed_fds == 0 && !s->output_queue) + return json_stream_format_json(s, m); + + if (s->n_output_queue >= s->queue_max) + return -ENOBUFS; + + q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds); + if (!q) + return -ENOMEM; + + s->n_pushed_fds = 0; /* fds belong to the queue entry now */ + + assert_se(json_stream_enqueue_item(s, q) >= 0); + return 0; +} + +int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret) { + JsonStreamQueueItem *q; + + assert(s); + assert(m); + assert(ret); + + q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds); + if (!q) + return -ENOMEM; + + s->n_pushed_fds = 0; /* fds belong to the queue entry now */ + + *ret = q; + return 0; +} + +/* ===== Write side ===== */ + +int json_stream_write(JsonStream *s) { + ssize_t n; + int r; + + assert(s); + + if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING)) + return 0; + if (FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED)) + return 0; + + /* Drain the deferred queue into the output buffer if possible */ + r = json_stream_format_queue(s); + if (r < 0) + return r; + + if (s->output_buffer_size == 0) + return 0; + + assert(s->output_fd >= 0); + + if (s->n_output_fds > 0) { + struct iovec iov = { + .iov_base = s->output_buffer + s->output_buffer_index, + .iov_len = s->output_buffer_size, + }; + struct msghdr mh = { + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_controllen = CMSG_SPACE(sizeof(int) * s->n_output_fds), + }; + + mh.msg_control = alloca0(mh.msg_controllen); + + struct cmsghdr *control = CMSG_FIRSTHDR(&mh); + control->cmsg_len = CMSG_LEN(sizeof(int) * s->n_output_fds); + control->cmsg_level = SOL_SOCKET; + control->cmsg_type = SCM_RIGHTS; + memcpy(CMSG_DATA(control), s->output_fds, sizeof(int) * s->n_output_fds); + + n = sendmsg(s->output_fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL); + } else if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_WRITE)) + n = write(s->output_fd, s->output_buffer + s->output_buffer_index, s->output_buffer_size); + else + n = send(s->output_fd, s->output_buffer + s->output_buffer_index, s->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL); + if (n < 0) { + if (ERRNO_IS_TRANSIENT(errno)) + return 0; + + if (ERRNO_IS_DISCONNECT(errno)) { + s->flags |= JSON_STREAM_WRITE_DISCONNECTED; + return 1; + } + + return -errno; + } + + if (FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE)) + explicit_bzero_safe(s->output_buffer + s->output_buffer_index, n); + + s->output_buffer_size -= n; + + if (s->output_buffer_size == 0) { + s->output_buffer_index = 0; + s->flags &= ~JSON_STREAM_OUTPUT_BUFFER_SENSITIVE; + } else + s->output_buffer_index += n; + + close_many(s->output_fds, s->n_output_fds); + s->n_output_fds = 0; + + /* Refresh activity timestamp on real progress (and rearm the time source if attached + * to an event loop). */ + s->last_activity = json_stream_now(s); + json_stream_rearm_time_source(s); + + return 1; +} + +/* ===== Read side ===== */ + +/* In bounded-reads mode, peek at the socket data to find the delimiter and return a read + * size that won't consume past it. This prevents over-reading data that belongs to whatever + * protocol the socket is being handed off to. Falls back to byte-by-byte for non-socket fds + * where MSG_PEEK is not available. */ +static ssize_t json_stream_peek_message_boundary(JsonStream *s, void *p, size_t rs) { + assert(s); + + if (!FLAGS_SET(s->flags, JSON_STREAM_BOUNDED_READS)) + return rs; + + if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_READ)) + return 1; + + ssize_t peeked = recv(s->input_fd, p, rs, MSG_PEEK|MSG_DONTWAIT); + if (peeked < 0) { + if (!ERRNO_IS_TRANSIENT(errno)) + return -errno; + + /* Transient error: shouldn't happen but fall back to byte-by-byte */ + return 1; + } + /* EOF: the real recv() will also see it; what we return here doesn't matter */ + if (peeked == 0) + return rs; + + size_t dsz = json_stream_delimiter_size(s); + void *delim = memmem_safe(p, peeked, s->delimiter ?: "\0", dsz); + if (delim) + return (ssize_t) ((char*) delim - (char*) p) + dsz; + + return peeked; +} + +int json_stream_read(JsonStream *s) { + struct iovec iov; + struct msghdr mh; + ssize_t rs; + ssize_t n; + void *p; + + assert(s); + + if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING)) + return 0; + if (s->input_buffer_unscanned > 0) + return 0; + if (FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED)) + return 0; + + if (s->input_buffer_size >= s->buffer_max) + return -ENOBUFS; + + assert(s->input_fd >= 0); + + if (MALLOC_SIZEOF_SAFE(s->input_buffer) <= s->input_buffer_index + s->input_buffer_size) { + size_t add; + + add = MIN(s->buffer_max - s->input_buffer_size, s->read_chunk); + + if (!FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE) && s->input_buffer_index == 0) { + if (!GREEDY_REALLOC(s->input_buffer, s->input_buffer_size + add)) + return -ENOMEM; + } else { + char *b; + + b = new(char, s->input_buffer_size + add); + if (!b) + return -ENOMEM; + + memcpy(b, s->input_buffer + s->input_buffer_index, s->input_buffer_size); + + if (FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE)) + s->input_buffer = erase_and_free(s->input_buffer); + else + free(s->input_buffer); + s->input_buffer = b; + s->input_buffer_index = 0; + } + } + + p = s->input_buffer + s->input_buffer_index + s->input_buffer_size; + + rs = MALLOC_SIZEOF_SAFE(s->input_buffer) - (s->input_buffer_index + s->input_buffer_size); + + /* If a protocol upgrade may follow, ensure we don't consume any post-upgrade bytes by + * limiting the read to the next delimiter. Uses MSG_PEEK on sockets, single-byte reads + * otherwise. */ + rs = json_stream_peek_message_boundary(s, p, rs); + if (rs < 0) + return json_stream_log_errno(s, (int) rs, "Failed to peek message boundary: %m"); + + if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) { + iov = IOVEC_MAKE(p, rs); + + if (!s->input_control_buffer) { + s->input_control_buffer_size = CMSG_SPACE(sizeof(int) * JSON_STREAM_FDS_MAX); + s->input_control_buffer = malloc(s->input_control_buffer_size); + if (!s->input_control_buffer) + return -ENOMEM; + } + + mh = (struct msghdr) { + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = s->input_control_buffer, + .msg_controllen = s->input_control_buffer_size, + }; + + n = recvmsg_safe(s->input_fd, &mh, MSG_DONTWAIT|MSG_CMSG_CLOEXEC); + } else if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_READ)) + n = RET_NERRNO(read(s->input_fd, p, rs)); + else + n = RET_NERRNO(recv(s->input_fd, p, rs, MSG_DONTWAIT)); + if (ERRNO_IS_NEG_TRANSIENT(n)) + return 0; + if (ERRNO_IS_NEG_DISCONNECT(n)) { + s->flags |= JSON_STREAM_READ_DISCONNECTED; + return 1; + } + if (n < 0) + return n; + if (n == 0) { /* EOF */ + if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) + cmsg_close_all(&mh); + + s->flags |= JSON_STREAM_READ_DISCONNECTED; + return 1; + } + + if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) { + struct cmsghdr *cmsg; + + cmsg = cmsg_find(&mh, SOL_SOCKET, SCM_RIGHTS, (socklen_t) -1); + if (cmsg) { + size_t add; + + /* fds are only allowed with the first byte of a message; receiving them + * mid-stream is a protocol violation. */ + if (s->input_buffer_size != 0) { + cmsg_close_all(&mh); + return -EPROTO; + } + + add = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int); + if (add > INT_MAX - s->n_input_fds) { + cmsg_close_all(&mh); + return -EBADF; + } + + if (!GREEDY_REALLOC(s->input_fds, s->n_input_fds + add)) { + cmsg_close_all(&mh); + return -ENOMEM; + } + + memcpy_safe(s->input_fds + s->n_input_fds, CMSG_TYPED_DATA(cmsg, int), add * sizeof(int)); + s->n_input_fds += add; + } + } + + s->input_buffer_size += n; + s->input_buffer_unscanned += n; + + return 1; +} + +/* ===== Parse ===== */ + +int json_stream_parse(JsonStream *s, sd_json_variant **ret) { + char *begin, *e; + size_t sz; + int r; + + assert(s); + assert(ret); + + if (s->input_buffer_unscanned == 0) { + *ret = NULL; + return 0; + } + + assert(s->input_buffer_unscanned <= s->input_buffer_size); + assert(s->input_buffer_index + s->input_buffer_size <= MALLOC_SIZEOF_SAFE(s->input_buffer)); + + begin = s->input_buffer + s->input_buffer_index; + + size_t dsz = json_stream_delimiter_size(s); + e = memmem_safe(begin + s->input_buffer_size - s->input_buffer_unscanned, s->input_buffer_unscanned, s->delimiter ?: "\0", dsz); + if (!e) { + s->input_buffer_unscanned = 0; + *ret = NULL; + return 0; + } + + sz = e - begin + dsz; + + /* For non-NUL delimiters (e.g. "\r\n" for QMP) sd_json_parse() needs a NUL-terminated + * string; overwrite the first delimiter byte with NUL in place. For NUL delimiters + * this is a no-op since the byte is already '\0'. */ + if (s->delimiter) + *e = '\0'; + + r = sd_json_parse(begin, SD_JSON_PARSE_MUST_BE_OBJECT, ret, /* reterr_line= */ NULL, /* reterr_column= */ NULL); + if (FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE)) + explicit_bzero_safe(begin, sz); + if (r < 0) { + /* Unrecoverable parse failure: drop all buffered data. */ + s->input_buffer_index = s->input_buffer_size = s->input_buffer_unscanned = 0; + return json_stream_log_errno(s, r, "Failed to parse JSON object: %m"); + } + + if (DEBUG_LOGGING) { + _cleanup_(erase_and_freep) char *censored_text = NULL; + + /* Suppress sensitive fields in the debug output */ + r = sd_json_variant_format(*ret, /* flags= */ SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text); + if (r >= 0) + json_stream_log(s, "Received message: %s", censored_text); + } + + s->input_buffer_size -= sz; + + if (s->input_buffer_size == 0) + s->input_buffer_index = 0; + else + s->input_buffer_index += sz; + + s->input_buffer_unscanned = s->input_buffer_size; + return 1; +} diff --git a/src/libsystemd/sd-json/json-stream.h b/src/libsystemd/sd-json/json-stream.h new file mode 100644 index 00000000000..92a09d49419 --- /dev/null +++ b/src/libsystemd/sd-json/json-stream.h @@ -0,0 +1,267 @@ +/* SPDX-License-Identifier: LGPL-2.1-or-later */ +#pragma once + +#include + +#include "sd-forward.h" + +#include "list.h" + +/* JsonStream provides the transport layer used by sd-varlink (and other consumers like + * the QMP client) for exchanging length-delimited JSON messages over a pair of file + * descriptors. It owns the input/output buffers, the file-descriptor passing machinery + * (SCM_RIGHTS), the deferred output queue, and the read/write/parse step functions. It + * does not implement any state machine, dispatch, callback or event-source plumbing — + * those concerns belong to the consumer. */ + +typedef struct JsonStreamQueueItem JsonStreamQueueItem; + +typedef enum JsonStreamFlags { + JSON_STREAM_BOUNDED_READS = 1u << 0, + JSON_STREAM_INPUT_SENSITIVE = 1u << 1, + JSON_STREAM_ALLOW_FD_PASSING_INPUT = 1u << 2, + JSON_STREAM_ALLOW_FD_PASSING_OUTPUT = 1u << 3, + JSON_STREAM_CONNECTING = 1u << 4, + JSON_STREAM_GOT_POLLHUP = 1u << 5, + JSON_STREAM_WRITE_DISCONNECTED = 1u << 6, + JSON_STREAM_READ_DISCONNECTED = 1u << 7, + JSON_STREAM_PREFER_READ = 1u << 8, + JSON_STREAM_PREFER_WRITE = 1u << 9, + JSON_STREAM_OUTPUT_BUFFER_SENSITIVE = 1u << 10, +} JsonStreamFlags; + +/* What the consumer's high-level state machine is currently doing — used by the various + * "what should I do right now?" APIs (get_events, wait, should_disconnect) to decide + * whether to ask for read events, whether transport death matters, and whether the idle + * timeout deadline is currently in force. */ +typedef enum JsonStreamPhase { + JSON_STREAM_PHASE_READING, /* waiting for the next inbound message, no deadline */ + JSON_STREAM_PHASE_AWAITING_REPLY, /* waiting for a reply with the idle timeout deadline */ + JSON_STREAM_PHASE_IDLE_CLIENT, /* idle client, no in-flight call */ + JSON_STREAM_PHASE_PENDING_OUTPUT, /* has more output queued, waiting to send */ + JSON_STREAM_PHASE_OTHER, /* none of the above */ +} JsonStreamPhase; + +/* Consumer hooks supplied at construction time: + * • phase — queried by get_events / wait / should_disconnect / attach_event's prepare + * callback whenever the consumer's current phase is needed. + * • dispatch — invoked by attach_event's io and time callbacks after the stream has + * consumed the revents, so the consumer can drive its state machine + * forward. Should return 0 on success or a negative errno; the stream logs + * the failure and continues running. */ +typedef JsonStreamPhase (*json_stream_phase_t)(void *userdata); +typedef int (*json_stream_dispatch_t)(void *userdata); + +typedef struct JsonStreamParams { + const char *delimiter; /* message delimiter; NULL → single NUL byte (varlink), e.g. "\r\n" for QMP */ + size_t buffer_max; /* maximum bytes buffered before -ENOBUFS; 0 = 16 MiB default */ + size_t read_chunk; /* per-read chunk size; 0 = 64 KiB default */ + size_t queue_max; /* maximum number of queued output items; 0 = 64 Ki default */ + + /* Consumer hooks (see typedefs above). */ + json_stream_phase_t phase; + json_stream_dispatch_t dispatch; + void *userdata; +} JsonStreamParams; + +typedef struct JsonStream { + char *delimiter; /* message delimiter; NULL → NUL byte (varlink), e.g. "\r\n" for QMP */ + size_t buffer_max; + size_t read_chunk; + size_t queue_max; + + char *description; + + int input_fd; + int output_fd; + + usec_t timeout; /* relative; USEC_INFINITY = no timeout */ + usec_t last_activity; /* CLOCK_MONOTONIC */ + + /* Cached peer credentials */ + struct ucred ucred; + bool ucred_acquired; + int peer_pidfd; + + /* Cached socket address family. -1 = unchecked, AF_UNSPEC = checked-not-socket, + * otherwise the resolved family. */ + int af; + + sd_event *event; + sd_event_source *input_event_source; + sd_event_source *output_event_source; + sd_event_source *time_event_source; + + json_stream_phase_t phase_cb; + json_stream_dispatch_t dispatch_cb; + void *userdata; + + char *input_buffer; + size_t input_buffer_index; + size_t input_buffer_size; + size_t input_buffer_unscanned; + + void *input_control_buffer; + size_t input_control_buffer_size; + + char *output_buffer; + size_t output_buffer_index; + size_t output_buffer_size; + + int *input_fds; + size_t n_input_fds; + + int *output_fds; + size_t n_output_fds; + + LIST_HEAD(JsonStreamQueueItem, output_queue); + JsonStreamQueueItem *output_queue_tail; + size_t n_output_queue; + + int *pushed_fds; + size_t n_pushed_fds; + + JsonStreamFlags flags; +} JsonStream; + +int json_stream_init(JsonStream *s, const JsonStreamParams *params); +void json_stream_done(JsonStream *s); + +/* Optional description used as the prefix for the stream's debug log lines (sent/received + * messages, POLLHUP detection, async connect completion, etc.). The string is duped. */ +int json_stream_set_description(JsonStream *s, const char *description); +const char* json_stream_get_description(const JsonStream *s); + +/* fd ownership */ +int json_stream_attach_fds(JsonStream *s, int input_fd, int output_fd); + +/* Open an AF_UNIX SOCK_STREAM socket and connect to the given filesystem path, attaching + * the resulting fd to the stream. Handles paths too long for sockaddr_un by routing through + * O_PATH (connect_unix_path()). If the connect() returns EAGAIN/EINPROGRESS the stream's + * connecting state is set so that the consumer waits for POLLOUT before treating the + * connection as established. Returns 0 on success or successfully started async connect, + * negative errno on failure. */ +int json_stream_connect_address(JsonStream *s, const char *address); + +/* Adopt a pre-connected pair of fds, ensuring both are non-blocking. Equivalent to + * json_stream_attach_fds() but does the fd_nonblock() dance up front, so the caller can + * pass in fds without having to know whether they were already configured. */ +int json_stream_connect_fd_pair(JsonStream *s, int input_fd, int output_fd); + +bool json_stream_flags_set(const JsonStream *s, JsonStreamFlags flags); +void json_stream_set_flags(JsonStream *s, JsonStreamFlags flags, bool b); + +/* Combines the transport-level disconnect signals (write/read disconnected, buffered + * output, POLLHUP, async connect) with the consumer's current phase (queried via the + * registered get_phase callback) to answer "should the consumer initiate teardown right + * now?". The decision logic mirrors what the original varlink transport did but stays + * generic enough for other JSON-line consumers. */ +bool json_stream_should_disconnect(const JsonStream *s); + +/* Enable/disable fd passing. These verify the underlying fd is an AF_UNIX socket and + * (for input) optionally set SO_PASSRIGHTS. */ +int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt); +int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled); + +/* Output: enqueue a JSON variant. Fast path concatenates into the output buffer; if + * pushed_fds are present or the queue is non-empty the message is queued instead, so that + * fd-to-message boundaries are preserved. */ +int json_stream_enqueue(JsonStream *s, sd_json_variant *m); + +/* Allocate a queue item carrying `m` and the currently pushed fds. The pushed fds are + * transferred to the new item; on success n_pushed_fds is reset to 0. The caller may + * later submit the item via json_stream_enqueue_item() or free it. */ +int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret); +int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q); +JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q); +DEFINE_TRIVIAL_CLEANUP_FUNC(JsonStreamQueueItem*, json_stream_queue_item_free); +sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q); + +/* fd push/peek/take */ +int json_stream_push_fd(JsonStream *s, int fd); +void json_stream_reset_pushed_fds(JsonStream *s); + +int json_stream_peek_input_fd(const JsonStream *s, size_t i); +int json_stream_take_input_fd(JsonStream *s, size_t i); +size_t json_stream_get_n_input_fds(const JsonStream *s); + +/* Close and free all currently received input fds (used after consuming a message). */ +void json_stream_close_input_fds(JsonStream *s); + +/* I/O steps. Same return-value contract as the original varlink_{write,read,parse_message}: + * 1 = made progress (call again), + * 0 = nothing to do (wait for I/O), + * <0 = error. */ +int json_stream_write(JsonStream *s); +int json_stream_read(JsonStream *s); + +/* Extract the next complete JSON message from the input buffer (delimited per + * params.delimiter). Returns 1 with *ret set on success, 0 if no full message is + * available yet (with *ret == NULL), <0 on parse error. The buffer slot occupied by the + * parsed message is erased if input_sensitive was set. */ +int json_stream_parse(JsonStream *s, sd_json_variant **ret); + +/* Status accessors used by the consumer's state machine. */ +bool json_stream_has_buffered_input(const JsonStream *s); + +/* Compute the poll events the consumer should wait for. The stream queries the consumer's + * phase via the registered get_phase callback. In JSON_STREAM_PHASE_READING the stream asks + * for POLLIN (provided the input buffer is empty and the read side is still alive); POLLOUT + * is added whenever there's pending output. When connecting we only ask for POLLOUT to + * learn when the non-blocking connect() completes. */ +int json_stream_get_events(const JsonStream *s); + +/* Block on poll() for the configured fds for at most `timeout` µs. Internally updates the + * connecting / got_pollhup state based on the seen revents. + * 1 = some event was observed (call us again), + * 0 = timeout, + * <0 = error (negative errno from ppoll_usec). */ +int json_stream_wait(JsonStream *s, usec_t timeout); + +/* Block until the output buffer is fully drained (or the write side disconnects). + * 1 = some bytes were written during the flush, + * 0 = nothing to flush, + * -ECONNRESET if the write side became disconnected before everything could be sent, + * <0 on other I/O errors. */ +int json_stream_flush(JsonStream *s); + +/* Peer credential helpers. All refuse if the stream uses different input/output fds, since + * peer credentials are only meaningful for a bidirectional socket. + * • acquire_peer_uid/gid/pid/pidfd() query the kernel on first use, cache the result, + * and log failures (using the stream's description). They each return 0 on success + * with the value in *ret, or a negative errno on failure (kernel error or invalid + * field). + * • get_peer_ucred() returns the *already-cached* ucred (set via a prior acquire or via + * set_peer_ucred()) without triggering a kernel query — returns -ENODATA if nothing is + * cached. Used by consumers that want to react to a previously-known ucred without + * forcing a fresh query (e.g. teardown bookkeeping). */ +int json_stream_acquire_peer_uid(JsonStream *s, uid_t *ret); +int json_stream_acquire_peer_gid(JsonStream *s, gid_t *ret); +int json_stream_acquire_peer_pid(JsonStream *s, pid_t *ret); +int json_stream_acquire_peer_pidfd(JsonStream *s); +int json_stream_get_peer_ucred(const JsonStream *s, struct ucred *ret); +void json_stream_set_peer_ucred(JsonStream *s, const struct ucred *ucred); + +/* Per-operation idle timeout. The deadline is computed as last_activity + timeout. + * Successful writes refresh last_activity automatically; the consumer should also call + * json_stream_mark_activity() at operation start (e.g. when initiating a method call) to + * reset the deadline. + * + * When the deadline elapses the time event source attached via json_stream_attach_event() + * fires and the consumer's dispatch callback is invoked. The consumer detects the timeout + * by comparing now(CLOCK_MONOTONIC) against json_stream_get_timeout(). */ +void json_stream_set_timeout(JsonStream *s, usec_t timeout); +void json_stream_mark_activity(JsonStream *s); + +/* Returns the absolute deadline (in CLOCK_MONOTONIC microseconds) currently in force for + * the consumer's phase, or USEC_INFINITY if no timeout applies (no timeout configured, no + * activity yet, or the current phase isn't AWAITING_REPLY). */ +usec_t json_stream_get_timeout(const JsonStream *s); + +/* sd-event integration. JsonStream owns the input/output io event sources and the time + * event source for its idle timeout, and installs its own internal prepare and io callbacks + * on them. The hooks (get_phase, io_dispatch) supplied via JsonStreamParams at construction + * are wired up automatically. */ +int json_stream_attach_event(JsonStream *s, sd_event *event, int64_t priority); +void json_stream_detach_event(JsonStream *s); +sd_event* json_stream_get_event(const JsonStream *s); diff --git a/src/libsystemd/sd-varlink/sd-varlink.c b/src/libsystemd/sd-varlink/sd-varlink.c index a9bbb8f79c1..fdcbcff0e1f 100644 --- a/src/libsystemd/sd-varlink/sd-varlink.c +++ b/src/libsystemd/sd-varlink/sd-varlink.c @@ -18,8 +18,6 @@ #include "format-util.h" #include "glyph-util.h" #include "hashmap.h" -#include "io-util.h" -#include "iovec-util.h" #include "json-util.h" #include "list.h" #include "log.h" @@ -44,10 +42,7 @@ #define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U #define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC) -#define VARLINK_BUFFER_MAX (16U*1024U*1024U) -#define VARLINK_READ_SIZE (64U*1024U) #define VARLINK_COLLECT_MAX 1024U -#define VARLINK_QUEUE_MAX (64U*1024U) static const char* const varlink_state_table[_VARLINK_STATE_MAX] = { [VARLINK_IDLE_CLIENT] = "idle-client", @@ -75,39 +70,8 @@ static const char* const varlink_state_table[_VARLINK_STATE_MAX] = { DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state, VarlinkState); -static int varlink_format_queue(sd_varlink *v); static void varlink_server_test_exit_on_idle(sd_varlink_server *s); -static VarlinkJsonQueueItem* varlink_json_queue_item_free(VarlinkJsonQueueItem *q) { - if (!q) - return NULL; - - sd_json_variant_unref(q->data); - close_many(q->fds, q->n_fds); - - return mfree(q); -} - -static VarlinkJsonQueueItem* varlink_json_queue_item_new(sd_json_variant *m, const int fds[], size_t n_fds) { - VarlinkJsonQueueItem *q; - - assert(m); - assert(fds || n_fds == 0); - - q = malloc(offsetof(VarlinkJsonQueueItem, fds) + sizeof(int) * n_fds); - if (!q) - return NULL; - - *q = (VarlinkJsonQueueItem) { - .data = sd_json_variant_ref(m), - .n_fds = n_fds, - }; - - memcpy_safe(q->fds, fds, n_fds * sizeof(int)); - - return TAKE_PTR(q); -} - static void varlink_set_state(sd_varlink *v, VarlinkState state) { assert(v); assert(state >= 0 && state < _VARLINK_STATE_MAX); @@ -124,8 +88,40 @@ static void varlink_set_state(sd_varlink *v, VarlinkState state) { v->state = state; } +/* Map the varlink state machine onto the generic transport-level "phase". The transport + * uses this to decide whether to ask for POLLIN, whether the connection is salvageable + * after a read/write disconnect, and whether the idle timeout deadline is in force. */ +static JsonStreamPhase varlink_phase(void *userdata) { + sd_varlink *v = ASSERT_PTR(userdata); + + /* Client side reading a reply with the per-call deadline in force. */ + if (IN_SET(v->state, + VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, + VARLINK_CALLING, VARLINK_COLLECTING) && + !v->current) + return JSON_STREAM_PHASE_AWAITING_REPLY; + + /* Server side reading the next request — no deadline applies. */ + if (v->state == VARLINK_IDLE_SERVER && !v->current) + return JSON_STREAM_PHASE_READING; + + if (v->state == VARLINK_IDLE_CLIENT) + return JSON_STREAM_PHASE_IDLE_CLIENT; + + if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) + return JSON_STREAM_PHASE_PENDING_OUTPUT; + + return JSON_STREAM_PHASE_OTHER; +} + +static int varlink_dispatch(void *userdata) { + sd_varlink *v = ASSERT_PTR(userdata); + return sd_varlink_process(v); +} + static int varlink_new(sd_varlink **ret) { - sd_varlink *v; + _cleanup_(sd_varlink_unrefp) sd_varlink *v = NULL; + int r; assert(ret); @@ -135,32 +131,28 @@ static int varlink_new(sd_varlink **ret) { *v = (sd_varlink) { .n_ref = 1, - .input_fd = -EBADF, - .output_fd = -EBADF, - .state = _VARLINK_STATE_INVALID, - - .ucred = UCRED_INVALID, - - .peer_pidfd = -EBADF, - - .timestamp = USEC_INFINITY, - .timeout = VARLINK_DEFAULT_TIMEOUT_USEC, - - .allow_fd_passing_input = -1, - - .af = -1, - .exec_pidref = PIDREF_NULL, }; - *ret = v; + r = json_stream_init( + &v->stream, + &(JsonStreamParams) { + .phase = varlink_phase, + .dispatch = varlink_dispatch, + .userdata = v, + }); + if (r < 0) + return r; + + json_stream_set_timeout(&v->stream, VARLINK_DEFAULT_TIMEOUT_USEC); + + *ret = TAKE_PTR(v); return 0; } _public_ int sd_varlink_connect_address(sd_varlink **ret, const char *address) { _cleanup_(sd_varlink_unrefp) sd_varlink *v = NULL; - union sockaddr_union sockaddr; int r; assert_return(ret, -EINVAL); @@ -170,39 +162,9 @@ _public_ int sd_varlink_connect_address(sd_varlink **ret, const char *address) { if (r < 0) return log_debug_errno(r, "Failed to create varlink object: %m"); - v->input_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); - if (v->input_fd < 0) - return log_debug_errno(errno, "Failed to create AF_UNIX socket: %m"); - - v->output_fd = v->input_fd = fd_move_above_stdio(v->input_fd); - v->af = AF_UNIX; - - r = sockaddr_un_set_path(&sockaddr.un, address); - if (r < 0) { - if (r != -ENAMETOOLONG) - return log_debug_errno(r, "Failed to set socket address '%s': %m", address); - - /* This is a file system path, and too long to fit into sockaddr_un. Let's connect via O_PATH - * to this socket. */ - - r = connect_unix_path(v->input_fd, AT_FDCWD, address); - } else - r = RET_NERRNO(connect(v->input_fd, &sockaddr.sa, r)); - - if (r < 0) { - if (!IN_SET(r, -EAGAIN, -EINPROGRESS)) - return log_debug_errno(r, "Failed to connect to %s: %m", address); - - v->connecting = true; /* We are asynchronously connecting, i.e. the connect() is being - * processed in the background. As long as that's the case the socket - * is in a special state: it's there, we can poll it for EPOLLOUT, but - * if we attempt to write() to it before we see EPOLLOUT we'll get - * ENOTCONN (and not EAGAIN, like we would for a normal connected - * socket that isn't writable at the moment). Since ENOTCONN on write() - * hence can mean two different things (i.e. connection not complete - * yet vs. already disconnected again), we store as a boolean whether - * we are still in connect(). */ - } + r = json_stream_connect_address(&v->stream, address); + if (r < 0) + return r; varlink_set_state(v, VARLINK_IDLE_CLIENT); @@ -291,8 +253,11 @@ _public_ int sd_varlink_connect_exec(sd_varlink **ret, const char *_command, cha if (r < 0) return log_debug_errno(r, "Failed to create varlink object: %m"); - v->output_fd = v->input_fd = TAKE_FD(pair[0]); - v->af = AF_UNIX; + int conn_fd = TAKE_FD(pair[0]); + r = json_stream_attach_fds(&v->stream, conn_fd, conn_fd); + if (r < 0) + return r; + v->exec_pidref = TAKE_PIDREF(pidref); varlink_set_state(v, VARLINK_IDLE_CLIENT); @@ -375,8 +340,11 @@ static int varlink_connect_ssh_unix(sd_varlink **ret, const char *where) { if (r < 0) return log_debug_errno(r, "Failed to create varlink object: %m"); - v->output_fd = v->input_fd = TAKE_FD(pair[0]); - v->af = AF_UNIX; + int conn_fd = TAKE_FD(pair[0]); + r = json_stream_attach_fds(&v->stream, conn_fd, conn_fd); + if (r < 0) + return r; + v->exec_pidref = TAKE_PIDREF(pidref); varlink_set_state(v, VARLINK_IDLE_CLIENT); @@ -467,9 +435,10 @@ static int varlink_connect_ssh_exec(sd_varlink **ret, const char *where) { if (r < 0) return log_debug_errno(r, "Failed to create varlink object: %m"); - v->input_fd = TAKE_FD(output_pipe[0]); - v->output_fd = TAKE_FD(input_pipe[1]); - v->af = AF_UNSPEC; + r = json_stream_attach_fds(&v->stream, TAKE_FD(output_pipe[0]), TAKE_FD(input_pipe[1])); + if (r < 0) + return r; + v->exec_pidref = TAKE_PIDREF(pidref); varlink_set_state(v, VARLINK_IDLE_CLIENT); @@ -575,35 +544,23 @@ _public_ int sd_varlink_connect_url(sd_varlink **ret, const char *url) { } _public_ int sd_varlink_connect_fd_pair(sd_varlink **ret, int input_fd, int output_fd, const struct ucred *override_ucred) { - sd_varlink *v; + _cleanup_(sd_varlink_unrefp) sd_varlink *v = NULL; int r; assert_return(ret, -EINVAL); assert_return(input_fd >= 0, -EBADF); assert_return(output_fd >= 0, -EBADF); - r = fd_nonblock(input_fd, true); - if (r < 0) - return log_debug_errno(r, "Failed to make input fd %d nonblocking: %m", input_fd); - - if (input_fd != output_fd) { - r = fd_nonblock(output_fd, true); - if (r < 0) - return log_debug_errno(r, "Failed to make output fd %d nonblocking: %m", output_fd); - } - r = varlink_new(&v); if (r < 0) return log_debug_errno(r, "Failed to create varlink object: %m"); - v->input_fd = input_fd; - v->output_fd = output_fd; - v->af = -1; + r = json_stream_connect_fd_pair(&v->stream, input_fd, output_fd); + if (r < 0) + return r; - if (override_ucred) { - v->ucred = *override_ucred; - v->ucred_acquired = true; - } + if (override_ucred) + json_stream_set_peer_ucred(&v->stream, override_ucred); varlink_set_state(v, VARLINK_IDLE_CLIENT); @@ -614,7 +571,7 @@ _public_ int sd_varlink_connect_fd_pair(sd_varlink **ret, int input_fd, int outp * varlink_connect_address() above, as there we do handle asynchronous connections ourselves and * avoid doing write() on it before we saw EPOLLOUT for the first time. */ - *ret = v; + *ret = TAKE_PTR(v); return 0; } @@ -622,16 +579,6 @@ _public_ int sd_varlink_connect_fd(sd_varlink **ret, int fd) { return sd_varlink_connect_fd_pair(ret, fd, fd, /* override_ucred= */ NULL); } -static void varlink_detach_event_sources(sd_varlink *v) { - assert(v); - - v->input_event_source = sd_event_source_disable_unref(v->input_event_source); - v->output_event_source = sd_event_source_disable_unref(v->output_event_source); - v->time_event_source = sd_event_source_disable_unref(v->time_event_source); - v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source); - v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source); -} - static void varlink_clear_current(sd_varlink *v) { assert(v); @@ -641,11 +588,9 @@ static void varlink_clear_current(sd_varlink *v) { v->current_method = NULL; v->current_reply_flags = 0; - close_many(v->input_fds, v->n_input_fds); - v->input_fds = mfree(v->input_fds); - v->n_input_fds = 0; + json_stream_close_input_fds(&v->stream); - v->previous = varlink_json_queue_item_free(v->previous); + v->previous = json_stream_queue_item_free(v->previous); if (v->sentinel != POINTER_MAX) v->sentinel = mfree(v->sentinel); else @@ -655,39 +600,17 @@ static void varlink_clear_current(sd_varlink *v) { static void varlink_clear(sd_varlink *v) { assert(v); - varlink_detach_event_sources(v); - - if (v->input_fd != v->output_fd) { - v->input_fd = safe_close(v->input_fd); - v->output_fd = safe_close(v->output_fd); - } else - v->output_fd = v->input_fd = safe_close(v->input_fd); + /* Detach event sources first so the kernel no longer has epoll watches on the + * stream's fds, then free the stream — json_stream_done() closes the input/output + * fds, the cached peer_pidfd, the received input fds, the queued output fds, and + * the pushed fds. */ + sd_varlink_detach_event(v); varlink_clear_current(v); - v->input_buffer = v->input_sensitive ? erase_and_free(v->input_buffer) : mfree(v->input_buffer); - v->output_buffer = v->output_buffer_sensitive ? erase_and_free(v->output_buffer) : mfree(v->output_buffer); - - v->input_control_buffer = mfree(v->input_control_buffer); - v->input_control_buffer_size = 0; - - close_many(v->output_fds, v->n_output_fds); - v->output_fds = mfree(v->output_fds); - v->n_output_fds = 0; - - close_many(v->pushed_fds, v->n_pushed_fds); - v->pushed_fds = mfree(v->pushed_fds); - v->n_pushed_fds = 0; - - LIST_CLEAR(queue, v->output_queue, varlink_json_queue_item_free); - v->output_queue_tail = NULL; - v->n_output_queue = 0; - - v->event = sd_event_unref(v->event); + json_stream_done(&v->stream); pidref_done_sigterm_wait(&v->exec_pidref); - - v->peer_pidfd = safe_close(v->peer_pidfd); } static sd_varlink* varlink_destroy(sd_varlink *v) { @@ -700,7 +623,6 @@ static sd_varlink* varlink_destroy(sd_varlink *v) { varlink_clear(v); - free(v->description); return mfree(v); } @@ -709,405 +631,67 @@ DEFINE_PUBLIC_TRIVIAL_REF_UNREF_FUNC(sd_varlink, sd_varlink, varlink_destroy); static int varlink_test_disconnect(sd_varlink *v) { assert(v); - /* Tests whether we the connection has been terminated. We are careful to not stop processing it - * prematurely, since we want to handle half-open connections as well as possible and want to flush - * out and read data before we close down if we can. */ - /* Already disconnected? */ if (!VARLINK_STATE_IS_ALIVE(v->state)) return 0; - /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */ - if (v->connecting) - return 0; - - /* Still something to write and we can write? Stay around */ - if (v->output_buffer_size > 0 && !v->write_disconnected) + if (!json_stream_should_disconnect(&v->stream)) return 0; - /* Both sides gone already? Then there's no need to stick around */ - if (v->read_disconnected && v->write_disconnected) - goto disconnect; - - /* If we are waiting for incoming data but the read side is shut down, disconnect. */ - if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER) && v->read_disconnected) - goto disconnect; - - /* Similar, if are a client that hasn't written anything yet but the write side is dead, also - * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side - * being down if we never wrote anything. */ - if (v->state == VARLINK_IDLE_CLIENT && (v->write_disconnected || v->got_pollhup)) - goto disconnect; - - /* We are on the server side and still want to send out more replies, but we saw POLLHUP already, and - * either got no buffered bytes to write anymore or already saw a write error. In that case we should - * shut down the varlink link. */ - if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE) && (v->write_disconnected || v->output_buffer_size == 0) && v->got_pollhup) - goto disconnect; - - return 0; - -disconnect: varlink_set_state(v, VARLINK_PENDING_DISCONNECT); return 1; } static int varlink_write(sd_varlink *v) { - ssize_t n; - int r; - assert(v); if (!VARLINK_STATE_IS_ALIVE(v->state)) return 0; - if (v->connecting) /* Writing while we are still wait for a non-blocking connect() to complete will - * result in ENOTCONN, hence exit early here */ - return 0; - if (v->write_disconnected) - return 0; - - /* If needed let's convert some output queue json variants into text form */ - r = varlink_format_queue(v); - if (r < 0) - return r; - - if (v->output_buffer_size == 0) - return 0; - - assert(v->output_fd >= 0); - - if (v->n_output_fds > 0) { /* If we shall send fds along, we must use sendmsg() */ - struct iovec iov = { - .iov_base = v->output_buffer + v->output_buffer_index, - .iov_len = v->output_buffer_size, - }; - struct msghdr mh = { - .msg_iov = &iov, - .msg_iovlen = 1, - .msg_controllen = CMSG_SPACE(sizeof(int) * v->n_output_fds), - }; - - mh.msg_control = alloca0(mh.msg_controllen); - - struct cmsghdr *control = CMSG_FIRSTHDR(&mh); - control->cmsg_len = CMSG_LEN(sizeof(int) * v->n_output_fds); - control->cmsg_level = SOL_SOCKET; - control->cmsg_type = SCM_RIGHTS; - memcpy(CMSG_DATA(control), v->output_fds, sizeof(int) * v->n_output_fds); - - n = sendmsg(v->output_fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL); - } else { - /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible - * with non-socket IO, hence fall back automatically. - * - * Use a local variable to help gcc figure out that we set 'n' in all cases. */ - bool prefer_write = v->prefer_write; - if (!prefer_write) { - n = send(v->output_fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL); - if (n < 0 && errno == ENOTSOCK) - prefer_write = v->prefer_write = true; - } - if (prefer_write) - n = write(v->output_fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size); - } - if (n < 0) { - if (errno == EAGAIN) - return 0; - - if (ERRNO_IS_DISCONNECT(errno)) { - /* If we get informed about a disconnect on write, then let's remember that, but not - * act on it just yet. Let's wait for read() to report the issue first. */ - v->write_disconnected = true; - return 1; - } - - return -errno; - } - - if (v->output_buffer_sensitive) - explicit_bzero_safe(v->output_buffer + v->output_buffer_index, n); - - v->output_buffer_size -= n; - - if (v->output_buffer_size == 0) { - v->output_buffer_index = 0; - v->output_buffer_sensitive = false; /* We can reset the sensitive flag once the buffer is empty */ - } else - v->output_buffer_index += n; - - close_many(v->output_fds, v->n_output_fds); - v->n_output_fds = 0; - - v->timestamp = now(CLOCK_MONOTONIC); - return 1; -} - -#define VARLINK_FDS_MAX (16U*1024U) - -static bool varlink_may_protocol_upgrade(sd_varlink *v) { - return v->protocol_upgrade || (v->server && FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE)); -} - -/* When a protocol upgrade might happen, peek at the socket data to find the \0 message - * boundary and return a read size that won't consume past it. This prevents over-reading - * raw post-upgrade data into the varlink input buffer. Falls back to byte-by-byte for - * non-socket fds where MSG_PEEK is not available. */ -static ssize_t varlink_peek_upgrade_boundary(sd_varlink *v, void *p, size_t rs) { - assert(v); - - if (!varlink_may_protocol_upgrade(v)) - return rs; - - if (v->prefer_read) - return 1; - - ssize_t peeked = recv(v->input_fd, p, rs, MSG_PEEK|MSG_DONTWAIT); - if (peeked < 0) { - if (errno == ENOTSOCK) { - v->prefer_read = true; - return 1; /* Not a socket, fall back to byte-to-byte */ - } else if (!ERRNO_IS_TRANSIENT(errno)) - return -errno; - - /* Transient error, this should not happen but fall back to byte-to-byte */ - return 1; - } - /* EOF, the real recv() will also get it so what we return does not matter */ - if (peeked == 0) - return rs; - - void *nul_chr = memchr(p, 0, peeked); - if (nul_chr) - return (ssize_t) ((char*) nul_chr - (char*) p) + 1; - return peeked; + return json_stream_write(&v->stream); } static int varlink_read(sd_varlink *v) { - struct iovec iov; - struct msghdr mh; - ssize_t rs; - ssize_t n; - void *p; - assert(v); if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER)) return 0; - if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */ - return 0; if (v->current) return 0; - if (v->input_buffer_unscanned > 0) - return 0; - if (v->read_disconnected) - return 0; - - if (v->input_buffer_size >= VARLINK_BUFFER_MAX) - return -ENOBUFS; - - assert(v->input_fd >= 0); - - if (MALLOC_SIZEOF_SAFE(v->input_buffer) <= v->input_buffer_index + v->input_buffer_size) { - size_t add; - - add = MIN(VARLINK_BUFFER_MAX - v->input_buffer_size, VARLINK_READ_SIZE); - - if (v->input_buffer_index == 0) { - - if (!GREEDY_REALLOC(v->input_buffer, v->input_buffer_size + add)) - return -ENOMEM; - - } else { - char *b; - - b = new(char, v->input_buffer_size + add); - if (!b) - return -ENOMEM; - - memcpy(b, v->input_buffer + v->input_buffer_index, v->input_buffer_size); - - free_and_replace(v->input_buffer, b); - v->input_buffer_index = 0; - } - } - - p = v->input_buffer + v->input_buffer_index + v->input_buffer_size; - - rs = MALLOC_SIZEOF_SAFE(v->input_buffer) - (v->input_buffer_index + v->input_buffer_size); - - /* When a protocol upgrade is requested we can't consume any post-upgrade data from the socket - * buffer. Use MSG_PEEK to find the \0 message boundary and only consume up to it. For non-socket - * fds (pipes) MSG_PEEK is not available, so fall back to byte-by-byte reading. */ - rs = varlink_peek_upgrade_boundary(v, p, rs); - if (rs < 0) - return varlink_log_errno(v, rs, "Failed to peek upgrade boundary: %m"); - - if (v->allow_fd_passing_input > 0) { - iov = IOVEC_MAKE(p, rs); - - /* Allocate the fd buffer on the heap, since we need a lot of space potentially */ - if (!v->input_control_buffer) { - v->input_control_buffer_size = CMSG_SPACE(sizeof(int) * VARLINK_FDS_MAX); - v->input_control_buffer = malloc(v->input_control_buffer_size); - if (!v->input_control_buffer) - return -ENOMEM; - } - - mh = (struct msghdr) { - .msg_iov = &iov, - .msg_iovlen = 1, - .msg_control = v->input_control_buffer, - .msg_controllen = v->input_control_buffer_size, - }; - - n = recvmsg_safe(v->input_fd, &mh, MSG_DONTWAIT|MSG_CMSG_CLOEXEC); - } else { - bool prefer_read = v->prefer_read; - if (!prefer_read) { - n = recv(v->input_fd, p, rs, MSG_DONTWAIT); - if (n < 0) - n = -errno; - if (n == -ENOTSOCK) - prefer_read = v->prefer_read = true; - } - if (prefer_read) { - n = read(v->input_fd, p, rs); - if (n < 0) - n = -errno; - } - } - if (ERRNO_IS_NEG_TRANSIENT(n)) - return 0; - if (ERRNO_IS_NEG_DISCONNECT(n)) { - v->read_disconnected = true; - return 1; - } - if (n < 0) - return n; - if (n == 0) { /* EOF */ - - if (v->allow_fd_passing_input > 0) - cmsg_close_all(&mh); - v->read_disconnected = true; - return 1; - } - - if (v->allow_fd_passing_input > 0) { - struct cmsghdr *cmsg; - - cmsg = cmsg_find(&mh, SOL_SOCKET, SCM_RIGHTS, (socklen_t) -1); - if (cmsg) { - size_t add; - - /* We only allow file descriptors to be passed along with the first byte of a - * message. If they are passed with any other byte this is a protocol violation. */ - if (v->input_buffer_size != 0) { - cmsg_close_all(&mh); - return -EPROTO; - } - - add = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int); - if (add > INT_MAX - v->n_input_fds) { - cmsg_close_all(&mh); - return -EBADF; - } - - if (!GREEDY_REALLOC(v->input_fds, v->n_input_fds + add)) { - cmsg_close_all(&mh); - return -ENOMEM; - } - - memcpy_safe(v->input_fds + v->n_input_fds, CMSG_TYPED_DATA(cmsg, int), add * sizeof(int)); - v->n_input_fds += add; - } - } - - v->input_buffer_size += n; - v->input_buffer_unscanned += n; - - return 1; + return json_stream_read(&v->stream); } static int varlink_parse_message(sd_varlink *v) { - const char *e; - char *begin; - size_t sz; int r; assert(v); if (v->current) return 0; - if (v->input_buffer_unscanned <= 0) - return 0; - - assert(v->input_buffer_unscanned <= v->input_buffer_size); - assert(v->input_buffer_index + v->input_buffer_size <= MALLOC_SIZEOF_SAFE(v->input_buffer)); - - begin = v->input_buffer + v->input_buffer_index; - e = memchr(begin + v->input_buffer_size - v->input_buffer_unscanned, 0, v->input_buffer_unscanned); - if (!e) { - v->input_buffer_unscanned = 0; - return 0; - } - - sz = e - begin + 1; - - r = sd_json_parse(begin, SD_JSON_PARSE_MUST_BE_OBJECT, &v->current, /* reterr_line= */ NULL, /* reterr_column= */ NULL); - if (v->input_sensitive) - explicit_bzero_safe(begin, sz); - if (r < 0) { - /* If we encounter a parse failure flush all data. We cannot possibly recover from this, - * hence drop all buffered data now. */ - v->input_buffer_index = v->input_buffer_size = v->input_buffer_unscanned = 0; - return varlink_log_errno(v, r, "Failed to parse JSON object: %m"); - } + r = json_stream_parse(&v->stream, &v->current); + if (r <= 0) + return r; - if (v->input_sensitive) { + if (json_stream_flags_set(&v->stream, JSON_STREAM_INPUT_SENSITIVE)) { /* Mark the parameters subfield as sensitive right-away, if that's requested */ sd_json_variant *parameters = sd_json_variant_by_key(v->current, "parameters"); if (parameters) sd_json_variant_sensitive(parameters); } - if (DEBUG_LOGGING) { - _cleanup_(erase_and_freep) char *censored_text = NULL; - - /* Suppress sensitive fields in the debug output */ - r = sd_json_variant_format(v->current, /* flags= */ SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text); - if (r < 0) - return r; - - varlink_log(v, "Received message: %s", censored_text); - } - - v->input_buffer_size -= sz; - - if (v->input_buffer_size == 0) - v->input_buffer_index = 0; - else - v->input_buffer_index += sz; - - v->input_buffer_unscanned = v->input_buffer_size; return 1; } static int varlink_test_timeout(sd_varlink *v) { assert(v); - if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING)) - return 0; - if (v->timeout == USEC_INFINITY) - return 0; - - if (now(CLOCK_MONOTONIC) < usec_add(v->timestamp, v->timeout)) + usec_t deadline = json_stream_get_timeout(&v->stream); + if (deadline == USEC_INFINITY || now(CLOCK_MONOTONIC) < deadline) return 0; varlink_set_state(v, VARLINK_PENDING_TIMEOUT); - return 1; } @@ -1348,157 +932,11 @@ static int generic_method_get_interface_description( r = sd_varlink_idl_format(interface, &text); if (r < 0) - return r; - - return sd_varlink_replybo( - link, - SD_JSON_BUILD_PAIR_STRING("description", text)); -} - -static int varlink_format_json(sd_varlink *v, sd_json_variant *m) { - _cleanup_(erase_and_freep) char *text = NULL; - int sz, r; - - assert(v); - assert(m); - - sz = sd_json_variant_format(m, /* flags= */ 0, &text); - if (sz < 0) - return sz; - assert(text[sz] == '\0'); - - if (v->output_buffer_size + sz + 1 > VARLINK_BUFFER_MAX) - return -ENOBUFS; - - if (DEBUG_LOGGING) { - _cleanup_(erase_and_freep) char *censored_text = NULL; - - /* Suppress sensitive fields in the debug output */ - r = sd_json_variant_format(m, SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text); - if (r < 0) - return r; - - varlink_log(v, "Sending message: %s", censored_text); - } - - if (v->output_buffer_size == 0) { - - free_and_replace(v->output_buffer, text); - - v->output_buffer_size = sz + 1; - v->output_buffer_index = 0; - - } else if (v->output_buffer_index == 0) { - - if (!GREEDY_REALLOC(v->output_buffer, v->output_buffer_size + sz + 1)) - return -ENOMEM; - - memcpy(v->output_buffer + v->output_buffer_size, text, sz + 1); - v->output_buffer_size += sz + 1; - } else { - char *n; - const size_t new_size = v->output_buffer_size + sz + 1; - - n = new(char, new_size); - if (!n) - return -ENOMEM; - - memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, sz + 1); - - free_and_replace(v->output_buffer, n); - v->output_buffer_size = new_size; - v->output_buffer_index = 0; - } - - if (sd_json_variant_is_sensitive_recursive(m)) - v->output_buffer_sensitive = true; /* Propagate sensitive flag */ - else - text = mfree(text); /* No point in the erase_and_free() destructor declared above */ - - return 0; -} - -static int varlink_format_queue(sd_varlink *v) { - int r; - - assert(v); - - /* Takes entries out of the output queue and formats them into the output buffer. But only if this - * would not corrupt our fd message boundaries */ - - while (v->output_queue) { - assert(v->n_output_queue > 0); - - if (v->n_output_fds > 0) /* unwritten fds? if we'd add more we'd corrupt the fd message boundaries, hence wait */ - return 0; - - VarlinkJsonQueueItem *q = v->output_queue; - _cleanup_free_ int *array = NULL; - - if (q->n_fds > 0) { - array = newdup(int, q->fds, q->n_fds); - if (!array) - return -ENOMEM; - } - - r = varlink_format_json(v, q->data); - if (r < 0) - return r; - - /* Take possession of the queue element's fds */ - free_and_replace(v->output_fds, array); - v->n_output_fds = q->n_fds; - q->n_fds = 0; - - LIST_REMOVE(queue, v->output_queue, q); - if (!v->output_queue) - v->output_queue_tail = NULL; - v->n_output_queue--; - - varlink_json_queue_item_free(q); - } - - return 0; -} - -static int varlink_enqueue_item(sd_varlink *v, VarlinkJsonQueueItem *q) { - assert(v); - assert(q); - - if (v->n_output_queue >= VARLINK_QUEUE_MAX) - return -ENOBUFS; - - LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q); - v->output_queue_tail = q; - v->n_output_queue++; - return 0; -} - -static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) { - VarlinkJsonQueueItem *q; - - assert(v); - assert(m); - - /* If there are no file descriptors to be queued and no queue entries yet we can shortcut things and - * append this entry directly to the output buffer */ - if (v->n_pushed_fds == 0 && !v->output_queue) - return varlink_format_json(v, m); - - if (v->n_output_queue >= VARLINK_QUEUE_MAX) - return -ENOBUFS; - - /* Otherwise add a queue entry for this */ - q = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds); - if (!q) - return -ENOMEM; - - v->n_pushed_fds = 0; /* fds now belong to the queue entry */ - - /* We already checked the precondition ourselves so this call cannot fail. */ - assert_se(varlink_enqueue_item(v, q) >= 0); + return r; - return 0; + return sd_varlink_replybo( + link, + SD_JSON_BUILD_PAIR_STRING("description", text)); } static int varlink_dispatch_method(sd_varlink *v) { @@ -1586,10 +1024,17 @@ static int varlink_dispatch_method(sd_varlink *v) { (flags & SD_VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY : VARLINK_PROCESSING_METHOD); - v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE); - assert(v->server); + /* Reset the per-call upgrade marker on every dispatch — a previous method's + * UPGRADE flag must not bleed into this one. The transport-level bounded reads + * stay active for SD_VARLINK_SERVER_UPGRADABLE servers regardless. */ + v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE); + json_stream_set_flags( + &v->stream, + JSON_STREAM_BOUNDED_READS, + v->protocol_upgrade || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE)); + /* First consult user supplied method implementations */ callback = hashmap_get(v->server->methods, method); if (!callback) { @@ -1653,7 +1098,7 @@ static int varlink_dispatch_method(sd_varlink *v) { r = sd_varlink_error_errno(v, r); } else if (v->sentinel) { if (v->previous) { - r = varlink_enqueue_item(v, v->previous); + r = json_stream_enqueue_item(&v->stream, v->previous); if (r >= 0) { TAKE_PTR(v->previous); varlink_set_state(v, VARLINK_PROCESSED_METHOD); @@ -1883,85 +1328,13 @@ _public_ int sd_varlink_get_current_parameters(sd_varlink *v, sd_json_variant ** return 0; } -static void handle_revents(sd_varlink *v, int revents) { - assert(v); - - if (v->connecting) { - /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect() - * to complete on, we know we are ready. We don't read the connection error here though, - * we'll get the error on the next read() or write(). */ - if ((revents & (POLLOUT|POLLHUP)) == 0) - return; - - varlink_log(v, "Asynchronous connection completed."); - v->connecting = false; - } else { - /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing - * what we can. However, we do care about POLLHUP to detect connection termination even if we - * momentarily don't want to read nor write anything. */ - - if (!FLAGS_SET(revents, POLLHUP)) - return; - - varlink_log(v, "Got POLLHUP from socket."); - v->got_pollhup = true; - } -} - _public_ int sd_varlink_wait(sd_varlink *v, uint64_t timeout) { - int r, events; - usec_t t; - assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - r = sd_varlink_get_timeout(v, &t); - if (r < 0) - return r; - if (t != USEC_INFINITY) - t = usec_sub_unsigned(t, now(CLOCK_MONOTONIC)); - - t = MIN(t, timeout); - - events = sd_varlink_get_events(v); - if (events < 0) - return events; - - struct pollfd pollfd[2]; - size_t n_poll_fd = 0; - - if (v->input_fd == v->output_fd) { - pollfd[n_poll_fd++] = (struct pollfd) { - .fd = v->input_fd, - .events = events, - }; - } else { - pollfd[n_poll_fd++] = (struct pollfd) { - .fd = v->input_fd, - .events = events & POLLIN, - }; - pollfd[n_poll_fd++] = (struct pollfd) { - .fd = v->output_fd, - .events = events & POLLOUT, - }; - }; - - r = ppoll_usec(pollfd, n_poll_fd, t); - if (ERRNO_IS_NEG_TRANSIENT(r)) /* Treat EINTR as not a timeout, but also nothing happened, and - * the caller gets a chance to call back into us */ - return 1; - if (r <= 0) - return r; - - /* Merge the seen events into one */ - int revents = 0; - FOREACH_ARRAY(p, pollfd, n_poll_fd) - revents |= p->revents; - - handle_revents(v, revents); - return 1; + return json_stream_wait(&v->stream, timeout); } _public_ int sd_varlink_is_idle(sd_varlink *v) { @@ -1982,68 +1355,55 @@ _public_ int sd_varlink_is_connected(sd_varlink *v) { } _public_ int sd_varlink_get_fd(sd_varlink *v) { - assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - if (v->input_fd != v->output_fd) + + int input_fd = v->stream.input_fd; + int output_fd = v->stream.output_fd; + + if (input_fd != output_fd) return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "Separate file descriptors for input/output set."); - if (v->input_fd < 0) + if (input_fd < 0) return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid fd."); - return v->input_fd; + return input_fd; } _public_ int sd_varlink_get_input_fd(sd_varlink *v) { - assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - if (v->input_fd < 0) + + int input_fd = v->stream.input_fd; + if (input_fd < 0) return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid input fd."); - return v->input_fd; + return input_fd; } _public_ int sd_varlink_get_output_fd(sd_varlink *v) { - assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - if (v->output_fd < 0) + + int output_fd = v->stream.output_fd; + if (output_fd < 0) return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid output fd."); - return v->output_fd; + return output_fd; } _public_ int sd_varlink_get_events(sd_varlink *v) { - int ret = 0; - assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - if (v->connecting) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which - * tells us that the connection is now complete. Before that we should neither - * write() or read() from the fd. */ - return EPOLLOUT; - - if (!v->read_disconnected && - IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER) && - !v->current && - v->input_buffer_unscanned <= 0) - ret |= EPOLLIN; - - if (!v->write_disconnected && - (v->output_queue || - v->output_buffer_size > 0)) - ret |= EPOLLOUT; - - return ret; + return json_stream_get_events(&v->stream); } _public_ int sd_varlink_get_timeout(sd_varlink *v, uint64_t *ret) { @@ -2052,51 +1412,21 @@ _public_ int sd_varlink_get_timeout(sd_varlink *v, uint64_t *ret) { if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING) && - v->timeout != USEC_INFINITY) { - if (ret) - *ret = usec_add(v->timestamp, v->timeout); - return 1; - } else { - if (ret) - *ret = USEC_INFINITY; - return 0; - } + usec_t deadline = json_stream_get_timeout(&v->stream); + + if (ret) + *ret = deadline; + + return deadline != USEC_INFINITY; } _public_ int sd_varlink_flush(sd_varlink *v) { - int ret = 0, r; - assert_return(v, -EINVAL); if (v->state == VARLINK_DISCONNECTED) return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected."); - for (;;) { - if (v->output_buffer_size == 0 && !v->output_queue) - break; - if (v->write_disconnected) - return -ECONNRESET; - - r = varlink_write(v); - if (r < 0) - return r; - if (r > 0) { - ret = 1; - continue; - } - - r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY); - if (ERRNO_IS_NEG_TRANSIENT(r)) - continue; - if (r < 0) - return varlink_log_errno(v, r, "Poll failed on fd: %m"); - assert(r > 0); - - handle_revents(v, r); - } - - return ret; + return json_stream_flush(&v->stream); } static void varlink_detach_server(sd_varlink *v) { @@ -2107,18 +1437,22 @@ static void varlink_detach_server(sd_varlink *v) { if (!v->server) return; + /* Only touch by_uid for connections we already counted in count_connection() — + * those are exactly the ones for which the ucred was acquired or injected during + * sd_varlink_server_add_connection_pair(). Don't trigger an acquire from here. */ + struct ucred ucred; if (v->server->by_uid && - v->ucred_acquired && - uid_is_valid(v->ucred.uid)) { + json_stream_get_peer_ucred(&v->stream, &ucred) >= 0 && + uid_is_valid(ucred.uid)) { unsigned c; - c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(v->ucred.uid))); + c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(ucred.uid))); assert(c > 0); if (c == 1) - (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(v->ucred.uid)); + (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(ucred.uid)); else - (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(v->ucred.uid), UINT_TO_PTR(c - 1)); + (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(ucred.uid), UINT_TO_PTR(c - 1)); } assert(v->server->n_connections > 0); @@ -2194,12 +1528,12 @@ _public_ int sd_varlink_send(sd_varlink *v, const char *method, sd_json_variant if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); /* No state change here, this is one-way only after all */ - v->timestamp = now(CLOCK_MONOTONIC); + json_stream_mark_activity(&v->stream); return 0; } @@ -2241,13 +1575,13 @@ _public_ int sd_varlink_invoke(sd_varlink *v, const char *method, sd_json_varian if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); varlink_set_state(v, VARLINK_AWAITING_REPLY); v->n_pending++; - v->timestamp = now(CLOCK_MONOTONIC); + json_stream_mark_activity(&v->stream); return 0; } @@ -2292,13 +1626,13 @@ _public_ int sd_varlink_observe(sd_varlink *v, const char *method, sd_json_varia if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE); v->n_pending++; - v->timestamp = now(CLOCK_MONOTONIC); + json_stream_mark_activity(&v->stream); return 0; } @@ -2339,13 +1673,13 @@ static int varlink_call_internal(sd_varlink *v, sd_json_variant *request) { * that we can assign a new reply shortly. */ varlink_clear_current(v); - r = varlink_enqueue_json(v, request); + r = json_stream_enqueue(&v->stream, request); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); varlink_set_state(v, VARLINK_CALLING); v->n_pending++; - v->timestamp = now(CLOCK_MONOTONIC); + json_stream_mark_activity(&v->stream); while (v->state == VARLINK_CALLING) { r = sd_varlink_process(v); @@ -2441,44 +1775,41 @@ static int varlink_handle_upgrade_fds(sd_varlink *v, int *ret_input_fd, int *ret /* Ensure no post-upgrade data was consumed into our input buffer (we ensure this via MSG_PEEK or * byte-to-byte) and refuse the upgrade rather than silently losing the data. */ - if (v->input_buffer_size != 0) + if (json_stream_has_buffered_input(&v->stream)) return varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO), "Unexpected buffered data during protocol upgrade, refusing."); + _cleanup_close_ int input_fd = TAKE_FD(v->stream.input_fd), + output_fd = TAKE_FD(v->stream.output_fd); + /* Pass the connection fds to the caller, it owns them now. Reset to blocking mode * since callers of the upgraded protocol will generally expect normal blocking - * semantics. */ - r = fd_nonblock(v->input_fd, false); - if (r < 0) - return varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m"); - if (v->input_fd != v->output_fd) { - r = fd_nonblock(v->output_fd, false); + * semantics. For bidirectional sockets (input_fd == output_fd), dup the fd so that + * callers always get two independent fds they can close separately. */ + if (input_fd == output_fd) { + output_fd = fcntl(input_fd, F_DUPFD_CLOEXEC, 3); + if (output_fd < 0) + return varlink_log_errno(v, errno, "Failed to dup upgraded connection fd: %m"); + } else { + r = fd_nonblock(output_fd, false); if (r < 0) return varlink_log_errno(v, r, "Failed to set output fd to blocking mode: %m"); } - /* For bidirectional sockets (input_fd == output_fd), dup the fd so that callers - * always get two independent fds they can close separately. */ - if (v->input_fd == v->output_fd) { - v->output_fd = fcntl(v->input_fd, F_DUPFD_CLOEXEC, 3); - if (v->output_fd < 0) - return varlink_log_errno(v, errno, "Failed to dup upgraded connection fd: %m"); - } + r = fd_nonblock(input_fd, false); + if (r < 0) + return varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m"); /* Hand out requested fds, shut down unwanted directions. */ if (ret_input_fd) - *ret_input_fd = TAKE_FD(v->input_fd); - else { - (void) shutdown(v->input_fd, SHUT_RD); - v->input_fd = safe_close(v->input_fd); - } + *ret_input_fd = TAKE_FD(input_fd); + else + (void) shutdown(input_fd, SHUT_RD); if (ret_output_fd) - *ret_output_fd = TAKE_FD(v->output_fd); - else { - (void) shutdown(v->output_fd, SHUT_WR); - v->output_fd = safe_close(v->output_fd); - } + *ret_output_fd = TAKE_FD(output_fd); + else + (void) shutdown(output_fd, SHUT_WR); return 0; } @@ -2508,14 +1839,16 @@ _public_ int sd_varlink_call_and_upgrade( return varlink_log_errno(v, r, "Failed to build json message: %m"); v->protocol_upgrade = true; + json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, true); r = varlink_call_internal(v, m); if (r < 0) { v->protocol_upgrade = false; + json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, false); return r; } /* ensure we did not consume any data from the upgraded protocol */ - assert(v->input_buffer_size == 0); + assert(!json_stream_has_buffered_input(&v->stream)); sd_json_variant *e = sd_json_variant_by_key(v->current, "error"), *p = sd_json_variant_by_key(v->current, "parameters"); @@ -2555,6 +1888,7 @@ _public_ int sd_varlink_call_and_upgrade( finish: v->protocol_upgrade = false; + json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, false); assert(v->n_pending == 1); v->n_pending--; return r; @@ -2647,13 +1981,13 @@ _public_ int sd_varlink_collect_full( if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); varlink_set_state(v, VARLINK_COLLECTING); v->n_pending++; - v->timestamp = now(CLOCK_MONOTONIC); + json_stream_mark_activity(&v->stream); for (;;) { while (v->state == VARLINK_COLLECTING) { @@ -2815,24 +2149,23 @@ _public_ int sd_varlink_reply(sd_varlink *v, sd_json_variant *parameters) { if (more && v->sentinel) { if (v->previous) { - r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true); + r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true); if (r < 0) return r; - r = varlink_enqueue_item(v, v->previous); + r = json_stream_enqueue_item(&v->stream, v->previous); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); } - v->previous = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds); - if (!v->previous) - return -ENOMEM; + r = json_stream_make_queue_item(&v->stream, m, &v->previous); + if (r < 0) + return r; - v->n_pushed_fds = 0; /* fds now belong to the queue entry */ return 1; } - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -2891,7 +2224,7 @@ _public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parame * client). In normal operation this cannot happen because the client waits for our reply before * sending raw data, and we set protocol_upgrade=true in dispatch to limit subsequent reads to * single bytes. But a misbehaving client could pipeline data early. */ - if (v->input_buffer_size > 0) + if (json_stream_has_buffered_input(&v->stream)) return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADMSG), "Unexpected buffered data from client during protocol upgrade."); @@ -2912,40 +2245,20 @@ _public_ int sd_varlink_reply_and_upgrade(sd_varlink *v, sd_json_variant *parame if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); /* Flush the reply to the socket before stealing the fds. The reply must be fully written * before the caller starts speaking the upgraded protocol. */ - for (;;) { - r = varlink_write(v); - if (r < 0) { - varlink_log_errno(v, r, "Failed to flush reply: %m"); - goto disconnect; - } - if (v->output_buffer_size == 0 && !v->output_queue) - break; - if (v->write_disconnected) { - r = varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET), - "Write disconnected during upgrade reply flush."); - goto disconnect; - } - - r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY); - if (ERRNO_IS_NEG_TRANSIENT(r)) - continue; - if (r < 0) { - varlink_log_errno(v, r, "Failed to wait for writable fd: %m"); - goto disconnect; - } - assert(r > 0); - - handle_revents(v, r); + r = json_stream_flush(&v->stream); + if (r < 0) { + varlink_log_errno(v, r, "Failed to flush reply before protocol upgrade: %m"); + goto disconnect; } /* Detach from the event loop before stealing the fds */ - varlink_detach_event_sources(v); + sd_varlink_detach_event(v); /* Now hand the original FDs over to the caller, from this point on we have nothing to do with the * connection anymore, it's up to the caller and we close the connection below */ @@ -2966,8 +2279,7 @@ _public_ int sd_varlink_reset_fds(sd_varlink *v) { * rollback the fds. Note that this is implicitly called whenever an error reply is sent, see * below. */ - close_many(v->pushed_fds, v->n_pushed_fds); - v->n_pushed_fds = 0; + json_stream_reset_pushed_fds(&v->stream); return 0; } @@ -2986,14 +2298,14 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy."); if (v->previous) { - r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true); + r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true); if (r < 0) return r; /* If we have a previous reply still ready make sure we queue it before the error. We only * ever set "previous" if we're in a streaming method so we pass more=true unconditionally * here as we know we're still going to queue an error afterwards. */ - r = varlink_enqueue_item(v, v->previous); + r = json_stream_enqueue_item(&v->stream, v->previous); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -3028,7 +2340,7 @@ _public_ int sd_varlink_error(sd_varlink *v, const char *error_id, sd_json_varia if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -3168,7 +2480,7 @@ _public_ int sd_varlink_notify(sd_varlink *v, sd_json_variant *parameters) { if (r < 0) return varlink_log_errno(v, r, "Failed to build json message: %m"); - r = varlink_enqueue_json(v, m); + r = json_stream_enqueue(&v->stream, m); if (r < 0) return varlink_log_errno(v, r, "Failed to enqueue json message: %m"); @@ -3262,99 +2574,38 @@ _public_ int sd_varlink_set_sentinel(sd_varlink *v, const char *error_id) { return 0; } -static int varlink_acquire_ucred(sd_varlink *v) { - int r; - - assert(v); - - if (v->ucred_acquired) - return 0; - - /* If we are connected asymmetrically, let's refuse, since it's not clear if caller wants to know - * peer on read or write fd */ - if (v->input_fd != v->output_fd) - return -EBADF; - - r = getpeercred(v->input_fd, &v->ucred); - if (r < 0) - return r; - - v->ucred_acquired = true; - return 0; -} - _public_ int sd_varlink_get_peer_uid(sd_varlink *v, uid_t *ret) { - int r; - assert_return(v, -EINVAL); assert_return(ret, -EINVAL); - r = varlink_acquire_ucred(v); - if (r < 0) - return varlink_log_errno(v, r, "Failed to acquire credentials: %m"); - - if (!uid_is_valid(v->ucred.uid)) - return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer UID is invalid."); - - *ret = v->ucred.uid; - return 0; + return json_stream_acquire_peer_uid(&v->stream, ret); } _public_ int sd_varlink_get_peer_gid(sd_varlink *v, gid_t *ret) { - int r; - assert_return(v, -EINVAL); assert_return(ret, -EINVAL); - r = varlink_acquire_ucred(v); - if (r < 0) - return varlink_log_errno(v, r, "Failed to acquire credentials: %m"); - - if (!gid_is_valid(v->ucred.gid)) - return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer GID is invalid."); - - *ret = v->ucred.gid; - return 0; + return json_stream_acquire_peer_gid(&v->stream, ret); } _public_ int sd_varlink_get_peer_pid(sd_varlink *v, pid_t *ret) { - int r; - assert_return(v, -EINVAL); assert_return(ret, -EINVAL); - r = varlink_acquire_ucred(v); - if (r < 0) - return varlink_log_errno(v, r, "Failed to acquire credentials: %m"); - - if (!pid_is_valid(v->ucred.pid)) - return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer uid is invalid."); - - *ret = v->ucred.pid; - return 0; + return json_stream_acquire_peer_pid(&v->stream, ret); } _public_ int sd_varlink_get_peer_pidfd(sd_varlink *v) { assert_return(v, -EINVAL); - if (v->peer_pidfd >= 0) - return v->peer_pidfd; - - if (v->input_fd != v->output_fd) - return -EBADF; - - v->peer_pidfd = getpeerpidfd(v->input_fd); - if (v->peer_pidfd < 0) - return varlink_log_errno(v, v->peer_pidfd, "Failed to acquire pidfd of peer: %m"); - - return v->peer_pidfd; + return json_stream_acquire_peer_pidfd(&v->stream); } _public_ int sd_varlink_set_relative_timeout(sd_varlink *v, uint64_t timeout) { assert_return(v, -EINVAL); /* If set to 0, reset to default value */ - v->timeout = timeout == 0 ? VARLINK_DEFAULT_TIMEOUT_USEC : timeout; + json_stream_set_timeout(&v->stream, timeout == 0 ? VARLINK_DEFAULT_TIMEOUT_USEC : timeout); return 0; } @@ -3367,33 +2618,13 @@ _public_ sd_varlink_server *sd_varlink_get_server(sd_varlink *v) { _public_ int sd_varlink_set_description(sd_varlink *v, const char *description) { assert_return(v, -EINVAL); - return free_and_strdup(&v->description, description); + return json_stream_set_description(&v->stream, description); } _public_ const char* sd_varlink_get_description(sd_varlink *v) { assert_return(v, NULL); - return v->description; -} - -static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) { - sd_varlink *v = ASSERT_PTR(userdata); - - assert(s); - - handle_revents(v, revents); - (void) sd_varlink_process(v); - - return 1; -} - -static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) { - sd_varlink *v = ASSERT_PTR(userdata); - - assert(s); - - (void) sd_varlink_process(v); - return 1; + return json_stream_get_description(&v->stream); } static int defer_callback(sd_event_source *s, void *userdata) { @@ -3405,47 +2636,6 @@ static int defer_callback(sd_event_source *s, void *userdata) { return 1; } -static int prepare_callback(sd_event_source *s, void *userdata) { - sd_varlink *v = ASSERT_PTR(userdata); - int r, e; - usec_t until; - bool have_timeout; - - assert(s); - - e = sd_varlink_get_events(v); - if (e < 0) - return e; - - if (v->input_event_source == v->output_event_source) - /* Same fd for input + output */ - r = sd_event_source_set_io_events(v->input_event_source, e); - else { - r = sd_event_source_set_io_events(v->input_event_source, e & EPOLLIN); - if (r >= 0) - r = sd_event_source_set_io_events(v->output_event_source, e & EPOLLOUT); - } - if (r < 0) - return varlink_log_errno(v, r, "Failed to set source events: %m"); - - r = sd_varlink_get_timeout(v, &until); - if (r < 0) - return r; - have_timeout = r > 0; - - if (have_timeout) { - r = sd_event_source_set_time(v->time_event_source, until); - if (r < 0) - return varlink_log_errno(v, r, "Failed to set source time: %m"); - } - - r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF); - if (r < 0) - return varlink_log_errno(v, r, "Failed to enable event source: %m"); - - return 1; -} - static int quit_callback(sd_event_source *event, void *userdata) { sd_varlink *v = ASSERT_PTR(userdata); @@ -3461,27 +2651,15 @@ _public_ int sd_varlink_attach_event(sd_varlink *v, sd_event *e, int64_t priorit int r; assert_return(v, -EINVAL); - assert_return(!v->event, -EBUSY); - - if (e) - v->event = sd_event_ref(e); - else { - r = sd_event_default(&v->event); - if (r < 0) - return varlink_log_errno(v, r, "Failed to create event source: %m"); - } - - r = sd_event_add_time(v->event, &v->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, v); - if (r < 0) - goto fail; + assert_return(!json_stream_get_event(&v->stream), -EBUSY); - r = sd_event_source_set_priority(v->time_event_source, priority); + r = json_stream_attach_event(&v->stream, e, priority); if (r < 0) - goto fail; + return r; - (void) sd_event_source_set_description(v->time_event_source, "varlink-time"); + sd_event *event = json_stream_get_event(&v->stream); - r = sd_event_add_exit(v->event, &v->quit_event_source, quit_callback, v); + r = sd_event_add_exit(event, &v->quit_event_source, quit_callback, v); if (r < 0) goto fail; @@ -3491,35 +2669,7 @@ _public_ int sd_varlink_attach_event(sd_varlink *v, sd_event *e, int64_t priorit (void) sd_event_source_set_description(v->quit_event_source, "varlink-quit"); - r = sd_event_add_io(v->event, &v->input_event_source, v->input_fd, 0, io_callback, v); - if (r < 0) - goto fail; - - r = sd_event_source_set_prepare(v->input_event_source, prepare_callback); - if (r < 0) - goto fail; - - r = sd_event_source_set_priority(v->input_event_source, priority); - if (r < 0) - goto fail; - - (void) sd_event_source_set_description(v->input_event_source, "varlink-input"); - - if (v->input_fd == v->output_fd) - v->output_event_source = sd_event_source_ref(v->input_event_source); - else { - r = sd_event_add_io(v->event, &v->output_event_source, v->output_fd, 0, io_callback, v); - if (r < 0) - goto fail; - - r = sd_event_source_set_priority(v->output_event_source, priority); - if (r < 0) - goto fail; - - (void) sd_event_source_set_description(v->output_event_source, "varlink-output"); - } - - r = sd_event_add_defer(v->event, &v->defer_event_source, defer_callback, v); + r = sd_event_add_defer(event, &v->defer_event_source, defer_callback, v); if (r < 0) goto fail; @@ -3541,38 +2691,28 @@ _public_ void sd_varlink_detach_event(sd_varlink *v) { if (!v) return; - varlink_detach_event_sources(v); - - v->event = sd_event_unref(v->event); + v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source); + v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source); + json_stream_detach_event(&v->stream); } _public_ sd_event* sd_varlink_get_event(sd_varlink *v) { assert_return(v, NULL); - return v->event; + return json_stream_get_event(&v->stream); } _public_ int sd_varlink_push_fd(sd_varlink *v, int fd) { - int i; - assert_return(v, -EINVAL); assert_return(fd >= 0, -EBADF); /* Takes an fd to send along with the *next* varlink message sent via this varlink connection. This * takes ownership of the specified fd. Use varlink_dup_fd() below to duplicate the fd first. */ - if (!v->allow_fd_passing_output) + if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT)) return -EPERM; - if (v->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message, refuse early hence */ - return -ENOBUFS; - - if (!GREEDY_REALLOC(v->pushed_fds, v->n_pushed_fds + 1)) - return -ENOMEM; - - i = (int) v->n_pushed_fds; - v->pushed_fds[v->n_pushed_fds++] = fd; - return i; + return json_stream_push_fd(&v->stream, fd); } _public_ int sd_varlink_push_dup_fd(sd_varlink *v, int fd) { @@ -3602,13 +2742,10 @@ _public_ int sd_varlink_peek_fd(sd_varlink *v, size_t i) { /* Returns one of the file descriptors that were received along with the current message. This does * not duplicate the fd nor invalidate it, it hence remains in our possession. */ - if (v->allow_fd_passing_input <= 0) + if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) return -EPERM; - if (i >= v->n_input_fds) - return -ENXIO; - - return v->input_fds[i]; + return json_stream_peek_input_fd(&v->stream, i); } _public_ int sd_varlink_peek_dup_fd(sd_varlink *v, size_t i) { @@ -3628,113 +2765,42 @@ _public_ int sd_varlink_take_fd(sd_varlink *v, size_t i) { * we'll invalidate the reference to it under our possession. If called twice in a row will return * -EBADF */ - if (v->allow_fd_passing_input <= 0) + if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) return -EPERM; - if (i >= v->n_input_fds) - return -ENXIO; - - return TAKE_FD(v->input_fds[i]); + return json_stream_take_input_fd(&v->stream, i); } _public_ int sd_varlink_get_n_fds(sd_varlink *v) { assert_return(v, -EINVAL); - if (v->allow_fd_passing_input <= 0) + if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) return -EPERM; - return (int) v->n_input_fds; -} - -static int verify_unix_socket(sd_varlink *v) { - assert(v); - - /* Returns: - * • 0 if this is an AF_UNIX socket - * • -ENOTSOCK if this is not a socket at all - * • -ENOMEDIUM if this is a socket, but not an AF_UNIX socket - * - * Reminder: - * • v->af is < 0 if we haven't checked what kind of address family the thing is yet. - * • v->af == AF_UNSPEC if we checked but it's not a socket - * • otherwise: v->af contains the address family we determined */ - - if (v->af < 0) { - /* If we have distinct input + output fds, we don't consider ourselves to be connected via a regular - * AF_UNIX socket. */ - if (v->input_fd != v->output_fd) { - v->af = AF_UNSPEC; - return -ENOTSOCK; - } - - struct stat st; - - if (fstat(v->input_fd, &st) < 0) - return -errno; - if (!S_ISSOCK(st.st_mode)) { - v->af = AF_UNSPEC; - return -ENOTSOCK; - } - - v->af = socket_get_family(v->input_fd); - if (v->af < 0) - return v->af; - } - - return v->af == AF_UNIX ? 0 : - v->af == AF_UNSPEC ? -ENOTSOCK : -ENOMEDIUM; + return (int) json_stream_get_n_input_fds(&v->stream); } _public_ int sd_varlink_set_allow_fd_passing_input(sd_varlink *v, int b) { - int r; - assert_return(v, -EINVAL); - if (v->allow_fd_passing_input >= 0 && (v->allow_fd_passing_input > 0) == !!b) - return 0; - - r = verify_unix_socket(v); - if (r < 0) { - assert(v->allow_fd_passing_input <= 0); - - if (!b) { - v->allow_fd_passing_input = false; - return 0; - } - - return r; - } - - if (!v->server || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT)) { - r = setsockopt_int(v->input_fd, SOL_SOCKET, SO_PASSRIGHTS, !!b); - if (r < 0 && !ERRNO_IS_NEG_NOT_SUPPORTED(r)) - log_debug_errno(r, "Failed to set SO_PASSRIGHTS socket option: %m"); - } + /* Server connections that haven't opted into FD_PASSING_INPUT_STRICT skip the + * per-connection SO_PASSRIGHTS setsockopt — the listening server already configured + * the socket option once at listen time. */ + bool with_sockopt = !v->server || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT); - v->allow_fd_passing_input = !!b; - return 1; + return json_stream_set_allow_fd_passing_input(&v->stream, !!b, with_sockopt); } _public_ int sd_varlink_set_allow_fd_passing_output(sd_varlink *v, int b) { - int r; - assert_return(v, -EINVAL); - if (v->allow_fd_passing_output == !!b) - return 0; - - r = verify_unix_socket(v); - if (r < 0) - return r; - - v->allow_fd_passing_output = b; - return 1; + return json_stream_set_allow_fd_passing_output(&v->stream, !!b); } _public_ int sd_varlink_set_input_sensitive(sd_varlink *v) { assert_return(v, -EINVAL); - v->input_sensitive = true; + json_stream_set_flags(&v->stream, JSON_STREAM_INPUT_SENSITIVE, true); return 0; } @@ -3955,19 +3021,24 @@ _public_ int sd_varlink_server_add_connection_pair( v->server = sd_varlink_server_ref(server); sd_varlink_ref(v); - v->input_fd = input_fd; - v->output_fd = output_fd; + r = json_stream_attach_fds(&v->stream, input_fd, output_fd); + if (r < 0) + return r; + if (server->flags & SD_VARLINK_SERVER_INHERIT_USERDATA) v->userdata = server->userdata; - if (ucred_acquired) { - v->ucred = ucred; - v->ucred_acquired = true; - } + /* If the server might receive a protocol upgrade method, switch the input path to + * byte-bounded reads so we don't accidentally consume post-upgrade bytes. */ + if (FLAGS_SET(server->flags, SD_VARLINK_SERVER_UPGRADABLE)) + json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, true); + + if (ucred_acquired) + json_stream_set_peer_ucred(&v->stream, &ucred); _cleanup_free_ char *desc = NULL; if (asprintf(&desc, "%s-%i-%i", varlink_server_description(server), input_fd, output_fd) >= 0) - v->description = TAKE_PTR(desc); + json_stream_set_description(&v->stream, desc); (void) sd_varlink_set_allow_fd_passing_input(v, FLAGS_SET(server->flags, SD_VARLINK_SERVER_ALLOW_FD_PASSING_INPUT)); (void) sd_varlink_set_allow_fd_passing_output(v, FLAGS_SET(server->flags, SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT)); @@ -3978,8 +3049,12 @@ _public_ int sd_varlink_server_add_connection_pair( r = sd_varlink_attach_event(v, server->event, server->event_priority); if (r < 0) { varlink_log_errno(v, r, "Failed to attach new connection: %m"); - TAKE_FD(v->input_fd); /* take the fd out of the connection again */ - TAKE_FD(v->output_fd); + /* Detach the fds from the connection so the caller (the connect callback) + * can decide what to do with them. The original fd value(s) the caller + * passed in are still owned by the caller; we just stop the connection + * from closing them on shutdown. */ + TAKE_FD(v->stream.input_fd); + TAKE_FD(v->stream.output_fd); sd_varlink_close(v); return r; } diff --git a/src/libsystemd/sd-varlink/varlink-internal.h b/src/libsystemd/sd-varlink/varlink-internal.h index 3dfe86d4487..966afd0b06c 100644 --- a/src/libsystemd/sd-varlink/varlink-internal.h +++ b/src/libsystemd/sd-varlink/varlink-internal.h @@ -5,6 +5,7 @@ #include "sd-varlink.h" +#include "json-stream.h" #include "list.h" #include "pidref.h" #include "sd-forward.h" @@ -70,85 +71,21 @@ typedef enum VarlinkState { VARLINK_PROCESSING_METHOD, \ VARLINK_PROCESSING_METHOD_MORE) -typedef struct VarlinkJsonQueueItem VarlinkJsonQueueItem; - -/* A queued message we shall write into the socket, along with the file descriptors to send at the same - * time. This queue item binds them together so that message/fd boundaries are maintained throughout the - * whole pipeline. */ -struct VarlinkJsonQueueItem { - LIST_FIELDS(VarlinkJsonQueueItem, queue); - sd_json_variant *data; - size_t n_fds; - int fds[]; -}; - typedef struct sd_varlink { unsigned n_ref; sd_varlink_server *server; VarlinkState state; - bool connecting; /* This boolean indicates whether the socket fd we are operating on is currently - * processing an asynchronous connect(). In that state we watch the socket for - * EPOLLOUT, but we refrain from calling read() or write() on the socket as that - * will trigger ENOTCONN. Note that this boolean is kept separate from the - * VarlinkState above on purpose: while the connect() is still not complete we - * already want to allow queuing of messages and similar. Thus it's nice to keep - * these two state concepts separate: the VarlinkState encodes what our own view of - * the connection is, i.e. whether we think it's a server, a client, and has - * something queued already, while 'connecting' tells us a detail about the - * transport used below, that should have no effect on how we otherwise accept and - * process operations from the user. - * - * Or to say this differently: VARLINK_STATE_IS_ALIVE(state) tells you whether the - * connection is good to use, even if it might not be fully connected - * yet. connecting=true then informs you that actually we are still connecting, and - * the connection is actually not established yet and thus any requests you enqueue - * now will still work fine but will be queued only, not sent yet, but that - * shouldn't stop you from using the connection, since eventually whatever you queue - * *will* be sent. - * - * Or to say this even differently: 'state' is a high-level ("application layer" - * high, if you so will) state, while 'conecting' is a low-level ("transport layer" - * low, if you so will) state, and while they are not entirely unrelated and - * sometimes propagate effects to each other they are only asynchronously connected - * at most. */ - unsigned n_pending; - - int input_fd; - int output_fd; - - char *input_buffer; /* valid data starts at input_buffer_index, ends at input_buffer_index+input_buffer_size */ - size_t input_buffer_index; - size_t input_buffer_size; - size_t input_buffer_unscanned; - - void *input_control_buffer; - size_t input_control_buffer_size; - - char *output_buffer; /* valid data starts at output_buffer_index, ends at output_buffer_index+output_buffer_size */ - size_t output_buffer_index; - size_t output_buffer_size; - - int *input_fds; /* file descriptors associated with the data in input_buffer (for fd passing) */ - size_t n_input_fds; - - int *output_fds; /* file descriptors associated with the data in output_buffer (for fd passing) */ - size_t n_output_fds; - /* Further messages to output not yet formatted into text, and thus not included in output_buffer - * yet. We keep them separate from output_buffer, to not violate fd message boundaries: we want that - * each fd that is sent is associated with its fds, and that fds cannot be accidentally associated - * with preceding or following messages. */ - LIST_HEAD(VarlinkJsonQueueItem, output_queue); - VarlinkJsonQueueItem *output_queue_tail; - size_t n_output_queue; + /* Transport layer: input/output buffers, fd passing, output queue, read/write/parse + * step functions, sd-event integration (input/output/time event sources, idle + * timeout, description, peer credentials). The varlink-level state machine and + * dispatch logic live in sd-varlink.c; everything else about moving bytes is + * delegated. */ + JsonStream stream; - /* The fds to associate with the next message that is about to be enqueued. The user first pushes the - * fds it intends to send via varlink_push_fd() into this queue, and then once the message data is - * submitted we'll combine the fds and the message data into one. */ - int *pushed_fds; - size_t n_pushed_fds; + unsigned n_pending; sd_varlink_reply_t reply_callback; @@ -157,39 +94,18 @@ typedef struct sd_varlink { sd_varlink_reply_flags_t current_reply_flags; sd_varlink_symbol *current_method; - VarlinkJsonQueueItem *previous; + JsonStreamQueueItem *previous; char *sentinel; - int peer_pidfd; - struct ucred ucred; - bool ucred_acquired:1; - - bool write_disconnected:1; - bool read_disconnected:1; - bool prefer_read:1; - bool prefer_write:1; - bool got_pollhup:1; - - bool protocol_upgrade:1; /* Whether a protocol_upgrade was requested */ - - bool output_buffer_sensitive:1; /* whether to erase the output buffer after writing it to the socket */ - bool input_sensitive:1; /* Whether incoming messages might be sensitive */ - - bool allow_fd_passing_output; - int allow_fd_passing_input; - - int af; /* address family if socket; AF_UNSPEC if not socket; negative if not known */ - - usec_t timestamp; - usec_t timeout; + /* Per-call protocol-upgrade marker: set when the *current* method call carries the + * SD_VARLINK_METHOD_UPGRADE flag. Validated by sd_varlink_reply_and_upgrade() to + * ensure the caller's contract is honored. The transport-layer "stop reading at the + * next message boundary" behavior is governed independently by the JsonStream's + * bounded_reads flag. */ + bool protocol_upgrade:1; void *userdata; - char *description; - sd_event *event; - sd_event_source *input_event_source; - sd_event_source *output_event_source; - sd_event_source *time_event_source; sd_event_source *quit_event_source; sd_event_source *defer_event_source; @@ -254,7 +170,7 @@ typedef struct sd_varlink_server { log_debug("%s: " fmt, varlink_server_description(s), ##__VA_ARGS__) static inline const char* varlink_description(sd_varlink *v) { - return (v ? v->description : NULL) ?: "varlink"; + return (v ? json_stream_get_description(&v->stream) : NULL) ?: "varlink"; } static inline const char* varlink_server_description(sd_varlink_server *s) { -- 2.47.3