3 #include "parallel-checkout.h"
10 * The entry could not be written because there was another file
11 * already present in its path or leading directories. Since
12 * checkout_entry_ca() removes such files from the working tree before
13 * enqueueing the entry for parallel checkout, it means that there was
14 * a path collision among the entries being written.
20 struct parallel_checkout_item
{
21 /* pointer to a istate->cache[] entry. Not owned by us. */
22 struct cache_entry
*ce
;
25 enum pc_item_status status
;
28 struct parallel_checkout
{
29 enum pc_status status
;
30 struct parallel_checkout_item
*items
; /* The parallel checkout queue. */
34 static struct parallel_checkout parallel_checkout
;
36 enum pc_status
parallel_checkout_status(void)
38 return parallel_checkout
.status
;
41 void init_parallel_checkout(void)
43 if (parallel_checkout
.status
!= PC_UNINITIALIZED
)
44 BUG("parallel checkout already initialized");
46 parallel_checkout
.status
= PC_ACCEPTING_ENTRIES
;
49 static void finish_parallel_checkout(void)
51 if (parallel_checkout
.status
== PC_UNINITIALIZED
)
52 BUG("cannot finish parallel checkout: not initialized yet");
54 free(parallel_checkout
.items
);
55 memset(¶llel_checkout
, 0, sizeof(parallel_checkout
));
58 static int is_eligible_for_parallel_checkout(const struct cache_entry
*ce
,
59 const struct conv_attrs
*ca
)
61 enum conv_attrs_classification c
;
64 * Symlinks cannot be checked out in parallel as, in case of path
65 * collision, they could racily replace leading directories of other
66 * entries being checked out. Submodules are checked out in child
67 * processes, which have their own parallel checkout queues.
69 if (!S_ISREG(ce
->ce_mode
))
72 c
= classify_conv_attrs(ca
);
77 case CA_CLASS_INCORE_FILTER
:
79 * It would be safe to allow concurrent instances of
80 * single-file smudge filters, like rot13, but we should not
81 * assume that all filters are parallel-process safe. So we
86 case CA_CLASS_INCORE_PROCESS
:
88 * The parallel queue and the delayed queue are not compatible,
89 * so they must be kept completely separated. And we can't tell
90 * if a long-running process will delay its response without
91 * actually asking it to perform the filtering. Therefore, this
92 * type of filter is not allowed in parallel checkout.
94 * Furthermore, there should only be one instance of the
95 * long-running process filter as we don't know how it is
96 * managing its own concurrency. So, spreading the entries that
97 * requisite such a filter among the parallel workers would
98 * require a lot more inter-process communication. We would
99 * probably have to designate a single process to interact with
100 * the filter and send all the necessary data to it, for each
105 case CA_CLASS_STREAMABLE
:
109 BUG("unsupported conv_attrs classification '%d'", c
);
113 int enqueue_checkout(struct cache_entry
*ce
, struct conv_attrs
*ca
)
115 struct parallel_checkout_item
*pc_item
;
117 if (parallel_checkout
.status
!= PC_ACCEPTING_ENTRIES
||
118 !is_eligible_for_parallel_checkout(ce
, ca
))
121 ALLOC_GROW(parallel_checkout
.items
, parallel_checkout
.nr
+ 1,
122 parallel_checkout
.alloc
);
124 pc_item
= ¶llel_checkout
.items
[parallel_checkout
.nr
++];
126 memcpy(&pc_item
->ca
, ca
, sizeof(pc_item
->ca
));
127 pc_item
->status
= PC_ITEM_PENDING
;
132 static int handle_results(struct checkout
*state
)
136 int have_pending
= 0;
139 * We first update the successfully written entries with the collected
140 * stat() data, so that they can be found by mark_colliding_entries(),
141 * in the next loop, when necessary.
143 for (i
= 0; i
< parallel_checkout
.nr
; i
++) {
144 struct parallel_checkout_item
*pc_item
= ¶llel_checkout
.items
[i
];
145 if (pc_item
->status
== PC_ITEM_WRITTEN
)
146 update_ce_after_write(state
, pc_item
->ce
, &pc_item
->st
);
149 for (i
= 0; i
< parallel_checkout
.nr
; i
++) {
150 struct parallel_checkout_item
*pc_item
= ¶llel_checkout
.items
[i
];
152 switch(pc_item
->status
) {
153 case PC_ITEM_WRITTEN
:
154 /* Already handled */
156 case PC_ITEM_COLLIDED
:
158 * The entry could not be checked out due to a path
159 * collision with another entry. Since there can only
160 * be one entry of each colliding group on the disk, we
161 * could skip trying to check out this one and move on.
162 * However, this would leave the unwritten entries with
163 * null stat() fields on the index, which could
164 * potentially slow down subsequent operations that
165 * require refreshing it: git would not be able to
166 * trust st_size and would have to go to the filesystem
167 * to see if the contents match (see ie_modified()).
169 * Instead, let's pay the overhead only once, now, and
170 * call checkout_entry_ca() again for this file, to
171 * have its stat() data stored in the index. This also
172 * has the benefit of adding this entry and its
173 * colliding pair to the collision report message.
174 * Additionally, this overwriting behavior is consistent
175 * with what the sequential checkout does, so it doesn't
176 * add any extra overhead.
178 ret
|= checkout_entry_ca(pc_item
->ce
, &pc_item
->ca
,
181 case PC_ITEM_PENDING
:
188 BUG("unknown checkout item status in parallel checkout");
193 error("parallel checkout finished with pending entries");
198 static int reset_fd(int fd
, const char *path
)
200 if (lseek(fd
, 0, SEEK_SET
) != 0)
201 return error_errno("failed to rewind descriptor of '%s'", path
);
202 if (ftruncate(fd
, 0))
203 return error_errno("failed to truncate file '%s'", path
);
207 static int write_pc_item_to_fd(struct parallel_checkout_item
*pc_item
, int fd
,
211 struct stream_filter
*filter
;
212 struct strbuf buf
= STRBUF_INIT
;
218 assert(is_eligible_for_parallel_checkout(pc_item
->ce
, &pc_item
->ca
));
220 filter
= get_stream_filter_ca(&pc_item
->ca
, &pc_item
->ce
->oid
);
222 if (stream_blob_to_fd(fd
, &pc_item
->ce
->oid
, filter
, 1)) {
223 /* On error, reset fd to try writing without streaming */
224 if (reset_fd(fd
, path
))
231 blob
= read_blob_entry(pc_item
->ce
, &size
);
233 return error("cannot read object %s '%s'",
234 oid_to_hex(&pc_item
->ce
->oid
), pc_item
->ce
->name
);
237 * checkout metadata is used to give context for external process
238 * filters. Files requiring such filters are not eligible for parallel
239 * checkout, so pass NULL.
241 ret
= convert_to_working_tree_ca(&pc_item
->ca
, pc_item
->ce
->name
,
242 blob
, size
, &buf
, NULL
);
247 blob
= strbuf_detach(&buf
, &newsize
);
251 wrote
= write_in_full(fd
, blob
, size
);
254 return error("unable to write file '%s'", path
);
259 static int close_and_clear(int *fd
)
271 static void write_pc_item(struct parallel_checkout_item
*pc_item
,
272 struct checkout
*state
)
274 unsigned int mode
= (pc_item
->ce
->ce_mode
& 0100) ? 0777 : 0666;
275 int fd
= -1, fstat_done
= 0;
276 struct strbuf path
= STRBUF_INIT
;
279 strbuf_add(&path
, state
->base_dir
, state
->base_dir_len
);
280 strbuf_add(&path
, pc_item
->ce
->name
, pc_item
->ce
->ce_namelen
);
282 dir_sep
= find_last_dir_sep(path
.buf
);
285 * The leading dirs should have been already created by now. But, in
286 * case of path collisions, one of the dirs could have been replaced by
287 * a symlink (checked out after we enqueued this entry for parallel
288 * checkout). Thus, we must check the leading dirs again.
290 if (dir_sep
&& !has_dirs_only_path(path
.buf
, dir_sep
- path
.buf
,
291 state
->base_dir_len
)) {
292 pc_item
->status
= PC_ITEM_COLLIDED
;
296 fd
= open(path
.buf
, O_WRONLY
| O_CREAT
| O_EXCL
, mode
);
299 if (errno
== EEXIST
|| errno
== EISDIR
) {
301 * Errors which probably represent a path collision.
302 * Suppress the error message and mark the item to be
303 * retried later, sequentially. ENOTDIR and ENOENT are
304 * also interesting, but the above has_dirs_only_path()
305 * call should have already caught these cases.
307 pc_item
->status
= PC_ITEM_COLLIDED
;
309 error_errno("failed to open file '%s'", path
.buf
);
310 pc_item
->status
= PC_ITEM_FAILED
;
315 if (write_pc_item_to_fd(pc_item
, fd
, path
.buf
)) {
316 /* Error was already reported. */
317 pc_item
->status
= PC_ITEM_FAILED
;
318 close_and_clear(&fd
);
323 fstat_done
= fstat_checkout_output(fd
, state
, &pc_item
->st
);
325 if (close_and_clear(&fd
)) {
326 error_errno("unable to close file '%s'", path
.buf
);
327 pc_item
->status
= PC_ITEM_FAILED
;
331 if (state
->refresh_cache
&& !fstat_done
&& lstat(path
.buf
, &pc_item
->st
) < 0) {
332 error_errno("unable to stat just-written file '%s'", path
.buf
);
333 pc_item
->status
= PC_ITEM_FAILED
;
337 pc_item
->status
= PC_ITEM_WRITTEN
;
340 strbuf_release(&path
);
343 static void write_items_sequentially(struct checkout
*state
)
347 for (i
= 0; i
< parallel_checkout
.nr
; i
++)
348 write_pc_item(¶llel_checkout
.items
[i
], state
);
351 int run_parallel_checkout(struct checkout
*state
)
355 if (parallel_checkout
.status
!= PC_ACCEPTING_ENTRIES
)
356 BUG("cannot run parallel checkout: uninitialized or already running");
358 parallel_checkout
.status
= PC_RUNNING
;
360 write_items_sequentially(state
);
361 ret
= handle_results(state
);
363 finish_parallel_checkout();