]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
udev: use sd_notify to communicate between worker and manager processes
authorYu Watanabe <watanabe.yu+github@gmail.com>
Thu, 20 Mar 2025 08:50:05 +0000 (17:50 +0900)
committerYu Watanabe <watanabe.yu+github@gmail.com>
Wed, 2 Apr 2025 14:27:15 +0000 (23:27 +0900)
Let's replace home-grown notification from worker to manager process
with sd_notify.

src/udev/udev-manager.c
src/udev/udev-manager.h
src/udev/udev-worker.c
src/udev/udev-worker.h

index 7da211241d49e6eaa5da1d942d321b1c340d2cd1..1fbc2bd17cb93ffa4dbfd558148327c244feec85 100644 (file)
@@ -16,6 +16,7 @@
 #include "iovec-util.h"
 #include "list.h"
 #include "mkdir.h"
+#include "notify-recv.h"
 #include "process-util.h"
 #include "selinux-util.h"
 #include "signal-util.h"
@@ -153,7 +154,8 @@ Manager* manager_free(Manager *manager) {
         event_queue_cleanup(manager, EVENT_UNDEF);
 
         safe_close(manager->inotify_fd);
-        safe_close(manager->worker_notify_fd);
+
+        free(manager->worker_notify_socket_path);
 
         sd_device_monitor_unref(manager->monitor);
         udev_ctrl_unref(manager->ctrl);
@@ -421,11 +423,15 @@ static int worker_spawn(Manager *manager, Event *event) {
                         .monitor = TAKE_PTR(worker_monitor),
                         .properties = TAKE_PTR(manager->properties),
                         .rules = TAKE_PTR(manager->rules),
-                        .pipe_fd = TAKE_FD(manager->worker_notify_fd),
                         .inotify_fd = TAKE_FD(manager->inotify_fd),
                         .config = manager->config,
                 };
 
+                if (setenv("NOTIFY_SOCKET", manager->worker_notify_socket_path, /* overwrite = */ true) < 0) {
+                        log_error_errno(errno, "Failed to set $NOTIFY_SOCKET: %m");
+                        _exit(EXIT_FAILURE);
+                }
+
                 /* Worker process */
                 r = udev_worker_main(&w, event->dev);
                 log_close();
@@ -809,65 +815,41 @@ static int on_uevent(sd_device_monitor *monitor, sd_device *dev, void *userdata)
         return 1;
 }
 
