--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+
+#include <poll.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "sd-event.h"
+#include "sd-json.h"
+
+#include "alloc-util.h"
+#include "errno-util.h"
+#include "fd-util.h"
+#include "io-util.h"
+#include "iovec-util.h"
+#include "json-stream.h"
+#include "list.h"
+#include "log.h"
+#include "memory-util.h"
+#include "process-util.h"
+#include "socket-util.h"
+#include "string-util.h"
+#include "time-util.h"
+#include "user-util.h"
+
+#define JSON_STREAM_BUFFER_MAX_DEFAULT (16U * 1024U * 1024U)
+#define JSON_STREAM_READ_SIZE_DEFAULT (64U * 1024U)
+#define JSON_STREAM_QUEUE_MAX_DEFAULT (64U * 1024U)
+#define JSON_STREAM_FDS_MAX (16U * 1024U)
+
+struct JsonStreamQueueItem {
+ LIST_FIELDS(JsonStreamQueueItem, queue);
+ sd_json_variant *data;
+ size_t n_fds;
+ int fds[];
+};
+
+static const char* json_stream_description(const JsonStream *s) {
+ return (s ? s->description : NULL) ?: "json-stream";
+}
+
+/* Returns the size of the framing delimiter in bytes: strlen(delimiter) for multi-char
+ * delimiters (e.g. "\r\n"), or 1 for the default NUL-byte delimiter (delimiter == NULL). */
+static size_t json_stream_delimiter_size(const JsonStream *s) {
+ return strlen_ptr(s->delimiter) ?: 1;
+}
+
+static usec_t json_stream_now(const JsonStream *s) {
+ usec_t t;
+
+ if (s->event && sd_event_now(s->event, CLOCK_MONOTONIC, &t) >= 0)
+ return t;
+
+ return now(CLOCK_MONOTONIC);
+}
+
+#define json_stream_log(s, fmt, ...) \
+ log_debug("%s: " fmt, json_stream_description(s), ##__VA_ARGS__)
+
+#define json_stream_log_errno(s, error, fmt, ...) \
+ log_debug_errno((error), "%s: " fmt, json_stream_description(s), ##__VA_ARGS__)
+
+sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q) {
+ assert(q);
+ return &q->data;
+}
+
+JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q) {
+ if (!q)
+ return NULL;
+
+ sd_json_variant_unref(q->data);
+ close_many(q->fds, q->n_fds);
+
+ return mfree(q);
+}
+
+static JsonStreamQueueItem* json_stream_queue_item_new(sd_json_variant *m, const int fds[], size_t n_fds) {
+ JsonStreamQueueItem *q;
+
+ assert(m);
+ assert(fds || n_fds == 0);
+
+ size_t sz = sizeof(int);
+ if (!MUL_SAFE(&sz, sz, n_fds) ||
+ !INC_SAFE(&sz, offsetof(JsonStreamQueueItem, fds)))
+ return NULL;
+
+ q = malloc(sz);
+ if (!q)
+ return NULL;
+
+ *q = (JsonStreamQueueItem) {
+ .data = sd_json_variant_ref(m),
+ .n_fds = n_fds,
+ };
+
+ memcpy_safe(q->fds, fds, n_fds * sizeof(int));
+
+ return TAKE_PTR(q);
+}
+
+int json_stream_init(JsonStream *s, const JsonStreamParams *params) {
+ assert(s);
+ assert(params);
+ assert(params->phase);
+ assert(params->dispatch);
+
+ char *delimiter = NULL;
+ if (params->delimiter) {
+ delimiter = strdup(params->delimiter);
+ if (!delimiter)
+ return -ENOMEM;
+ }
+
+ *s = (JsonStream) {
+ .delimiter = delimiter,
+ .buffer_max = params->buffer_max > 0 ? params->buffer_max : JSON_STREAM_BUFFER_MAX_DEFAULT,
+ .read_chunk = params->read_chunk > 0 ? params->read_chunk : JSON_STREAM_READ_SIZE_DEFAULT,
+ .queue_max = params->queue_max > 0 ? params->queue_max : JSON_STREAM_QUEUE_MAX_DEFAULT,
+ .phase_cb = params->phase,
+ .dispatch_cb = params->dispatch,
+ .userdata = params->userdata,
+ .input_fd = -EBADF,
+ .output_fd = -EBADF,
+ .timeout = USEC_INFINITY,
+ .last_activity = USEC_INFINITY,
+ .ucred = UCRED_INVALID,
+ .peer_pidfd = -EBADF,
+ .af = -1,
+ };
+
+ return 0;
+}
+
+static void json_stream_clear(JsonStream *s) {
+ if (!s)
+ return;
+
+ json_stream_detach_event(s);
+
+ s->delimiter = mfree(s->delimiter);
+ s->description = mfree(s->description);
+
+ if (s->input_fd != s->output_fd) {
+ s->input_fd = safe_close(s->input_fd);
+ s->output_fd = safe_close(s->output_fd);
+ } else
+ s->output_fd = s->input_fd = safe_close(s->input_fd);
+
+ s->peer_pidfd = safe_close(s->peer_pidfd);
+ s->ucred_acquired = false;
+ s->af = -1;
+
+ close_many(s->input_fds, s->n_input_fds);
+ s->input_fds = mfree(s->input_fds);
+ s->n_input_fds = 0;
+
+ s->input_buffer = FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE) ? erase_and_free(s->input_buffer) : mfree(s->input_buffer);
+ s->input_buffer_index = s->input_buffer_size = s->input_buffer_unscanned = 0;
+
+ s->output_buffer = FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE) ? erase_and_free(s->output_buffer) : mfree(s->output_buffer);
+ s->output_buffer_index = s->output_buffer_size = 0;
+ s->flags &= ~JSON_STREAM_OUTPUT_BUFFER_SENSITIVE;
+
+ s->input_control_buffer = mfree(s->input_control_buffer);
+ s->input_control_buffer_size = 0;
+
+ close_many(s->output_fds, s->n_output_fds);
+ s->output_fds = mfree(s->output_fds);
+ s->n_output_fds = 0;
+
+ close_many(s->pushed_fds, s->n_pushed_fds);
+ s->pushed_fds = mfree(s->pushed_fds);
+ s->n_pushed_fds = 0;
+
+ LIST_CLEAR(queue, s->output_queue, json_stream_queue_item_free);
+ s->output_queue_tail = NULL;
+ s->n_output_queue = 0;
+}
+
+void json_stream_done(JsonStream *s) {
+ if (!s)
+ return;
+
+ json_stream_clear(s);
+}
+
+int json_stream_set_description(JsonStream *s, const char *description) {
+ assert(s);
+ return free_and_strdup(&s->description, description);
+}
+
+const char* json_stream_get_description(const JsonStream *s) {
+ assert(s);
+ return s->description;
+}
+
+int json_stream_connect_address(JsonStream *s, const char *address) {
+ union sockaddr_union sockaddr;
+ int r;
+
+ assert(s);
+ assert(address);
+
+ _cleanup_close_ int sock_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
+ if (sock_fd < 0)
+ return json_stream_log_errno(s, errno, "Failed to create AF_UNIX socket: %m");
+
+ sock_fd = fd_move_above_stdio(sock_fd);
+
+ r = sockaddr_un_set_path(&sockaddr.un, address);
+ if (r < 0) {
+ if (r != -ENAMETOOLONG)
+ return json_stream_log_errno(s, r, "Failed to set socket address '%s': %m", address);
+
+ /* Path too long to fit into sockaddr_un, connect via O_PATH instead. */
+ r = connect_unix_path(sock_fd, AT_FDCWD, address);
+ } else
+ r = RET_NERRNO(connect(sock_fd, &sockaddr.sa, r));
+
+ if (r < 0) {
+ if (!IN_SET(r, -EAGAIN, -EINPROGRESS))
+ return json_stream_log_errno(s, r, "Failed to connect to %s: %m", address);
+
+ /* The connect() is being processed in the background. As long as that's the
+ * case the socket is in a special state: we can poll it for POLLOUT, but
+ * write()s before POLLOUT will fail with ENOTCONN (rather than EAGAIN). Since
+ * ENOTCONN can mean two different things (not yet connected vs. already
+ * disconnected), we track this as a separate flag. */
+ s->flags |= JSON_STREAM_CONNECTING;
+ }
+
+ int fd = TAKE_FD(sock_fd);
+ return json_stream_attach_fds(s, fd, fd);
+}
+
+int json_stream_attach_fds(JsonStream *s, int input_fd, int output_fd) {
+ struct stat st;
+
+ assert(s);
+
+ /* NB: input_fd and output_fd are donated to the JsonStream instance! */
+
+ if (s->input_fd != s->output_fd) {
+ safe_close(s->input_fd);
+ safe_close(s->output_fd);
+ } else
+ safe_close(s->input_fd);
+
+ s->input_fd = input_fd;
+ s->output_fd = output_fd;
+ s->flags &= ~(JSON_STREAM_PREFER_READ|JSON_STREAM_PREFER_WRITE);
+
+ /* Detect non-socket fds up front so the read/write paths use read()/write() for
+ * non-socket fds and send()/recv() for sockets (mostly for MSG_NOSIGNAL). */
+ if (input_fd >= 0) {
+ if (fstat(input_fd, &st) < 0)
+ return -errno;
+ if (!S_ISSOCK(st.st_mode))
+ s->flags |= JSON_STREAM_PREFER_READ;
+ }
+
+ if (output_fd >= 0 && output_fd != input_fd) {
+ if (fstat(output_fd, &st) < 0)
+ return -errno;
+ if (!S_ISSOCK(st.st_mode))
+ s->flags |= JSON_STREAM_PREFER_WRITE;
+ } else if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_READ))
+ s->flags |= JSON_STREAM_PREFER_WRITE;
+
+ return 0;
+}
+
+int json_stream_connect_fd_pair(JsonStream *s, int input_fd, int output_fd) {
+ int r;
+
+ assert(s);
+ assert(input_fd >= 0);
+ assert(output_fd >= 0);
+
+ r = fd_nonblock(input_fd, true);
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to make input fd %d nonblocking: %m", input_fd);
+
+ if (input_fd != output_fd) {
+ r = fd_nonblock(output_fd, true);
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to make output fd %d nonblocking: %m", output_fd);
+ }
+
+ return json_stream_attach_fds(s, input_fd, output_fd);
+}
+
+bool json_stream_flags_set(const JsonStream *s, JsonStreamFlags flags) {
+ assert(s);
+ assert((flags & ~(JSON_STREAM_BOUNDED_READS|JSON_STREAM_INPUT_SENSITIVE|JSON_STREAM_ALLOW_FD_PASSING_INPUT|JSON_STREAM_ALLOW_FD_PASSING_OUTPUT)) == 0);
+
+ return FLAGS_SET(s->flags, flags);
+}
+
+/* Multiple flags may be passed — all are set or cleared together. */
+void json_stream_set_flags(JsonStream *s, JsonStreamFlags flags, bool b) {
+ assert(s);
+ assert((flags & ~(JSON_STREAM_BOUNDED_READS|JSON_STREAM_INPUT_SENSITIVE)) == 0);
+
+ SET_FLAG(s->flags, flags, b);
+}
+
+bool json_stream_has_buffered_input(const JsonStream *s) {
+ assert(s);
+ return s->input_buffer_size > 0;
+}
+
+/* Query the consumer's current phase. The callback is mandatory (asserted at construction
+ * time), so we can call it unconditionally. */
+static JsonStreamPhase json_stream_current_phase(const JsonStream *s) {
+ assert(s);
+ return s->phase_cb(s->userdata);
+}
+
+/* Both READING and AWAITING_REPLY mean "we want POLLIN and would lose if the read side
+ * died" — they only differ in whether the idle timeout is in force. */
+static bool phase_is_reading(JsonStreamPhase p) {
+ return IN_SET(p, JSON_STREAM_PHASE_READING, JSON_STREAM_PHASE_AWAITING_REPLY);
+}
+
+bool json_stream_should_disconnect(const JsonStream *s) {
+ assert(s);
+
+ /* Carefully decide when the consumer should initiate a teardown. We err on the side
+ * of staying around so half-open connections can flush remaining data and reads can
+ * surface buffered messages before we tear everything down. */
+
+ /* Wait until any in-flight async connect() completes — there's nothing reasonable
+ * to do until we know whether the socket is connected or not. */
+ if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING))
+ return false;
+
+ /* Still bytes to write and we can write? Stay around so the flush can complete. */
+ if (s->output_buffer_size > 0 && !FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED))
+ return false;
+
+ /* Both sides gone already? Then there's no point in lingering. */
+ if (FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED|JSON_STREAM_WRITE_DISCONNECTED))
+ return true;
+
+ JsonStreamPhase phase = json_stream_current_phase(s);
+
+ /* Caller is waiting for input but the read side is shut down — we'll never see
+ * another message. */
+ if (phase_is_reading(phase) && FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED))
+ return true;
+
+ /* Idle client whose write side has died, or we saw POLLHUP. We explicitly check for
+ * POLLHUP because we likely won't notice the write side being down via send() if we
+ * never wrote anything in the first place. */
+ if (phase == JSON_STREAM_PHASE_IDLE_CLIENT &&
+ (s->flags & (JSON_STREAM_WRITE_DISCONNECTED|JSON_STREAM_GOT_POLLHUP)))
+ return true;
+
+ /* Caller has more output to send but the peer hung up, and we're either out of
+ * bytes or already saw a write error. Nothing left to do. */
+ if (phase == JSON_STREAM_PHASE_PENDING_OUTPUT &&
+ (FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED) || s->output_buffer_size == 0) &&
+ FLAGS_SET(s->flags, JSON_STREAM_GOT_POLLHUP))
+ return true;
+
+ return false;
+}
+
+int json_stream_get_events(const JsonStream *s) {
+ int ret = 0;
+
+ assert(s);
+
+ /* While an asynchronous connect() is still in flight we only ask for POLLOUT, which
+ * tells us once the connection is fully established. We must not read or write before
+ * that. */
+ if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING))
+ return POLLOUT;
+
+ if (phase_is_reading(json_stream_current_phase(s)) &&
+ !FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED) &&
+ s->input_buffer_unscanned == 0)
+ ret |= POLLIN;
+
+ if (!FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED) && (s->output_queue || s->output_buffer_size > 0))
+ ret |= POLLOUT;
+
+ return ret;
+}
+
+static void json_stream_handle_revents(JsonStream *s, int revents) {
+ assert(s);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING)) {
+ /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a
+ * connect() to complete on, we know we are ready. We don't read the connection
+ * error here though — we'll get it on the next read() or write(). */
+ if ((revents & (POLLOUT|POLLHUP)) == 0)
+ return;
+
+ json_stream_log(s, "Asynchronous connection completed.");
+ s->flags &= ~JSON_STREAM_CONNECTING;
+ return;
+ }
+
+ /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and
+ * writing what we can. However, we do care about POLLHUP to detect connection
+ * termination even if we momentarily don't want to read nor write anything. */
+ if (FLAGS_SET(revents, POLLHUP)) {
+ json_stream_log(s, "Got POLLHUP from socket.");
+ s->flags |= JSON_STREAM_GOT_POLLHUP;
+ }
+}
+
+int json_stream_wait(JsonStream *s, usec_t timeout) {
+ int events, r;
+
+ assert(s);
+
+ events = json_stream_get_events(s);
+ if (events < 0)
+ return events;
+
+ /* MIN the caller's timeout with our own deadline (if any) so that we wake up to
+ * fire the idle timeout. */
+ usec_t deadline = json_stream_get_timeout(s);
+ if (deadline != USEC_INFINITY)
+ timeout = MIN(timeout, usec_sub_unsigned(deadline, now(CLOCK_MONOTONIC)));
+
+ struct pollfd pollfd[2];
+ size_t n_poll_fd = 0;
+
+ if (s->input_fd == s->output_fd) {
+ pollfd[n_poll_fd++] = (struct pollfd) {
+ .fd = s->input_fd,
+ .events = events,
+ };
+ } else {
+ pollfd[n_poll_fd++] = (struct pollfd) {
+ .fd = s->input_fd,
+ .events = events & POLLIN,
+ };
+ pollfd[n_poll_fd++] = (struct pollfd) {
+ .fd = s->output_fd,
+ .events = events & POLLOUT,
+ };
+ }
+
+ r = ppoll_usec(pollfd, n_poll_fd, timeout);
+ if (ERRNO_IS_NEG_TRANSIENT(r))
+ /* Treat EINTR as not a timeout, but also nothing happened, and the caller gets
+ * a chance to call back into us. */
+ return 1;
+ if (r <= 0)
+ return r;
+
+ int revents = 0;
+ FOREACH_ARRAY(p, pollfd, n_poll_fd)
+ revents |= p->revents;
+
+ json_stream_handle_revents(s, revents);
+ return 1;
+}
+
+/* ===== Timeout management ===== */
+
+static usec_t json_stream_get_deadline(const JsonStream *s) {
+ assert(s);
+
+ return usec_add(s->last_activity, s->timeout);
+}
+
+usec_t json_stream_get_timeout(const JsonStream *s) {
+ assert(s);
+
+ /* The deadline is in force only when the consumer is in PHASE_AWAITING_REPLY. In
+ * other phases (idle server, between operations) we ignore the cached deadline even
+ * if it's still set from a previous operation. */
+ if (json_stream_current_phase(s) != JSON_STREAM_PHASE_AWAITING_REPLY)
+ return USEC_INFINITY;
+
+ return json_stream_get_deadline(s);
+}
+
+static void json_stream_rearm_time_source(JsonStream *s) {
+ int r;
+
+ assert(s);
+
+ if (!s->time_event_source)
+ return;
+
+ usec_t deadline = json_stream_get_timeout(s);
+ if (deadline == USEC_INFINITY) {
+ (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_OFF);
+ return;
+ }
+
+ r = sd_event_source_set_time(s->time_event_source, deadline);
+ if (r < 0) {
+ json_stream_log_errno(s, r, "Failed to set time source deadline: %m");
+ return;
+ }
+
+ (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_ON);
+}
+
+void json_stream_set_timeout(JsonStream *s, usec_t timeout) {
+ assert(s);
+
+ s->timeout = timeout;
+
+ /* If the configured timeout changes mid-flight, rearm the time source so the new
+ * deadline takes effect immediately rather than waiting for the next mark_activity
+ * or successful write. */
+ json_stream_rearm_time_source(s);
+}
+
+void json_stream_mark_activity(JsonStream *s) {
+ assert(s);
+
+ s->last_activity = json_stream_now(s);
+ json_stream_rearm_time_source(s);
+}
+
+static int json_stream_acquire_peer_ucred(JsonStream *s, struct ucred *ret) {
+ int r;
+
+ assert(s);
+ assert(ret);
+
+ if (!s->ucred_acquired) {
+ /* Peer credentials only make sense for a bidirectional socket. */
+ if (s->input_fd != s->output_fd)
+ return -EBADF;
+
+ r = getpeercred(s->input_fd, &s->ucred);
+ if (r < 0)
+ return r;
+
+ s->ucred_acquired = true;
+ }
+
+ *ret = s->ucred;
+ return 0;
+}
+
+int json_stream_acquire_peer_uid(JsonStream *s, uid_t *ret) {
+ struct ucred ucred;
+ int r;
+
+ assert(s);
+ assert(ret);
+
+ r = json_stream_acquire_peer_ucred(s, &ucred);
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to acquire credentials: %m");
+
+ if (!uid_is_valid(ucred.uid))
+ return json_stream_log_errno(s, SYNTHETIC_ERRNO(ENODATA), "Peer UID is invalid.");
+
+ *ret = ucred.uid;
+ return 0;
+}
+
+int json_stream_acquire_peer_gid(JsonStream *s, gid_t *ret) {
+ struct ucred ucred;
+ int r;
+
+ assert(s);
+ assert(ret);
+
+ r = json_stream_acquire_peer_ucred(s, &ucred);
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to acquire credentials: %m");
+
+ if (!gid_is_valid(ucred.gid))
+ return json_stream_log_errno(s, SYNTHETIC_ERRNO(ENODATA), "Peer GID is invalid.");
+
+ *ret = ucred.gid;
+ return 0;
+}
+
+int json_stream_acquire_peer_pid(JsonStream *s, pid_t *ret) {
+ struct ucred ucred;
+ int r;
+
+ assert(s);
+ assert(ret);
+
+ r = json_stream_acquire_peer_ucred(s, &ucred);
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to acquire credentials: %m");
+
+ if (!pid_is_valid(ucred.pid))
+ return json_stream_log_errno(s, SYNTHETIC_ERRNO(ENODATA), "Peer PID is invalid.");
+
+ *ret = ucred.pid;
+ return 0;
+}
+
+int json_stream_get_peer_ucred(const JsonStream *s, struct ucred *ret) {
+ assert(s);
+ assert(ret);
+
+ if (!s->ucred_acquired)
+ return -ENODATA;
+
+ *ret = s->ucred;
+ return 0;
+}
+
+void json_stream_set_peer_ucred(JsonStream *s, const struct ucred *ucred) {
+ assert(s);
+ assert(ucred);
+
+ s->ucred = *ucred;
+ s->ucred_acquired = true;
+}
+
+int json_stream_acquire_peer_pidfd(JsonStream *s) {
+ assert(s);
+
+ if (s->peer_pidfd >= 0)
+ return s->peer_pidfd;
+
+ if (s->input_fd != s->output_fd)
+ return json_stream_log_errno(s, SYNTHETIC_ERRNO(EBADF), "Failed to acquire pidfd of peer: separate input/output fds");
+
+ s->peer_pidfd = getpeerpidfd(s->input_fd);
+ if (s->peer_pidfd < 0)
+ return json_stream_log_errno(s, s->peer_pidfd, "Failed to acquire pidfd of peer: %m");
+
+ return s->peer_pidfd;
+}
+
+static int json_stream_verify_unix_socket(JsonStream *s) {
+ assert(s);
+
+ /* Returns:
+ * • 0 if this is an AF_UNIX socket
+ * • -ENOTSOCK if this is not a socket at all
+ * • -ENOMEDIUM if this is a socket, but not an AF_UNIX socket
+ *
+ * The result is cached after the first call. af < 0 = unchecked, af == AF_UNSPEC =
+ * checked but not a socket, otherwise af is the resolved address family. */
+
+ if (s->af < 0) {
+ /* If we have distinct input + output fds, we don't consider ourselves to be
+ * connected via a regular AF_UNIX socket. */
+ if (s->input_fd != s->output_fd) {
+ s->af = AF_UNSPEC;
+ return -ENOTSOCK;
+ }
+
+ struct stat st;
+
+ if (fstat(s->input_fd, &st) < 0)
+ return -errno;
+ if (!S_ISSOCK(st.st_mode)) {
+ s->af = AF_UNSPEC;
+ return -ENOTSOCK;
+ }
+
+ s->af = socket_get_family(s->input_fd);
+ if (s->af < 0)
+ return s->af;
+ }
+
+ if (s->af == AF_UNIX)
+ return 0;
+ if (s->af == AF_UNSPEC)
+ return -ENOTSOCK;
+
+ return -ENOMEDIUM;
+}
+
+int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt) {
+ int r;
+
+ assert(s);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT) == enabled)
+ return 0;
+
+ r = json_stream_verify_unix_socket(s);
+ if (r < 0) {
+ /* If the caller is disabling, accept the verify failure silently — we just
+ * leave the flag as it was (or set it to false if currently true). */
+ if (!enabled) {
+ s->flags &= ~JSON_STREAM_ALLOW_FD_PASSING_INPUT;
+ return 0;
+ }
+ return r;
+ }
+
+ if (with_sockopt) {
+ r = setsockopt_int(s->input_fd, SOL_SOCKET, SO_PASSRIGHTS, enabled);
+ if (r < 0 && !ERRNO_IS_NEG_NOT_SUPPORTED(r))
+ json_stream_log_errno(s, r, "Failed to set SO_PASSRIGHTS socket option: %m");
+ }
+
+ SET_FLAG(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT, enabled);
+ return 1;
+}
+
+int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled) {
+ int r;
+
+ assert(s);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT) == enabled)
+ return 0;
+
+ r = json_stream_verify_unix_socket(s);
+ if (r < 0)
+ return r;
+
+ SET_FLAG(s->flags, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT, enabled);
+ return 1;
+}
+
+/* ===== sd-event integration ===== */
+
+static int json_stream_io_callback(sd_event_source *source, int fd, uint32_t revents, void *userdata) {
+ JsonStream *s = ASSERT_PTR(userdata);
+ int r;
+
+ json_stream_handle_revents(s, revents);
+
+ r = s->dispatch_cb(s->userdata);
+ if (r < 0)
+ json_stream_log_errno(s, r, "Dispatch callback failed, ignoring: %m");
+
+ return 1;
+}
+
+static int json_stream_time_callback(sd_event_source *source, uint64_t usec, void *userdata) {
+ JsonStream *s = ASSERT_PTR(userdata);
+ int r;
+
+ /* Disable the source: it must not fire again until activity is marked. The consumer
+ * notices the timeout by comparing now() to json_stream_get_timeout() in its dispatch
+ * callback. */
+ (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_OFF);
+
+ r = s->dispatch_cb(s->userdata);
+ if (r < 0)
+ json_stream_log_errno(s, r, "Dispatch callback failed, ignoring: %m");
+
+ return 1;
+}
+
+static int json_stream_prepare_callback(sd_event_source *source, void *userdata) {
+ JsonStream *s = ASSERT_PTR(userdata);
+ int r, e;
+
+ e = json_stream_get_events(s);
+ if (e < 0)
+ return e;
+
+ if (s->input_event_source == s->output_event_source)
+ /* Same fd for input + output */
+ r = sd_event_source_set_io_events(s->input_event_source, e);
+ else {
+ r = sd_event_source_set_io_events(s->input_event_source, e & POLLIN);
+ if (r >= 0)
+ r = sd_event_source_set_io_events(s->output_event_source, e & POLLOUT);
+ }
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to set io events: %m");
+
+ /* Rearm the timeout on every prepare cycle so that phase transitions (e.g. entering
+ * AWAITING_REPLY) are picked up without requiring the consumer to explicitly call
+ * mark_activity at every state change. */
+ json_stream_rearm_time_source(s);
+
+ return 1;
+}
+
+void json_stream_detach_event(JsonStream *s) {
+ if (!s)
+ return;
+
+ s->input_event_source = sd_event_source_disable_unref(s->input_event_source);
+ s->output_event_source = sd_event_source_disable_unref(s->output_event_source);
+ s->time_event_source = sd_event_source_disable_unref(s->time_event_source);
+ s->event = sd_event_unref(s->event);
+}
+
+sd_event* json_stream_get_event(const JsonStream *s) {
+ assert(s);
+ return s->event;
+}
+
+int json_stream_attach_event(JsonStream *s, sd_event *event, int64_t priority) {
+ int r;
+
+ assert(s);
+ assert(!s->event);
+ assert(s->input_fd >= 0);
+ assert(s->output_fd >= 0);
+
+ if (event)
+ s->event = sd_event_ref(event);
+ else {
+ r = sd_event_default(&s->event);
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Failed to acquire default event loop: %m");
+ }
+
+ r = sd_event_add_io(s->event, &s->input_event_source, s->input_fd, 0, json_stream_io_callback, s);
+ if (r < 0)
+ goto fail;
+
+ r = sd_event_source_set_prepare(s->input_event_source, json_stream_prepare_callback);
+ if (r < 0)
+ goto fail;
+
+ r = sd_event_source_set_priority(s->input_event_source, priority);
+ if (r < 0)
+ goto fail;
+
+ (void) sd_event_source_set_description(s->input_event_source, "json-stream-input");
+
+ if (s->input_fd == s->output_fd)
+ s->output_event_source = sd_event_source_ref(s->input_event_source);
+ else {
+ r = sd_event_add_io(s->event, &s->output_event_source, s->output_fd, 0, json_stream_io_callback, s);
+ if (r < 0)
+ goto fail;
+
+ r = sd_event_source_set_priority(s->output_event_source, priority);
+ if (r < 0)
+ goto fail;
+
+ (void) sd_event_source_set_description(s->output_event_source, "json-stream-output");
+ }
+
+ r = sd_event_add_time(s->event, &s->time_event_source, CLOCK_MONOTONIC, /* usec= */ 0, /* accuracy= */ 0,
+ json_stream_time_callback, s);
+ if (r < 0)
+ goto fail;
+
+ r = sd_event_source_set_priority(s->time_event_source, priority);
+ if (r < 0)
+ goto fail;
+
+ (void) sd_event_source_set_description(s->time_event_source, "json-stream-time");
+
+ /* Initially disabled — only enabled by mark_activity once a timeout is configured. */
+ (void) sd_event_source_set_enabled(s->time_event_source, SD_EVENT_OFF);
+ json_stream_rearm_time_source(s);
+
+ return 0;
+
+fail:
+ json_stream_log_errno(s, r, "Failed to attach event source: %m");
+ json_stream_detach_event(s);
+ return r;
+}
+
+int json_stream_flush(JsonStream *s) {
+ int ret = 0, r;
+
+ assert(s);
+
+ for (;;) {
+ if (s->output_buffer_size == 0 && !s->output_queue)
+ break;
+ if (FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED))
+ return -ECONNRESET;
+
+ r = json_stream_write(s);
+ if (r < 0)
+ return r;
+ if (r > 0) {
+ ret = 1;
+ continue;
+ }
+
+ r = json_stream_wait(s, USEC_INFINITY);
+ if (ERRNO_IS_NEG_TRANSIENT(r))
+ continue;
+ if (r < 0)
+ return json_stream_log_errno(s, r, "Poll failed on fd: %m");
+ assert(r > 0);
+ }
+
+ return ret;
+}
+
+int json_stream_push_fd(JsonStream *s, int fd) {
+ int i;
+
+ assert(s);
+ assert(fd >= 0);
+
+ if (s->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message */
+ return -ENOBUFS;
+
+ if (!GREEDY_REALLOC(s->pushed_fds, s->n_pushed_fds + 1))
+ return -ENOMEM;
+
+ i = (int) s->n_pushed_fds;
+ s->pushed_fds[s->n_pushed_fds++] = fd;
+ return i;
+}
+
+void json_stream_reset_pushed_fds(JsonStream *s) {
+ assert(s);
+
+ close_many(s->pushed_fds, s->n_pushed_fds);
+ s->n_pushed_fds = 0;
+}
+
+int json_stream_peek_input_fd(const JsonStream *s, size_t i) {
+ assert(s);
+
+ if (i >= s->n_input_fds)
+ return -ENXIO;
+
+ return s->input_fds[i];
+}
+
+int json_stream_take_input_fd(JsonStream *s, size_t i) {
+ assert(s);
+
+ if (i >= s->n_input_fds)
+ return -ENXIO;
+
+ return TAKE_FD(s->input_fds[i]);
+}
+
+size_t json_stream_get_n_input_fds(const JsonStream *s) {
+ assert(s);
+ return s->n_input_fds;
+}
+
+void json_stream_close_input_fds(JsonStream *s) {
+ assert(s);
+
+ close_many(s->input_fds, s->n_input_fds);
+ s->input_fds = mfree(s->input_fds);
+ s->n_input_fds = 0;
+}
+
+/* ===== Output formatting ===== */
+
+static int json_stream_format_json(JsonStream *s, sd_json_variant *m) {
+ _cleanup_(erase_and_freep) char *text = NULL;
+ ssize_t sz, r;
+
+ assert(s);
+ assert(m);
+
+ sz = sd_json_variant_format(m, /* flags= */ 0, &text);
+ if (sz < 0)
+ return sz;
+ assert(text[sz] == '\0');
+
+ size_t dsz = json_stream_delimiter_size(s);
+
+ /* Append the framing delimiter after the formatted JSON. For varlink (delimiter ==
+ * NULL) this keeps the trailing NUL already placed by sd_json_variant_format(); for
+ * multi-char delimiters (e.g. "\r\n") we grow the buffer and copy them in. */
+ if (s->delimiter) {
+ if (!GREEDY_REALLOC(text, sz + dsz))
+ return -ENOMEM;
+ memcpy(text + sz, s->delimiter, dsz);
+ }
+
+ if (s->output_buffer_size + sz + dsz > s->buffer_max)
+ return -ENOBUFS;
+
+ if (DEBUG_LOGGING) {
+ _cleanup_(erase_and_freep) char *censored_text = NULL;
+
+ /* Suppress sensitive fields in the debug output */
+ r = sd_json_variant_format(m, SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text);
+ if (r >= 0)
+ json_stream_log(s, "Sending message: %s", censored_text);
+ }
+
+ if (s->output_buffer_size == 0) {
+ if (FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE)) {
+ s->output_buffer = erase_and_free(s->output_buffer);
+ s->flags &= ~JSON_STREAM_OUTPUT_BUFFER_SENSITIVE;
+ }
+
+ free_and_replace(s->output_buffer, text);
+
+ s->output_buffer_size = sz + dsz;
+ s->output_buffer_index = 0;
+
+ } else if (!FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE) && s->output_buffer_index == 0) {
+ if (!GREEDY_REALLOC(s->output_buffer, s->output_buffer_size + sz + dsz))
+ return -ENOMEM;
+
+ memcpy(s->output_buffer + s->output_buffer_size, text, sz + dsz);
+ s->output_buffer_size += sz + dsz;
+ } else {
+ const size_t new_size = s->output_buffer_size + sz + dsz;
+
+ char *n = new(char, new_size);
+ if (!n)
+ return -ENOMEM;
+
+ memcpy(mempcpy(n, s->output_buffer + s->output_buffer_index, s->output_buffer_size), text, sz + dsz);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE))
+ s->output_buffer = erase_and_free(s->output_buffer);
+ else
+ free(s->output_buffer);
+ s->output_buffer = n;
+ s->output_buffer_size = new_size;
+ s->output_buffer_index = 0;
+ }
+
+ if (sd_json_variant_is_sensitive_recursive(m))
+ s->flags |= JSON_STREAM_OUTPUT_BUFFER_SENSITIVE;
+ else
+ text = mfree(text); /* Skip the erase_and_free() destructor declared above */
+
+ return 0;
+}
+
+static int json_stream_format_queue(JsonStream *s) {
+ int r;
+
+ assert(s);
+
+ /* Drain entries out of the output queue and format them into the output buffer. Stop
+ * if there are unwritten output_fds, since adding more would corrupt the fd boundary. */
+
+ while (s->output_queue) {
+ assert(s->n_output_queue > 0);
+
+ if (s->n_output_fds > 0)
+ return 0;
+
+ JsonStreamQueueItem *q = s->output_queue;
+ _cleanup_free_ int *array = NULL;
+
+ if (q->n_fds > 0) {
+ array = newdup(int, q->fds, q->n_fds);
+ if (!array)
+ return -ENOMEM;
+ }
+
+ r = json_stream_format_json(s, q->data);
+ if (r < 0)
+ return r;
+
+ free_and_replace(s->output_fds, array);
+ s->n_output_fds = q->n_fds;
+ q->n_fds = 0;
+
+ LIST_REMOVE(queue, s->output_queue, q);
+ if (!s->output_queue)
+ s->output_queue_tail = NULL;
+ s->n_output_queue--;
+
+ json_stream_queue_item_free(q);
+ }
+
+ return 0;
+}
+
+int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q) {
+ assert(s);
+ assert(q);
+
+ if (s->n_output_queue >= s->queue_max)
+ return -ENOBUFS;
+
+ LIST_INSERT_AFTER(queue, s->output_queue, s->output_queue_tail, q);
+ s->output_queue_tail = q;
+ s->n_output_queue++;
+ return 0;
+}
+
+int json_stream_enqueue(JsonStream *s, sd_json_variant *m) {
+ JsonStreamQueueItem *q;
+
+ assert(s);
+ assert(m);
+
+ /* Fast path: no fds pending and no items currently queued — append directly into the
+ * output buffer to avoid the queue allocation. */
+ if (s->n_pushed_fds == 0 && !s->output_queue)
+ return json_stream_format_json(s, m);
+
+ if (s->n_output_queue >= s->queue_max)
+ return -ENOBUFS;
+
+ q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds);
+ if (!q)
+ return -ENOMEM;
+
+ s->n_pushed_fds = 0; /* fds belong to the queue entry now */
+
+ assert_se(json_stream_enqueue_item(s, q) >= 0);
+ return 0;
+}
+
+int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret) {
+ JsonStreamQueueItem *q;
+
+ assert(s);
+ assert(m);
+ assert(ret);
+
+ q = json_stream_queue_item_new(m, s->pushed_fds, s->n_pushed_fds);
+ if (!q)
+ return -ENOMEM;
+
+ s->n_pushed_fds = 0; /* fds belong to the queue entry now */
+
+ *ret = q;
+ return 0;
+}
+
+/* ===== Write side ===== */
+
+int json_stream_write(JsonStream *s) {
+ ssize_t n;
+ int r;
+
+ assert(s);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING))
+ return 0;
+ if (FLAGS_SET(s->flags, JSON_STREAM_WRITE_DISCONNECTED))
+ return 0;
+
+ /* Drain the deferred queue into the output buffer if possible */
+ r = json_stream_format_queue(s);
+ if (r < 0)
+ return r;
+
+ if (s->output_buffer_size == 0)
+ return 0;
+
+ assert(s->output_fd >= 0);
+
+ if (s->n_output_fds > 0) {
+ struct iovec iov = {
+ .iov_base = s->output_buffer + s->output_buffer_index,
+ .iov_len = s->output_buffer_size,
+ };
+ struct msghdr mh = {
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ .msg_controllen = CMSG_SPACE(sizeof(int) * s->n_output_fds),
+ };
+
+ mh.msg_control = alloca0(mh.msg_controllen);
+
+ struct cmsghdr *control = CMSG_FIRSTHDR(&mh);
+ control->cmsg_len = CMSG_LEN(sizeof(int) * s->n_output_fds);
+ control->cmsg_level = SOL_SOCKET;
+ control->cmsg_type = SCM_RIGHTS;
+ memcpy(CMSG_DATA(control), s->output_fds, sizeof(int) * s->n_output_fds);
+
+ n = sendmsg(s->output_fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
+ } else if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_WRITE))
+ n = write(s->output_fd, s->output_buffer + s->output_buffer_index, s->output_buffer_size);
+ else
+ n = send(s->output_fd, s->output_buffer + s->output_buffer_index, s->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL);
+ if (n < 0) {
+ if (ERRNO_IS_TRANSIENT(errno))
+ return 0;
+
+ if (ERRNO_IS_DISCONNECT(errno)) {
+ s->flags |= JSON_STREAM_WRITE_DISCONNECTED;
+ return 1;
+ }
+
+ return -errno;
+ }
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_OUTPUT_BUFFER_SENSITIVE))
+ explicit_bzero_safe(s->output_buffer + s->output_buffer_index, n);
+
+ s->output_buffer_size -= n;
+
+ if (s->output_buffer_size == 0) {
+ s->output_buffer_index = 0;
+ s->flags &= ~JSON_STREAM_OUTPUT_BUFFER_SENSITIVE;
+ } else
+ s->output_buffer_index += n;
+
+ close_many(s->output_fds, s->n_output_fds);
+ s->n_output_fds = 0;
+
+ /* Refresh activity timestamp on real progress (and rearm the time source if attached
+ * to an event loop). */
+ s->last_activity = json_stream_now(s);
+ json_stream_rearm_time_source(s);
+
+ return 1;
+}
+
+/* ===== Read side ===== */
+
+/* In bounded-reads mode, peek at the socket data to find the delimiter and return a read
+ * size that won't consume past it. This prevents over-reading data that belongs to whatever
+ * protocol the socket is being handed off to. Falls back to byte-by-byte for non-socket fds
+ * where MSG_PEEK is not available. */
+static ssize_t json_stream_peek_message_boundary(JsonStream *s, void *p, size_t rs) {
+ assert(s);
+
+ if (!FLAGS_SET(s->flags, JSON_STREAM_BOUNDED_READS))
+ return rs;
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_READ))
+ return 1;
+
+ ssize_t peeked = recv(s->input_fd, p, rs, MSG_PEEK|MSG_DONTWAIT);
+ if (peeked < 0) {
+ if (!ERRNO_IS_TRANSIENT(errno))
+ return -errno;
+
+ /* Transient error: shouldn't happen but fall back to byte-by-byte */
+ return 1;
+ }
+ /* EOF: the real recv() will also see it; what we return here doesn't matter */
+ if (peeked == 0)
+ return rs;
+
+ size_t dsz = json_stream_delimiter_size(s);
+ void *delim = memmem_safe(p, peeked, s->delimiter ?: "\0", dsz);
+ if (delim)
+ return (ssize_t) ((char*) delim - (char*) p) + dsz;
+
+ return peeked;
+}
+
+int json_stream_read(JsonStream *s) {
+ struct iovec iov;
+ struct msghdr mh;
+ ssize_t rs;
+ ssize_t n;
+ void *p;
+
+ assert(s);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_CONNECTING))
+ return 0;
+ if (s->input_buffer_unscanned > 0)
+ return 0;
+ if (FLAGS_SET(s->flags, JSON_STREAM_READ_DISCONNECTED))
+ return 0;
+
+ if (s->input_buffer_size >= s->buffer_max)
+ return -ENOBUFS;
+
+ assert(s->input_fd >= 0);
+
+ if (MALLOC_SIZEOF_SAFE(s->input_buffer) <= s->input_buffer_index + s->input_buffer_size) {
+ size_t add;
+
+ add = MIN(s->buffer_max - s->input_buffer_size, s->read_chunk);
+
+ if (!FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE) && s->input_buffer_index == 0) {
+ if (!GREEDY_REALLOC(s->input_buffer, s->input_buffer_size + add))
+ return -ENOMEM;
+ } else {
+ char *b;
+
+ b = new(char, s->input_buffer_size + add);
+ if (!b)
+ return -ENOMEM;
+
+ memcpy(b, s->input_buffer + s->input_buffer_index, s->input_buffer_size);
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE))
+ s->input_buffer = erase_and_free(s->input_buffer);
+ else
+ free(s->input_buffer);
+ s->input_buffer = b;
+ s->input_buffer_index = 0;
+ }
+ }
+
+ p = s->input_buffer + s->input_buffer_index + s->input_buffer_size;
+
+ rs = MALLOC_SIZEOF_SAFE(s->input_buffer) - (s->input_buffer_index + s->input_buffer_size);
+
+ /* If a protocol upgrade may follow, ensure we don't consume any post-upgrade bytes by
+ * limiting the read to the next delimiter. Uses MSG_PEEK on sockets, single-byte reads
+ * otherwise. */
+ rs = json_stream_peek_message_boundary(s, p, rs);
+ if (rs < 0)
+ return json_stream_log_errno(s, (int) rs, "Failed to peek message boundary: %m");
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) {
+ iov = IOVEC_MAKE(p, rs);
+
+ if (!s->input_control_buffer) {
+ s->input_control_buffer_size = CMSG_SPACE(sizeof(int) * JSON_STREAM_FDS_MAX);
+ s->input_control_buffer = malloc(s->input_control_buffer_size);
+ if (!s->input_control_buffer)
+ return -ENOMEM;
+ }
+
+ mh = (struct msghdr) {
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ .msg_control = s->input_control_buffer,
+ .msg_controllen = s->input_control_buffer_size,
+ };
+
+ n = recvmsg_safe(s->input_fd, &mh, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
+ } else if (FLAGS_SET(s->flags, JSON_STREAM_PREFER_READ))
+ n = RET_NERRNO(read(s->input_fd, p, rs));
+ else
+ n = RET_NERRNO(recv(s->input_fd, p, rs, MSG_DONTWAIT));
+ if (ERRNO_IS_NEG_TRANSIENT(n))
+ return 0;
+ if (ERRNO_IS_NEG_DISCONNECT(n)) {
+ s->flags |= JSON_STREAM_READ_DISCONNECTED;
+ return 1;
+ }
+ if (n < 0)
+ return n;
+ if (n == 0) { /* EOF */
+ if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT))
+ cmsg_close_all(&mh);
+
+ s->flags |= JSON_STREAM_READ_DISCONNECTED;
+ return 1;
+ }
+
+ if (FLAGS_SET(s->flags, JSON_STREAM_ALLOW_FD_PASSING_INPUT)) {
+ struct cmsghdr *cmsg;
+
+ cmsg = cmsg_find(&mh, SOL_SOCKET, SCM_RIGHTS, (socklen_t) -1);
+ if (cmsg) {
+ size_t add;
+
+ /* fds are only allowed with the first byte of a message; receiving them
+ * mid-stream is a protocol violation. */
+ if (s->input_buffer_size != 0) {
+ cmsg_close_all(&mh);
+ return -EPROTO;
+ }
+
+ add = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
+ if (add > INT_MAX - s->n_input_fds) {
+ cmsg_close_all(&mh);
+ return -EBADF;
+ }
+
+ if (!GREEDY_REALLOC(s->input_fds, s->n_input_fds + add)) {
+ cmsg_close_all(&mh);
+ return -ENOMEM;
+ }
+
+ memcpy_safe(s->input_fds + s->n_input_fds, CMSG_TYPED_DATA(cmsg, int), add * sizeof(int));
+ s->n_input_fds += add;
+ }
+ }
+
+ s->input_buffer_size += n;
+ s->input_buffer_unscanned += n;
+
+ return 1;
+}
+
+/* ===== Parse ===== */
+
+int json_stream_parse(JsonStream *s, sd_json_variant **ret) {
+ char *begin, *e;
+ size_t sz;
+ int r;
+
+ assert(s);
+ assert(ret);
+
+ if (s->input_buffer_unscanned == 0) {
+ *ret = NULL;
+ return 0;
+ }
+
+ assert(s->input_buffer_unscanned <= s->input_buffer_size);
+ assert(s->input_buffer_index + s->input_buffer_size <= MALLOC_SIZEOF_SAFE(s->input_buffer));
+
+ begin = s->input_buffer + s->input_buffer_index;
+
+ size_t dsz = json_stream_delimiter_size(s);
+ e = memmem_safe(begin + s->input_buffer_size - s->input_buffer_unscanned, s->input_buffer_unscanned, s->delimiter ?: "\0", dsz);
+ if (!e) {
+ s->input_buffer_unscanned = 0;
+ *ret = NULL;
+ return 0;
+ }
+
+ sz = e - begin + dsz;
+
+ /* For non-NUL delimiters (e.g. "\r\n" for QMP) sd_json_parse() needs a NUL-terminated
+ * string; overwrite the first delimiter byte with NUL in place. For NUL delimiters
+ * this is a no-op since the byte is already '\0'. */
+ if (s->delimiter)
+ *e = '\0';
+
+ r = sd_json_parse(begin, SD_JSON_PARSE_MUST_BE_OBJECT, ret, /* reterr_line= */ NULL, /* reterr_column= */ NULL);
+ if (FLAGS_SET(s->flags, JSON_STREAM_INPUT_SENSITIVE))
+ explicit_bzero_safe(begin, sz);
+ if (r < 0) {
+ /* Unrecoverable parse failure: drop all buffered data. */
+ s->input_buffer_index = s->input_buffer_size = s->input_buffer_unscanned = 0;
+ return json_stream_log_errno(s, r, "Failed to parse JSON object: %m");
+ }
+
+ if (DEBUG_LOGGING) {
+ _cleanup_(erase_and_freep) char *censored_text = NULL;
+
+ /* Suppress sensitive fields in the debug output */
+ r = sd_json_variant_format(*ret, /* flags= */ SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text);
+ if (r >= 0)
+ json_stream_log(s, "Received message: %s", censored_text);
+ }
+
+ s->input_buffer_size -= sz;
+
+ if (s->input_buffer_size == 0)
+ s->input_buffer_index = 0;
+ else
+ s->input_buffer_index += sz;
+
+ s->input_buffer_unscanned = s->input_buffer_size;
+ return 1;
+}
--- /dev/null
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+#pragma once
+
+#include <sys/socket.h>
+
+#include "sd-forward.h"
+
+#include "list.h"
+
+/* JsonStream provides the transport layer used by sd-varlink (and other consumers like
+ * the QMP client) for exchanging length-delimited JSON messages over a pair of file
+ * descriptors. It owns the input/output buffers, the file-descriptor passing machinery
+ * (SCM_RIGHTS), the deferred output queue, and the read/write/parse step functions. It
+ * does not implement any state machine, dispatch, callback or event-source plumbing —
+ * those concerns belong to the consumer. */
+
+typedef struct JsonStreamQueueItem JsonStreamQueueItem;
+
+typedef enum JsonStreamFlags {
+ JSON_STREAM_BOUNDED_READS = 1u << 0,
+ JSON_STREAM_INPUT_SENSITIVE = 1u << 1,
+ JSON_STREAM_ALLOW_FD_PASSING_INPUT = 1u << 2,
+ JSON_STREAM_ALLOW_FD_PASSING_OUTPUT = 1u << 3,
+ JSON_STREAM_CONNECTING = 1u << 4,
+ JSON_STREAM_GOT_POLLHUP = 1u << 5,
+ JSON_STREAM_WRITE_DISCONNECTED = 1u << 6,
+ JSON_STREAM_READ_DISCONNECTED = 1u << 7,
+ JSON_STREAM_PREFER_READ = 1u << 8,
+ JSON_STREAM_PREFER_WRITE = 1u << 9,
+ JSON_STREAM_OUTPUT_BUFFER_SENSITIVE = 1u << 10,
+} JsonStreamFlags;
+
+/* What the consumer's high-level state machine is currently doing — used by the various
+ * "what should I do right now?" APIs (get_events, wait, should_disconnect) to decide
+ * whether to ask for read events, whether transport death matters, and whether the idle
+ * timeout deadline is currently in force. */
+typedef enum JsonStreamPhase {
+ JSON_STREAM_PHASE_READING, /* waiting for the next inbound message, no deadline */
+ JSON_STREAM_PHASE_AWAITING_REPLY, /* waiting for a reply with the idle timeout deadline */
+ JSON_STREAM_PHASE_IDLE_CLIENT, /* idle client, no in-flight call */
+ JSON_STREAM_PHASE_PENDING_OUTPUT, /* has more output queued, waiting to send */
+ JSON_STREAM_PHASE_OTHER, /* none of the above */
+} JsonStreamPhase;
+
+/* Consumer hooks supplied at construction time:
+ * • phase — queried by get_events / wait / should_disconnect / attach_event's prepare
+ * callback whenever the consumer's current phase is needed.
+ * • dispatch — invoked by attach_event's io and time callbacks after the stream has
+ * consumed the revents, so the consumer can drive its state machine
+ * forward. Should return 0 on success or a negative errno; the stream logs
+ * the failure and continues running. */
+typedef JsonStreamPhase (*json_stream_phase_t)(void *userdata);
+typedef int (*json_stream_dispatch_t)(void *userdata);
+
+typedef struct JsonStreamParams {
+ const char *delimiter; /* message delimiter; NULL → single NUL byte (varlink), e.g. "\r\n" for QMP */
+ size_t buffer_max; /* maximum bytes buffered before -ENOBUFS; 0 = 16 MiB default */
+ size_t read_chunk; /* per-read chunk size; 0 = 64 KiB default */
+ size_t queue_max; /* maximum number of queued output items; 0 = 64 Ki default */
+
+ /* Consumer hooks (see typedefs above). */
+ json_stream_phase_t phase;
+ json_stream_dispatch_t dispatch;
+ void *userdata;
+} JsonStreamParams;
+
+typedef struct JsonStream {
+ char *delimiter; /* message delimiter; NULL → NUL byte (varlink), e.g. "\r\n" for QMP */
+ size_t buffer_max;
+ size_t read_chunk;
+ size_t queue_max;
+
+ char *description;
+
+ int input_fd;
+ int output_fd;
+
+ usec_t timeout; /* relative; USEC_INFINITY = no timeout */
+ usec_t last_activity; /* CLOCK_MONOTONIC */
+
+ /* Cached peer credentials */
+ struct ucred ucred;
+ bool ucred_acquired;
+ int peer_pidfd;
+
+ /* Cached socket address family. -1 = unchecked, AF_UNSPEC = checked-not-socket,
+ * otherwise the resolved family. */
+ int af;
+
+ sd_event *event;
+ sd_event_source *input_event_source;
+ sd_event_source *output_event_source;
+ sd_event_source *time_event_source;
+
+ json_stream_phase_t phase_cb;
+ json_stream_dispatch_t dispatch_cb;
+ void *userdata;
+
+ char *input_buffer;
+ size_t input_buffer_index;
+ size_t input_buffer_size;
+ size_t input_buffer_unscanned;
+
+ void *input_control_buffer;
+ size_t input_control_buffer_size;
+
+ char *output_buffer;
+ size_t output_buffer_index;
+ size_t output_buffer_size;
+
+ int *input_fds;
+ size_t n_input_fds;
+
+ int *output_fds;
+ size_t n_output_fds;
+
+ LIST_HEAD(JsonStreamQueueItem, output_queue);
+ JsonStreamQueueItem *output_queue_tail;
+ size_t n_output_queue;
+
+ int *pushed_fds;
+ size_t n_pushed_fds;
+
+ JsonStreamFlags flags;
+} JsonStream;
+
+int json_stream_init(JsonStream *s, const JsonStreamParams *params);
+void json_stream_done(JsonStream *s);
+
+/* Optional description used as the prefix for the stream's debug log lines (sent/received
+ * messages, POLLHUP detection, async connect completion, etc.). The string is duped. */
+int json_stream_set_description(JsonStream *s, const char *description);
+const char* json_stream_get_description(const JsonStream *s);
+
+/* fd ownership */
+int json_stream_attach_fds(JsonStream *s, int input_fd, int output_fd);
+
+/* Open an AF_UNIX SOCK_STREAM socket and connect to the given filesystem path, attaching
+ * the resulting fd to the stream. Handles paths too long for sockaddr_un by routing through
+ * O_PATH (connect_unix_path()). If the connect() returns EAGAIN/EINPROGRESS the stream's
+ * connecting state is set so that the consumer waits for POLLOUT before treating the
+ * connection as established. Returns 0 on success or successfully started async connect,
+ * negative errno on failure. */
+int json_stream_connect_address(JsonStream *s, const char *address);
+
+/* Adopt a pre-connected pair of fds, ensuring both are non-blocking. Equivalent to
+ * json_stream_attach_fds() but does the fd_nonblock() dance up front, so the caller can
+ * pass in fds without having to know whether they were already configured. */
+int json_stream_connect_fd_pair(JsonStream *s, int input_fd, int output_fd);
+
+bool json_stream_flags_set(const JsonStream *s, JsonStreamFlags flags);
+void json_stream_set_flags(JsonStream *s, JsonStreamFlags flags, bool b);
+
+/* Combines the transport-level disconnect signals (write/read disconnected, buffered
+ * output, POLLHUP, async connect) with the consumer's current phase (queried via the
+ * registered get_phase callback) to answer "should the consumer initiate teardown right
+ * now?". The decision logic mirrors what the original varlink transport did but stays
+ * generic enough for other JSON-line consumers. */
+bool json_stream_should_disconnect(const JsonStream *s);
+
+/* Enable/disable fd passing. These verify the underlying fd is an AF_UNIX socket and
+ * (for input) optionally set SO_PASSRIGHTS. */
+int json_stream_set_allow_fd_passing_input(JsonStream *s, bool enabled, bool with_sockopt);
+int json_stream_set_allow_fd_passing_output(JsonStream *s, bool enabled);
+
+/* Output: enqueue a JSON variant. Fast path concatenates into the output buffer; if
+ * pushed_fds are present or the queue is non-empty the message is queued instead, so that
+ * fd-to-message boundaries are preserved. */
+int json_stream_enqueue(JsonStream *s, sd_json_variant *m);
+
+/* Allocate a queue item carrying `m` and the currently pushed fds. The pushed fds are
+ * transferred to the new item; on success n_pushed_fds is reset to 0. The caller may
+ * later submit the item via json_stream_enqueue_item() or free it. */
+int json_stream_make_queue_item(JsonStream *s, sd_json_variant *m, JsonStreamQueueItem **ret);
+int json_stream_enqueue_item(JsonStream *s, JsonStreamQueueItem *q);
+JsonStreamQueueItem* json_stream_queue_item_free(JsonStreamQueueItem *q);
+DEFINE_TRIVIAL_CLEANUP_FUNC(JsonStreamQueueItem*, json_stream_queue_item_free);
+sd_json_variant** json_stream_queue_item_get_data(JsonStreamQueueItem *q);
+
+/* fd push/peek/take */
+int json_stream_push_fd(JsonStream *s, int fd);
+void json_stream_reset_pushed_fds(JsonStream *s);
+
+int json_stream_peek_input_fd(const JsonStream *s, size_t i);
+int json_stream_take_input_fd(JsonStream *s, size_t i);
+size_t json_stream_get_n_input_fds(const JsonStream *s);
+
+/* Close and free all currently received input fds (used after consuming a message). */
+void json_stream_close_input_fds(JsonStream *s);
+
+/* I/O steps. Same return-value contract as the original varlink_{write,read,parse_message}:
+ * 1 = made progress (call again),
+ * 0 = nothing to do (wait for I/O),
+ * <0 = error. */
+int json_stream_write(JsonStream *s);
+int json_stream_read(JsonStream *s);
+
+/* Extract the next complete JSON message from the input buffer (delimited per
+ * params.delimiter). Returns 1 with *ret set on success, 0 if no full message is
+ * available yet (with *ret == NULL), <0 on parse error. The buffer slot occupied by the
+ * parsed message is erased if input_sensitive was set. */
+int json_stream_parse(JsonStream *s, sd_json_variant **ret);
+
+/* Status accessors used by the consumer's state machine. */
+bool json_stream_has_buffered_input(const JsonStream *s);
+
+/* Compute the poll events the consumer should wait for. The stream queries the consumer's
+ * phase via the registered get_phase callback. In JSON_STREAM_PHASE_READING the stream asks
+ * for POLLIN (provided the input buffer is empty and the read side is still alive); POLLOUT
+ * is added whenever there's pending output. When connecting we only ask for POLLOUT to
+ * learn when the non-blocking connect() completes. */
+int json_stream_get_events(const JsonStream *s);
+
+/* Block on poll() for the configured fds for at most `timeout` µs. Internally updates the
+ * connecting / got_pollhup state based on the seen revents.
+ * 1 = some event was observed (call us again),
+ * 0 = timeout,
+ * <0 = error (negative errno from ppoll_usec). */
+int json_stream_wait(JsonStream *s, usec_t timeout);
+
+/* Block until the output buffer is fully drained (or the write side disconnects).
+ * 1 = some bytes were written during the flush,
+ * 0 = nothing to flush,
+ * -ECONNRESET if the write side became disconnected before everything could be sent,
+ * <0 on other I/O errors. */
+int json_stream_flush(JsonStream *s);
+
+/* Peer credential helpers. All refuse if the stream uses different input/output fds, since
+ * peer credentials are only meaningful for a bidirectional socket.
+ * • acquire_peer_uid/gid/pid/pidfd() query the kernel on first use, cache the result,
+ * and log failures (using the stream's description). They each return 0 on success
+ * with the value in *ret, or a negative errno on failure (kernel error or invalid
+ * field).
+ * • get_peer_ucred() returns the *already-cached* ucred (set via a prior acquire or via
+ * set_peer_ucred()) without triggering a kernel query — returns -ENODATA if nothing is
+ * cached. Used by consumers that want to react to a previously-known ucred without
+ * forcing a fresh query (e.g. teardown bookkeeping). */
+int json_stream_acquire_peer_uid(JsonStream *s, uid_t *ret);
+int json_stream_acquire_peer_gid(JsonStream *s, gid_t *ret);
+int json_stream_acquire_peer_pid(JsonStream *s, pid_t *ret);
+int json_stream_acquire_peer_pidfd(JsonStream *s);
+int json_stream_get_peer_ucred(const JsonStream *s, struct ucred *ret);
+void json_stream_set_peer_ucred(JsonStream *s, const struct ucred *ucred);
+
+/* Per-operation idle timeout. The deadline is computed as last_activity + timeout.
+ * Successful writes refresh last_activity automatically; the consumer should also call
+ * json_stream_mark_activity() at operation start (e.g. when initiating a method call) to
+ * reset the deadline.
+ *
+ * When the deadline elapses the time event source attached via json_stream_attach_event()
+ * fires and the consumer's dispatch callback is invoked. The consumer detects the timeout
+ * by comparing now(CLOCK_MONOTONIC) against json_stream_get_timeout(). */
+void json_stream_set_timeout(JsonStream *s, usec_t timeout);
+void json_stream_mark_activity(JsonStream *s);
+
+/* Returns the absolute deadline (in CLOCK_MONOTONIC microseconds) currently in force for
+ * the consumer's phase, or USEC_INFINITY if no timeout applies (no timeout configured, no
+ * activity yet, or the current phase isn't AWAITING_REPLY). */
+usec_t json_stream_get_timeout(const JsonStream *s);
+
+/* sd-event integration. JsonStream owns the input/output io event sources and the time
+ * event source for its idle timeout, and installs its own internal prepare and io callbacks
+ * on them. The hooks (get_phase, io_dispatch) supplied via JsonStreamParams at construction
+ * are wired up automatically. */
+int json_stream_attach_event(JsonStream *s, sd_event *event, int64_t priority);
+void json_stream_detach_event(JsonStream *s);
+sd_event* json_stream_get_event(const JsonStream *s);
#include "format-util.h"
#include "glyph-util.h"
#include "hashmap.h"
-#include "io-util.h"
-#include "iovec-util.h"
#include "json-util.h"
#include "list.h"
#include "log.h"
#define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 1024U
#define VARLINK_DEFAULT_TIMEOUT_USEC (45U*USEC_PER_SEC)
-#define VARLINK_BUFFER_MAX (16U*1024U*1024U)
-#define VARLINK_READ_SIZE (64U*1024U)
#define VARLINK_COLLECT_MAX 1024U
-#define VARLINK_QUEUE_MAX (64U*1024U)
static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
[VARLINK_IDLE_CLIENT] = "idle-client",
DEFINE_PRIVATE_STRING_TABLE_LOOKUP_TO_STRING(varlink_state, VarlinkState);
-static int varlink_format_queue(sd_varlink *v);
static void varlink_server_test_exit_on_idle(sd_varlink_server *s);
-static VarlinkJsonQueueItem* varlink_json_queue_item_free(VarlinkJsonQueueItem *q) {
- if (!q)
- return NULL;
-
- sd_json_variant_unref(q->data);
- close_many(q->fds, q->n_fds);
-
- return mfree(q);
-}
-
-static VarlinkJsonQueueItem* varlink_json_queue_item_new(sd_json_variant *m, const int fds[], size_t n_fds) {
- VarlinkJsonQueueItem *q;
-
- assert(m);
- assert(fds || n_fds == 0);
-
- q = malloc(offsetof(VarlinkJsonQueueItem, fds) + sizeof(int) * n_fds);
- if (!q)
- return NULL;
-
- *q = (VarlinkJsonQueueItem) {
- .data = sd_json_variant_ref(m),
- .n_fds = n_fds,
- };
-
- memcpy_safe(q->fds, fds, n_fds * sizeof(int));
-
- return TAKE_PTR(q);
-}
-
static void varlink_set_state(sd_varlink *v, VarlinkState state) {
assert(v);
assert(state >= 0 && state < _VARLINK_STATE_MAX);
v->state = state;
}
+/* Map the varlink state machine onto the generic transport-level "phase". The transport
+ * uses this to decide whether to ask for POLLIN, whether the connection is salvageable
+ * after a read/write disconnect, and whether the idle timeout deadline is in force. */
+static JsonStreamPhase varlink_phase(void *userdata) {
+ sd_varlink *v = ASSERT_PTR(userdata);
+
+ /* Client side reading a reply with the per-call deadline in force. */
+ if (IN_SET(v->state,
+ VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE,
+ VARLINK_CALLING, VARLINK_COLLECTING) &&
+ !v->current)
+ return JSON_STREAM_PHASE_AWAITING_REPLY;
+
+ /* Server side reading the next request — no deadline applies. */
+ if (v->state == VARLINK_IDLE_SERVER && !v->current)
+ return JSON_STREAM_PHASE_READING;
+
+ if (v->state == VARLINK_IDLE_CLIENT)
+ return JSON_STREAM_PHASE_IDLE_CLIENT;
+
+ if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
+ return JSON_STREAM_PHASE_PENDING_OUTPUT;
+
+ return JSON_STREAM_PHASE_OTHER;
+}
+
+static int varlink_dispatch(void *userdata) {
+ sd_varlink *v = ASSERT_PTR(userdata);
+ return sd_varlink_process(v);
+}
+
static int varlink_new(sd_varlink **ret) {
- sd_varlink *v;
+ _cleanup_(sd_varlink_unrefp) sd_varlink *v = NULL;
+ int r;
assert(ret);
*v = (sd_varlink) {
.n_ref = 1,
- .input_fd = -EBADF,
- .output_fd = -EBADF,
-
.state = _VARLINK_STATE_INVALID,
-
- .ucred = UCRED_INVALID,
-
- .peer_pidfd = -EBADF,
-
- .timestamp = USEC_INFINITY,
- .timeout = VARLINK_DEFAULT_TIMEOUT_USEC,
-
- .allow_fd_passing_input = -1,
-
- .af = -1,
-
.exec_pidref = PIDREF_NULL,
};
- *ret = v;
+ r = json_stream_init(
+ &v->stream,
+ &(JsonStreamParams) {
+ .phase = varlink_phase,
+ .dispatch = varlink_dispatch,
+ .userdata = v,
+ });
+ if (r < 0)
+ return r;
+
+ json_stream_set_timeout(&v->stream, VARLINK_DEFAULT_TIMEOUT_USEC);
+
+ *ret = TAKE_PTR(v);
return 0;
}
_public_ int sd_varlink_connect_address(sd_varlink **ret, const char *address) {
_cleanup_(sd_varlink_unrefp) sd_varlink *v = NULL;
- union sockaddr_union sockaddr;
int r;
assert_return(ret, -EINVAL);
if (r < 0)
return log_debug_errno(r, "Failed to create varlink object: %m");
- v->input_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
- if (v->input_fd < 0)
- return log_debug_errno(errno, "Failed to create AF_UNIX socket: %m");
-
- v->output_fd = v->input_fd = fd_move_above_stdio(v->input_fd);
- v->af = AF_UNIX;
-
- r = sockaddr_un_set_path(&sockaddr.un, address);
- if (r < 0) {
- if (r != -ENAMETOOLONG)
- return log_debug_errno(r, "Failed to set socket address '%s': %m", address);
-
- /* This is a file system path, and too long to fit into sockaddr_un. Let's connect via O_PATH
- * to this socket. */
-
- r = connect_unix_path(v->input_fd, AT_FDCWD, address);
- } else
- r = RET_NERRNO(connect(v->input_fd, &sockaddr.sa, r));
-
- if (r < 0) {
- if (!IN_SET(r, -EAGAIN, -EINPROGRESS))
- return log_debug_errno(r, "Failed to connect to %s: %m", address);
-
- v->connecting = true; /* We are asynchronously connecting, i.e. the connect() is being
- * processed in the background. As long as that's the case the socket
- * is in a special state: it's there, we can poll it for EPOLLOUT, but
- * if we attempt to write() to it before we see EPOLLOUT we'll get
- * ENOTCONN (and not EAGAIN, like we would for a normal connected
- * socket that isn't writable at the moment). Since ENOTCONN on write()
- * hence can mean two different things (i.e. connection not complete
- * yet vs. already disconnected again), we store as a boolean whether
- * we are still in connect(). */
- }
+ r = json_stream_connect_address(&v->stream, address);
+ if (r < 0)
+ return r;
varlink_set_state(v, VARLINK_IDLE_CLIENT);
if (r < 0)
return log_debug_errno(r, "Failed to create varlink object: %m");
- v->output_fd = v->input_fd = TAKE_FD(pair[0]);
- v->af = AF_UNIX;
+ int conn_fd = TAKE_FD(pair[0]);
+ r = json_stream_attach_fds(&v->stream, conn_fd, conn_fd);
+ if (r < 0)
+ return r;
+
v->exec_pidref = TAKE_PIDREF(pidref);
varlink_set_state(v, VARLINK_IDLE_CLIENT);
if (r < 0)
return log_debug_errno(r, "Failed to create varlink object: %m");
- v->output_fd = v->input_fd = TAKE_FD(pair[0]);
- v->af = AF_UNIX;
+ int conn_fd = TAKE_FD(pair[0]);
+ r = json_stream_attach_fds(&v->stream, conn_fd, conn_fd);
+ if (r < 0)
+ return r;
+
v->exec_pidref = TAKE_PIDREF(pidref);
varlink_set_state(v, VARLINK_IDLE_CLIENT);
if (r < 0)
return log_debug_errno(r, "Failed to create varlink object: %m");
- v->input_fd = TAKE_FD(output_pipe[0]);
- v->output_fd = TAKE_FD(input_pipe[1]);
- v->af = AF_UNSPEC;
+ r = json_stream_attach_fds(&v->stream, TAKE_FD(output_pipe[0]), TAKE_FD(input_pipe[1]));
+ if (r < 0)
+ return r;
+
v->exec_pidref = TAKE_PIDREF(pidref);
varlink_set_state(v, VARLINK_IDLE_CLIENT);
}
_public_ int sd_varlink_connect_fd_pair(sd_varlink **ret, int input_fd, int output_fd, const struct ucred *override_ucred) {
- sd_varlink *v;
+ _cleanup_(sd_varlink_unrefp) sd_varlink *v = NULL;
int r;
assert_return(ret, -EINVAL);
assert_return(input_fd >= 0, -EBADF);
assert_return(output_fd >= 0, -EBADF);
- r = fd_nonblock(input_fd, true);
- if (r < 0)
- return log_debug_errno(r, "Failed to make input fd %d nonblocking: %m", input_fd);
-
- if (input_fd != output_fd) {
- r = fd_nonblock(output_fd, true);
- if (r < 0)
- return log_debug_errno(r, "Failed to make output fd %d nonblocking: %m", output_fd);
- }
-
r = varlink_new(&v);
if (r < 0)
return log_debug_errno(r, "Failed to create varlink object: %m");
- v->input_fd = input_fd;
- v->output_fd = output_fd;
- v->af = -1;
+ r = json_stream_connect_fd_pair(&v->stream, input_fd, output_fd);
+ if (r < 0)
+ return r;
- if (override_ucred) {
- v->ucred = *override_ucred;
- v->ucred_acquired = true;
- }
+ if (override_ucred)
+ json_stream_set_peer_ucred(&v->stream, override_ucred);
varlink_set_state(v, VARLINK_IDLE_CLIENT);
* varlink_connect_address() above, as there we do handle asynchronous connections ourselves and
* avoid doing write() on it before we saw EPOLLOUT for the first time. */
- *ret = v;
+ *ret = TAKE_PTR(v);
return 0;
}
return sd_varlink_connect_fd_pair(ret, fd, fd, /* override_ucred= */ NULL);
}
-static void varlink_detach_event_sources(sd_varlink *v) {
- assert(v);
-
- v->input_event_source = sd_event_source_disable_unref(v->input_event_source);
- v->output_event_source = sd_event_source_disable_unref(v->output_event_source);
- v->time_event_source = sd_event_source_disable_unref(v->time_event_source);
- v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
- v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
-}
-
static void varlink_clear_current(sd_varlink *v) {
assert(v);
v->current_method = NULL;
v->current_reply_flags = 0;
- close_many(v->input_fds, v->n_input_fds);
- v->input_fds = mfree(v->input_fds);
- v->n_input_fds = 0;
+ json_stream_close_input_fds(&v->stream);
- v->previous = varlink_json_queue_item_free(v->previous);
+ v->previous = json_stream_queue_item_free(v->previous);
if (v->sentinel != POINTER_MAX)
v->sentinel = mfree(v->sentinel);
else
static void varlink_clear(sd_varlink *v) {
assert(v);
- varlink_detach_event_sources(v);
-
- if (v->input_fd != v->output_fd) {
- v->input_fd = safe_close(v->input_fd);
- v->output_fd = safe_close(v->output_fd);
- } else
- v->output_fd = v->input_fd = safe_close(v->input_fd);
+ /* Detach event sources first so the kernel no longer has epoll watches on the
+ * stream's fds, then free the stream — json_stream_done() closes the input/output
+ * fds, the cached peer_pidfd, the received input fds, the queued output fds, and
+ * the pushed fds. */
+ sd_varlink_detach_event(v);
varlink_clear_current(v);
- v->input_buffer = v->input_sensitive ? erase_and_free(v->input_buffer) : mfree(v->input_buffer);
- v->output_buffer = v->output_buffer_sensitive ? erase_and_free(v->output_buffer) : mfree(v->output_buffer);
-
- v->input_control_buffer = mfree(v->input_control_buffer);
- v->input_control_buffer_size = 0;
-
- close_many(v->output_fds, v->n_output_fds);
- v->output_fds = mfree(v->output_fds);
- v->n_output_fds = 0;
-
- close_many(v->pushed_fds, v->n_pushed_fds);
- v->pushed_fds = mfree(v->pushed_fds);
- v->n_pushed_fds = 0;
-
- LIST_CLEAR(queue, v->output_queue, varlink_json_queue_item_free);
- v->output_queue_tail = NULL;
- v->n_output_queue = 0;
-
- v->event = sd_event_unref(v->event);
+ json_stream_done(&v->stream);
pidref_done_sigterm_wait(&v->exec_pidref);
-
- v->peer_pidfd = safe_close(v->peer_pidfd);
}
static sd_varlink* varlink_destroy(sd_varlink *v) {
varlink_clear(v);
- free(v->description);
return mfree(v);
}
static int varlink_test_disconnect(sd_varlink *v) {
assert(v);
- /* Tests whether we the connection has been terminated. We are careful to not stop processing it
- * prematurely, since we want to handle half-open connections as well as possible and want to flush
- * out and read data before we close down if we can. */
-
/* Already disconnected? */
if (!VARLINK_STATE_IS_ALIVE(v->state))
return 0;
- /* Wait until connection setup is complete, i.e. until asynchronous connect() completes */
- if (v->connecting)
- return 0;
-
- /* Still something to write and we can write? Stay around */
- if (v->output_buffer_size > 0 && !v->write_disconnected)
+ if (!json_stream_should_disconnect(&v->stream))
return 0;
- /* Both sides gone already? Then there's no need to stick around */
- if (v->read_disconnected && v->write_disconnected)
- goto disconnect;
-
- /* If we are waiting for incoming data but the read side is shut down, disconnect. */
- if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER) && v->read_disconnected)
- goto disconnect;
-
- /* Similar, if are a client that hasn't written anything yet but the write side is dead, also
- * disconnect. We also explicitly check for POLLHUP here since we likely won't notice the write side
- * being down if we never wrote anything. */
- if (v->state == VARLINK_IDLE_CLIENT && (v->write_disconnected || v->got_pollhup))
- goto disconnect;
-
- /* We are on the server side and still want to send out more replies, but we saw POLLHUP already, and
- * either got no buffered bytes to write anymore or already saw a write error. In that case we should
- * shut down the varlink link. */
- if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE) && (v->write_disconnected || v->output_buffer_size == 0) && v->got_pollhup)
- goto disconnect;
-
- return 0;
-
-disconnect:
varlink_set_state(v, VARLINK_PENDING_DISCONNECT);
return 1;
}
static int varlink_write(sd_varlink *v) {
- ssize_t n;
- int r;
-
assert(v);
if (!VARLINK_STATE_IS_ALIVE(v->state))
return 0;
- if (v->connecting) /* Writing while we are still wait for a non-blocking connect() to complete will
- * result in ENOTCONN, hence exit early here */
- return 0;
- if (v->write_disconnected)
- return 0;
-
- /* If needed let's convert some output queue json variants into text form */
- r = varlink_format_queue(v);
- if (r < 0)
- return r;
-
- if (v->output_buffer_size == 0)
- return 0;
-
- assert(v->output_fd >= 0);
-
- if (v->n_output_fds > 0) { /* If we shall send fds along, we must use sendmsg() */
- struct iovec iov = {
- .iov_base = v->output_buffer + v->output_buffer_index,
- .iov_len = v->output_buffer_size,
- };
- struct msghdr mh = {
- .msg_iov = &iov,
- .msg_iovlen = 1,
- .msg_controllen = CMSG_SPACE(sizeof(int) * v->n_output_fds),
- };
-
- mh.msg_control = alloca0(mh.msg_controllen);
-
- struct cmsghdr *control = CMSG_FIRSTHDR(&mh);
- control->cmsg_len = CMSG_LEN(sizeof(int) * v->n_output_fds);
- control->cmsg_level = SOL_SOCKET;
- control->cmsg_type = SCM_RIGHTS;
- memcpy(CMSG_DATA(control), v->output_fds, sizeof(int) * v->n_output_fds);
-
- n = sendmsg(v->output_fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
- } else {
- /* We generally prefer recv()/send() (mostly because of MSG_NOSIGNAL) but also want to be compatible
- * with non-socket IO, hence fall back automatically.
- *
- * Use a local variable to help gcc figure out that we set 'n' in all cases. */
- bool prefer_write = v->prefer_write;
- if (!prefer_write) {
- n = send(v->output_fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size, MSG_DONTWAIT|MSG_NOSIGNAL);
- if (n < 0 && errno == ENOTSOCK)
- prefer_write = v->prefer_write = true;
- }
- if (prefer_write)
- n = write(v->output_fd, v->output_buffer + v->output_buffer_index, v->output_buffer_size);
- }
- if (n < 0) {
- if (errno == EAGAIN)
- return 0;
-
- if (ERRNO_IS_DISCONNECT(errno)) {
- /* If we get informed about a disconnect on write, then let's remember that, but not
- * act on it just yet. Let's wait for read() to report the issue first. */
- v->write_disconnected = true;
- return 1;
- }
-
- return -errno;
- }
-
- if (v->output_buffer_sensitive)
- explicit_bzero_safe(v->output_buffer + v->output_buffer_index, n);
-
- v->output_buffer_size -= n;
-
- if (v->output_buffer_size == 0) {
- v->output_buffer_index = 0;
- v->output_buffer_sensitive = false; /* We can reset the sensitive flag once the buffer is empty */
- } else
- v->output_buffer_index += n;
-
- close_many(v->output_fds, v->n_output_fds);
- v->n_output_fds = 0;
-
- v->timestamp = now(CLOCK_MONOTONIC);
- return 1;
-}
-
-#define VARLINK_FDS_MAX (16U*1024U)
-
-static bool varlink_may_protocol_upgrade(sd_varlink *v) {
- return v->protocol_upgrade || (v->server && FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE));
-}
-
-/* When a protocol upgrade might happen, peek at the socket data to find the \0 message
- * boundary and return a read size that won't consume past it. This prevents over-reading
- * raw post-upgrade data into the varlink input buffer. Falls back to byte-by-byte for
- * non-socket fds where MSG_PEEK is not available. */
-static ssize_t varlink_peek_upgrade_boundary(sd_varlink *v, void *p, size_t rs) {
- assert(v);
-
- if (!varlink_may_protocol_upgrade(v))
- return rs;
-
- if (v->prefer_read)
- return 1;
-
- ssize_t peeked = recv(v->input_fd, p, rs, MSG_PEEK|MSG_DONTWAIT);
- if (peeked < 0) {
- if (errno == ENOTSOCK) {
- v->prefer_read = true;
- return 1; /* Not a socket, fall back to byte-to-byte */
- } else if (!ERRNO_IS_TRANSIENT(errno))
- return -errno;
-
- /* Transient error, this should not happen but fall back to byte-to-byte */
- return 1;
- }
- /* EOF, the real recv() will also get it so what we return does not matter */
- if (peeked == 0)
- return rs;
-
- void *nul_chr = memchr(p, 0, peeked);
- if (nul_chr)
- return (ssize_t) ((char*) nul_chr - (char*) p) + 1;
- return peeked;
+ return json_stream_write(&v->stream);
}
static int varlink_read(sd_varlink *v) {
- struct iovec iov;
- struct msghdr mh;
- ssize_t rs;
- ssize_t n;
- void *p;
-
assert(v);
if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER))
return 0;
- if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
- return 0;
if (v->current)
return 0;
- if (v->input_buffer_unscanned > 0)
- return 0;
- if (v->read_disconnected)
- return 0;
-
- if (v->input_buffer_size >= VARLINK_BUFFER_MAX)
- return -ENOBUFS;
-
- assert(v->input_fd >= 0);
-
- if (MALLOC_SIZEOF_SAFE(v->input_buffer) <= v->input_buffer_index + v->input_buffer_size) {
- size_t add;
-
- add = MIN(VARLINK_BUFFER_MAX - v->input_buffer_size, VARLINK_READ_SIZE);
-
- if (v->input_buffer_index == 0) {
-
- if (!GREEDY_REALLOC(v->input_buffer, v->input_buffer_size + add))
- return -ENOMEM;
-
- } else {
- char *b;
-
- b = new(char, v->input_buffer_size + add);
- if (!b)
- return -ENOMEM;
-
- memcpy(b, v->input_buffer + v->input_buffer_index, v->input_buffer_size);
-
- free_and_replace(v->input_buffer, b);
- v->input_buffer_index = 0;
- }
- }
-
- p = v->input_buffer + v->input_buffer_index + v->input_buffer_size;
-
- rs = MALLOC_SIZEOF_SAFE(v->input_buffer) - (v->input_buffer_index + v->input_buffer_size);
-
- /* When a protocol upgrade is requested we can't consume any post-upgrade data from the socket
- * buffer. Use MSG_PEEK to find the \0 message boundary and only consume up to it. For non-socket
- * fds (pipes) MSG_PEEK is not available, so fall back to byte-by-byte reading. */
- rs = varlink_peek_upgrade_boundary(v, p, rs);
- if (rs < 0)
- return varlink_log_errno(v, rs, "Failed to peek upgrade boundary: %m");
-
- if (v->allow_fd_passing_input > 0) {
- iov = IOVEC_MAKE(p, rs);
-
- /* Allocate the fd buffer on the heap, since we need a lot of space potentially */
- if (!v->input_control_buffer) {
- v->input_control_buffer_size = CMSG_SPACE(sizeof(int) * VARLINK_FDS_MAX);
- v->input_control_buffer = malloc(v->input_control_buffer_size);
- if (!v->input_control_buffer)
- return -ENOMEM;
- }
-
- mh = (struct msghdr) {
- .msg_iov = &iov,
- .msg_iovlen = 1,
- .msg_control = v->input_control_buffer,
- .msg_controllen = v->input_control_buffer_size,
- };
-
- n = recvmsg_safe(v->input_fd, &mh, MSG_DONTWAIT|MSG_CMSG_CLOEXEC);
- } else {
- bool prefer_read = v->prefer_read;
- if (!prefer_read) {
- n = recv(v->input_fd, p, rs, MSG_DONTWAIT);
- if (n < 0)
- n = -errno;
- if (n == -ENOTSOCK)
- prefer_read = v->prefer_read = true;
- }
- if (prefer_read) {
- n = read(v->input_fd, p, rs);
- if (n < 0)
- n = -errno;
- }
- }
- if (ERRNO_IS_NEG_TRANSIENT(n))
- return 0;
- if (ERRNO_IS_NEG_DISCONNECT(n)) {
- v->read_disconnected = true;
- return 1;
- }
- if (n < 0)
- return n;
- if (n == 0) { /* EOF */
-
- if (v->allow_fd_passing_input > 0)
- cmsg_close_all(&mh);
- v->read_disconnected = true;
- return 1;
- }
-
- if (v->allow_fd_passing_input > 0) {
- struct cmsghdr *cmsg;
-
- cmsg = cmsg_find(&mh, SOL_SOCKET, SCM_RIGHTS, (socklen_t) -1);
- if (cmsg) {
- size_t add;
-
- /* We only allow file descriptors to be passed along with the first byte of a
- * message. If they are passed with any other byte this is a protocol violation. */
- if (v->input_buffer_size != 0) {
- cmsg_close_all(&mh);
- return -EPROTO;
- }
-
- add = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
- if (add > INT_MAX - v->n_input_fds) {
- cmsg_close_all(&mh);
- return -EBADF;
- }
-
- if (!GREEDY_REALLOC(v->input_fds, v->n_input_fds + add)) {
- cmsg_close_all(&mh);
- return -ENOMEM;
- }
-
- memcpy_safe(v->input_fds + v->n_input_fds, CMSG_TYPED_DATA(cmsg, int), add * sizeof(int));
- v->n_input_fds += add;
- }
- }
-
- v->input_buffer_size += n;
- v->input_buffer_unscanned += n;
-
- return 1;
+ return json_stream_read(&v->stream);
}
static int varlink_parse_message(sd_varlink *v) {
- const char *e;
- char *begin;
- size_t sz;
int r;
assert(v);
if (v->current)
return 0;
- if (v->input_buffer_unscanned <= 0)
- return 0;
-
- assert(v->input_buffer_unscanned <= v->input_buffer_size);
- assert(v->input_buffer_index + v->input_buffer_size <= MALLOC_SIZEOF_SAFE(v->input_buffer));
-
- begin = v->input_buffer + v->input_buffer_index;
- e = memchr(begin + v->input_buffer_size - v->input_buffer_unscanned, 0, v->input_buffer_unscanned);
- if (!e) {
- v->input_buffer_unscanned = 0;
- return 0;
- }
-
- sz = e - begin + 1;
-
- r = sd_json_parse(begin, SD_JSON_PARSE_MUST_BE_OBJECT, &v->current, /* reterr_line= */ NULL, /* reterr_column= */ NULL);
- if (v->input_sensitive)
- explicit_bzero_safe(begin, sz);
- if (r < 0) {
- /* If we encounter a parse failure flush all data. We cannot possibly recover from this,
- * hence drop all buffered data now. */
- v->input_buffer_index = v->input_buffer_size = v->input_buffer_unscanned = 0;
- return varlink_log_errno(v, r, "Failed to parse JSON object: %m");
- }
+ r = json_stream_parse(&v->stream, &v->current);
+ if (r <= 0)
+ return r;
- if (v->input_sensitive) {
+ if (json_stream_flags_set(&v->stream, JSON_STREAM_INPUT_SENSITIVE)) {
/* Mark the parameters subfield as sensitive right-away, if that's requested */
sd_json_variant *parameters = sd_json_variant_by_key(v->current, "parameters");
if (parameters)
sd_json_variant_sensitive(parameters);
}
- if (DEBUG_LOGGING) {
- _cleanup_(erase_and_freep) char *censored_text = NULL;
-
- /* Suppress sensitive fields in the debug output */
- r = sd_json_variant_format(v->current, /* flags= */ SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text);
- if (r < 0)
- return r;
-
- varlink_log(v, "Received message: %s", censored_text);
- }
-
- v->input_buffer_size -= sz;
-
- if (v->input_buffer_size == 0)
- v->input_buffer_index = 0;
- else
- v->input_buffer_index += sz;
-
- v->input_buffer_unscanned = v->input_buffer_size;
return 1;
}
static int varlink_test_timeout(sd_varlink *v) {
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING))
- return 0;
- if (v->timeout == USEC_INFINITY)
- return 0;
-
- if (now(CLOCK_MONOTONIC) < usec_add(v->timestamp, v->timeout))
+ usec_t deadline = json_stream_get_timeout(&v->stream);
+ if (deadline == USEC_INFINITY || now(CLOCK_MONOTONIC) < deadline)
return 0;
varlink_set_state(v, VARLINK_PENDING_TIMEOUT);
-
return 1;
}
r = sd_varlink_idl_format(interface, &text);
if (r < 0)
- return r;
-
- return sd_varlink_replybo(
- link,
- SD_JSON_BUILD_PAIR_STRING("description", text));
-}
-
-static int varlink_format_json(sd_varlink *v, sd_json_variant *m) {
- _cleanup_(erase_and_freep) char *text = NULL;
- int sz, r;
-
- assert(v);
- assert(m);
-
- sz = sd_json_variant_format(m, /* flags= */ 0, &text);
- if (sz < 0)
- return sz;
- assert(text[sz] == '\0');
-
- if (v->output_buffer_size + sz + 1 > VARLINK_BUFFER_MAX)
- return -ENOBUFS;
-
- if (DEBUG_LOGGING) {
- _cleanup_(erase_and_freep) char *censored_text = NULL;
-
- /* Suppress sensitive fields in the debug output */
- r = sd_json_variant_format(m, SD_JSON_FORMAT_CENSOR_SENSITIVE, &censored_text);
- if (r < 0)
- return r;
-
- varlink_log(v, "Sending message: %s", censored_text);
- }
-
- if (v->output_buffer_size == 0) {
-
- free_and_replace(v->output_buffer, text);
-
- v->output_buffer_size = sz + 1;
- v->output_buffer_index = 0;
-
- } else if (v->output_buffer_index == 0) {
-
- if (!GREEDY_REALLOC(v->output_buffer, v->output_buffer_size + sz + 1))
- return -ENOMEM;
-
- memcpy(v->output_buffer + v->output_buffer_size, text, sz + 1);
- v->output_buffer_size += sz + 1;
- } else {
- char *n;
- const size_t new_size = v->output_buffer_size + sz + 1;
-
- n = new(char, new_size);
- if (!n)
- return -ENOMEM;
-
- memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, sz + 1);
-
- free_and_replace(v->output_buffer, n);
- v->output_buffer_size = new_size;
- v->output_buffer_index = 0;
- }
-
- if (sd_json_variant_is_sensitive_recursive(m))
- v->output_buffer_sensitive = true; /* Propagate sensitive flag */
- else
- text = mfree(text); /* No point in the erase_and_free() destructor declared above */
-
- return 0;
-}
-
-static int varlink_format_queue(sd_varlink *v) {
- int r;
-
- assert(v);
-
- /* Takes entries out of the output queue and formats them into the output buffer. But only if this
- * would not corrupt our fd message boundaries */
-
- while (v->output_queue) {
- assert(v->n_output_queue > 0);
-
- if (v->n_output_fds > 0) /* unwritten fds? if we'd add more we'd corrupt the fd message boundaries, hence wait */
- return 0;
-
- VarlinkJsonQueueItem *q = v->output_queue;
- _cleanup_free_ int *array = NULL;
-
- if (q->n_fds > 0) {
- array = newdup(int, q->fds, q->n_fds);
- if (!array)
- return -ENOMEM;
- }
-
- r = varlink_format_json(v, q->data);
- if (r < 0)
- return r;
-
- /* Take possession of the queue element's fds */
- free_and_replace(v->output_fds, array);
- v->n_output_fds = q->n_fds;
- q->n_fds = 0;
-
- LIST_REMOVE(queue, v->output_queue, q);
- if (!v->output_queue)
- v->output_queue_tail = NULL;
- v->n_output_queue--;
-
- varlink_json_queue_item_free(q);
- }
-
- return 0;
-}
-
-static int varlink_enqueue_item(sd_varlink *v, VarlinkJsonQueueItem *q) {
- assert(v);
- assert(q);
-
- if (v->n_output_queue >= VARLINK_QUEUE_MAX)
- return -ENOBUFS;
-
- LIST_INSERT_AFTER(queue, v->output_queue, v->output_queue_tail, q);
- v->output_queue_tail = q;
- v->n_output_queue++;
- return 0;
-}
-
-static int varlink_enqueue_json(sd_varlink *v, sd_json_variant *m) {
- VarlinkJsonQueueItem *q;
-
- assert(v);
- assert(m);
-
- /* If there are no file descriptors to be queued and no queue entries yet we can shortcut things and
- * append this entry directly to the output buffer */
- if (v->n_pushed_fds == 0 && !v->output_queue)
- return varlink_format_json(v, m);
-
- if (v->n_output_queue >= VARLINK_QUEUE_MAX)
- return -ENOBUFS;
-
- /* Otherwise add a queue entry for this */
- q = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds);
- if (!q)
- return -ENOMEM;
-
- v->n_pushed_fds = 0; /* fds now belong to the queue entry */
-
- /* We already checked the precondition ourselves so this call cannot fail. */
- assert_se(varlink_enqueue_item(v, q) >= 0);
+ return r;
- return 0;
+ return sd_varlink_replybo(
+ link,
+ SD_JSON_BUILD_PAIR_STRING("description", text));
}
static int varlink_dispatch_method(sd_varlink *v) {
(flags & SD_VARLINK_METHOD_ONEWAY) ? VARLINK_PROCESSING_METHOD_ONEWAY :
VARLINK_PROCESSING_METHOD);
- v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
-
assert(v->server);
+ /* Reset the per-call upgrade marker on every dispatch — a previous method's
+ * UPGRADE flag must not bleed into this one. The transport-level bounded reads
+ * stay active for SD_VARLINK_SERVER_UPGRADABLE servers regardless. */
+ v->protocol_upgrade = FLAGS_SET(flags, SD_VARLINK_METHOD_UPGRADE);
+ json_stream_set_flags(
+ &v->stream,
+ JSON_STREAM_BOUNDED_READS,
+ v->protocol_upgrade || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE));
+
/* First consult user supplied method implementations */
callback = hashmap_get(v->server->methods, method);
if (!callback) {
r = sd_varlink_error_errno(v, r);
} else if (v->sentinel) {
if (v->previous) {
- r = varlink_enqueue_item(v, v->previous);
+ r = json_stream_enqueue_item(&v->stream, v->previous);
if (r >= 0) {
TAKE_PTR(v->previous);
varlink_set_state(v, VARLINK_PROCESSED_METHOD);
return 0;
}
-static void handle_revents(sd_varlink *v, int revents) {
- assert(v);
-
- if (v->connecting) {
- /* If we have seen POLLOUT or POLLHUP on a socket we are asynchronously waiting a connect()
- * to complete on, we know we are ready. We don't read the connection error here though,
- * we'll get the error on the next read() or write(). */
- if ((revents & (POLLOUT|POLLHUP)) == 0)
- return;
-
- varlink_log(v, "Asynchronous connection completed.");
- v->connecting = false;
- } else {
- /* Note that we don't care much about POLLIN/POLLOUT here, we'll just try reading and writing
- * what we can. However, we do care about POLLHUP to detect connection termination even if we
- * momentarily don't want to read nor write anything. */
-
- if (!FLAGS_SET(revents, POLLHUP))
- return;
-
- varlink_log(v, "Got POLLHUP from socket.");
- v->got_pollhup = true;
- }
-}
-
_public_ int sd_varlink_wait(sd_varlink *v, uint64_t timeout) {
- int r, events;
- usec_t t;
-
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- r = sd_varlink_get_timeout(v, &t);
- if (r < 0)
- return r;
- if (t != USEC_INFINITY)
- t = usec_sub_unsigned(t, now(CLOCK_MONOTONIC));
-
- t = MIN(t, timeout);
-
- events = sd_varlink_get_events(v);
- if (events < 0)
- return events;
-
- struct pollfd pollfd[2];
- size_t n_poll_fd = 0;
-
- if (v->input_fd == v->output_fd) {
- pollfd[n_poll_fd++] = (struct pollfd) {
- .fd = v->input_fd,
- .events = events,
- };
- } else {
- pollfd[n_poll_fd++] = (struct pollfd) {
- .fd = v->input_fd,
- .events = events & POLLIN,
- };
- pollfd[n_poll_fd++] = (struct pollfd) {
- .fd = v->output_fd,
- .events = events & POLLOUT,
- };
- };
-
- r = ppoll_usec(pollfd, n_poll_fd, t);
- if (ERRNO_IS_NEG_TRANSIENT(r)) /* Treat EINTR as not a timeout, but also nothing happened, and
- * the caller gets a chance to call back into us */
- return 1;
- if (r <= 0)
- return r;
-
- /* Merge the seen events into one */
- int revents = 0;
- FOREACH_ARRAY(p, pollfd, n_poll_fd)
- revents |= p->revents;
-
- handle_revents(v, revents);
- return 1;
+ return json_stream_wait(&v->stream, timeout);
}
_public_ int sd_varlink_is_idle(sd_varlink *v) {
}
_public_ int sd_varlink_get_fd(sd_varlink *v) {
-
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- if (v->input_fd != v->output_fd)
+
+ int input_fd = v->stream.input_fd;
+ int output_fd = v->stream.output_fd;
+
+ if (input_fd != output_fd)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "Separate file descriptors for input/output set.");
- if (v->input_fd < 0)
+ if (input_fd < 0)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid fd.");
- return v->input_fd;
+ return input_fd;
}
_public_ int sd_varlink_get_input_fd(sd_varlink *v) {
-
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- if (v->input_fd < 0)
+
+ int input_fd = v->stream.input_fd;
+ if (input_fd < 0)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid input fd.");
- return v->input_fd;
+ return input_fd;
}
_public_ int sd_varlink_get_output_fd(sd_varlink *v) {
-
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- if (v->output_fd < 0)
+
+ int output_fd = v->stream.output_fd;
+ if (output_fd < 0)
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADF), "No valid output fd.");
- return v->output_fd;
+ return output_fd;
}
_public_ int sd_varlink_get_events(sd_varlink *v) {
- int ret = 0;
-
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- if (v->connecting) /* When processing an asynchronous connect(), we only wait for EPOLLOUT, which
- * tells us that the connection is now complete. Before that we should neither
- * write() or read() from the fd. */
- return EPOLLOUT;
-
- if (!v->read_disconnected &&
- IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER) &&
- !v->current &&
- v->input_buffer_unscanned <= 0)
- ret |= EPOLLIN;
-
- if (!v->write_disconnected &&
- (v->output_queue ||
- v->output_buffer_size > 0))
- ret |= EPOLLOUT;
-
- return ret;
+ return json_stream_get_events(&v->stream);
}
_public_ int sd_varlink_get_timeout(sd_varlink *v, uint64_t *ret) {
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING) &&
- v->timeout != USEC_INFINITY) {
- if (ret)
- *ret = usec_add(v->timestamp, v->timeout);
- return 1;
- } else {
- if (ret)
- *ret = USEC_INFINITY;
- return 0;
- }
+ usec_t deadline = json_stream_get_timeout(&v->stream);
+
+ if (ret)
+ *ret = deadline;
+
+ return deadline != USEC_INFINITY;
}
_public_ int sd_varlink_flush(sd_varlink *v) {
- int ret = 0, r;
-
assert_return(v, -EINVAL);
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- for (;;) {
- if (v->output_buffer_size == 0 && !v->output_queue)
- break;
- if (v->write_disconnected)
- return -ECONNRESET;
-
- r = varlink_write(v);
- if (r < 0)
- return r;
- if (r > 0) {
- ret = 1;
- continue;
- }
-
- r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY);
- if (ERRNO_IS_NEG_TRANSIENT(r))
- continue;
- if (r < 0)
- return varlink_log_errno(v, r, "Poll failed on fd: %m");
- assert(r > 0);
-
- handle_revents(v, r);
- }
-
- return ret;
+ return json_stream_flush(&v->stream);
}
static void varlink_detach_server(sd_varlink *v) {
if (!v->server)
return;
+ /* Only touch by_uid for connections we already counted in count_connection() —
+ * those are exactly the ones for which the ucred was acquired or injected during
+ * sd_varlink_server_add_connection_pair(). Don't trigger an acquire from here. */
+ struct ucred ucred;
if (v->server->by_uid &&
- v->ucred_acquired &&
- uid_is_valid(v->ucred.uid)) {
+ json_stream_get_peer_ucred(&v->stream, &ucred) >= 0 &&
+ uid_is_valid(ucred.uid)) {
unsigned c;
- c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(v->ucred.uid)));
+ c = PTR_TO_UINT(hashmap_get(v->server->by_uid, UID_TO_PTR(ucred.uid)));
assert(c > 0);
if (c == 1)
- (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(v->ucred.uid));
+ (void) hashmap_remove(v->server->by_uid, UID_TO_PTR(ucred.uid));
else
- (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(v->ucred.uid), UINT_TO_PTR(c - 1));
+ (void) hashmap_replace(v->server->by_uid, UID_TO_PTR(ucred.uid), UINT_TO_PTR(c - 1));
}
assert(v->server->n_connections > 0);
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
/* No state change here, this is one-way only after all */
- v->timestamp = now(CLOCK_MONOTONIC);
+ json_stream_mark_activity(&v->stream);
return 0;
}
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
varlink_set_state(v, VARLINK_AWAITING_REPLY);
v->n_pending++;
- v->timestamp = now(CLOCK_MONOTONIC);
+ json_stream_mark_activity(&v->stream);
return 0;
}
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
v->n_pending++;
- v->timestamp = now(CLOCK_MONOTONIC);
+ json_stream_mark_activity(&v->stream);
return 0;
}
* that we can assign a new reply shortly. */
varlink_clear_current(v);
- r = varlink_enqueue_json(v, request);
+ r = json_stream_enqueue(&v->stream, request);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
varlink_set_state(v, VARLINK_CALLING);
v->n_pending++;
- v->timestamp = now(CLOCK_MONOTONIC);
+ json_stream_mark_activity(&v->stream);
while (v->state == VARLINK_CALLING) {
r = sd_varlink_process(v);
/* Ensure no post-upgrade data was consumed into our input buffer (we ensure this via MSG_PEEK or
* byte-to-byte) and refuse the upgrade rather than silently losing the data. */
- if (v->input_buffer_size != 0)
+ if (json_stream_has_buffered_input(&v->stream))
return varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
"Unexpected buffered data during protocol upgrade, refusing.");
+ _cleanup_close_ int input_fd = TAKE_FD(v->stream.input_fd),
+ output_fd = TAKE_FD(v->stream.output_fd);
+
/* Pass the connection fds to the caller, it owns them now. Reset to blocking mode
* since callers of the upgraded protocol will generally expect normal blocking
- * semantics. */
- r = fd_nonblock(v->input_fd, false);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m");
- if (v->input_fd != v->output_fd) {
- r = fd_nonblock(v->output_fd, false);
+ * semantics. For bidirectional sockets (input_fd == output_fd), dup the fd so that
+ * callers always get two independent fds they can close separately. */
+ if (input_fd == output_fd) {
+ output_fd = fcntl(input_fd, F_DUPFD_CLOEXEC, 3);
+ if (output_fd < 0)
+ return varlink_log_errno(v, errno, "Failed to dup upgraded connection fd: %m");
+ } else {
+ r = fd_nonblock(output_fd, false);
if (r < 0)
return varlink_log_errno(v, r, "Failed to set output fd to blocking mode: %m");
}
- /* For bidirectional sockets (input_fd == output_fd), dup the fd so that callers
- * always get two independent fds they can close separately. */
- if (v->input_fd == v->output_fd) {
- v->output_fd = fcntl(v->input_fd, F_DUPFD_CLOEXEC, 3);
- if (v->output_fd < 0)
- return varlink_log_errno(v, errno, "Failed to dup upgraded connection fd: %m");
- }
+ r = fd_nonblock(input_fd, false);
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to set input fd to blocking mode: %m");
/* Hand out requested fds, shut down unwanted directions. */
if (ret_input_fd)
- *ret_input_fd = TAKE_FD(v->input_fd);
- else {
- (void) shutdown(v->input_fd, SHUT_RD);
- v->input_fd = safe_close(v->input_fd);
- }
+ *ret_input_fd = TAKE_FD(input_fd);
+ else
+ (void) shutdown(input_fd, SHUT_RD);
if (ret_output_fd)
- *ret_output_fd = TAKE_FD(v->output_fd);
- else {
- (void) shutdown(v->output_fd, SHUT_WR);
- v->output_fd = safe_close(v->output_fd);
- }
+ *ret_output_fd = TAKE_FD(output_fd);
+ else
+ (void) shutdown(output_fd, SHUT_WR);
return 0;
}
return varlink_log_errno(v, r, "Failed to build json message: %m");
v->protocol_upgrade = true;
+ json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, true);
r = varlink_call_internal(v, m);
if (r < 0) {
v->protocol_upgrade = false;
+ json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, false);
return r;
}
/* ensure we did not consume any data from the upgraded protocol */
- assert(v->input_buffer_size == 0);
+ assert(!json_stream_has_buffered_input(&v->stream));
sd_json_variant *e = sd_json_variant_by_key(v->current, "error"),
*p = sd_json_variant_by_key(v->current, "parameters");
finish:
v->protocol_upgrade = false;
+ json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, false);
assert(v->n_pending == 1);
v->n_pending--;
return r;
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
varlink_set_state(v, VARLINK_COLLECTING);
v->n_pending++;
- v->timestamp = now(CLOCK_MONOTONIC);
+ json_stream_mark_activity(&v->stream);
for (;;) {
while (v->state == VARLINK_COLLECTING) {
if (more && v->sentinel) {
if (v->previous) {
- r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true);
+ r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true);
if (r < 0)
return r;
- r = varlink_enqueue_item(v, v->previous);
+ r = json_stream_enqueue_item(&v->stream, v->previous);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
}
- v->previous = varlink_json_queue_item_new(m, v->pushed_fds, v->n_pushed_fds);
- if (!v->previous)
- return -ENOMEM;
+ r = json_stream_make_queue_item(&v->stream, m, &v->previous);
+ if (r < 0)
+ return r;
- v->n_pushed_fds = 0; /* fds now belong to the queue entry */
return 1;
}
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
* client). In normal operation this cannot happen because the client waits for our reply before
* sending raw data, and we set protocol_upgrade=true in dispatch to limit subsequent reads to
* single bytes. But a misbehaving client could pipeline data early. */
- if (v->input_buffer_size > 0)
+ if (json_stream_has_buffered_input(&v->stream))
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBADMSG),
"Unexpected buffered data from client during protocol upgrade.");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
/* Flush the reply to the socket before stealing the fds. The reply must be fully written
* before the caller starts speaking the upgraded protocol. */
- for (;;) {
- r = varlink_write(v);
- if (r < 0) {
- varlink_log_errno(v, r, "Failed to flush reply: %m");
- goto disconnect;
- }
- if (v->output_buffer_size == 0 && !v->output_queue)
- break;
- if (v->write_disconnected) {
- r = varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET),
- "Write disconnected during upgrade reply flush.");
- goto disconnect;
- }
-
- r = fd_wait_for_event(v->output_fd, POLLOUT, USEC_INFINITY);
- if (ERRNO_IS_NEG_TRANSIENT(r))
- continue;
- if (r < 0) {
- varlink_log_errno(v, r, "Failed to wait for writable fd: %m");
- goto disconnect;
- }
- assert(r > 0);
-
- handle_revents(v, r);
+ r = json_stream_flush(&v->stream);
+ if (r < 0) {
+ varlink_log_errno(v, r, "Failed to flush reply before protocol upgrade: %m");
+ goto disconnect;
}
/* Detach from the event loop before stealing the fds */
- varlink_detach_event_sources(v);
+ sd_varlink_detach_event(v);
/* Now hand the original FDs over to the caller, from this point on we have nothing to do with the
* connection anymore, it's up to the caller and we close the connection below */
* rollback the fds. Note that this is implicitly called whenever an error reply is sent, see
* below. */
- close_many(v->pushed_fds, v->n_pushed_fds);
- v->n_pushed_fds = 0;
+ json_stream_reset_pushed_fds(&v->stream);
return 0;
}
return varlink_log_errno(v, SYNTHETIC_ERRNO(EBUSY), "Connection busy.");
if (v->previous) {
- r = sd_json_variant_set_field_boolean(&v->previous->data, "continues", true);
+ r = sd_json_variant_set_field_boolean(json_stream_queue_item_get_data(v->previous), "continues", true);
if (r < 0)
return r;
/* If we have a previous reply still ready make sure we queue it before the error. We only
* ever set "previous" if we're in a streaming method so we pass more=true unconditionally
* here as we know we're still going to queue an error afterwards. */
- r = varlink_enqueue_item(v, v->previous);
+ r = json_stream_enqueue_item(&v->stream, v->previous);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_enqueue_json(v, m);
+ r = json_stream_enqueue(&v->stream, m);
if (r < 0)
return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
return 0;
}
-static int varlink_acquire_ucred(sd_varlink *v) {
- int r;
-
- assert(v);
-
- if (v->ucred_acquired)
- return 0;
-
- /* If we are connected asymmetrically, let's refuse, since it's not clear if caller wants to know
- * peer on read or write fd */
- if (v->input_fd != v->output_fd)
- return -EBADF;
-
- r = getpeercred(v->input_fd, &v->ucred);
- if (r < 0)
- return r;
-
- v->ucred_acquired = true;
- return 0;
-}
-
_public_ int sd_varlink_get_peer_uid(sd_varlink *v, uid_t *ret) {
- int r;
-
assert_return(v, -EINVAL);
assert_return(ret, -EINVAL);
- r = varlink_acquire_ucred(v);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to acquire credentials: %m");
-
- if (!uid_is_valid(v->ucred.uid))
- return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer UID is invalid.");
-
- *ret = v->ucred.uid;
- return 0;
+ return json_stream_acquire_peer_uid(&v->stream, ret);
}
_public_ int sd_varlink_get_peer_gid(sd_varlink *v, gid_t *ret) {
- int r;
-
assert_return(v, -EINVAL);
assert_return(ret, -EINVAL);
- r = varlink_acquire_ucred(v);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to acquire credentials: %m");
-
- if (!gid_is_valid(v->ucred.gid))
- return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer GID is invalid.");
-
- *ret = v->ucred.gid;
- return 0;
+ return json_stream_acquire_peer_gid(&v->stream, ret);
}
_public_ int sd_varlink_get_peer_pid(sd_varlink *v, pid_t *ret) {
- int r;
-
assert_return(v, -EINVAL);
assert_return(ret, -EINVAL);
- r = varlink_acquire_ucred(v);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to acquire credentials: %m");
-
- if (!pid_is_valid(v->ucred.pid))
- return varlink_log_errno(v, SYNTHETIC_ERRNO(ENODATA), "Peer uid is invalid.");
-
- *ret = v->ucred.pid;
- return 0;
+ return json_stream_acquire_peer_pid(&v->stream, ret);
}
_public_ int sd_varlink_get_peer_pidfd(sd_varlink *v) {
assert_return(v, -EINVAL);
- if (v->peer_pidfd >= 0)
- return v->peer_pidfd;
-
- if (v->input_fd != v->output_fd)
- return -EBADF;
-
- v->peer_pidfd = getpeerpidfd(v->input_fd);
- if (v->peer_pidfd < 0)
- return varlink_log_errno(v, v->peer_pidfd, "Failed to acquire pidfd of peer: %m");
-
- return v->peer_pidfd;
+ return json_stream_acquire_peer_pidfd(&v->stream);
}
_public_ int sd_varlink_set_relative_timeout(sd_varlink *v, uint64_t timeout) {
assert_return(v, -EINVAL);
/* If set to 0, reset to default value */
- v->timeout = timeout == 0 ? VARLINK_DEFAULT_TIMEOUT_USEC : timeout;
+ json_stream_set_timeout(&v->stream, timeout == 0 ? VARLINK_DEFAULT_TIMEOUT_USEC : timeout);
return 0;
}
_public_ int sd_varlink_set_description(sd_varlink *v, const char *description) {
assert_return(v, -EINVAL);
- return free_and_strdup(&v->description, description);
+ return json_stream_set_description(&v->stream, description);
}
_public_ const char* sd_varlink_get_description(sd_varlink *v) {
assert_return(v, NULL);
- return v->description;
-}
-
-static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
- sd_varlink *v = ASSERT_PTR(userdata);
-
- assert(s);
-
- handle_revents(v, revents);
- (void) sd_varlink_process(v);
-
- return 1;
-}
-
-static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
- sd_varlink *v = ASSERT_PTR(userdata);
-
- assert(s);
-
- (void) sd_varlink_process(v);
- return 1;
+ return json_stream_get_description(&v->stream);
}
static int defer_callback(sd_event_source *s, void *userdata) {
return 1;
}
-static int prepare_callback(sd_event_source *s, void *userdata) {
- sd_varlink *v = ASSERT_PTR(userdata);
- int r, e;
- usec_t until;
- bool have_timeout;
-
- assert(s);
-
- e = sd_varlink_get_events(v);
- if (e < 0)
- return e;
-
- if (v->input_event_source == v->output_event_source)
- /* Same fd for input + output */
- r = sd_event_source_set_io_events(v->input_event_source, e);
- else {
- r = sd_event_source_set_io_events(v->input_event_source, e & EPOLLIN);
- if (r >= 0)
- r = sd_event_source_set_io_events(v->output_event_source, e & EPOLLOUT);
- }
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to set source events: %m");
-
- r = sd_varlink_get_timeout(v, &until);
- if (r < 0)
- return r;
- have_timeout = r > 0;
-
- if (have_timeout) {
- r = sd_event_source_set_time(v->time_event_source, until);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to set source time: %m");
- }
-
- r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to enable event source: %m");
-
- return 1;
-}
-
static int quit_callback(sd_event_source *event, void *userdata) {
sd_varlink *v = ASSERT_PTR(userdata);
int r;
assert_return(v, -EINVAL);
- assert_return(!v->event, -EBUSY);
-
- if (e)
- v->event = sd_event_ref(e);
- else {
- r = sd_event_default(&v->event);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to create event source: %m");
- }
-
- r = sd_event_add_time(v->event, &v->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, v);
- if (r < 0)
- goto fail;
+ assert_return(!json_stream_get_event(&v->stream), -EBUSY);
- r = sd_event_source_set_priority(v->time_event_source, priority);
+ r = json_stream_attach_event(&v->stream, e, priority);
if (r < 0)
- goto fail;
+ return r;
- (void) sd_event_source_set_description(v->time_event_source, "varlink-time");
+ sd_event *event = json_stream_get_event(&v->stream);
- r = sd_event_add_exit(v->event, &v->quit_event_source, quit_callback, v);
+ r = sd_event_add_exit(event, &v->quit_event_source, quit_callback, v);
if (r < 0)
goto fail;
(void) sd_event_source_set_description(v->quit_event_source, "varlink-quit");
- r = sd_event_add_io(v->event, &v->input_event_source, v->input_fd, 0, io_callback, v);
- if (r < 0)
- goto fail;
-
- r = sd_event_source_set_prepare(v->input_event_source, prepare_callback);
- if (r < 0)
- goto fail;
-
- r = sd_event_source_set_priority(v->input_event_source, priority);
- if (r < 0)
- goto fail;
-
- (void) sd_event_source_set_description(v->input_event_source, "varlink-input");
-
- if (v->input_fd == v->output_fd)
- v->output_event_source = sd_event_source_ref(v->input_event_source);
- else {
- r = sd_event_add_io(v->event, &v->output_event_source, v->output_fd, 0, io_callback, v);
- if (r < 0)
- goto fail;
-
- r = sd_event_source_set_priority(v->output_event_source, priority);
- if (r < 0)
- goto fail;
-
- (void) sd_event_source_set_description(v->output_event_source, "varlink-output");
- }
-
- r = sd_event_add_defer(v->event, &v->defer_event_source, defer_callback, v);
+ r = sd_event_add_defer(event, &v->defer_event_source, defer_callback, v);
if (r < 0)
goto fail;
if (!v)
return;
- varlink_detach_event_sources(v);
-
- v->event = sd_event_unref(v->event);
+ v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
+ v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
+ json_stream_detach_event(&v->stream);
}
_public_ sd_event* sd_varlink_get_event(sd_varlink *v) {
assert_return(v, NULL);
- return v->event;
+ return json_stream_get_event(&v->stream);
}
_public_ int sd_varlink_push_fd(sd_varlink *v, int fd) {
- int i;
-
assert_return(v, -EINVAL);
assert_return(fd >= 0, -EBADF);
/* Takes an fd to send along with the *next* varlink message sent via this varlink connection. This
* takes ownership of the specified fd. Use varlink_dup_fd() below to duplicate the fd first. */
- if (!v->allow_fd_passing_output)
+ if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_OUTPUT))
return -EPERM;
- if (v->n_pushed_fds >= SCM_MAX_FD) /* Kernel doesn't support more than 253 fds per message, refuse early hence */
- return -ENOBUFS;
-
- if (!GREEDY_REALLOC(v->pushed_fds, v->n_pushed_fds + 1))
- return -ENOMEM;
-
- i = (int) v->n_pushed_fds;
- v->pushed_fds[v->n_pushed_fds++] = fd;
- return i;
+ return json_stream_push_fd(&v->stream, fd);
}
_public_ int sd_varlink_push_dup_fd(sd_varlink *v, int fd) {
/* Returns one of the file descriptors that were received along with the current message. This does
* not duplicate the fd nor invalidate it, it hence remains in our possession. */
- if (v->allow_fd_passing_input <= 0)
+ if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_INPUT))
return -EPERM;
- if (i >= v->n_input_fds)
- return -ENXIO;
-
- return v->input_fds[i];
+ return json_stream_peek_input_fd(&v->stream, i);
}
_public_ int sd_varlink_peek_dup_fd(sd_varlink *v, size_t i) {
* we'll invalidate the reference to it under our possession. If called twice in a row will return
* -EBADF */
- if (v->allow_fd_passing_input <= 0)
+ if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_INPUT))
return -EPERM;
- if (i >= v->n_input_fds)
- return -ENXIO;
-
- return TAKE_FD(v->input_fds[i]);
+ return json_stream_take_input_fd(&v->stream, i);
}
_public_ int sd_varlink_get_n_fds(sd_varlink *v) {
assert_return(v, -EINVAL);
- if (v->allow_fd_passing_input <= 0)
+ if (!json_stream_flags_set(&v->stream, JSON_STREAM_ALLOW_FD_PASSING_INPUT))
return -EPERM;
- return (int) v->n_input_fds;
-}
-
-static int verify_unix_socket(sd_varlink *v) {
- assert(v);
-
- /* Returns:
- * • 0 if this is an AF_UNIX socket
- * • -ENOTSOCK if this is not a socket at all
- * • -ENOMEDIUM if this is a socket, but not an AF_UNIX socket
- *
- * Reminder:
- * • v->af is < 0 if we haven't checked what kind of address family the thing is yet.
- * • v->af == AF_UNSPEC if we checked but it's not a socket
- * • otherwise: v->af contains the address family we determined */
-
- if (v->af < 0) {
- /* If we have distinct input + output fds, we don't consider ourselves to be connected via a regular
- * AF_UNIX socket. */
- if (v->input_fd != v->output_fd) {
- v->af = AF_UNSPEC;
- return -ENOTSOCK;
- }
-
- struct stat st;
-
- if (fstat(v->input_fd, &st) < 0)
- return -errno;
- if (!S_ISSOCK(st.st_mode)) {
- v->af = AF_UNSPEC;
- return -ENOTSOCK;
- }
-
- v->af = socket_get_family(v->input_fd);
- if (v->af < 0)
- return v->af;
- }
-
- return v->af == AF_UNIX ? 0 :
- v->af == AF_UNSPEC ? -ENOTSOCK : -ENOMEDIUM;
+ return (int) json_stream_get_n_input_fds(&v->stream);
}
_public_ int sd_varlink_set_allow_fd_passing_input(sd_varlink *v, int b) {
- int r;
-
assert_return(v, -EINVAL);
- if (v->allow_fd_passing_input >= 0 && (v->allow_fd_passing_input > 0) == !!b)
- return 0;
-
- r = verify_unix_socket(v);
- if (r < 0) {
- assert(v->allow_fd_passing_input <= 0);
-
- if (!b) {
- v->allow_fd_passing_input = false;
- return 0;
- }
-
- return r;
- }
-
- if (!v->server || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT)) {
- r = setsockopt_int(v->input_fd, SOL_SOCKET, SO_PASSRIGHTS, !!b);
- if (r < 0 && !ERRNO_IS_NEG_NOT_SUPPORTED(r))
- log_debug_errno(r, "Failed to set SO_PASSRIGHTS socket option: %m");
- }
+ /* Server connections that haven't opted into FD_PASSING_INPUT_STRICT skip the
+ * per-connection SO_PASSRIGHTS setsockopt — the listening server already configured
+ * the socket option once at listen time. */
+ bool with_sockopt = !v->server || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_FD_PASSING_INPUT_STRICT);
- v->allow_fd_passing_input = !!b;
- return 1;
+ return json_stream_set_allow_fd_passing_input(&v->stream, !!b, with_sockopt);
}
_public_ int sd_varlink_set_allow_fd_passing_output(sd_varlink *v, int b) {
- int r;
-
assert_return(v, -EINVAL);
- if (v->allow_fd_passing_output == !!b)
- return 0;
-
- r = verify_unix_socket(v);
- if (r < 0)
- return r;
-
- v->allow_fd_passing_output = b;
- return 1;
+ return json_stream_set_allow_fd_passing_output(&v->stream, !!b);
}
_public_ int sd_varlink_set_input_sensitive(sd_varlink *v) {
assert_return(v, -EINVAL);
- v->input_sensitive = true;
+ json_stream_set_flags(&v->stream, JSON_STREAM_INPUT_SENSITIVE, true);
return 0;
}
v->server = sd_varlink_server_ref(server);
sd_varlink_ref(v);
- v->input_fd = input_fd;
- v->output_fd = output_fd;
+ r = json_stream_attach_fds(&v->stream, input_fd, output_fd);
+ if (r < 0)
+ return r;
+
if (server->flags & SD_VARLINK_SERVER_INHERIT_USERDATA)
v->userdata = server->userdata;
- if (ucred_acquired) {
- v->ucred = ucred;
- v->ucred_acquired = true;
- }
+ /* If the server might receive a protocol upgrade method, switch the input path to
+ * byte-bounded reads so we don't accidentally consume post-upgrade bytes. */
+ if (FLAGS_SET(server->flags, SD_VARLINK_SERVER_UPGRADABLE))
+ json_stream_set_flags(&v->stream, JSON_STREAM_BOUNDED_READS, true);
+
+ if (ucred_acquired)
+ json_stream_set_peer_ucred(&v->stream, &ucred);
_cleanup_free_ char *desc = NULL;
if (asprintf(&desc, "%s-%i-%i", varlink_server_description(server), input_fd, output_fd) >= 0)
- v->description = TAKE_PTR(desc);
+ json_stream_set_description(&v->stream, desc);
(void) sd_varlink_set_allow_fd_passing_input(v, FLAGS_SET(server->flags, SD_VARLINK_SERVER_ALLOW_FD_PASSING_INPUT));
(void) sd_varlink_set_allow_fd_passing_output(v, FLAGS_SET(server->flags, SD_VARLINK_SERVER_ALLOW_FD_PASSING_OUTPUT));
r = sd_varlink_attach_event(v, server->event, server->event_priority);
if (r < 0) {
varlink_log_errno(v, r, "Failed to attach new connection: %m");
- TAKE_FD(v->input_fd); /* take the fd out of the connection again */
- TAKE_FD(v->output_fd);
+ /* Detach the fds from the connection so the caller (the connect callback)
+ * can decide what to do with them. The original fd value(s) the caller
+ * passed in are still owned by the caller; we just stop the connection
+ * from closing them on shutdown. */
+ TAKE_FD(v->stream.input_fd);
+ TAKE_FD(v->stream.output_fd);
sd_varlink_close(v);
return r;
}