X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=src%2Fshared%2Fvarlink.c;h=ee4fb9e843961105ec478fc4fe133cc8f7a3f451;hb=62092b2fae149956a804136256b1d69f53451bac;hp=7719a7d0214eca39afb6d8dca72159a703baa6be;hpb=e6042f682f9ff29674964d147721f7bd3735aa66;p=thirdparty%2Fsystemd.git diff --git a/src/shared/varlink.c b/src/shared/varlink.c index 7719a7d0214..ee4fb9e8439 100644 --- a/src/shared/varlink.c +++ b/src/shared/varlink.c @@ -29,6 +29,7 @@ typedef enum VarlinkState { /* Client side states */ VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY, + VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_CALLED, VARLINK_PROCESSING_REPLY, @@ -39,7 +40,6 @@ typedef enum VarlinkState { VARLINK_PROCESSING_METHOD_MORE, VARLINK_PROCESSING_METHOD_ONEWAY, VARLINK_PROCESSED_METHOD, - VARLINK_PROCESSED_METHOD_MORE, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE, @@ -63,6 +63,7 @@ typedef enum VarlinkState { IN_SET(state, \ VARLINK_IDLE_CLIENT, \ VARLINK_AWAITING_REPLY, \ + VARLINK_AWAITING_REPLY_MORE, \ VARLINK_CALLING, \ VARLINK_CALLED, \ VARLINK_PROCESSING_REPLY, \ @@ -71,7 +72,6 @@ typedef enum VarlinkState { VARLINK_PROCESSING_METHOD_MORE, \ VARLINK_PROCESSING_METHOD_ONEWAY, \ VARLINK_PROCESSED_METHOD, \ - VARLINK_PROCESSED_METHOD_MORE, \ VARLINK_PENDING_METHOD, \ VARLINK_PENDING_METHOD_MORE) @@ -185,6 +185,7 @@ struct VarlinkServer { static const char* const varlink_state_table[_VARLINK_STATE_MAX] = { [VARLINK_IDLE_CLIENT] = "idle-client", [VARLINK_AWAITING_REPLY] = "awaiting-reply", + [VARLINK_AWAITING_REPLY_MORE] = "awaiting-reply-more", [VARLINK_CALLING] = "calling", [VARLINK_CALLED] = "called", [VARLINK_PROCESSING_REPLY] = "processing-reply", @@ -193,7 +194,6 @@ static const char* const varlink_state_table[_VARLINK_STATE_MAX] = { [VARLINK_PROCESSING_METHOD_MORE] = "processing-method-more", [VARLINK_PROCESSING_METHOD_ONEWAY] = "processing-method-oneway", [VARLINK_PROCESSED_METHOD] = "processed-method", - [VARLINK_PROCESSED_METHOD_MORE] = "processed-method-more", [VARLINK_PENDING_METHOD] = "pending-method", [VARLINK_PENDING_METHOD_MORE] = "pending-method-more", [VARLINK_PENDING_DISCONNECT] = "pending-disconnect", @@ -246,8 +246,7 @@ static int varlink_new(Varlink **ret) { assert(ret); - /* Here use new0 as the below structured initializer is nested. */ - v = new0(Varlink, 1); + v = new(Varlink, 1); if (!v) return -ENOMEM; @@ -288,6 +287,8 @@ int varlink_connect_address(Varlink **ret, const char *address) { if (v->fd < 0) return -errno; + v->fd = fd_move_above_stdio(v->fd); + if (connect(v->fd, &sockaddr.sa, SOCKADDR_UN_LEN(sockaddr.un)) < 0) { if (!IN_SET(errno, EAGAIN, EINPROGRESS)) return -errno; @@ -342,11 +343,8 @@ static void varlink_detach_event_sources(Varlink *v) { assert(v); v->io_event_source = sd_event_source_disable_unref(v->io_event_source); - v->time_event_source = sd_event_source_disable_unref(v->time_event_source); - v->quit_event_source = sd_event_source_disable_unref(v->quit_event_source); - v->defer_event_source = sd_event_source_disable_unref(v->defer_event_source); } @@ -406,7 +404,7 @@ static int varlink_test_disconnect(Varlink *v) { goto disconnect; /* If we are waiting for incoming data but the read side is shut down, disconnect. */ - if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected) + if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && v->read_disconnected) goto disconnect; /* Similar, if are a client that hasn't written anything yet but the write side is dead, also @@ -479,7 +477,7 @@ static int varlink_read(Varlink *v) { assert(v); - if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER)) + if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER)) return 0; if (v->connecting) /* read() on a socket while we are in connect() will fail with EINVAL, hence exit early here */ return 0; @@ -597,7 +595,7 @@ static int varlink_parse_message(Varlink *v) { static int varlink_test_timeout(Varlink *v) { assert(v); - if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING)) + if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING)) return 0; if (v->timeout == USEC_INFINITY) return 0; @@ -674,7 +672,7 @@ static int varlink_dispatch_reply(Varlink *v) { assert(v); - if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING)) + if (!IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING)) return 0; if (!v->current) return 0; @@ -716,6 +714,11 @@ static int varlink_dispatch_reply(Varlink *v) { goto invalid; } + /* Replies with 'continue' set are only OK if we set 'more' when the method call was initiated */ + if (v->state != VARLINK_AWAITING_REPLY_MORE && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) + goto invalid; + + /* An error is final */ if (error && FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) goto invalid; @@ -723,7 +726,7 @@ static int varlink_dispatch_reply(Varlink *v) { if (r < 0) goto invalid; - if (v->state == VARLINK_AWAITING_REPLY) { + if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE)) { varlink_set_state(v, VARLINK_PROCESSING_REPLY); if (v->reply_callback) { @@ -735,17 +738,18 @@ static int varlink_dispatch_reply(Varlink *v) { v->current = json_variant_unref(v->current); if (v->state == VARLINK_PROCESSING_REPLY) { + assert(v->n_pending > 0); - v->n_pending--; - varlink_set_state(v, v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY); + if (!FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) + v->n_pending--; + + varlink_set_state(v, + FLAGS_SET(flags, VARLINK_REPLY_CONTINUES) ? VARLINK_AWAITING_REPLY_MORE : + v->n_pending == 0 ? VARLINK_IDLE_CLIENT : VARLINK_AWAITING_REPLY); } } else { assert(v->state == VARLINK_CALLING); - - if (FLAGS_SET(flags, VARLINK_REPLY_CONTINUES)) - goto invalid; - varlink_set_state(v, VARLINK_CALLED); } @@ -879,7 +883,6 @@ static int varlink_dispatch_method(Varlink *v) { varlink_set_state(v, VARLINK_PENDING_METHOD); break; - case VARLINK_PROCESSED_METHOD_MORE: /* One reply for a "more" message was sent, more to come */ case VARLINK_PROCESSING_METHOD_MORE: /* No reply for a "more" message was sent, more to come */ varlink_set_state(v, VARLINK_PENDING_METHOD_MORE); break; @@ -1003,7 +1006,6 @@ int varlink_wait(Varlink *v, usec_t timeout) { usec_t t; assert_return(v, -EINVAL); - assert_return(!v->server, -ENOTTY); if (v->state == VARLINK_DISCONNECTED) return -ENOTCONN; @@ -1075,7 +1077,7 @@ int varlink_get_events(Varlink *v) { return EPOLLOUT; if (!v->read_disconnected && - IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING, VARLINK_IDLE_SERVER) && + IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING, VARLINK_IDLE_SERVER) && !v->current && v->input_buffer_unscanned <= 0) ret |= EPOLLIN; @@ -1093,7 +1095,7 @@ int varlink_get_timeout(Varlink *v, usec_t *ret) { if (v->state == VARLINK_DISCONNECTED) return -ENOTCONN; - if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_CALLING) && + if (IN_SET(v->state, VARLINK_AWAITING_REPLY, VARLINK_AWAITING_REPLY_MORE, VARLINK_CALLING) && v->timeout != USEC_INFINITY) { if (ret) *ret = usec_add(v->timestamp, v->timeout); @@ -1212,6 +1214,7 @@ static int varlink_enqueue_json(Varlink *v, JsonVariant *m) { r = json_variant_format(m, 0, &text); if (r < 0) return r; + assert(text[r] == '\0'); if (v->output_buffer_size + r + 1 > VARLINK_BUFFER_MAX) return -ENOBUFS; @@ -1235,15 +1238,16 @@ static int varlink_enqueue_json(Varlink *v, JsonVariant *m) { } else { char *n; + const size_t new_size = v->output_buffer_size + r + 1; - n = new(char, v->output_buffer_size + r + 1); + n = new(char, new_size); if (!n) return -ENOMEM; memcpy(mempcpy(n, v->output_buffer + v->output_buffer_index, v->output_buffer_size), text, r + 1); free_and_replace(v->output_buffer, n); - v->output_buffer_size += r + 1; + v->output_buffer_allocated = v->output_buffer_size = new_size; v->output_buffer_index = 0; } @@ -1259,6 +1263,8 @@ int varlink_send(Varlink *v, const char *method, JsonVariant *parameters) { if (v->state == VARLINK_DISCONNECTED) return -ENOTCONN; + + /* We allow enqueuing multiple method calls at once! */ if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY)) return -EBUSY; @@ -1308,6 +1314,8 @@ int varlink_invoke(Varlink *v, const char *method, JsonVariant *parameters) { if (v->state == VARLINK_DISCONNECTED) return -ENOTCONN; + + /* We allow enqueing multiple method calls at once! */ if (!IN_SET(v->state, VARLINK_IDLE_CLIENT, VARLINK_AWAITING_REPLY)) return -EBUSY; @@ -1349,6 +1357,60 @@ int varlink_invokeb(Varlink *v, const char *method, ...) { return varlink_invoke(v, method, parameters); } +int varlink_observe(Varlink *v, const char *method, JsonVariant *parameters) { + _cleanup_(json_variant_unrefp) JsonVariant *m = NULL; + int r; + + assert_return(v, -EINVAL); + assert_return(method, -EINVAL); + + if (v->state == VARLINK_DISCONNECTED) + return -ENOTCONN; + /* Note that we don't allow enqueuing multiple method calls when we are in more/continues mode! We + * thus insist on an idle client here. */ + if (v->state != VARLINK_IDLE_CLIENT) + return -EBUSY; + + r = varlink_sanitize_parameters(¶meters); + if (r < 0) + return r; + + r = json_build(&m, JSON_BUILD_OBJECT( + JSON_BUILD_PAIR("method", JSON_BUILD_STRING(method)), + JSON_BUILD_PAIR("parameters", JSON_BUILD_VARIANT(parameters)), + JSON_BUILD_PAIR("more", JSON_BUILD_BOOLEAN(true)))); + if (r < 0) + return r; + + r = varlink_enqueue_json(v, m); + if (r < 0) + return r; + + + varlink_set_state(v, VARLINK_AWAITING_REPLY_MORE); + v->n_pending++; + v->timestamp = now(CLOCK_MONOTONIC); + + return 0; +} + +int varlink_observeb(Varlink *v, const char *method, ...) { + _cleanup_(json_variant_unrefp) JsonVariant *parameters = NULL; + va_list ap; + int r; + + assert_return(v, -EINVAL); + + va_start(ap, method); + r = json_buildv(¶meters, ap); + va_end(ap); + + if (r < 0) + return r; + + return varlink_observe(v, method, parameters); +} + int varlink_call( Varlink *v, const char *method, @@ -1769,6 +1831,7 @@ static int prepare_callback(sd_event_source *s, void *userdata) { Varlink *v = userdata; int r, e; usec_t until; + bool have_timeout; assert(s); assert(v); @@ -1784,13 +1847,15 @@ static int prepare_callback(sd_event_source *s, void *userdata) { r = varlink_get_timeout(v, &until); if (r < 0) return r; - if (r > 0) { + have_timeout = r > 0; + + if (have_timeout) { r = sd_event_source_set_time(v->time_event_source, until); if (r < 0) return r; } - r = sd_event_source_set_enabled(v->time_event_source, r > 0 ? SD_EVENT_ON : SD_EVENT_OFF); + r = sd_event_source_set_enabled(v->time_event_source, have_timeout ? SD_EVENT_ON : SD_EVENT_OFF); if (r < 0) return r; @@ -1874,7 +1939,6 @@ fail: return r; } - void varlink_detach_event(Varlink *v) { if (!v) return; @@ -2052,12 +2116,14 @@ int varlink_server_add_connection(VarlinkServer *server, int fd, Varlink **ret) varlink_set_state(v, VARLINK_IDLE_SERVER); - r = varlink_attach_event(v, server->event, server->event_priority); - if (r < 0) { - varlink_log_errno(v, r, "Failed to attach new connection: %m"); - v->fd = -1; /* take the fd out of the connection again */ - varlink_close(v); - return r; + if (server->event) { + r = varlink_attach_event(v, server->event, server->event_priority); + if (r < 0) { + varlink_log_errno(v, r, "Failed to attach new connection: %m"); + v->fd = -1; /* take the fd out of the connection again */ + varlink_close(v); + return r; + } } if (ret) @@ -2156,6 +2222,8 @@ int varlink_server_listen_address(VarlinkServer *s, const char *address, mode_t if (fd < 0) return -errno; + fd = fd_move_above_stdio(fd); + (void) sockaddr_un_unlink(&sockaddr.un); RUN_WITH_UMASK(~m & 0777) @@ -2303,7 +2371,7 @@ int varlink_server_bind_method(VarlinkServer *s, const char *method, VarlinkMeth int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) { va_list ap; - int r; + int r = 0; assert_return(s, -EINVAL); @@ -2320,10 +2388,11 @@ int varlink_server_bind_method_many_internal(VarlinkServer *s, ...) { r = varlink_server_bind_method(s, method, callback); if (r < 0) - return r; + break; } + va_end(ap); - return 0; + return r; } int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) { @@ -2337,17 +2406,18 @@ int varlink_server_bind_connect(VarlinkServer *s, VarlinkConnect callback) { } unsigned varlink_server_connections_max(VarlinkServer *s) { - struct rlimit rl; + int dts; /* If a server is specified, return the setting for that server, otherwise the default value */ if (s) return s->connections_max; - assert_se(getrlimit(RLIMIT_NOFILE, &rl) >= 0); + dts = getdtablesize(); + assert_se(dts > 0); /* Make sure we never use up more than ¾th of RLIMIT_NOFILE for IPC */ - if (VARLINK_DEFAULT_CONNECTIONS_MAX > rl.rlim_cur / 4 * 3) - return rl.rlim_cur / 4 * 3; + if (VARLINK_DEFAULT_CONNECTIONS_MAX > (unsigned) dts / 4 * 3) + return dts / 4 * 3; return VARLINK_DEFAULT_CONNECTIONS_MAX; }