]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
sd-netlink: reimplement received message queue
authorYu Watanabe <watanabe.yu+github@gmail.com>
Sat, 26 Nov 2022 01:10:58 +0000 (10:10 +0900)
committerYu Watanabe <watanabe.yu+github@gmail.com>
Sat, 26 Nov 2022 02:28:27 +0000 (11:28 +0900)
By using OrderedSet and Hashmap, we can drop all memmove() calls.
No functional changes, just refactoring.

src/libsystemd/sd-netlink/netlink-internal.h
src/libsystemd/sd-netlink/netlink-socket.c
src/libsystemd/sd-netlink/sd-netlink.c

index 964f7c0016b02888c79f100d06c200bb44a48d41..9ad4c03a3c9e7a642fb4ae20a8ee826b14b31b3a 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "list.h"
 #include "netlink-types.h"
+#include "ordered-set.h"
 #include "prioq.h"
 #include "time-util.h"
 
@@ -72,11 +73,8 @@ struct sd_netlink {
         Hashmap *broadcast_group_refs;
         bool broadcast_group_dont_leave:1; /* until we can rely on 4.2 */
 
-        sd_netlink_message **rqueue;
-        unsigned rqueue_size;
-
-        sd_netlink_message **rqueue_partial;
-        unsigned rqueue_partial_size;
+        OrderedSet *rqueue;
+        Hashmap *rqueue_partial_by_serial;
 
         struct nlmsghdr *rbuffer;
 
index 1480525c73f826d62b543ce390594307947ab06e..95ac7bad1aac3c7e5e61fb6a63d30fb34004c887 100644 (file)
@@ -241,57 +241,50 @@ static int socket_recv_message(int fd, void *buf, size_t buf_size, uint32_t *ret
         return (int) n;
 }
 
+DEFINE_PRIVATE_HASH_OPS_WITH_VALUE_DESTRUCTOR(
+        netlink_message_hash_ops,
+        void, trivial_hash_func, trivial_compare_func,
+        sd_netlink_message, sd_netlink_message_unref);
+
 static int netlink_queue_received_message(sd_netlink *nl, sd_netlink_message *m) {
+        int r;
+
         assert(nl);
         assert(m);
 
-        if (nl->rqueue_size >= NETLINK_RQUEUE_MAX)
+        if (ordered_set_size(nl->rqueue) >= NETLINK_RQUEUE_MAX)
                 return log_debug_errno(SYNTHETIC_ERRNO(ENOBUFS),
-                                       "sd-netlink: exhausted the read queue size (%d)",
-                                       NETLINK_RQUEUE_MAX);
+                                       "sd-netlink: exhausted the read queue size (%d)", NETLINK_RQUEUE_MAX);
 
-        if (!GREEDY_REALLOC(nl->rqueue, nl->rqueue_size + 1))
-                return -ENOMEM;
+        r = ordered_set_ensure_put(&nl->rqueue, &netlink_message_hash_ops, m);
+        if (r < 0)
+                return r;
 
-        nl->rqueue[nl->rqueue_size++] = sd_netlink_message_ref(m);
+        sd_netlink_message_ref(m);
         return 0;
 }
 
 static int netlink_queue_partially_received_message(sd_netlink *nl, sd_netlink_message *m) {
+        uint32_t serial;
+        int r;
+
         assert(nl);
         assert(m);
         assert(m->hdr->nlmsg_flags & NLM_F_MULTI);
 
-        if (nl->rqueue_partial_size >= NETLINK_RQUEUE_MAX)
+        if (hashmap_size(nl->rqueue_partial_by_serial) >= NETLINK_RQUEUE_MAX)
                 return log_debug_errno(SYNTHETIC_ERRNO(ENOBUFS),
-                                       "sd-netlink: exhausted the partial read queue size (%d)",
-                                       NETLINK_RQUEUE_MAX);
+                                       "sd-netlink: exhausted the partial read queue size (%d)", NETLINK_RQUEUE_MAX);
 
-        if (!GREEDY_REALLOC(nl->rqueue_partial, nl->rqueue_partial_size + 1))
-                return -ENOMEM;
+        serial = message_get_serial(m);
+        r = hashmap_ensure_put(&nl->rqueue_partial_by_serial, &netlink_message_hash_ops, UINT32_TO_PTR(serial), m);
+        if (r < 0)
+                return r;
 
-        nl->rqueue_partial[nl->rqueue_partial_size++] = sd_netlink_message_ref(m);
+        sd_netlink_message_ref(m);
         return 0;
 }
 
