VARLINK_AWAITING_REPLY_MORE,
VARLINK_CALLING,
VARLINK_CALLED,
+ VARLINK_COLLECTING,
+ VARLINK_COLLECTING_REPLY,
VARLINK_PROCESSING_REPLY,
/* Server side states */
VARLINK_AWAITING_REPLY_MORE, \
VARLINK_CALLING, \
VARLINK_CALLED, \
+ VARLINK_COLLECTING, \
+ VARLINK_COLLECTING_REPLY, \
VARLINK_PROCESSING_REPLY, \
VARLINK_IDLE_SERVER, \
VARLINK_PROCESSING_METHOD, \
VarlinkReply reply_callback;
JsonVariant *current;
+ JsonVariant *current_collected;
VarlinkReplyFlags current_reply_flags;
VarlinkSymbol *current_method;
bool exit_on_idle;
};
-typedef struct VarlinkCollectContext {
- JsonVariant *parameters;
- const char *error_id;
- VarlinkReplyFlags flags;
-} VarlinkCollectContext ;
-
static const char* const varlink_state_table[_VARLINK_STATE_MAX] = {
[VARLINK_IDLE_CLIENT] = "idle-client",
[VARLINK_AWAITING_REPLY] = "awaiting-reply",
[VARLINK_AWAITING_REPLY_MORE] = "awaiting-reply-more",
[VARLINK_CALLING] = "calling",
[VARLINK_CALLED] = "called",
+ [VARLINK_COLLECTING] = "collecting",
+ [VARLINK_COLLECTING_REPLY] = "collecting-reply",
[VARLINK_PROCESSING_REPLY] = "processing-reply",
[VARLINK_IDLE_SERVER] = "idle-server",
[VARLINK_PROCESSING_METHOD] = "processing-method",
/* Clears the currently processed incoming message */
v->current = json_variant_unref(v->current);
+ v->current_collected = json_variant_unref(v->current_collected);
v->current_method = NULL;
v->current_reply_flags = 0;
goto disconnect;
/* If we are waiting for incoming data but the read side is shut down, disconnect. */
- if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected)
+ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER) && v->read_disconnected)
goto disconnect;
/* Similar, if are a client that hasn't written anything yet but the write side is dead, also
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER))
return 0;
if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */
return 0;
static int varlink_test_timeout(Varlink *v) {
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING))
return 0;
if (v->timeout == USEC_INFINITY)
return 0;
assert(v);
- if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING))
return 0;
if (!v->current)
return 0;
}
/* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */
- if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
+ if (!IN_SET(v->state, VARLINK_AWAITING_REPLY_MORE, VARLINK_COLLECTING) && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES))
goto invalid;
/* An error is final */
FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE :
v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY);
}
- } else {
+ } else if (v->state == VARLINK_COLLECTING)
+ varlink_set_state(v, VARLINK_COLLECTING_REPLY);
+ else {
assert(v->state == VARLINK_CALLING);
varlink_set_state(v, VARLINK_CALLED);
}
return EPOLLOUT;
if (!v->read_disconnected &&
- IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) &&
+ IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING, VARLINK_IDLE_SERVER) &&
!v->current &&
v->input_buffer_unscanned <= 0)
ret |= EPOLLIN;
if (v->state == VARLINK_DISCONNECTED)
return varlink_log_errno(v, SYNTHETIC_ERRNO(ENOTCONN), "Not connected.");
- if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) &&
+ if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_COLLECTING) &&
v->timeout != USEC_INFINITY) {
if (ret)
*ret = usec_add(v->timestamp, v->timeout);
return varlink_call_and_log(v, method, parameters, ret_parameters);
}
-static void varlink_collect_context_free(VarlinkCollectContext *cc) {
- assert(cc);
-
- json_variant_unref(cc->parameters);
- free((char *)cc->error_id);
-}
-
-static int collect_callback(
- Varlink *v,
- JsonVariant *parameters,
- const char *error_id,
- VarlinkReplyFlags flags,
- void *userdata) {
-
- VarlinkCollectContext *context = ASSERT_PTR(userdata);
- int r;
-
- assert(v);
-
- context->flags = flags;
- /* If we hit an error, we will drop all collected replies and just return the error_id and flags in varlink_collect() */
- if (error_id) {
- context->error_id = error_id;
-
- json_variant_unref(context->parameters);
- context->parameters = json_variant_ref(parameters);
-
- return 0;
- }
-
- if (json_variant_elements(context->parameters) >= VARLINK_COLLECT_MAX)
- return varlink_log_errno(v, SYNTHETIC_ERRNO(E2BIG), "Number of reply messages grew too large (%zu) while collecting.", json_variant_elements(context->parameters));
-
- r = json_variant_append_array(&context->parameters, parameters);
- if (r < 0)
- return varlink_log_errno(v, r, "Failed to append JSON object to array: %m");
-
- return 1;
-}
-
-int varlink_collect(
+int varlink_collect_full(
Varlink *v,
const char *method,
JsonVariant *parameters,
const char **ret_error_id,
VarlinkReplyFlags *ret_flags) {
- _cleanup_(varlink_collect_context_free) VarlinkCollectContext context = {};
+ _cleanup_(json_variant_unrefp) JsonVariant *m = NULL, *collected = NULL;
int r;
assert_return(v, -EINVAL);
* that we can assign a new reply shortly. */
varlink_clear_current(v);
- r = varlink_bind_reply(v, collect_callback);
+ r = varlink_sanitize_parameters(¶meters);
if (r < 0)
- return varlink_log_errno(v, r, "Failed to bind collect callback");
+ return varlink_log_errno(v, r, "Failed to sanitize parameters: %m");
- varlink_set_userdata(v, &context);
- r = varlink_observe(v, method, parameters);
+ r = json_build(&m, JSON_BUILD_OBJECT(
+ JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)),
+ JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)),
+ JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true))));
if (r < 0)
- return varlink_log_errno(v, r, "Failed to collect varlink method: %m");
-
- while (v->state == VARLINK_AWAITING_REPLY_MORE) {
+ return varlink_log_errno(v, r, "Failed to build json message: %m");
- r = varlink_process(v);
- if (r < 0)
- return r;
+ r = varlink_enqueue_json(v, m);
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to enqueue json message: %m");
- /* If we get an error from any of the replies, return immediately with just the error_id and flags*/
- if (context.error_id) {
+ varlink_set_state(v, VARLINK_COLLECTING);
+ v->n_pending++;
+ v->timestamp = now(CLOCK_MONOTONIC);
- /* If caller doesn't ask for the error string, then let's return an error code in case of failure */
- if (!ret_error_id)
- return varlink_error_to_errno(context.error_id, context.parameters);
+ for (;;) {
+ while (v->state == VARLINK_COLLECTING) {
+ r = varlink_process(v);
+ if (r < 0)
+ return r;
+ if (r > 0)
+ continue;
- if (ret_parameters)
- *ret_parameters = TAKE_PTR(context.parameters);
- if (ret_error_id)
- *ret_error_id = TAKE_PTR(context.error_id);
- if (ret_flags)
- *ret_flags = context.flags;
- return 0;
+ r = varlink_wait(v, USEC_INFINITY);
+ if (r < 0)
+ return r;
}
- if (r > 0)
- continue;
+ switch (v->state) {
- r = varlink_wait(v, USEC_INFINITY);
- if (r < 0)
- return r;
- }
+ case VARLINK_COLLECTING_REPLY: {
+ assert(v->current);
- switch (v->state) {
+ JsonVariant *e = json_variant_by_key(v->current, "error"),
+ *p = json_variant_by_key(v->current, "parameters");
- case VARLINK_IDLE_CLIENT:
- break;
+ if (e) {
+ if (!ret_error_id)
+ return varlink_error_to_errno(json_variant_string(e), p);
- case VARLINK_PENDING_DISCONNECT:
- case VARLINK_DISCONNECTED:
- return varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET), "Connection was closed.");
+ if (ret_parameters)
+ *ret_parameters = p;
+ if (ret_error_id)
+ *ret_error_id = e ? json_variant_string(e) : NULL;
+ if (ret_flags)
+ *ret_flags = v->current_reply_flags;
- case VARLINK_PENDING_TIMEOUT:
- return varlink_log_errno(v, SYNTHETIC_ERRNO(ETIME), "Connection timed out.");
+ return 1;
+ }
- default:
- assert_not_reached();
- }
+ if (json_variant_elements(collected) >= VARLINK_COLLECT_MAX)
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(E2BIG), "Number of reply messages grew too large (%zu) while collecting.", json_variant_elements(collected));
- if (!ret_error_id && context.error_id)
- return varlink_error_to_errno(context.error_id, context.parameters);
+ r = json_variant_append_array(&collected, p);
+ if (r < 0)
+ return varlink_log_errno(v, r, "Failed to append JSON object to array: %m");
- if (ret_parameters)
- *ret_parameters = TAKE_PTR(context.parameters);
- if (ret_error_id)
- *ret_error_id = TAKE_PTR(context.error_id);
- if (ret_flags)
- *ret_flags = context.flags;
- return 1;
+ if (FLAGS_SET(v->current_reply_flags, VARLINK_REPLY_CONTINUES)) {
+ /* There's more to collect, continue */
+ varlink_clear_current(v);
+ varlink_set_state(v, VARLINK_COLLECTING);
+ continue;
+ }
+
+ varlink_set_state(v, VARLINK_IDLE_CLIENT);
+ assert(v->n_pending == 1);
+ v->n_pending--;
+
+ if (ret_parameters)
+ /* Install the collection array in the connection object, so that we can hand
+ * out a pointer to it without passing over ownership, to make it work more
+ * alike regular method call replies */
+ *ret_parameters = v->current_collected = TAKE_PTR(collected);
+ if (ret_error_id)
+ *ret_error_id = NULL;
+ if (ret_flags)
+ *ret_flags = v->current_reply_flags;
+
+ return 1;
+ }
+
+ case VARLINK_PENDING_DISCONNECT:
+ case VARLINK_DISCONNECTED:
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(ECONNRESET), "Connection was closed.");
+
+ case VARLINK_PENDING_TIMEOUT:
+ return varlink_log_errno(v, SYNTHETIC_ERRNO(ETIME), "Connection timed out.");
+
+ default:
+ assert_not_reached();
+ }
+ }
}
int varlink_collectb(
const char *method,
JsonVariant **ret_parameters,
const char **ret_error_id,
- VarlinkReplyFlags *ret_flags, ...) {
+ ...) {
_cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL;
va_list ap;
assert_return(v, -EINVAL);
- va_start(ap, ret_flags);
+ va_start(ap, ret_error_id);
r = json_buildv(¶meters, ap);
va_end(ap);
if (r < 0)
return varlink_log_errno(v, r, "Failed to build json message: %m");
- return varlink_collect(v, method, parameters, ret_parameters, ret_error_id, ret_flags);
+ return varlink_collect_full(v, method, parameters, ret_parameters, ret_error_id, NULL);
}
int varlink_reply(Varlink *v, JsonVariant *parameters) {