-static int on_worker(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
+static int on_worker_notify(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
         Manager *manager = ASSERT_PTR(userdata);
+        int r;
 
-        for (;;) {
-                EventResult result;
-                struct iovec iovec = IOVEC_MAKE(&result, sizeof(result));
-                CMSG_BUFFER_TYPE(CMSG_SPACE(sizeof(struct ucred))) control;
-                struct msghdr msghdr = {
-                        .msg_iov = &iovec,
-                        .msg_iovlen = 1,
-                        .msg_control = &control,
-                        .msg_controllen = sizeof(control),
-                };
-                ssize_t size;
-                struct ucred *ucred;
-                Worker *worker;
-
-                size = recvmsg_safe(fd, &msghdr, MSG_DONTWAIT);
-                if (size == -EINTR)
-                        continue;
-                if (size == -EAGAIN)
-                        /* nothing more to read */
-                        break;
-                if (size < 0)
-                        return log_error_errno(size, "Failed to receive message: %m");
-
-                cmsg_close_all(&msghdr);
-
-                if (size != sizeof(result)) {
-                        log_warning("Ignoring worker message with invalid size %zi bytes", size);
-                        continue;
-                }
+        assert(fd >= 0);
 
-                ucred = CMSG_FIND_DATA(&msghdr, SOL_SOCKET, SCM_CREDENTIALS, struct ucred);
-                if (!ucred || ucred->pid <= 0) {
-                        log_warning("Ignoring worker message without valid PID");
-                        continue;
-                }
+        _cleanup_(pidref_done) PidRef sender = PIDREF_NULL;
+        _cleanup_strv_free_ char **l = NULL;
+        r = notify_recv_strv(fd, &l, /* ret_ucred= */ NULL, &sender);
+        if (r == -EAGAIN)
+                return 0;
+        if (r < 0)
+                return r;
 
-                /* lookup worker who sent the signal */
-                worker = hashmap_get(manager->workers, PID_TO_PTR(ucred->pid));
-                if (!worker) {
-                        log_debug("Worker ["PID_FMT"] returned, but is no longer tracked", ucred->pid);
-                        continue;
-                }
+        /* lookup worker who sent the signal */
+        Worker *worker = hashmap_get(manager->workers, PID_TO_PTR(sender.pid));
+        if (!worker) {
+                log_warning("Received notify datagram of unknown process ["PID_FMT"], ignoring.", sender.pid);
+                return 0;
+        }
 
-                if (worker->state == WORKER_KILLING) {
-                        worker->state = WORKER_KILLED;
-                        (void) kill(worker->pid, SIGTERM);
-                } else if (worker->state != WORKER_KILLED)
-                        worker->state = WORKER_IDLE;
+        if (strv_contains(l, "TRY_AGAIN=1"))
+                /* Worker cannot lock the device. Requeue the event. */
+                event_requeue(worker->event);
+        else
+                event_free(worker->event);
 
-                if (result == EVENT_RESULT_TRY_AGAIN)
-                        event_requeue(worker->event);
-                else
-                        event_free(worker->event);
-        }
+        /* Update the state of the worker. */
+        if (worker->state == WORKER_KILLING) {
+                worker->state = WORKER_KILLED;
+                (void) kill(worker->pid, SIGTERM);
+        } else if (worker->state != WORKER_KILLED)
+                worker->state = WORKER_IDLE;
 
-        return 1;
+        return 0;
 }
 
 static int synthesize_change_one(sd_device *dev, sd_device *target) {
@@ -1154,7 +1136,6 @@ Manager* manager_new(void) {
 
         *manager = (Manager) {
                 .inotify_fd = -EBADF,
-                .worker_notify_fd = -EBADF,
                 .config_by_udev_conf = UDEV_CONFIG_INIT,
                 .config_by_command = UDEV_CONFIG_INIT,
                 .config_by_kernel = UDEV_CONFIG_INIT,
@@ -1290,40 +1271,21 @@ static int manager_start_inotify(Manager *manager) {
         return 0;
 }
 
-static int manager_start_worker_event(Manager *manager) {
-        _cleanup_(sd_event_source_unrefp) sd_event_source *s = NULL;
-        _cleanup_close_pair_ int pair[2] = EBADF_PAIR;
+static int manager_start_worker_notify(Manager *manager) {
         int r;
 
         assert(manager);
         assert(manager->event);
 
-        /* unnamed socket from workers to the main daemon */
-        r = socketpair(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0, pair);
-        if (r < 0)
-                return log_error_errno(errno, "Failed to create socketpair for communicating with workers: %m");
-
-        r = setsockopt_int(pair[READ_END], SOL_SOCKET, SO_PASSCRED, true);
-        if (r < 0)
-                return log_error_errno(r, "Failed to enable SO_PASSCRED: %m");
-
-        r = sd_event_add_io(manager->event, &s, pair[READ_END], EPOLLIN, on_worker, manager);
-        if (r < 0)
-                return log_error_errno(r, "Failed to create worker event source: %m");
-
-        (void) sd_event_source_set_description(s, "manager-worker-event");
-
-        r = sd_event_source_set_io_fd_own(s, true);
-        if (r < 0)
-                return log_error_errno(r, "Failed to make worker event source own file descriptor: %m");
-
-        TAKE_FD(pair[READ_END]);
-
-        r = sd_event_source_set_floating(s, true);
+        r = notify_socket_prepare(
+                        manager->event,
+                        SD_EVENT_PRIORITY_NORMAL,
+                        on_worker_notify,
+                        manager,
+                        &manager->worker_notify_socket_path);
         if (r < 0)
-                return log_error_errno(r, "Failed to make worker event source floating: %m");
+                return log_error_errno(r, "Failed to prepare worker notification socket: %m");
 
-        manager->worker_notify_fd = TAKE_FD(pair[WRITE_END]);
         return 0;
 }
 
@@ -1400,7 +1362,7 @@ int manager_main(Manager *manager) {
         if (r < 0)
                 return r;
 
-        r = manager_start_worker_event(manager);
+        r = manager_start_worker_notify(manager);
         if (r < 0)
                 return r;
 
index 168156053315e5892490fd221e2528b6eceacc28..dab271abaa825fe78b4655003c340c708650de32 100644 (file)
@@ -30,7 +30,8 @@ typedef struct Manager {
         sd_device_monitor *monitor;
         UdevCtrl *ctrl;
         sd_varlink_server *varlink_server;
-        int worker_notify_fd;
+
+        char *worker_notify_socket_path;
 
         /* used by udev-watch */
         int inotify_fd;
index 8f54bf9281f06e2e81b155e304dea1d9b89f3f79..2686fcbe10217733e4362cc7c84191bde65ea4fd 100644 (file)
@@ -4,6 +4,8 @@
 #include <sys/ioctl.h>
 #include <sys/mount.h>
 
+#include "sd-daemon.h"
+
 #include "alloc-util.h"
 #include "blockdev-util.h"
 #include "common-signal.h"
@@ -12,7 +14,6 @@
 #include "device-util.h"
 #include "errno-util.h"
 #include "fd-util.h"
-#include "io-util.h"
 #include "path-util.h"
 #include "process-util.h"
 #include "signal-util.h"
@@ -34,7 +35,6 @@ void udev_worker_done(UdevWorker *worker) {
         sd_device_monitor_unref(worker->monitor);
         hashmap_free(worker->properties);
         udev_rules_free(worker->rules);
-        safe_close(worker->pipe_fd);
 }
 
 int udev_get_whole_disk(sd_device *dev, sd_device **ret_device, const char **ret_devname) {
@@ -192,8 +192,17 @@ static int worker_process_device(UdevWorker *worker, sd_device *dev) {
          *
          * The user-facing side of this: https://systemd.io/BLOCK_DEVICE_LOCKING */
         r = worker_lock_whole_disk(dev, &fd_lock);
-        if (r == -EAGAIN)
-                return EVENT_RESULT_TRY_AGAIN;
+        if (r == -EAGAIN) {
+                log_device_debug(dev, "Block device is currently locked, requeueing the event.");
+
+                r = sd_notify(/* unset_environment = */ false, "TRY_AGAIN=1");
+                if (r < 0) {
+                        log_device_warning_errno(dev, r, "Failed to send notification message to manager process: %m");
+                        (void) sd_event_exit(worker->event, r);
+                }
+
+                return 0;
+        }
         if (r < 0)
                 return r;
 
@@ -237,42 +246,52 @@ static int worker_process_device(UdevWorker *worker, sd_device *dev) {
         }
 
         log_device_uevent(dev, "Device processed");
-        return 0;
-}
 
-static int worker_send_result(UdevWorker *worker, EventResult result) {
-        assert(worker);
-        assert(worker->pipe_fd >= 0);
+        /* send processed event to libudev listeners */
+        r = device_monitor_send(worker->monitor, NULL, dev);
+        if (r < 0) {
+                log_device_warning_errno(dev, r, "Failed to broadcast event to libudev listeners: %m");
+                (void) sd_event_exit(worker->event, r);
+                return 0;
+        }
 
-        return loop_write(worker->pipe_fd, &result, sizeof(result));
+        r = sd_notify(/* unset_environment = */ false, "PROCESSED=1");
+        if (r < 0) {
+                log_device_warning_errno(dev, r, "Failed to send notification message to manager process: %m");
+                (void) sd_event_exit(worker->event, r);
+        }
+
+        return 0;
 }
 
 static int worker_device_monitor_handler(sd_device_monitor *monitor, sd_device *dev, void *userdata) {
         UdevWorker *worker = ASSERT_PTR(userdata);
         int r;
 
+        assert(monitor);
         assert(dev);
 
         r = worker_process_device(worker, dev);
-        if (r == EVENT_RESULT_TRY_AGAIN)
-                /* if we couldn't acquire the flock(), then requeue the event */
-                log_device_debug(dev, "Block device is currently locked, requeueing the event.");
-        else {
-                if (r < 0) {
-                        log_device_warning_errno(dev, r, "Failed to process device, ignoring: %m");
-                        (void) device_add_errno(dev, r);
-                }
+        if (r < 0) {
+                log_device_warning_errno(dev, r, "Failed to process device, ignoring: %m");
+                (void) device_add_errno(dev, r);
 
-                /* send processed event back to libudev listeners */
+                /* broadcast (possibly partially processed) event to libudev listeners */
                 int k = device_monitor_send(monitor, NULL, dev);
-                if (k < 0)
-                        log_device_warning_errno(dev, k, "Failed to broadcast event to libudev listeners, ignoring: %m");
-        }
+                if (k < 0) {
+                        log_device_warning_errno(dev, k, "Failed to broadcast event to libudev listeners: %m");
+                        (void) sd_event_exit(worker->event, k);
+                        return 0;
+                }
 
-        /* send udevd the result of the event execution */
-        r = worker_send_result(worker, r);
-        if (r < 0)
-                log_device_warning_errno(dev, r, "Failed to send signal to main daemon, ignoring: %m");
+                const char *e = errno_to_name(r);
+                r = sd_notifyf(/* unset_environment = */ false, "ERRNO=%i%s%s", -r, e ? "\nERRNO_NAME=" : "", strempty(e));
+                if (r < 0) {
+                        log_device_warning_errno(dev, r, "Failed to send notification message to manager process, ignoring: %m");
+                        (void) sd_event_exit(worker->event, r);
+                        return 0;
+                }
+        }
 
         /* Reset the log level, as it might be changed by "OPTIONS=log_level=". */
         log_set_max_level(worker->config.log_level);
index 8e66e2f6b5e62c4a026f5825fd141ed2cab5fc7f..521141c170df29ca1112fd835662a86748f9f5eb 100644 (file)
@@ -25,26 +25,11 @@ typedef struct UdevWorker {
         Hashmap *properties;
         UdevRules *rules;
 
-        int pipe_fd;
         int inotify_fd; /* Do not close! */
 
         UdevConfig config;
 } UdevWorker;
 
-/* passed from worker to main process */
-typedef enum EventResult {
-        EVENT_RESULT_NERRNO_MIN       = -ERRNO_MAX,
-        EVENT_RESULT_NERRNO_MAX       = -1,
-        EVENT_RESULT_SUCCESS          = 0,
-        EVENT_RESULT_EXIT_STATUS_BASE = 0,
-        EVENT_RESULT_EXIT_STATUS_MAX  = 255,
-        EVENT_RESULT_TRY_AGAIN        = 256, /* when the block device is locked by another process. */
-        EVENT_RESULT_SIGNAL_BASE      = 257,
-        EVENT_RESULT_SIGNAL_MAX       = EVENT_RESULT_SIGNAL_BASE + _NSIG,
-        _EVENT_RESULT_MAX,
-        _EVENT_RESULT_INVALID         = -EINVAL,
-} EventResult;
-
 void udev_worker_done(UdevWorker *worker);
 int udev_worker_main(UdevWorker *worker, sd_device *dev);