/* Client side states */
VARLINK_IDLE_CLIENT,
VARLINK_AWAITING_REPLY,
+ VARLINK_AWAITING_REPLY_MORE,
VARLINK_CALLING,
VARLINK_CALLED,
VARLINK_PROCESSING_REPLY,
VARLINK_PROCESSING_METHOD_MORE,
VARLINK_PROCESSING_METHOD_ONEWAY,
VARLINK_PROCESSED_METHOD,
- VARLINK_PROCESSED_METHOD_MORE,
VARLINK_PENDING_METHOD,
VARLINK_PENDING_METHOD_MORE,
IN_SET(state, \
VARLINK_IDLE_CLIENT, \
VARLINK_AWAITING_REPLY, \
+ VARLINK_AWAITING_REPLY_MORE, \
VARLINK_CALLING, \
VARLINK_CALLED, \
VARLINK_PROCESSING_REPLY, \
VARLINK_PROCESSING_METHOD_MORE, \
VARLINK_PROCESSING_METHOD_ONEWAY, \
VARLINK_PROCESSED_METHOD, \
- VARLINK_PROCESSED_METHOD_MORE, \
VARLINK_PENDING_METHOD, \
VARLINK_PENDING_METHOD_MORE)
static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
[VARLINK_IDLE_CLIENT] = "idle-client",
[VARLINK_AWAITING_REPLY] = "awaiting-reply",
+ [VARLINK_AWAITING_REPLY_MORE] = "awaiting-reply-more",
[VARLINK_CALLING] = "calling",
[VARLINK_CALLED] = "called",
[VARLINK_PROCESSING_REPLY] = "processing-reply",
[VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more",
[VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway",
[VARLINK_PROCESSED_METHOD] = "processed-method",
- [VARLINK_PROCESSED_METHOD_MORE] = "processed-method-more",
[VARLINK_PENDING_METHOD] = "pending-method",
[VARLINK_PENDING_METHOD_MORE] = "pending-method-more",
[VARLINK_PENDING_DISCONNECT] = "pending-disconnect",
assert(ret);
- /* Here use new0 as the below structured initializer is nested. */
- v = new0(Varlink, 1);
+ v = new(Varlink, 1);
if (!v)
return -ENOMEM;
if (v->fd < 0)
return -errno;
+ v->fd = fd_move_above_stdio(v->fd);
+
if (connect(v->fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0) {
if (!IN_SET(errno, EAGAIN, EINPROGRESS))
return -errno;
assert(v);
v->io_event_source = sd_event_source_disable_unref(v->io_event_source);
-
v->time_event_source = sd_event_source_disable_unref(v->time_event_source);
-
v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source);
-
v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source);
}
goto disconnect;
/* If we are waiting for incoming data but the read side is shut down, disconnect. */
- if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
+ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
goto disconnect;
/* Similar, if are a client that hasn't written anything yet but the write side is dead, also
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
return 0;
if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
return 0;
static int varlink_test_timeout(Varlink *v) {
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
return 0;
if (v->timeout == USEC_INFINITY)
return 0;
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
return 0;
if (!v->current)
return 0;
goto invalid;
}
+ /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
+ if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+ goto invalid;
+
+ /* An error is final */
if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
goto invalid;
if (r < 0)
goto invalid;
- if (v->state == VARLINK_AWAITING_REPLY) {
+ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) {
varlink_set_state(v, VARLINK_PROCESSING_REPLY);
if (v->reply_callback) {
v->current = json_variant_unref(v->current);
if (v->state == VARLINK_PROCESSING_REPLY) {
+
assert(v->n_pending > 0);
- v->n_pending--;
- varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
+ if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+ v->n_pending--;
+
+ varlink_set_state(v,
+ FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
+ v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
}
} else {
assert(v->state == VARLINK_CALLING);
-
- if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
- goto invalid;
-
varlink_set_state(v, VARLINK_CALLED);
}
varlink_set_state(v, VARLINK_PENDING_METHOD);
break;
- case VARLINK_PROCESSED_METHOD_MORE: /* One reply for a "more" message was sent, more to come */
case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */
varlink_set_state(v, VARLINK_PENDING_METHOD_MORE);
break;
usec_t t;
assert_return(v, -EINVAL);
- assert_return(!v->server, -ENOTTY);
if (v->state == VARLINK_DISCONNECTED)
return -ENOTCONN;
return EPOLLOUT;
if (!v->read_disconnected &&
- IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
+ IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
!v->current &&
v->input_buffer_unscanned <= 0)
ret |= EPOLLIN;
if (v->state == VARLINK_DISCONNECTED)
return -ENOTCONN;
- if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) &&
+ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
v->timeout != USEC_INFINITY) {
if (ret)
*ret = usec_add(v->timestamp, v->timeout);
r = json_variant_format(m, 0, &text);
if (r < 0)
return r;
+ assert(text[r] == '\0');
if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX)
return -ENOBUFS;
if (v->state == VARLINK_DISCONNECTED)
return -ENOTCONN;
+
+ /* We allow enqueuing multiple method calls at once! */
if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
return -EBUSY;
if (v->state == VARLINK_DISCONNECTED)
return -ENOTCONN;
+
+ /* We allow enqueing multiple method calls at once! */
if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY))
return -EBUSY;
return varlink_invoke(v, method, parameters);
}
+int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) {
+ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL;
+ int r;
+
+ assert_return(v, -EINVAL);
+ assert_return(method, -EINVAL);
+
+ if (v->state == VARLINK_DISCONNECTED)
+ return -ENOTCONN;
+ /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We
+ * thus insist on an idle client here. */
+ if (v->state != VARLINK_IDLE_CLIENT)
+ return -EBUSY;
+
+ r = varlink_sanitize_parameters(¶meters);
+ if (r < 0)
+ return r;
+
+ r = json_build(&m, JSON_BUILD_OBJECT(
+ JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
+ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
+ JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
+ if (r < 0)
+ return r;
+
+ r = varlink_enqueue_json(v, m);
+ if (r < 0)
+ return r;
+
+
+ varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE);
+ v->n_pending++;
+ v->timestamp = now(CLOCK_MONOTONIC);
+
+ return 0;
+}
+
+int varlink_observeb(Varlink *v, const char *method, ...) {
+ _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
+ va_list ap;
+ int r;
+
+ assert_return(v, -EINVAL);
+
+ va_start(ap, method);
+ r = json_buildv(¶meters, ap);
+ va_end(ap);
+
+ if (r < 0)
+ return r;
+
+ return varlink_observe(v, method, parameters);
+}
+
int varlink_call(
Varlink *v,
const char *method,
Varlink *v = userdata;
int r, e;
usec_t until;
+ bool have_timeout;
assert(s);
assert(v);
r = varlink_get_timeout(v, &until);
if (r < 0)
return r;
- if (r > 0) {
+ have_timeout = r > 0;
+
+ if (have_timeout) {
r = sd_event_source_set_time(v->time_event_source, until);
if (r < 0)
return r;
}
- r = sd_event_source_set_enabled(v->time_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF);
+ r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF);
if (r < 0)
return r;
return r;
}
-
void varlink_detach_event(Varlink *v) {
if (!v)
return;
varlink_set_state(v, VARLINK_IDLE_SERVER);
- r = varlink_attach_event(v, server->event, server->event_priority);
- if (r < 0) {
- varlink_log_errno(v, r, "Failed to attach new connection: %m");
- v->fd = -1; /* take the fd out of the connection again */
- varlink_close(v);
- return r;
+ if (server->event) {
+ r = varlink_attach_event(v, server->event, server->event_priority);
+ if (r < 0) {
+ varlink_log_errno(v, r, "Failed to attach new connection: %m");
+ v->fd = -1; /* take the fd out of the connection again */
+ varlink_close(v);
+ return r;
+ }
}
if (ret)
if (fd < 0)
return -errno;
+ fd = fd_move_above_stdio(fd);
+
(void) sockaddr_un_unlink(&sockaddr.un);
RUN_WITH_UMASK(~m & 0777)
int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) {
va_list ap;
- int r;
+ int r = 0;
assert_return(s, -EINVAL);
r = varlink_server_bind_method(s, method, callback);
if (r < 0)
- return r;
+ break;
}
+ va_end(ap);
- return 0;
+ return r;
}
int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) {
}
unsigned varlink_server_connections_max(VarlinkServer *s) {
- struct rlimit rl;
+ int dts;
/* If a server is specified, return the setting for that server, otherwise the default value */
if (s)
return s->connections_max;
- assert_se(getrlimit(RLIMIT_NOFILE, &rl) >= 0);
+ dts = getdtablesize();
+ assert_se(dts > 0);
/* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */
- if (VARLINK_DEFAULT_CONNECTIONS_MAX > rl.rlim_cur / 4 * 3)
- return rl.rlim_cur / 4 * 3;
+ if (VARLINK_DEFAULT_CONNECTIONS_MAX > (unsigned) dts / 4 * 3)
+ return dts / 4 * 3;
return VARLINK_DEFAULT_CONNECTIONS_MAX;
}