#include "udev-watch.h"
#include "udev-worker.h"
-#define EVENT_RETRY_INTERVAL_USEC (200 * USEC_PER_MSEC)
-#define EVENT_RETRY_TIMEOUT_USEC (3 * USEC_PER_MINUTE)
+#define EVENT_REQUEUE_INTERVAL_USEC (200 * USEC_PER_MSEC)
+#define EVENT_REQUEUE_TIMEOUT_USEC (3 * USEC_PER_MINUTE)
typedef enum EventState {
EVENT_UNDEF,
EVENT_QUEUED,
EVENT_RUNNING,
+ EVENT_LOCKED,
} EventState;
typedef struct Event {
const char *devnode;
/* Used when the device is locked by another program. */
- usec_t retry_again_next_usec;
- usec_t retry_again_timeout_usec;
- sd_event_source *retry_event_source;
+ usec_t requeue_next_usec;
+ usec_t requeue_timeout_usec;
+ unsigned locked_event_prioq_index;
+ char *whole_disk;
+ LIST_FIELDS(Event, same_disk);
bool dependencies_built;
Set *blocker_events;
event->dependencies_built = false;
}
+static void event_unset_whole_disk(Event *event) {
+ Manager *manager = ASSERT_PTR(ASSERT_PTR(event)->manager);
+
+ if (!event->whole_disk)
+ return;
+
+ if (event->same_disk_prev)
+ /* If this is not the first event, then simply remove this event. */
+ event->same_disk_prev->same_disk_next = event->same_disk_next;
+ else if (event->same_disk_next)
+ /* If this is the first event, replace with the next event. */
+ assert_se(hashmap_replace(manager->locked_events_by_disk, event->same_disk_next->whole_disk, event->same_disk_next) >= 0);
+ else
+ /* Otherwise, remove the entry. */
+ assert_se(hashmap_remove(manager->locked_events_by_disk, event->whole_disk) == event);
+
+ if (event->same_disk_next)
+ event->same_disk_next->same_disk_prev = event->same_disk_prev;
+
+ event->same_disk_prev = event->same_disk_next = NULL;
+
+ event->whole_disk = mfree(event->whole_disk);
+}
+
static Event* event_free(Event *event) {
if (!event)
return NULL;
if (event->manager) {
+ event_unset_whole_disk(event);
+ prioq_remove(event->manager->locked_events_by_time, event, &event->locked_event_prioq_index);
+
if (event->manager->last_event == event)
event->manager->last_event = event->event_prev;
LIST_REMOVE(event, event->manager->events, event);
sd_device_unref(event->dev);
- sd_event_source_unref(event->retry_event_source);
-
return mfree(event);
}
while (manager->events)
event_free(manager->events);
+ prioq_free(manager->locked_events_by_time);
+ hashmap_free(manager->locked_events_by_disk);
+ sd_event_source_unref(manager->requeue_locked_events_timer_event_source);
+
safe_close(manager->inotify_fd);
free(manager->worker_notify_socket_path);
/* Kill all workers with SIGTERM, and disable unnecessary timer event source. */
manager_kill_workers(manager, SIGTERM);
manager->kill_workers_event = sd_event_source_disable_unref(manager->kill_workers_event);
+
+ (void) event_source_disable(manager->requeue_locked_events_timer_event_source);
}
void notify_ready(Manager *manager) {
log_device_uevent(event->dev, "Device ready for processing");
- (void) event_source_disable(event->retry_event_source);
-
Worker *worker;
HASHMAP_FOREACH(worker, manager->workers) {
if (worker->state != WORKER_IDLE)
int r;
assert(event);
- assert(event->manager);
-
- if (event->retry_again_next_usec > 0) {
- assert(event->dependencies_built);
-
- usec_t now_usec;
- r = sd_event_now(event->manager->event, CLOCK_BOOTTIME, &now_usec);
- if (r < 0)
- return r;
-
- if (event->retry_again_next_usec > now_usec)
- return true;
- }
r = event_build_dependencies(event);
if (r < 0)
return 0;
}
-static int on_event_retry(sd_event_source *s, uint64_t usec, void *userdata) {
- /* This does nothing. The on_post() callback will start the event if there exists an idle worker. */
+static int on_requeue_locked_events(sd_event_source *s, uint64_t usec, void *userdata) {
+ /* This does nothing. The on_post() callback will requeue locked events. */
return 1;
}
-static int event_requeue(Event *event) {
- usec_t now_usec;
+static int manager_requeue_locked_events(Manager *manager) {
+ usec_t now_usec = 0;
int r;
- assert(event);
- assert(event->manager);
- assert(event->manager->event);
+ assert(manager);
+
+ for (;;) {
+ Event *event = prioq_peek(manager->locked_events_by_time);
+ if (!event)
+ return event_source_disable(manager->requeue_locked_events_timer_event_source);
+
+ if (now_usec == 0) {
+ r = sd_event_now(manager->event, CLOCK_MONOTONIC, &now_usec);
+ if (r < 0)
+ return r;
+ }
+
+ if (event->requeue_next_usec > now_usec)
+ return event_reset_time(
+ manager->event,
+ &manager->requeue_locked_events_timer_event_source,
+ CLOCK_MONOTONIC,
+ event->requeue_next_usec,
+ USEC_PER_SEC,
+ on_requeue_locked_events,
+ /* userdata = */ NULL,
+ EVENT_PRIORITY_REQUEUE_EVENT,
+ "requeue-locked-events",
+ /* force_reset = */ true);
+
+ assert_se(prioq_pop(manager->locked_events_by_time) == event);
+ event_unset_whole_disk(event);
+ event->state = EVENT_QUEUED;
+ }
+}
+
+int manager_requeue_locked_events_by_device(Manager *manager, sd_device *dev) {
+ int r;
+
+ /* When a new event for a block device is queued or we get an inotify event, assume that the
+ * device is not locked anymore. The assumption may not be true, but that should not cause any
+ * issues, as in that case events will be requeued soon. */
+
+ if (hashmap_isempty(manager->locked_events_by_disk))
+ return 0;
+
+ const char *devname;
+ r = udev_get_whole_disk(dev, NULL, &devname);
+ if (r <= 0)
+ return r;
+
+ Event *first = hashmap_remove(manager->locked_events_by_disk, devname);
+ if (!first)
+ return 0;
+ Event *event;
+ while ((event = LIST_POP(same_disk, first))) {
+ assert_se(prioq_remove(manager->locked_events_by_time, event, &event->locked_event_prioq_index) > 0);
+ event->whole_disk = mfree(event->whole_disk);
+ event->state = EVENT_QUEUED;
+ }
+
+ return 0;
+}
+
+static int locked_event_compare(const Event *x, const Event *y) {
+ return CMP(x->requeue_next_usec, y->requeue_next_usec);
+}
+
+static int event_enter_locked(Event *event, const char *whole_disk) {
+ Manager *manager = ASSERT_PTR(ASSERT_PTR(event)->manager);
sd_device *dev = ASSERT_PTR(event->dev);
+ usec_t now_usec;
+ int r;
/* add a short delay to suppress busy loop */
- r = sd_event_now(event->manager->event, CLOCK_BOOTTIME, &now_usec);
+ r = sd_event_now(manager->event, CLOCK_MONOTONIC, &now_usec);
if (r < 0)
return log_device_warning_errno(
dev, r,
"Failed to get current time, skipping event (SEQNUM=%"PRIu64", ACTION=%s): %m",
event->seqnum, strna(device_action_to_string(event->action)));
- if (event->retry_again_timeout_usec > 0 && event->retry_again_timeout_usec <= now_usec)
+ if (event->requeue_timeout_usec > 0 && event->requeue_timeout_usec <= now_usec)
return log_device_warning_errno(
dev, SYNTHETIC_ERRNO(ETIMEDOUT),
"The underlying block device is locked by a process more than %s, skipping event (SEQNUM=%"PRIu64", ACTION=%s).",
- FORMAT_TIMESPAN(EVENT_RETRY_TIMEOUT_USEC, USEC_PER_MINUTE),
+ FORMAT_TIMESPAN(EVENT_REQUEUE_TIMEOUT_USEC, USEC_PER_MINUTE),
event->seqnum, strna(device_action_to_string(event->action)));
- event->retry_again_next_usec = usec_add(now_usec, EVENT_RETRY_INTERVAL_USEC);
- if (event->retry_again_timeout_usec == 0)
- event->retry_again_timeout_usec = usec_add(now_usec, EVENT_RETRY_TIMEOUT_USEC);
+ event->requeue_next_usec = usec_add(now_usec, EVENT_REQUEUE_INTERVAL_USEC);
+ if (event->requeue_timeout_usec == 0)
+ event->requeue_timeout_usec = usec_add(now_usec, EVENT_REQUEUE_TIMEOUT_USEC);
- r = event_reset_time_relative(
- event->manager->event,
- &event->retry_event_source,
- CLOCK_MONOTONIC,
- EVENT_RETRY_INTERVAL_USEC,
- /* accuracy = */ 0,
- on_event_retry,
- /* userdata = */ NULL,
- EVENT_PRIORITY_RETRY_EVENT,
- "retry-event",
- /* force_reset = */ true);
- if (r < 0)
+ if (isempty(whole_disk))
return log_device_warning_errno(
- dev, r,
- "Failed to reset timer event source for retrying event, skipping event (SEQNUM=%"PRIu64", ACTION=%s): %m",
+ dev, SYNTHETIC_ERRNO(EBADMSG),
+ "Unexpected notify message received, skipping event (SEQNUM=%"PRIu64", ACTION=%s): %m",
event->seqnum, strna(device_action_to_string(event->action)));
- event->state = EVENT_QUEUED;
- return 0;
-}
-
-int event_queue_assume_block_device_unlocked(Manager *manager, sd_device *dev) {
- const char *devname;
- int r;
-
- /* When a new event for a block device is queued or we get an inotify event, assume that the
- * device is not locked anymore. The assumption may not be true, but that should not cause any
- * issues, as in that case events will be requeued soon. */
-
- r = udev_get_whole_disk(dev, NULL, &devname);
- if (r <= 0)
- return r;
-
- LIST_FOREACH(event, event, manager->events) {
- const char *event_devname;
-
- if (event->state != EVENT_QUEUED)
- continue;
-
- if (event->retry_again_next_usec == 0)
- continue;
+ _cleanup_free_ char *whole_disk_copy = strdup(whole_disk);
+ if (!whole_disk_copy)
+ return log_oom();
- if (udev_get_whole_disk(event->dev, NULL, &event_devname) <= 0)
- continue;
-
- if (!streq(devname, event_devname))
- continue;
+ Event *first = hashmap_get(manager->locked_events_by_disk, whole_disk_copy);
+ LIST_PREPEND(same_disk, first, event);
- event->retry_again_next_usec = 0;
+ r = hashmap_ensure_replace(&manager->locked_events_by_disk, &path_hash_ops, whole_disk_copy, first);
+ if (r < 0) {
+ LIST_REMOVE(same_disk, first, event);
+ return log_oom();
}
+ event->whole_disk = TAKE_PTR(whole_disk_copy);
+
+ r = prioq_ensure_put(&manager->locked_events_by_time, locked_event_compare, event, &event->locked_event_prioq_index);
+ if (r < 0)
+ return log_oom();
+ event->state = EVENT_LOCKED;
return 0;
}
.devpath_old = devpath_old,
.devnode = devnode,
.state = EVENT_QUEUED,
+ .locked_event_prioq_index = PRIOQ_IDX_NULL,
};
LIST_INSERT_AFTER(event, manager->events, manager->last_event, event);
return 1;
}
- (void) event_queue_assume_block_device_unlocked(manager, dev);
-
+ (void) manager_requeue_locked_events_by_device(manager, dev);
return 1;
}
_cleanup_(event_freep) Event *event = worker_detach_event(worker);
if (strv_contains(l, "TRY_AGAIN=1")) {
- /* Worker cannot lock the device. Requeue the event. */
- r = event_requeue(event);
+ /* Worker cannot lock the device. */
+ r = event_enter_locked(event, strv_find_startswith(l, "WHOLE_DISK="));
if (r < 0) {
(void) device_add_errno(event->dev, r);
(void) device_broadcast_on_error(event->dev, manager->monitor);
return on_post_exit(manager);
if (manager->events) {
+ (void) manager_requeue_locked_events(manager);
+
/* Try to process pending events if idle workers exist. Why is this necessary?
* When a worker finished an event and became idle, even if there was a pending event,
* the corresponding device might have been locked and the processing of the event