-static sd_netlink_message *netlink_take_partial_message(sd_netlink *nl, uint32_t seqnum) {
-        assert(nl);
-
-        for (unsigned i = 0; i < nl->rqueue_partial_size; i++)
-                if (message_get_serial(nl->rqueue_partial[i]) == seqnum) {
-                        sd_netlink_message *found = nl->rqueue_partial[i];
-
-                        /* remove the message form the partial read queue */
-                        memmove(nl->rqueue_partial + i, nl->rqueue_partial + i + 1,
-                                sizeof(sd_netlink_message*) * (nl->rqueue_partial_size - i - 1));
-                        nl->rqueue_partial_size--;
-
-                        return found;
-                }
-
-        return NULL;
-}
-
 /* On success, the number of bytes received is returned and *ret points to the received message
  * which has a valid header and the correct size.
  * If nothing useful was received 0 is returned.
@@ -329,7 +322,7 @@ int socket_read_message(sd_netlink *nl) {
 
         if (nl->rbuffer->nlmsg_flags & NLM_F_MULTI) {
                 multi_part = true;
-                first = netlink_take_partial_message(nl, nl->rbuffer->nlmsg_seq);
+                first = hashmap_remove(nl->rqueue_partial_by_serial, UINT32_TO_PTR(nl->rbuffer->nlmsg_seq));
         }
 
         for (struct nlmsghdr *new_msg = nl->rbuffer; NLMSG_OK(new_msg, len) && !done; new_msg = NLMSG_NEXT(new_msg, len)) {
index 7818a179c8fea7952c80a2251049a3163925b54f..1f4f8e90156832fb32dd464bfc376a30bc68063c 100644 (file)
@@ -116,18 +116,11 @@ int sd_netlink_increase_rxbuf(sd_netlink *nl, size_t size) {
 
 static sd_netlink *netlink_free(sd_netlink *nl) {
         sd_netlink_slot *s;
-        unsigned i;
 
         assert(nl);
 
-        for (i = 0; i < nl->rqueue_size; i++)
-                sd_netlink_message_unref(nl->rqueue[i]);
-        free(nl->rqueue);
-
-        for (i = 0; i < nl->rqueue_partial_size; i++)
-                sd_netlink_message_unref(nl->rqueue_partial[i]);
-        free(nl->rqueue_partial);
-
+        ordered_set_free(nl->rqueue);
+        hashmap_free(nl->rqueue_partial_by_serial);
         free(nl->rbuffer);
 
         while ((s = nl->slots)) {
@@ -175,29 +168,26 @@ int sd_netlink_send(
         return 1;
 }
 
-static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **message) {
+static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **ret) {
+        sd_netlink_message *m;
         int r;
 
         assert(nl);
-        assert(message);
+        assert(ret);
 
-        if (nl->rqueue_size <= 0) {
+        if (ordered_set_size(nl->rqueue) <= 0) {
                 /* Try to read a new message */
                 r = socket_read_message(nl);
-                if (r == -ENOBUFS) /* FIXME: ignore buffer overruns for now */
+                if (r == -ENOBUFS) /* FIXME: ignore buffer overruns for now */
                         log_debug_errno(r, "sd-netlink: Got ENOBUFS from netlink socket, ignoring.");
-                        return 1;
-                }
-                if (r <= 0)
+                else if (r < 0)
                         return r;
         }
 
         /* Dispatch a queued message */
-        *message = nl->rqueue[0];
-        nl->rqueue_size--;
-        memmove(nl->rqueue, nl->rqueue + 1, sizeof(sd_netlink_message*) * nl->rqueue_size);
-
-        return 1;
+        m = ordered_set_steal_first(nl->rqueue);
+        *ret = m;
+        return !!m;
 }
 
 static int process_timeout(sd_netlink *nl) {
@@ -437,7 +427,7 @@ int sd_netlink_wait(sd_netlink *nl, uint64_t timeout_usec) {
         assert_return(nl, -EINVAL);
         assert_return(!netlink_pid_changed(nl), -ECHILD);
 
-        if (nl->rqueue_size > 0)
+        if (ordered_set_size(nl->rqueue) > 0)
                 return 0;
 
         r = netlink_poll(nl, false, timeout_usec);
@@ -538,23 +528,21 @@ int sd_netlink_read(
         timeout = calc_elapse(usec);
 
         for (;;) {
+                sd_netlink_message *m;
                 usec_t left;
 
-                for (unsigned i = 0; i < nl->rqueue_size; i++) {
+                ORDERED_SET_FOREACH(m, nl->rqueue) {
                         _cleanup_(sd_netlink_message_unrefp) sd_netlink_message *incoming = NULL;
                         uint32_t received_serial;
                         uint16_t type;
 
-                        received_serial = message_get_serial(nl->rqueue[i]);
+                        received_serial = message_get_serial(m);
                         if (received_serial != serial)
                                 continue;
 
-                        incoming = nl->rqueue[i];
-
                         /* found a match, remove from rqueue and return it */
-                        memmove(nl->rqueue + i, nl->rqueue + i + 1,
-                                sizeof(sd_netlink_message*) * (nl->rqueue_size - i - 1));
-                        nl->rqueue_size--;
+                        ordered_set_remove(nl->rqueue, m);
+                        incoming = TAKE_PTR(m);
 
                         r = sd_netlink_message_get_errno(incoming);
                         if (r < 0)
@@ -625,7 +613,7 @@ int sd_netlink_get_events(sd_netlink *nl) {
         assert_return(nl, -EINVAL);
         assert_return(!netlink_pid_changed(nl), -ECHILD);
 
-        return nl->rqueue_size == 0 ? POLLIN : 0;
+        return ordered_set_size(nl->rqueue) == 0 ? POLLIN : 0;
 }
 
 int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
@@ -635,7 +623,7 @@ int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
         assert_return(timeout_usec, -EINVAL);
         assert_return(!netlink_pid_changed(nl), -ECHILD);
 
-        if (nl->rqueue_size > 0) {
+        if (ordered_set_size(nl->rqueue) > 0) {
                 *timeout_usec = 0;
                 return 1;
         }
@@ -647,7 +635,6 @@ int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
         }
 
         *timeout_usec = c->timeout;
-
         return 1;
 }