#include "version.h"
#define WORKER_NUM_MAX 2048U
+#define EVENT_RETRY_INTERVAL_USEC (200 * USEC_PER_MSEC)
+#define EVENT_RETRY_TIMEOUT_USEC (3 * USEC_PER_MINUTE)
static bool arg_debug = false;
static int arg_daemonize = false;
sd_device_action_t action;
uint64_t seqnum;
uint64_t blocker_seqnum;
+ usec_t retry_again_next_usec;
+ usec_t retry_again_timeout_usec;
sd_event_source *timeout_warning_event;
sd_event_source *timeout_event;
} Worker;
/* passed from worker to main process */
-typedef struct WorkerMessage {
-} WorkerMessage;
+typedef enum EventResult {
+ EVENT_RESULT_SUCCESS,
+ EVENT_RESULT_FAILED,
+ EVENT_RESULT_TRY_AGAIN, /* when the block device is locked by another process. */
+ _EVENT_RESULT_MAX,
+ _EVENT_RESULT_INVALID = -EINVAL,
+} EventResult;
static Event *event_free(Event *event) {
if (!event)
"Failed to broadcast event to libudev listeners, ignoring: %m");
}
-static int worker_send_message(int fd) {
- WorkerMessage message = {};
+static int worker_send_result(Manager *manager, EventResult result) {
+ assert(manager);
+ assert(manager->worker_watch[WRITE_END] >= 0);
- return loop_write(fd, &message, sizeof(message), false);
+ return loop_write(manager->worker_watch[WRITE_END], &result, sizeof(result), false);
}
static int worker_lock_block_device(sd_device *dev, int *ret_fd) {
if (!udev_event)
return -ENOMEM;
+ /* If this is a block device and the device is locked currently via the BSD advisory locks,
+ * someone else is using it exclusively. We don't run our udev rules now to not interfere.
+ * Instead of processing the event, we requeue the event and will try again after a delay.
+ *
+ * The user-facing side of this: https://systemd.io/BLOCK_DEVICE_LOCKING */
r = worker_lock_block_device(dev, &fd_lock);
- if (r == -EAGAIN) {
- /* So this is a block device and the device is locked currently via the BSD advisory locks —
- * someone else is exclusively using it. This means we don't run our udev rules now, to not
- * interfere. However we want to know when the device is unlocked again, and retrigger the
- * device again then, so that the rules are run eventually. For that we use IN_CLOSE_WRITE
- * inotify watches (which isn't exactly the same as waiting for the BSD locks to release, but
- * not totally off, as long as unlock+close() is done together, as it usually is).
- *
- * (The user-facing side of this: https://systemd.io/BLOCK_DEVICE_LOCKING)
- *
- * There's a bit of a chicken and egg problem here for this however: inotify watching is
- * supposed to be enabled via an option set via udev rules (OPTIONS+="watch"). If we skip the
- * udev rules here however (as we just said we do), we would thus never see that specific
- * udev rule, and thus never turn on inotify watching. But in order to catch up eventually
- * and run them we need the inotify watching: hence a classic chicken and egg problem.
- *
- * Our way out here: if we see the block device locked, unconditionally watch the device via
- * inotify, regardless of any explicit request via OPTIONS+="watch". Thus, a device that is
- * currently locked via the BSD file locks will be treated as if we ran a single udev rule
- * only for it: the one that turns on inotify watching for it. If we eventually see the
- * inotify IN_CLOSE_WRITE event, and then run the rules after all and we then realize that
- * this wasn't actually requested (i.e. no OPTIONS+="watch" set) we'll simply turn off the
- * watching again (see below). Effectively this means: inotify watching is now enabled either
- * a) when the udev rules say so, or b) while the device is locked.
- *
- * Worst case scenario hence: in the (unlikely) case someone locked the device and we clash
- * with that we might do inotify watching for a brief moment for a device where we actually
- * weren't supposed to. But that shouldn't be too bad, in particular as BSD locks being taken
- * on a block device is kinda an indication that the inotify logic is desired too, to some
- * degree — they go hand-in-hand after all. */
-
- log_device_debug(dev, "Block device is currently locked, installing watch to wait until the lock is released.");
- (void) udev_watch_begin(manager->inotify_fd, dev);
-
- /* Now the watch is installed, let's lock the device again, maybe in the meantime things changed */
- r = worker_lock_block_device(dev, &fd_lock);
- }
if (r < 0)
return r;
static int worker_device_monitor_handler(sd_device_monitor *monitor, sd_device *dev, void *userdata) {
Manager *manager = userdata;
+ EventResult result;
int r;
assert(dev);
assert(manager);
r = worker_process_device(manager, dev);
- if (r == -EAGAIN)
- /* if we couldn't acquire the flock(), then proceed quietly */
- log_device_debug_errno(dev, r, "Device currently locked, not processing.");
- else {
- if (r < 0)
- log_device_warning_errno(dev, r, "Failed to process device, ignoring: %m");
+ if (r == -EAGAIN) {
+ /* if we couldn't acquire the flock(), then requeue the event */
+ result = EVENT_RESULT_TRY_AGAIN;
+ log_device_debug_errno(dev, r, "Block device is currently locked, requeueing the event.");
+ } else if (r < 0) {
+ result = EVENT_RESULT_FAILED;
+ log_device_warning_errno(dev, r, "Failed to process device, ignoring: %m");
+ } else
+ result = EVENT_RESULT_SUCCESS;
+ if (result != EVENT_RESULT_TRY_AGAIN)
/* send processed event back to libudev listeners */
device_broadcast(monitor, dev);
- }
/* send udevd the result of the event execution */
- r = worker_send_message(manager->worker_watch[WRITE_END]);
+ r = worker_send_result(manager, result);
if (r < 0)
log_device_warning_errno(dev, r, "Failed to send signal to main daemon, ignoring: %m");
assert(event->manager);
assert(event->blocker_seqnum <= event->seqnum);
+ if (event->retry_again_next_usec > 0) {
+ usec_t now_usec;
+
+ r = sd_event_now(event->manager->event, clock_boottime_or_monotonic(), &now_usec);
+ if (r < 0)
+ return r;
+
+ if (event->retry_again_next_usec <= now_usec)
+ return true;
+ }
+
if (event->blocker_seqnum == event->seqnum)
/* we have checked previously and no blocker found */
return false;
return 0;
}
+static int event_requeue(Event *event) {
+ usec_t now_usec;
+ int r;
+
+ assert(event);
+ assert(event->manager);
+ assert(event->manager->event);
+
+ event->timeout_warning_event = sd_event_source_disable_unref(event->timeout_warning_event);
+ event->timeout_event = sd_event_source_disable_unref(event->timeout_event);
+
+ /* add a short delay to suppress busy loop */
+ r = sd_event_now(event->manager->event, clock_boottime_or_monotonic(), &now_usec);
+ if (r < 0)
+ return log_device_warning_errno(event->dev, r,
+ "Failed to get current time, "
+ "skipping event (SEQNUM=%"PRIu64", ACTION=%s): %m",
+ event->seqnum, strna(device_action_to_string(event->action)));
+
+ if (event->retry_again_timeout_usec > 0 && event->retry_again_timeout_usec <= now_usec)
+ return log_device_warning_errno(event->dev, SYNTHETIC_ERRNO(ETIMEDOUT),
+ "The underlying block device is locked by a process more than %s, "
+ "skipping event (SEQNUM=%"PRIu64", ACTION=%s).",
+ FORMAT_TIMESPAN(EVENT_RETRY_TIMEOUT_USEC, USEC_PER_MINUTE),
+ event->seqnum, strna(device_action_to_string(event->action)));
+
+ event->retry_again_next_usec = usec_add(now_usec, EVENT_RETRY_INTERVAL_USEC);
+ if (event->retry_again_timeout_usec == 0)
+ event->retry_again_timeout_usec = usec_add(now_usec, EVENT_RETRY_TIMEOUT_USEC);
+
+ if (event->worker && event->worker->event == event)
+ event->worker->event = NULL;
+ event->worker = NULL;
+
+ event->state = EVENT_QUEUED;
+ return 0;
+}
+
static int event_queue_insert(Manager *manager, sd_device *dev) {
sd_device_action_t action;
uint64_t seqnum;
assert(manager);
for (;;) {
- WorkerMessage msg;
- struct iovec iovec = {
- .iov_base = &msg,
- .iov_len = sizeof(msg),
- };
+ EventResult result;
+ struct iovec iovec = IOVEC_MAKE(&result, sizeof(result));
CMSG_BUFFER_TYPE(CMSG_SPACE(sizeof(struct ucred))) control;
struct msghdr msghdr = {
.msg_iov = &iovec,
cmsg_close_all(&msghdr);
- if (size != sizeof(WorkerMessage)) {
+ if (size != sizeof(EventResult)) {
log_warning("Ignoring worker message with invalid size %zi bytes", size);
continue;
}
worker->state = WORKER_IDLE;
/* worker returned */
+ if (result == EVENT_RESULT_TRY_AGAIN &&
+ event_requeue(worker->event) < 0)
+ device_broadcast(manager->monitor, worker->event->dev);
+
+ /* When event_requeue() succeeds, worker->event is NULL, and event_free() handles NULL gracefully. */
event_free(worker->event);
}
assert(manager);
- if (!LIST_IS_EMPTY(manager->events))
+ if (!LIST_IS_EMPTY(manager->events)) {
+ /* Try to process pending events if idle workers exist. Why is this necessary?
+ * When a worker finished an event and became idle, even if there was a pending event,
+ * the corresponding device might have been locked and the processing of the event
+ * delayed for a while, preventing the worker from processing the event immediately.
+ * Now, the device may be unlocked. Let's try again! */
+ event_queue_start(manager);
return 1;
+ }
/* There are no queued events. Let's remove /run/udev/queue and clean up the idle processes. */