From: Yu Watanabe Date: Sat, 26 Nov 2022 01:10:58 +0000 (+0900) Subject: sd-netlink: reimplement received message queue X-Git-Tag: v253-rc1~431^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e417c4ac44f0904827c2451be491482dcfb30120;p=thirdparty%2Fsystemd.git sd-netlink: reimplement received message queue By using OrderedSet and Hashmap, we can drop all memmove() calls. No functional changes, just refactoring. --- diff --git a/src/libsystemd/sd-netlink/netlink-internal.h b/src/libsystemd/sd-netlink/netlink-internal.h index 964f7c0016b..9ad4c03a3c9 100644 --- a/src/libsystemd/sd-netlink/netlink-internal.h +++ b/src/libsystemd/sd-netlink/netlink-internal.h @@ -7,6 +7,7 @@ #include "list.h" #include "netlink-types.h" +#include "ordered-set.h" #include "prioq.h" #include "time-util.h" @@ -72,11 +73,8 @@ struct sd_netlink { Hashmap *broadcast_group_refs; bool broadcast_group_dont_leave:1; /* until we can rely on 4.2 */ - sd_netlink_message **rqueue; - unsigned rqueue_size; - - sd_netlink_message **rqueue_partial; - unsigned rqueue_partial_size; + OrderedSet *rqueue; + Hashmap *rqueue_partial_by_serial; struct nlmsghdr *rbuffer; diff --git a/src/libsystemd/sd-netlink/netlink-socket.c b/src/libsystemd/sd-netlink/netlink-socket.c index 1480525c73f..95ac7bad1aa 100644 --- a/src/libsystemd/sd-netlink/netlink-socket.c +++ b/src/libsystemd/sd-netlink/netlink-socket.c @@ -241,57 +241,50 @@ static int socket_recv_message(int fd, void *buf, size_t buf_size, uint32_t *ret return (int) n; } +DEFINE_PRIVATE_HASH_OPS_WITH_VALUE_DESTRUCTOR( + netlink_message_hash_ops, + void, trivial_hash_func, trivial_compare_func, + sd_netlink_message, sd_netlink_message_unref); + static int netlink_queue_received_message(sd_netlink *nl, sd_netlink_message *m) { + int r; + assert(nl); assert(m); - if (nl->rqueue_size >= NETLINK_RQUEUE_MAX) + if (ordered_set_size(nl->rqueue) >= NETLINK_RQUEUE_MAX) return log_debug_errno(SYNTHETIC_ERRNO(ENOBUFS), - "sd-netlink: exhausted the read queue size (%d)", - NETLINK_RQUEUE_MAX); + "sd-netlink: exhausted the read queue size (%d)", NETLINK_RQUEUE_MAX); - if (!GREEDY_REALLOC(nl->rqueue, nl->rqueue_size + 1)) - return -ENOMEM; + r = ordered_set_ensure_put(&nl->rqueue, &netlink_message_hash_ops, m); + if (r < 0) + return r; - nl->rqueue[nl->rqueue_size++] = sd_netlink_message_ref(m); + sd_netlink_message_ref(m); return 0; } static int netlink_queue_partially_received_message(sd_netlink *nl, sd_netlink_message *m) { + uint32_t serial; + int r; + assert(nl); assert(m); assert(m->hdr->nlmsg_flags & NLM_F_MULTI); - if (nl->rqueue_partial_size >= NETLINK_RQUEUE_MAX) + if (hashmap_size(nl->rqueue_partial_by_serial) >= NETLINK_RQUEUE_MAX) return log_debug_errno(SYNTHETIC_ERRNO(ENOBUFS), - "sd-netlink: exhausted the partial read queue size (%d)", - NETLINK_RQUEUE_MAX); + "sd-netlink: exhausted the partial read queue size (%d)", NETLINK_RQUEUE_MAX); - if (!GREEDY_REALLOC(nl->rqueue_partial, nl->rqueue_partial_size + 1)) - return -ENOMEM; + serial = message_get_serial(m); + r = hashmap_ensure_put(&nl->rqueue_partial_by_serial, &netlink_message_hash_ops, UINT32_TO_PTR(serial), m); + if (r < 0) + return r; - nl->rqueue_partial[nl->rqueue_partial_size++] = sd_netlink_message_ref(m); + sd_netlink_message_ref(m); return 0; } -static sd_netlink_message *netlink_take_partial_message(sd_netlink *nl, uint32_t seqnum) { - assert(nl); - - for (unsigned i = 0; i < nl->rqueue_partial_size; i++) - if (message_get_serial(nl->rqueue_partial[i]) == seqnum) { - sd_netlink_message *found = nl->rqueue_partial[i]; - - /* remove the message form the partial read queue */ - memmove(nl->rqueue_partial + i, nl->rqueue_partial + i + 1, - sizeof(sd_netlink_message*) * (nl->rqueue_partial_size - i - 1)); - nl->rqueue_partial_size--; - - return found; - } - - return NULL; -} - /* On success, the number of bytes received is returned and *ret points to the received message * which has a valid header and the correct size. * If nothing useful was received 0 is returned. @@ -329,7 +322,7 @@ int socket_read_message(sd_netlink *nl) { if (nl->rbuffer->nlmsg_flags & NLM_F_MULTI) { multi_part = true; - first = netlink_take_partial_message(nl, nl->rbuffer->nlmsg_seq); + first = hashmap_remove(nl->rqueue_partial_by_serial, UINT32_TO_PTR(nl->rbuffer->nlmsg_seq)); } for (struct nlmsghdr *new_msg = nl->rbuffer; NLMSG_OK(new_msg, len) && !done; new_msg = NLMSG_NEXT(new_msg, len)) { diff --git a/src/libsystemd/sd-netlink/sd-netlink.c b/src/libsystemd/sd-netlink/sd-netlink.c index 7818a179c8f..1f4f8e90156 100644 --- a/src/libsystemd/sd-netlink/sd-netlink.c +++ b/src/libsystemd/sd-netlink/sd-netlink.c @@ -116,18 +116,11 @@ int sd_netlink_increase_rxbuf(sd_netlink *nl, size_t size) { static sd_netlink *netlink_free(sd_netlink *nl) { sd_netlink_slot *s; - unsigned i; assert(nl); - for (i = 0; i < nl->rqueue_size; i++) - sd_netlink_message_unref(nl->rqueue[i]); - free(nl->rqueue); - - for (i = 0; i < nl->rqueue_partial_size; i++) - sd_netlink_message_unref(nl->rqueue_partial[i]); - free(nl->rqueue_partial); - + ordered_set_free(nl->rqueue); + hashmap_free(nl->rqueue_partial_by_serial); free(nl->rbuffer); while ((s = nl->slots)) { @@ -175,29 +168,26 @@ int sd_netlink_send( return 1; } -static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **message) { +static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **ret) { + sd_netlink_message *m; int r; assert(nl); - assert(message); + assert(ret); - if (nl->rqueue_size <= 0) { + if (ordered_set_size(nl->rqueue) <= 0) { /* Try to read a new message */ r = socket_read_message(nl); - if (r == -ENOBUFS) { /* FIXME: ignore buffer overruns for now */ + if (r == -ENOBUFS) /* FIXME: ignore buffer overruns for now */ log_debug_errno(r, "sd-netlink: Got ENOBUFS from netlink socket, ignoring."); - return 1; - } - if (r <= 0) + else if (r < 0) return r; } /* Dispatch a queued message */ - *message = nl->rqueue[0]; - nl->rqueue_size--; - memmove(nl->rqueue, nl->rqueue + 1, sizeof(sd_netlink_message*) * nl->rqueue_size); - - return 1; + m = ordered_set_steal_first(nl->rqueue); + *ret = m; + return !!m; } static int process_timeout(sd_netlink *nl) { @@ -437,7 +427,7 @@ int sd_netlink_wait(sd_netlink *nl, uint64_t timeout_usec) { assert_return(nl, -EINVAL); assert_return(!netlink_pid_changed(nl), -ECHILD); - if (nl->rqueue_size > 0) + if (ordered_set_size(nl->rqueue) > 0) return 0; r = netlink_poll(nl, false, timeout_usec); @@ -538,23 +528,21 @@ int sd_netlink_read( timeout = calc_elapse(usec); for (;;) { + sd_netlink_message *m; usec_t left; - for (unsigned i = 0; i < nl->rqueue_size; i++) { + ORDERED_SET_FOREACH(m, nl->rqueue) { _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *incoming = NULL; uint32_t received_serial; uint16_t type; - received_serial = message_get_serial(nl->rqueue[i]); + received_serial = message_get_serial(m); if (received_serial != serial) continue; - incoming = nl->rqueue[i]; - /* found a match, remove from rqueue and return it */ - memmove(nl->rqueue + i, nl->rqueue + i + 1, - sizeof(sd_netlink_message*) * (nl->rqueue_size - i - 1)); - nl->rqueue_size--; + ordered_set_remove(nl->rqueue, m); + incoming = TAKE_PTR(m); r = sd_netlink_message_get_errno(incoming); if (r < 0) @@ -625,7 +613,7 @@ int sd_netlink_get_events(sd_netlink *nl) { assert_return(nl, -EINVAL); assert_return(!netlink_pid_changed(nl), -ECHILD); - return nl->rqueue_size == 0 ? POLLIN : 0; + return ordered_set_size(nl->rqueue) == 0 ? POLLIN : 0; } int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) { @@ -635,7 +623,7 @@ int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) { assert_return(timeout_usec, -EINVAL); assert_return(!netlink_pid_changed(nl), -ECHILD); - if (nl->rqueue_size > 0) { + if (ordered_set_size(nl->rqueue) > 0) { *timeout_usec = 0; return 1; } @@ -647,7 +635,6 @@ int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) { } *timeout_usec = c->timeout; - return 1; }