struct eb_root idle_conn_srv = EB_ROOT;
struct task *idle_conn_task __read_mostly = NULL;
struct list servers_list = LIST_HEAD_INIT(servers_list);
+static struct task *server_atomic_sync_task = NULL;
+static event_hdl_async_equeue server_atomic_sync_queue;
/* SERVER DELETE(n)->ADD global tracker:
* This is meant to provide srv->rid (revision id) value.
/* Update server's addr:svc_port tuple in INET context
*
- * Must be called with server lock held
+ * Must be called under thread isolation to ensure consistent readings accross
+ * all threads (addr:svc_port might be read without srv lock being held).
*/
void _srv_set_inetaddr(struct server *srv, const struct sockaddr_storage *addr, unsigned int svc_port)
{
srv->svc_port = svc_port;
}
+/*
+ * Function executed by server_atomic_sync_task to perform atomic updates on
+ * compatible server struct members that are not guarded by any lock since
+ * they are not supposed to change often and are subject to being used in
+ * sensitive codepaths
+ *
+ * Some updates may require thread isolation: we start without isolation
+ * but as soon as we encounter an event that requires isolation, we do so.
+ * Once the event is processed, we keep the isolation until we've processed
+ * the whole batch of events and leave isolation once we're done, as it would
+ * be very costly to try to acquire isolation multiple times in a row.
+ * The task will limit itself to a number of events per run to prevent
+ * thread contention (see: "tune.events.max-events-at-once").
+ *
+ * TODO: if we find out that enforcing isolation is too costly, we may
+ * consider adding thread_isolate_try_full(timeout) or equivalent to the
+ * thread API so that we can do our best not to block harmless threads
+ * for too long if one or multiple threads are still heavily busy. This
+ * would mean that the task would be capable of rescheduling itself to
+ * start again on the current event if it failed to acquire thread
+ * isolation. This would also imply that the event_hdl API allows us
+ * to check an event without popping it from the queue first (remove the
+ * event once it is successfully processed).
+ */
+static void srv_set_addr_desc(struct server *s, int reattach);
+static struct task *server_atomic_sync(struct task *task, void *context, unsigned int state)
+{
+ unsigned int remain = event_hdl_tune.max_events_at_once; // to limit max number of events per batch
+ struct event_hdl_async_event *event;
+
+ /* check for new server events that we care about */
+ while ((event = event_hdl_async_equeue_pop(&server_atomic_sync_queue))) {
+ if (event_hdl_sub_type_equal(event->type, EVENT_HDL_SUB_END)) {
+ /* ending event: no more events to come */
+ event_hdl_async_free_event(event);
+ task_destroy(task);
+ task = NULL;
+ break;
+ }
+
+ if (!remain) {
+ /* STOP: we've already spent all our budget here, and
+ * considering we possibly are under isolation, we cannot
+ * keep blocking other threads any longer.
+ *
+ * Reschedule the task to finish where we left off if
+ * there are remaining events in the queue.
+ */
+ if (!event_hdl_async_equeue_isempty(&server_atomic_sync_queue))
+ task_wakeup(task, TASK_WOKEN_OTHER);
+ break;
+ }
+ remain--;
+
+ /* new event to process */
+ if (event_hdl_sub_type_equal(event->type, EVENT_HDL_SUB_SERVER_INETADDR)) {
+ struct sockaddr_storage new_addr;
+ struct event_hdl_cb_data_server_inetaddr *data = event->data;
+ struct proxy *px;
+ struct server *srv;
+
+ /* server ip:port changed, we must atomically update data members
+ * to prevent invalid reads by other threads.
+ */
+
+ /* check if related server still exists */
+ px = proxy_find_by_id(data->server.safe.proxy_uuid, PR_CAP_BE, 0);
+ if (!px)
+ continue;
+ srv = findserver_unique_id(px, data->server.safe.puid, data->server.safe.rid);
+ if (!srv)
+ continue;
+
+ /* prepare new addr based on event cb data */
+ memset(&new_addr, 0, sizeof(new_addr));
+ new_addr.ss_family = data->safe.next.family;
+ switch (new_addr.ss_family) {
+ case AF_INET:
+ ((struct sockaddr_in *)&new_addr)->sin_addr.s_addr =
+ data->safe.next.addr.v4.s_addr;
+ break;
+ case AF_INET6:
+ memcpy(&((struct sockaddr_in6 *)&new_addr)->sin6_addr,
+ &data->safe.next.addr.v6,
+ sizeof(struct in6_addr));
+ break;
+ default:
+ /* should not happen */
+ break;
+ }
+ /*
+ * this requires thread isolation, which is safe since we're the only
+ * task working for the current subscription and we don't hold locks
+ * or ressources that other threads may depend on to complete a running
+ * cycle. Note that we do this way because we assume that this event is
+ * rather rare.
+ */
+ if (!thread_isolated())
+ thread_isolate_full();
+
+ /* apply new addr:port combination */
+ _srv_set_inetaddr(srv, &new_addr, data->safe.next.svc_port);
+
+ /* propagate the changes */
+ if (data->safe.purge_conn) /* force connection cleanup on the given server? */
+ srv_cleanup_connections(srv);
+ srv_set_dyncookie(srv);
+ srv_set_addr_desc(srv, 1);
+ }
+ event_hdl_async_free_event(event);
+ }
+
+ /* some events possibly required thread_isolation:
+ * now that we are done, we must leave thread isolation before
+ * returning
+ */
+ if (thread_isolated())
+ thread_release();
+
+ return task;
+}
+
+/* Try to start the atomic server sync task.
+ *
+ * Returns ERR_NONE on success and a combination of ERR_CODE on failure
+ */
+static int server_atomic_sync_start()
+{
+ struct event_hdl_sub_type subscriptions = EVENT_HDL_SUB_NONE;
+
+ if (server_atomic_sync_task)
+ return ERR_NONE; // nothing to do
+ server_atomic_sync_task = task_new_anywhere();
+ if (!server_atomic_sync_task)
+ goto fail;
+ server_atomic_sync_task->process = server_atomic_sync;
+ event_hdl_async_equeue_init(&server_atomic_sync_queue);
+
+ /* task created, now subscribe to relevant server events in the global list */
+ subscriptions = event_hdl_sub_type_add(subscriptions, EVENT_HDL_SUB_SERVER_INETADDR);
+ if (!event_hdl_subscribe(NULL, subscriptions,
+ EVENT_HDL_ASYNC_TASK(&server_atomic_sync_queue,
+ server_atomic_sync_task,
+ NULL,
+ NULL)))
+ goto fail;
+
+
+ return ERR_NONE;
+
+ fail:
+ task_destroy(server_atomic_sync_task);
+ server_atomic_sync_task = NULL;
+ return ERR_ALERT | ERR_FATAL;
+}
+REGISTER_POST_CHECK(server_atomic_sync_start);
+
/* fill common server event data members struct
* must be called with server lock or under thread isolate
*/
_srv_event_hdl_prepare(&cb_data.common, s, 0);
_srv_event_hdl_prepare_inetaddr(&cb_data.addr, &s->addr, s->svc_port, &new_addr, s->svc_port, 0);
- /* apply the new IP address */
- _srv_set_inetaddr(s, &new_addr, s->svc_port);
-
+ /* server_atomic_sync_task will apply the changes for us */
_srv_event_hdl_publish(EVENT_HDL_SUB_SERVER_INETADDR, cb_data, s);
- srv_set_dyncookie(s);
- srv_set_addr_desc(s, 1);
-
return 0;
}
((port_change) ? new_port : s->svc_port),
1);
- /* apply new ip and port */
- _srv_set_inetaddr(s,
- ((ip_change) ? &sa : &s->addr),
- ((port_change) ? new_port : s->svc_port));
-
+ /* server_atomic_sync_task will apply the changes for us */
_srv_event_hdl_publish(EVENT_HDL_SUB_SERVER_INETADDR, cb_data, s);
-
- /* force connection cleanup on the given server */
- srv_cleanup_connections(s);
- srv_set_dyncookie(s);
- srv_set_addr_desc(s, 1);
}
if (updater)
chunk_appendf(msg, " by '%s'", updater);