*nl = (sd_netlink) {
.n_ref = 1,
- .fd = -1,
+ .fd = -EBADF,
.sockaddr.nl.nl_family = AF_NETLINK,
.original_pid = getpid_cached(),
.protocol = -1,
* while the socket sticks around we might get confused by replies from earlier runs coming
* in late — which is pretty likely if we'd start our sequence numbers always from 1. Hence,
* let's start with a value based on the system clock. This should make collisions much less
- * likely (though still theoretically possible). We use a 32 bit µs counter starting at boot
+ * likely (though still theoretically possible). We use a 32 bit μs counter starting at boot
* for this (and explicitly exclude the zero, see above). This counter will wrap around after
* a bit more than 1h, but that's hopefully OK as the kernel shouldn't take that long to
* reply to our requests.
*
* We only pick the initial start value this way. For each message we simply increase the
- * sequence number by 1. This means we could enqueue 1 netlink message per µs without risking
+ * sequence number by 1. This means we could enqueue 1 netlink message per μs without risking
* collisions, which should be OK.
*
* Note this means the serials will be in the range 1…UINT32_MAX here.
r = socket_bind(nl);
if (r < 0) {
- nl->fd = -1; /* on failure, the caller remains owner of the fd, hence don't close it here */
+ nl->fd = -EBADF; /* on failure, the caller remains owner of the fd, hence don't close it here */
nl->protocol = -1;
return r;
}
static sd_netlink *netlink_free(sd_netlink *nl) {
sd_netlink_slot *s;
- unsigned i;
assert(nl);
- for (i = 0; i < nl->rqueue_size; i++)
- sd_netlink_message_unref(nl->rqueue[i]);
- free(nl->rqueue);
-
- for (i = 0; i < nl->rqueue_partial_size; i++)
- sd_netlink_message_unref(nl->rqueue_partial[i]);
- free(nl->rqueue_partial);
-
+ ordered_set_free(nl->rqueue);
+ hashmap_free(nl->rqueue_by_serial);
+ hashmap_free(nl->rqueue_partial_by_serial);
free(nl->rbuffer);
while ((s = nl->slots)) {
return 1;
}
-static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **message) {
+static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **ret) {
+ sd_netlink_message *m;
int r;
assert(nl);
- assert(message);
+ assert(ret);
- if (nl->rqueue_size <= 0) {
+ if (ordered_set_isempty(nl->rqueue)) {
/* Try to read a new message */
r = socket_read_message(nl);
- if (r == -ENOBUFS) { /* FIXME: ignore buffer overruns for now */
+ if (r == -ENOBUFS) /* FIXME: ignore buffer overruns for now */
log_debug_errno(r, "sd-netlink: Got ENOBUFS from netlink socket, ignoring.");
- return 1;
- }
- if (r <= 0)
+ else if (r < 0)
return r;
}
/* Dispatch a queued message */
- *message = nl->rqueue[0];
- nl->rqueue_size--;
- memmove(nl->rqueue, nl->rqueue + 1, sizeof(sd_netlink_message*) * nl->rqueue_size);
-
- return 1;
+ m = ordered_set_steal_first(nl->rqueue);
+ if (m)
+ sd_netlink_message_unref(hashmap_remove_value(nl->rqueue_by_serial, UINT32_TO_PTR(message_get_serial(m)), m));
+ *ret = m;
+ return !!m;
}
static int process_timeout(sd_netlink *nl) {
return r;
assert_se(prioq_pop(nl->reply_callbacks_prioq) == c);
- c->timeout = 0;
hashmap_remove(nl->reply_callbacks, UINT32_TO_PTR(c->serial));
slot = container_of(c, sd_netlink_slot, reply_callback);
if (!c)
return 0;
- if (c->timeout != 0) {
+ if (c->timeout != USEC_INFINITY)
prioq_remove(nl->reply_callbacks_prioq, c, &c->prioq_idx);
- c->timeout = 0;
- }
r = sd_netlink_message_get_type(m, &type);
if (r < 0)
return r;
}
-static usec_t calc_elapse(uint64_t usec) {
- if (usec == UINT64_MAX)
- return 0;
+static usec_t timespan_to_timestamp(usec_t usec) {
+ static bool default_timeout_set = false;
+ static usec_t default_timeout;
+ int r;
+
+ if (usec == 0) {
+ if (!default_timeout_set) {
+ const char *e;
+
+ default_timeout_set = true;
+ default_timeout = NETLINK_DEFAULT_TIMEOUT_USEC;
+
+ e = secure_getenv("SYSTEMD_NETLINK_DEFAULT_TIMEOUT");
+ if (e) {
+ r = parse_sec(e, &default_timeout);
+ if (r < 0)
+ log_debug_errno(r, "sd-netlink: Failed to parse $SYSTEMD_NETLINK_DEFAULT_TIMEOUT environment variable, ignoring: %m");
+ }
+ }
- if (usec == 0)
- usec = NETLINK_DEFAULT_TIMEOUT_USEC;
+ usec = default_timeout;
+ }
return usec_add(now(CLOCK_MONOTONIC), usec);
}
assert_return(nl, -EINVAL);
assert_return(!netlink_pid_changed(nl), -ECHILD);
- if (nl->rqueue_size > 0)
+ if (!ordered_set_isempty(nl->rqueue))
return 0;
r = netlink_poll(nl, false, timeout_usec);
- if (r < 0 && ERRNO_IS_TRANSIENT(r)) /* Convert EINTR to "something happened" and give user a chance to run some code before calling back into us */
+ if (ERRNO_IS_NEG_TRANSIENT(r)) /* Convert EINTR to "something happened" and give user a chance to run some code before calling back into us */
return 1;
return r;
}
static int timeout_compare(const void *a, const void *b) {
const struct reply_callback *x = a, *y = b;
- if (x->timeout != 0 && y->timeout == 0)
- return -1;
+ return CMP(x->timeout, y->timeout);
+}
- if (x->timeout == 0 && y->timeout != 0)
- return 1;
+size_t netlink_get_reply_callback_count(sd_netlink *nl) {
+ assert(nl);
- return CMP(x->timeout, y->timeout);
+ return hashmap_size(nl->reply_callbacks);
}
int sd_netlink_call_async(
assert_return(!netlink_pid_changed(nl), -ECHILD);
if (hashmap_size(nl->reply_callbacks) >= REPLY_CALLBACKS_MAX)
- return -ERANGE;
+ return -EXFULL;
r = hashmap_ensure_allocated(&nl->reply_callbacks, &trivial_hash_ops);
if (r < 0)
return r;
slot->reply_callback.callback = callback;
- slot->reply_callback.timeout = calc_elapse(usec);
+ slot->reply_callback.timeout = timespan_to_timestamp(usec);
k = sd_netlink_send(nl, m, &slot->reply_callback.serial);
if (k < 0)
if (r < 0)
return r;
- if (slot->reply_callback.timeout != 0) {
+ if (slot->reply_callback.timeout != USEC_INFINITY) {
r = prioq_put(nl->reply_callbacks_prioq, &slot->reply_callback, &slot->reply_callback.prioq_idx);
if (r < 0) {
(void) hashmap_remove(nl->reply_callbacks, UINT32_TO_PTR(slot->reply_callback.serial));
assert_return(nl, -EINVAL);
assert_return(!netlink_pid_changed(nl), -ECHILD);
- timeout = calc_elapse(usec);
+ timeout = timespan_to_timestamp(usec);
for (;;) {
+ _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *m = NULL;
usec_t left;
- for (unsigned i = 0; i < nl->rqueue_size; i++) {
- _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *incoming = NULL;
- uint32_t received_serial;
+ m = hashmap_remove(nl->rqueue_by_serial, UINT32_TO_PTR(serial));
+ if (m) {
uint16_t type;
- received_serial = message_get_serial(nl->rqueue[i]);
- if (received_serial != serial)
- continue;
-
- incoming = nl->rqueue[i];
-
/* found a match, remove from rqueue and return it */
- memmove(nl->rqueue + i, nl->rqueue + i + 1,
- sizeof(sd_netlink_message*) * (nl->rqueue_size - i - 1));
- nl->rqueue_size--;
+ sd_netlink_message_unref(ordered_set_remove(nl->rqueue, m));
- r = sd_netlink_message_get_errno(incoming);
+ r = sd_netlink_message_get_errno(m);
if (r < 0)
return r;
- r = sd_netlink_message_get_type(incoming, &type);
+ r = sd_netlink_message_get_type(m, &type);
if (r < 0)
return r;
}
if (ret)
- *ret = TAKE_PTR(incoming);
+ *ret = TAKE_PTR(m);
return 1;
}
/* received message, so try to process straight away */
continue;
- if (timeout > 0) {
+ if (timeout != USEC_INFINITY) {
usec_t n;
n = now(CLOCK_MONOTONIC);
assert_return(nl, -EINVAL);
assert_return(!netlink_pid_changed(nl), -ECHILD);
- return nl->rqueue_size == 0 ? POLLIN : 0;
+ return ordered_set_isempty(nl->rqueue) ? POLLIN : 0;
}
int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
assert_return(timeout_usec, -EINVAL);
assert_return(!netlink_pid_changed(nl), -ECHILD);
- if (nl->rqueue_size > 0) {
+ if (!ordered_set_isempty(nl->rqueue)) {
*timeout_usec = 0;
return 1;
}
}
*timeout_usec = c->timeout;
-
return 1;
}
return 0;
}
+sd_event* sd_netlink_get_event(sd_netlink *nl) {
+ assert_return(nl, NULL);
+
+ return nl->event;
+}
+
int netlink_add_match_internal(
sd_netlink *nl,
sd_netlink_slot **ret_slot,