]> git.ipfire.org Git - thirdparty/git.git/commitdiff
fsmonitor--daemon: implement handle_client callback
authorJeff Hostetler <jeffhost@microsoft.com>
Fri, 25 Mar 2022 18:03:01 +0000 (18:03 +0000)
committerJunio C Hamano <gitster@pobox.com>
Fri, 25 Mar 2022 23:04:16 +0000 (16:04 -0700)
Teach fsmonitor--daemon to respond to IPC requests from client
Git processes and respond with a list of modified pathnames
relative to the provided token.

Signed-off-by: Jeff Hostetler <jeffhost@microsoft.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
builtin/fsmonitor--daemon.c

index 69312119b07e59b13f96e571a7d9780a48f36bc1..eafaafb45b15453b648def42aaf73dc3d1f01eff 100644 (file)
@@ -7,6 +7,7 @@
 #include "fsmonitor--daemon.h"
 #include "simple-ipc.h"
 #include "khash.h"
+#include "pkt-line.h"
 
 static const char * const builtin_fsmonitor__daemon_usage[] = {
        N_("git fsmonitor--daemon start [<options>]"),
@@ -364,6 +365,310 @@ void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
        pthread_mutex_unlock(&state->main_lock);
 }
 
+/*
+ * Format an opaque token string to send to the client.
+ */
+static void with_lock__format_response_token(
+       struct strbuf *response_token,
+       const struct strbuf *response_token_id,
+       const struct fsmonitor_batch *batch)
+{
+       /* assert current thread holding state->main_lock */
+
+       strbuf_reset(response_token);
+       strbuf_addf(response_token, "builtin:%s:%"PRIu64,
+                   response_token_id->buf, batch->batch_seq_nr);
+}
+
+/*
+ * Parse an opaque token from the client.
+ * Returns -1 on error.
+ */
+static int fsmonitor_parse_client_token(const char *buf_token,
+                                       struct strbuf *requested_token_id,
+                                       uint64_t *seq_nr)
+{
+       const char *p;
+       char *p_end;
+
+       strbuf_reset(requested_token_id);
+       *seq_nr = 0;
+
+       if (!skip_prefix(buf_token, "builtin:", &p))
+               return -1;
+
+       while (*p && *p != ':')
+               strbuf_addch(requested_token_id, *p++);
+       if (!*p++)
+               return -1;
+
+       *seq_nr = (uint64_t)strtoumax(p, &p_end, 10);
+       if (*p_end)
+               return -1;
+
+       return 0;
+}
+
+KHASH_INIT(str, const char *, int, 0, kh_str_hash_func, kh_str_hash_equal)
+
+static int do_handle_client(struct fsmonitor_daemon_state *state,
+                           const char *command,
+                           ipc_server_reply_cb *reply,
+                           struct ipc_server_reply_data *reply_data)
+{
+       struct fsmonitor_token_data *token_data = NULL;
+       struct strbuf response_token = STRBUF_INIT;
+       struct strbuf requested_token_id = STRBUF_INIT;
+       struct strbuf payload = STRBUF_INIT;
+       uint64_t requested_oldest_seq_nr = 0;
+       uint64_t total_response_len = 0;
+       const char *p;
+       const struct fsmonitor_batch *batch_head;
+       const struct fsmonitor_batch *batch;
+       intmax_t count = 0, duplicates = 0;
+       kh_str_t *shown;
+       int hash_ret;
+       int do_trivial = 0;
+       int do_flush = 0;
+
+       /*
+        * We expect `command` to be of the form:
+        *
+        * <command> := quit NUL
+        *            | flush NUL
+        *            | <V1-time-since-epoch-ns> NUL
+        *            | <V2-opaque-fsmonitor-token> NUL
+        */
+
+       if (!strcmp(command, "quit")) {
+               /*
+                * A client has requested over the socket/pipe that the
+                * daemon shutdown.
+                *
+                * Tell the IPC thread pool to shutdown (which completes
+                * the await in the main thread (which can stop the
+                * fsmonitor listener thread)).
+                *
+                * There is no reply to the client.
+                */
+               return SIMPLE_IPC_QUIT;
+
+       } else if (!strcmp(command, "flush")) {
+               /*
+                * Flush all of our cached data and generate a new token
+                * just like if we lost sync with the filesystem.
+                *
+                * Then send a trivial response using the new token.
+                */
+               do_flush = 1;
+               do_trivial = 1;
+
+       } else if (!skip_prefix(command, "builtin:", &p)) {
+               /* assume V1 timestamp or garbage */
+
+               char *p_end;
+
+               strtoumax(command, &p_end, 10);
+               trace_printf_key(&trace_fsmonitor,
+                                ((*p_end) ?
+                                 "fsmonitor: invalid command line '%s'" :
+                                 "fsmonitor: unsupported V1 protocol '%s'"),
+                                command);
+               do_trivial = 1;
+
+       } else {
+               /* We have "builtin:*" */
+               if (fsmonitor_parse_client_token(command, &requested_token_id,
+                                                &requested_oldest_seq_nr)) {
+                       trace_printf_key(&trace_fsmonitor,
+                                        "fsmonitor: invalid V2 protocol token '%s'",
+                                        command);
+                       do_trivial = 1;
+
+               } else {
+                       /*
+                        * We have a V2 valid token:
+                        *     "builtin:<token_id>:<seq_nr>"
+                        */
+               }
+       }
+
+       pthread_mutex_lock(&state->main_lock);
+
+       if (!state->current_token_data)
+               BUG("fsmonitor state does not have a current token");
+
+       if (do_flush)
+               with_lock__do_force_resync(state);
+
+       /*
+        * We mark the current head of the batch list as "pinned" so
+        * that the listener thread will treat this item as read-only
+        * (and prevent any more paths from being added to it) from
+        * now on.
+        */
+       token_data = state->current_token_data;
+       batch_head = token_data->batch_head;
+       ((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
+
+       /*
+        * FSMonitor Protocol V2 requires that we send a response header
+        * with a "new current token" and then all of the paths that changed
+        * since the "requested token".  We send the seq_nr of the just-pinned
+        * head batch so that future requests from a client will be relative
+        * to it.
+        */
+       with_lock__format_response_token(&response_token,
+                                        &token_data->token_id, batch_head);
+
+       reply(reply_data, response_token.buf, response_token.len + 1);
+       total_response_len += response_token.len + 1;
+
+       trace2_data_string("fsmonitor", the_repository, "response/token",
+                          response_token.buf);
+       trace_printf_key(&trace_fsmonitor, "response token: %s",
+                        response_token.buf);
+
+       if (!do_trivial) {
+               if (strcmp(requested_token_id.buf, token_data->token_id.buf)) {
+                       /*
+                        * The client last spoke to a different daemon
+                        * instance -OR- the daemon had to resync with
+                        * the filesystem (and lost events), so reject.
+                        */
+                       trace2_data_string("fsmonitor", the_repository,
+                                          "response/token", "different");
+                       do_trivial = 1;
+
+               } else if (requested_oldest_seq_nr <
+                          token_data->batch_tail->batch_seq_nr) {
+                       /*
+                        * The client wants older events than we have for
+                        * this token_id.  This means that the end of our
+                        * batch list was truncated and we cannot give the
+                        * client a complete snapshot relative to their
+                        * request.
+                        */
+                       trace_printf_key(&trace_fsmonitor,
+                                        "client requested truncated data");
+                       do_trivial = 1;
+               }
+       }
+
+       if (do_trivial) {
+               pthread_mutex_unlock(&state->main_lock);
+
+               reply(reply_data, "/", 2);
+
+               trace2_data_intmax("fsmonitor", the_repository,
+                                  "response/trivial", 1);
+
+               goto cleanup;
+       }
+
+       /*
+        * We're going to hold onto a pointer to the current
+        * token-data while we walk the list of batches of files.
+        * During this time, we will NOT be under the lock.
+        * So we ref-count it.
+        *
+        * This allows the listener thread to continue prepending
+        * new batches of items to the token-data (which we'll ignore).
+        *
+        * AND it allows the listener thread to do a token-reset
+        * (and install a new `current_token_data`).
+        */
+       token_data->client_ref_count++;
+
+       pthread_mutex_unlock(&state->main_lock);
+
+       /*
+        * The client request is relative to the token that they sent,
+        * so walk the batch list backwards from the current head back
+        * to the batch (sequence number) they named.
+        *
+        * We use khash to de-dup the list of pathnames.
+        *
+        * NEEDSWORK: each batch contains a list of interned strings,
+        * so we only need to do pointer comparisons here to build the
+        * hash table.  Currently, we're still comparing the string
+        * values.
+        */
+       shown = kh_init_str();
+       for (batch = batch_head;
+            batch && batch->batch_seq_nr > requested_oldest_seq_nr;
+            batch = batch->next) {
+               size_t k;
+
+               for (k = 0; k < batch->nr; k++) {
+                       const char *s = batch->interned_paths[k];
+                       size_t s_len;
+
+                       if (kh_get_str(shown, s) != kh_end(shown))
+                               duplicates++;
+                       else {
+                               kh_put_str(shown, s, &hash_ret);
+
+                               trace_printf_key(&trace_fsmonitor,
+                                                "send[%"PRIuMAX"]: %s",
+                                                count, s);
+
+                               /* Each path gets written with a trailing NUL */
+                               s_len = strlen(s) + 1;
+
+                               if (payload.len + s_len >=
+                                   LARGE_PACKET_DATA_MAX) {
+                                       reply(reply_data, payload.buf,
+                                             payload.len);
+                                       total_response_len += payload.len;
+                                       strbuf_reset(&payload);
+                               }
+
+                               strbuf_add(&payload, s, s_len);
+                               count++;
+                       }
+               }
+       }
+
+       if (payload.len) {
+               reply(reply_data, payload.buf, payload.len);
+               total_response_len += payload.len;
+       }
+
+       kh_release_str(shown);
+
+       pthread_mutex_lock(&state->main_lock);
+
+       if (token_data->client_ref_count > 0)
+               token_data->client_ref_count--;
+
+       if (token_data->client_ref_count == 0) {
+               if (token_data != state->current_token_data) {
+                       /*
+                        * The listener thread did a token-reset while we were
+                        * walking the batch list.  Therefore, this token is
+                        * stale and can be discarded completely.  If we are
+                        * the last reader thread using this token, we own
+                        * that work.
+                        */
+                       fsmonitor_free_token_data(token_data);
+               }
+       }
+
+       pthread_mutex_unlock(&state->main_lock);
+
+       trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
+       trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
+       trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
+
+cleanup:
+       strbuf_release(&response_token);
+       strbuf_release(&requested_token_id);
+       strbuf_release(&payload);
+
+       return 0;
+}
+
 static ipc_server_application_cb handle_client;
 
 static int handle_client(void *data,
@@ -371,7 +676,7 @@ static int handle_client(void *data,
                         ipc_server_reply_cb *reply,
                         struct ipc_server_reply_data *reply_data)
 {
-       /* struct fsmonitor_daemon_state *state = data; */
+       struct fsmonitor_daemon_state *state = data;
        int result;
 
        /*
@@ -382,10 +687,12 @@ static int handle_client(void *data,
        if (command_len != strlen(command))
                BUG("FSMonitor assumes text messages");
 
+       trace_printf_key(&trace_fsmonitor, "requested token: %s", command);
+
        trace2_region_enter("fsmonitor", "handle_client", the_repository);
        trace2_data_string("fsmonitor", the_repository, "request", command);
 
-       result = 0; /* TODO Do something here. */
+       result = do_handle_client(state, command, reply, reply_data);
 
        trace2_region_leave("fsmonitor", "handle_client", the_repository);