* system event listener thread so that we have the IPC handle
* before we need it.
*/
- if (ipc_server_run_async(&state->ipc_server_data,
- state->path_ipc.buf, &ipc_opts,
- handle_client, state))
+ if (ipc_server_init_async(&state->ipc_server_data,
+ state->path_ipc.buf, &ipc_opts,
+ handle_client, state))
return error_errno(
_("could not start IPC thread pool on '%s'"),
state->path_ipc.buf);
+ ipc_server_start_async(&state->ipc_server_data);
+
/*
* Start the fsmonitor listener thread to collect filesystem
* events.
struct ipc_server_data *server_data = NULL;
int ret;
- ret = ipc_server_run_async(&server_data, path, opts,
- application_cb, application_data);
+ ret = ipc_server_init_async(&server_data, path, opts,
+ application_cb, application_data);
if (ret)
return ret;
+ ipc_server_start_async(server_data);
ret = ipc_server_await(server_data);
ipc_server_free(server_data);
int back_pos;
int front_pos;
+ int started;
int shutdown_requested;
int is_stopped;
};
/*
* Start IPC server in a pool of background threads.
*/
-int ipc_server_run_async(struct ipc_server_data **returned_server_data,
- const char *path, const struct ipc_server_opts *opts,
- ipc_server_application_cb *application_cb,
- void *application_data)
+int ipc_server_init_async(struct ipc_server_data **returned_server_data,
+ const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data)
{
struct unix_ss_socket *server_socket = NULL;
struct ipc_server_data *server_data;
server_data->accept_thread->fd_send_shutdown = sv[0];
server_data->accept_thread->fd_wait_shutdown = sv[1];
+ /*
+ * Hold work-available mutex so that no work can start until
+ * we unlock it.
+ */
+ pthread_mutex_lock(&server_data->work_available_mutex);
+
if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
accept_thread_proc, server_data->accept_thread))
die_errno(_("could not start accept_thread '%s'"), path);
return 0;
}
+void ipc_server_start_async(struct ipc_server_data *server_data)
+{
+ if (!server_data || server_data->started)
+ return;
+
+ server_data->started = 1;
+ pthread_mutex_unlock(&server_data->work_available_mutex);
+}
+
/*
* Gently tell the IPC server treads to shutdown.
* Can be run on any thread.
trace2_region_enter("ipc-server", "server-stop-async", NULL);
- pthread_mutex_lock(&server_data->work_available_mutex);
+ /* If we haven't started yet, we are already holding lock. */
+ if (server_data->started)
+ pthread_mutex_lock(&server_data->work_available_mutex);
server_data->shutdown_requested = 1;
HANDLE hEventStopRequested;
struct ipc_server_thread_data *thread_list;
int is_stopped;
+
+ pthread_mutex_t startup_barrier;
+ int started;
};
enum connect_result {
return ret;
}
+static void wait_for_startup_barrier(struct ipc_server_data *server_data)
+{
+ /*
+ * Temporarily hold the startup_barrier mutex before starting,
+ * which lets us know that it's OK to start serving requests.
+ */
+ pthread_mutex_lock(&server_data->startup_barrier);
+ pthread_mutex_unlock(&server_data->startup_barrier);
+}
+
/*
* Thread proc for an IPC server worker thread. It handles a series of
* connections from clients. It cleans and reuses the hPipe between each
memset(&oConnect, 0, sizeof(oConnect));
oConnect.hEvent = hEventConnected;
+ wait_for_startup_barrier(server_thread_data->server_data);
+
for (;;) {
cr = wait_for_connection(server_thread_data, &oConnect);
return hPipe;
}
-int ipc_server_run_async(struct ipc_server_data **returned_server_data,
- const char *path, const struct ipc_server_opts *opts,
- ipc_server_application_cb *application_cb,
- void *application_data)
+int ipc_server_init_async(struct ipc_server_data **returned_server_data,
+ const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data)
{
struct ipc_server_data *server_data;
wchar_t wpath[MAX_PATH];
strbuf_addstr(&server_data->buf_path, path);
wcscpy(server_data->wpath, wpath);
+ /*
+ * Hold the startup_barrier lock so that no threads will progress
+ * until ipc_server_start_async() is called.
+ */
+ pthread_mutex_init(&server_data->startup_barrier, NULL);
+ pthread_mutex_lock(&server_data->startup_barrier);
+
if (nr_threads < 1)
nr_threads = 1;
return 0;
}
+void ipc_server_start_async(struct ipc_server_data *server_data)
+{
+ if (!server_data || server_data->started)
+ return;
+
+ server_data->started = 1;
+ pthread_mutex_unlock(&server_data->startup_barrier);
+}
+
int ipc_server_stop_async(struct ipc_server_data *server_data)
{
if (!server_data)
* We DO NOT attempt to force them to drop an active connection.
*/
SetEvent(server_data->hEventStopRequested);
+
+ /*
+ * If we haven't yet told the threads they are allowed to run,
+ * do so now, so they can receive the shutdown event.
+ */
+ ipc_server_start_async(server_data);
+
return 0;
}
free(std);
}
+ pthread_mutex_destroy(&server_data->startup_barrier);
+
free(server_data);
}
* When a client IPC message is received, the `application_cb` will be
* called (possibly on a random thread) to handle the message and
* optionally compose a reply message.
+ *
+ * This initializes all threads but no actual work will be done until
+ * ipc_server_start_async() is called.
+ */
+int ipc_server_init_async(struct ipc_server_data **returned_server_data,
+ const char *path, const struct ipc_server_opts *opts,
+ ipc_server_application_cb *application_cb,
+ void *application_data);
+
+/*
+ * Let an async server start running. This needs to be called only once
+ * after initialization.
*/
-int ipc_server_run_async(struct ipc_server_data **returned_server_data,
- const char *path, const struct ipc_server_opts *opts,
- ipc_server_application_cb *application_cb,
- void *application_data);
+void ipc_server_start_async(struct ipc_server_data *server_data);
/*
* Gently signal the IPC server pool to shutdown. No new client