]> git.ipfire.org Git - thirdparty/git.git/blob - parallel-checkout.c
unpack-trees: add basic support for parallel checkout
[thirdparty/git.git] / parallel-checkout.c
1 #include "cache.h"
2 #include "entry.h"
3 #include "parallel-checkout.h"
4 #include "streaming.h"
5
6 enum pc_item_status {
7 PC_ITEM_PENDING = 0,
8 PC_ITEM_WRITTEN,
9 /*
10 * The entry could not be written because there was another file
11 * already present in its path or leading directories. Since
12 * checkout_entry_ca() removes such files from the working tree before
13 * enqueueing the entry for parallel checkout, it means that there was
14 * a path collision among the entries being written.
15 */
16 PC_ITEM_COLLIDED,
17 PC_ITEM_FAILED,
18 };
19
20 struct parallel_checkout_item {
21 /* pointer to a istate->cache[] entry. Not owned by us. */
22 struct cache_entry *ce;
23 struct conv_attrs ca;
24 struct stat st;
25 enum pc_item_status status;
26 };
27
28 struct parallel_checkout {
29 enum pc_status status;
30 struct parallel_checkout_item *items; /* The parallel checkout queue. */
31 size_t nr, alloc;
32 };
33
34 static struct parallel_checkout parallel_checkout;
35
36 enum pc_status parallel_checkout_status(void)
37 {
38 return parallel_checkout.status;
39 }
40
41 void init_parallel_checkout(void)
42 {
43 if (parallel_checkout.status != PC_UNINITIALIZED)
44 BUG("parallel checkout already initialized");
45
46 parallel_checkout.status = PC_ACCEPTING_ENTRIES;
47 }
48
49 static void finish_parallel_checkout(void)
50 {
51 if (parallel_checkout.status == PC_UNINITIALIZED)
52 BUG("cannot finish parallel checkout: not initialized yet");
53
54 free(parallel_checkout.items);
55 memset(&parallel_checkout, 0, sizeof(parallel_checkout));
56 }
57
58 static int is_eligible_for_parallel_checkout(const struct cache_entry *ce,
59 const struct conv_attrs *ca)
60 {
61 enum conv_attrs_classification c;
62
63 /*
64 * Symlinks cannot be checked out in parallel as, in case of path
65 * collision, they could racily replace leading directories of other
66 * entries being checked out. Submodules are checked out in child
67 * processes, which have their own parallel checkout queues.
68 */
69 if (!S_ISREG(ce->ce_mode))
70 return 0;
71
72 c = classify_conv_attrs(ca);
73 switch (c) {
74 case CA_CLASS_INCORE:
75 return 1;
76
77 case CA_CLASS_INCORE_FILTER:
78 /*
79 * It would be safe to allow concurrent instances of
80 * single-file smudge filters, like rot13, but we should not
81 * assume that all filters are parallel-process safe. So we
82 * don't allow this.
83 */
84 return 0;
85
86 case CA_CLASS_INCORE_PROCESS:
87 /*
88 * The parallel queue and the delayed queue are not compatible,
89 * so they must be kept completely separated. And we can't tell
90 * if a long-running process will delay its response without
91 * actually asking it to perform the filtering. Therefore, this
92 * type of filter is not allowed in parallel checkout.
93 *
94 * Furthermore, there should only be one instance of the
95 * long-running process filter as we don't know how it is
96 * managing its own concurrency. So, spreading the entries that
97 * requisite such a filter among the parallel workers would
98 * require a lot more inter-process communication. We would
99 * probably have to designate a single process to interact with
100 * the filter and send all the necessary data to it, for each
101 * entry.
102 */
103 return 0;
104
105 case CA_CLASS_STREAMABLE:
106 return 1;
107
108 default:
109 BUG("unsupported conv_attrs classification '%d'", c);
110 }
111 }
112
113 int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca)
114 {
115 struct parallel_checkout_item *pc_item;
116
117 if (parallel_checkout.status != PC_ACCEPTING_ENTRIES ||
118 !is_eligible_for_parallel_checkout(ce, ca))
119 return -1;
120
121 ALLOC_GROW(parallel_checkout.items, parallel_checkout.nr + 1,
122 parallel_checkout.alloc);
123
124 pc_item = &parallel_checkout.items[parallel_checkout.nr++];
125 pc_item->ce = ce;
126 memcpy(&pc_item->ca, ca, sizeof(pc_item->ca));
127 pc_item->status = PC_ITEM_PENDING;
128
129 return 0;
130 }
131
132 static int handle_results(struct checkout *state)
133 {
134 int ret = 0;
135 size_t i;
136 int have_pending = 0;
137
138 /*
139 * We first update the successfully written entries with the collected
140 * stat() data, so that they can be found by mark_colliding_entries(),
141 * in the next loop, when necessary.
142 */
143 for (i = 0; i < parallel_checkout.nr; i++) {
144 struct parallel_checkout_item *pc_item = &parallel_checkout.items[i];
145 if (pc_item->status == PC_ITEM_WRITTEN)
146 update_ce_after_write(state, pc_item->ce, &pc_item->st);
147 }
148
149 for (i = 0; i < parallel_checkout.nr; i++) {
150 struct parallel_checkout_item *pc_item = &parallel_checkout.items[i];
151
152 switch(pc_item->status) {
153 case PC_ITEM_WRITTEN:
154 /* Already handled */
155 break;
156 case PC_ITEM_COLLIDED:
157 /*
158 * The entry could not be checked out due to a path
159 * collision with another entry. Since there can only
160 * be one entry of each colliding group on the disk, we
161 * could skip trying to check out this one and move on.
162 * However, this would leave the unwritten entries with
163 * null stat() fields on the index, which could
164 * potentially slow down subsequent operations that
165 * require refreshing it: git would not be able to
166 * trust st_size and would have to go to the filesystem
167 * to see if the contents match (see ie_modified()).
168 *
169 * Instead, let's pay the overhead only once, now, and
170 * call checkout_entry_ca() again for this file, to
171 * have its stat() data stored in the index. This also
172 * has the benefit of adding this entry and its
173 * colliding pair to the collision report message.
174 * Additionally, this overwriting behavior is consistent
175 * with what the sequential checkout does, so it doesn't
176 * add any extra overhead.
177 */
178 ret |= checkout_entry_ca(pc_item->ce, &pc_item->ca,
179 state, NULL, NULL);
180 break;
181 case PC_ITEM_PENDING:
182 have_pending = 1;
183 /* fall through */
184 case PC_ITEM_FAILED:
185 ret = -1;
186 break;
187 default:
188 BUG("unknown checkout item status in parallel checkout");
189 }
190 }
191
192 if (have_pending)
193 error("parallel checkout finished with pending entries");
194
195 return ret;
196 }
197
198 static int reset_fd(int fd, const char *path)
199 {
200 if (lseek(fd, 0, SEEK_SET) != 0)
201 return error_errno("failed to rewind descriptor of '%s'", path);
202 if (ftruncate(fd, 0))
203 return error_errno("failed to truncate file '%s'", path);
204 return 0;
205 }
206
207 static int write_pc_item_to_fd(struct parallel_checkout_item *pc_item, int fd,
208 const char *path)
209 {
210 int ret;
211 struct stream_filter *filter;
212 struct strbuf buf = STRBUF_INIT;
213 char *blob;
214 unsigned long size;
215 ssize_t wrote;
216
217 /* Sanity check */
218 assert(is_eligible_for_parallel_checkout(pc_item->ce, &pc_item->ca));
219
220 filter = get_stream_filter_ca(&pc_item->ca, &pc_item->ce->oid);
221 if (filter) {
222 if (stream_blob_to_fd(fd, &pc_item->ce->oid, filter, 1)) {
223 /* On error, reset fd to try writing without streaming */
224 if (reset_fd(fd, path))
225 return -1;
226 } else {
227 return 0;
228 }
229 }
230
231 blob = read_blob_entry(pc_item->ce, &size);
232 if (!blob)
233 return error("cannot read object %s '%s'",
234 oid_to_hex(&pc_item->ce->oid), pc_item->ce->name);
235
236 /*
237 * checkout metadata is used to give context for external process
238 * filters. Files requiring such filters are not eligible for parallel
239 * checkout, so pass NULL.
240 */
241 ret = convert_to_working_tree_ca(&pc_item->ca, pc_item->ce->name,
242 blob, size, &buf, NULL);
243
244 if (ret) {
245 size_t newsize;
246 free(blob);
247 blob = strbuf_detach(&buf, &newsize);
248 size = newsize;
249 }
250
251 wrote = write_in_full(fd, blob, size);
252 free(blob);
253 if (wrote < 0)
254 return error("unable to write file '%s'", path);
255
256 return 0;
257 }
258
259 static int close_and_clear(int *fd)
260 {
261 int ret = 0;
262
263 if (*fd >= 0) {
264 ret = close(*fd);
265 *fd = -1;
266 }
267
268 return ret;
269 }
270
271 static void write_pc_item(struct parallel_checkout_item *pc_item,
272 struct checkout *state)
273 {
274 unsigned int mode = (pc_item->ce->ce_mode & 0100) ? 0777 : 0666;
275 int fd = -1, fstat_done = 0;
276 struct strbuf path = STRBUF_INIT;
277 const char *dir_sep;
278
279 strbuf_add(&path, state->base_dir, state->base_dir_len);
280 strbuf_add(&path, pc_item->ce->name, pc_item->ce->ce_namelen);
281
282 dir_sep = find_last_dir_sep(path.buf);
283
284 /*
285 * The leading dirs should have been already created by now. But, in
286 * case of path collisions, one of the dirs could have been replaced by
287 * a symlink (checked out after we enqueued this entry for parallel
288 * checkout). Thus, we must check the leading dirs again.
289 */
290 if (dir_sep && !has_dirs_only_path(path.buf, dir_sep - path.buf,
291 state->base_dir_len)) {
292 pc_item->status = PC_ITEM_COLLIDED;
293 goto out;
294 }
295
296 fd = open(path.buf, O_WRONLY | O_CREAT | O_EXCL, mode);
297
298 if (fd < 0) {
299 if (errno == EEXIST || errno == EISDIR) {
300 /*
301 * Errors which probably represent a path collision.
302 * Suppress the error message and mark the item to be
303 * retried later, sequentially. ENOTDIR and ENOENT are
304 * also interesting, but the above has_dirs_only_path()
305 * call should have already caught these cases.
306 */
307 pc_item->status = PC_ITEM_COLLIDED;
308 } else {
309 error_errno("failed to open file '%s'", path.buf);
310 pc_item->status = PC_ITEM_FAILED;
311 }
312 goto out;
313 }
314
315 if (write_pc_item_to_fd(pc_item, fd, path.buf)) {
316 /* Error was already reported. */
317 pc_item->status = PC_ITEM_FAILED;
318 close_and_clear(&fd);
319 unlink(path.buf);
320 goto out;
321 }
322
323 fstat_done = fstat_checkout_output(fd, state, &pc_item->st);
324
325 if (close_and_clear(&fd)) {
326 error_errno("unable to close file '%s'", path.buf);
327 pc_item->status = PC_ITEM_FAILED;
328 goto out;
329 }
330
331 if (state->refresh_cache && !fstat_done && lstat(path.buf, &pc_item->st) < 0) {
332 error_errno("unable to stat just-written file '%s'", path.buf);
333 pc_item->status = PC_ITEM_FAILED;
334 goto out;
335 }
336
337 pc_item->status = PC_ITEM_WRITTEN;
338
339 out:
340 strbuf_release(&path);
341 }
342
343 static void write_items_sequentially(struct checkout *state)
344 {
345 size_t i;
346
347 for (i = 0; i < parallel_checkout.nr; i++)
348 write_pc_item(&parallel_checkout.items[i], state);
349 }
350
351 int run_parallel_checkout(struct checkout *state)
352 {
353 int ret;
354
355 if (parallel_checkout.status != PC_ACCEPTING_ENTRIES)
356 BUG("cannot run parallel checkout: uninitialized or already running");
357
358 parallel_checkout.status = PC_RUNNING;
359
360 write_items_sequentially(state);
361 ret = handle_results(state);
362
363 finish_parallel_checkout();
364 return ret;
365 }