From 378ec56beba161abbef6e2c87d9bc2ac43c355f3 Mon Sep 17 00:00:00 2001 From: Patrick Steinhardt Date: Sun, 23 Nov 2025 19:59:42 +0100 Subject: [PATCH] streaming: refactor interface to be object-database-centric Refactor the streaming interface to be centered around object databases instead of centered around the repository. Rename the functions accordingly. Signed-off-by: Patrick Steinhardt Signed-off-by: Junio C Hamano --- archive-tar.c | 6 +++--- archive-zip.c | 12 ++++++------ builtin/index-pack.c | 8 ++++---- builtin/pack-objects.c | 14 +++++++------- object-file.c | 8 ++++---- streaming.c | 44 +++++++++++++++++++++--------------------- streaming.h | 30 +++++++++++++++++++++++----- 7 files changed, 71 insertions(+), 51 deletions(-) diff --git a/archive-tar.c b/archive-tar.c index dc1eda09e0..4d87b28504 100644 --- a/archive-tar.c +++ b/archive-tar.c @@ -135,16 +135,16 @@ static int stream_blocked(struct repository *r, const struct object_id *oid) char buf[BLOCKSIZE]; ssize_t readlen; - st = open_istream(r, oid, &type, &sz, NULL); + st = odb_read_stream_open(r->objects, oid, &type, &sz, NULL); if (!st) return error(_("cannot stream blob %s"), oid_to_hex(oid)); for (;;) { - readlen = read_istream(st, buf, sizeof(buf)); + readlen = odb_read_stream_read(st, buf, sizeof(buf)); if (readlen <= 0) break; do_write_blocked(buf, readlen); } - close_istream(st); + odb_read_stream_close(st); if (!readlen) finish_record(); return readlen; diff --git a/archive-zip.c b/archive-zip.c index 40a9c93ff9..c44684aebc 100644 --- a/archive-zip.c +++ b/archive-zip.c @@ -348,8 +348,8 @@ static int write_zip_entry(struct archiver_args *args, if (!buffer) { enum object_type type; - stream = open_istream(args->repo, oid, &type, &size, - NULL); + stream = odb_read_stream_open(args->repo->objects, oid, + &type, &size, NULL); if (!stream) return error(_("cannot stream blob %s"), oid_to_hex(oid)); @@ -429,7 +429,7 @@ static int write_zip_entry(struct archiver_args *args, ssize_t readlen; for (;;) { - readlen = read_istream(stream, buf, sizeof(buf)); + readlen = odb_read_stream_read(stream, buf, sizeof(buf)); if (readlen <= 0) break; crc = crc32(crc, buf, readlen); @@ -439,7 +439,7 @@ static int write_zip_entry(struct archiver_args *args, buf, readlen); write_or_die(1, buf, readlen); } - close_istream(stream); + odb_read_stream_close(stream); if (readlen) return readlen; @@ -462,7 +462,7 @@ static int write_zip_entry(struct archiver_args *args, zstream.avail_out = sizeof(compressed); for (;;) { - readlen = read_istream(stream, buf, sizeof(buf)); + readlen = odb_read_stream_read(stream, buf, sizeof(buf)); if (readlen <= 0) break; crc = crc32(crc, buf, readlen); @@ -486,7 +486,7 @@ static int write_zip_entry(struct archiver_args *args, } } - close_istream(stream); + odb_read_stream_close(stream); if (readlen) return readlen; diff --git a/builtin/index-pack.c b/builtin/index-pack.c index 5f90f12f92..fb76ef0f4c 100644 --- a/builtin/index-pack.c +++ b/builtin/index-pack.c @@ -779,7 +779,7 @@ static int compare_objects(const unsigned char *buf, unsigned long size, } while (size) { - ssize_t len = read_istream(data->st, data->buf, size); + ssize_t len = odb_read_stream_read(data->st, data->buf, size); if (len == 0) die(_("SHA1 COLLISION FOUND WITH %s !"), oid_to_hex(&data->entry->idx.oid)); @@ -807,15 +807,15 @@ static int check_collison(struct object_entry *entry) memset(&data, 0, sizeof(data)); data.entry = entry; - data.st = open_istream(the_repository, &entry->idx.oid, &type, &size, - NULL); + data.st = odb_read_stream_open(the_repository->objects, &entry->idx.oid, + &type, &size, NULL); if (!data.st) return -1; if (size != entry->size || type != entry->type) die(_("SHA1 COLLISION FOUND WITH %s !"), oid_to_hex(&entry->idx.oid)); unpack_data(entry, compare_objects, &data); - close_istream(data.st); + odb_read_stream_close(data.st); free(data.buf); return 0; } diff --git a/builtin/pack-objects.c b/builtin/pack-objects.c index c693d948e1..1353c2384c 100644 --- a/builtin/pack-objects.c +++ b/builtin/pack-objects.c @@ -417,7 +417,7 @@ static unsigned long write_large_blob_data(struct odb_read_stream *st, struct ha for (;;) { ssize_t readlen; int zret = Z_OK; - readlen = read_istream(st, ibuf, sizeof(ibuf)); + readlen = odb_read_stream_read(st, ibuf, sizeof(ibuf)); if (readlen == -1) die(_("unable to read %s"), oid_to_hex(oid)); @@ -520,8 +520,8 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent if (oe_type(entry) == OBJ_BLOB && oe_size_greater_than(&to_pack, entry, repo_settings_get_big_file_threshold(the_repository)) && - (st = open_istream(the_repository, &entry->idx.oid, &type, - &size, NULL)) != NULL) + (st = odb_read_stream_open(the_repository->objects, &entry->idx.oid, + &type, &size, NULL)) != NULL) buf = NULL; else { buf = odb_read_object(the_repository->objects, @@ -577,7 +577,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent dheader[--pos] = 128 | (--ofs & 127); if (limit && hdrlen + sizeof(dheader) - pos + datalen + hashsz >= limit) { if (st) - close_istream(st); + odb_read_stream_close(st); free(buf); return 0; } @@ -591,7 +591,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent */ if (limit && hdrlen + hashsz + datalen + hashsz >= limit) { if (st) - close_istream(st); + odb_read_stream_close(st); free(buf); return 0; } @@ -601,7 +601,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent } else { if (limit && hdrlen + datalen + hashsz >= limit) { if (st) - close_istream(st); + odb_read_stream_close(st); free(buf); return 0; } @@ -609,7 +609,7 @@ static unsigned long write_no_reuse_object(struct hashfile *f, struct object_ent } if (st) { datalen = write_large_blob_data(st, f, &entry->idx.oid); - close_istream(st); + odb_read_stream_close(st); } else { hashwrite(f, buf, datalen); free(buf); diff --git a/object-file.c b/object-file.c index 8c67847fea..9ba40a848c 100644 --- a/object-file.c +++ b/object-file.c @@ -139,7 +139,7 @@ int stream_object_signature(struct repository *r, const struct object_id *oid) char hdr[MAX_HEADER_LEN]; int hdrlen; - st = open_istream(r, oid, &obj_type, &size, NULL); + st = odb_read_stream_open(r->objects, oid, &obj_type, &size, NULL); if (!st) return -1; @@ -151,10 +151,10 @@ int stream_object_signature(struct repository *r, const struct object_id *oid) git_hash_update(&c, hdr, hdrlen); for (;;) { char buf[1024 * 16]; - ssize_t readlen = read_istream(st, buf, sizeof(buf)); + ssize_t readlen = odb_read_stream_read(st, buf, sizeof(buf)); if (readlen < 0) { - close_istream(st); + odb_read_stream_close(st); return -1; } if (!readlen) @@ -162,7 +162,7 @@ int stream_object_signature(struct repository *r, const struct object_id *oid) git_hash_update(&c, buf, readlen); } git_hash_final_oid(&real_oid, &c); - close_istream(st); + odb_read_stream_close(st); return !oideq(oid, &real_oid) ? -1 : 0; } diff --git a/streaming.c b/streaming.c index 3140728a70..06993a751c 100644 --- a/streaming.c +++ b/streaming.c @@ -35,7 +35,7 @@ static int close_istream_filtered(struct odb_read_stream *_fs) { struct odb_filtered_read_stream *fs = (struct odb_filtered_read_stream *)_fs; free_stream_filter(fs->filter); - return close_istream(fs->upstream); + return odb_read_stream_close(fs->upstream); } static ssize_t read_istream_filtered(struct odb_read_stream *_fs, char *buf, @@ -87,7 +87,7 @@ static ssize_t read_istream_filtered(struct odb_read_stream *_fs, char *buf, /* refill the input from the upstream */ if (!fs->input_finished) { - fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER); + fs->i_end = odb_read_stream_read(fs->upstream, fs->ibuf, FILTER_BUFFER); if (fs->i_end < 0) return -1; if (fs->i_end) @@ -149,7 +149,7 @@ static ssize_t read_istream_incore(struct odb_read_stream *_st, char *buf, size_ } static int open_istream_incore(struct odb_read_stream **out, - struct repository *r, + struct object_database *odb, const struct object_id *oid) { struct object_info oi = OBJECT_INFO_INIT; @@ -163,7 +163,7 @@ static int open_istream_incore(struct odb_read_stream **out, oi.typep = &stream.base.type; oi.sizep = &stream.base.size; oi.contentp = (void **)&stream.buf; - ret = odb_read_object_info_extended(r->objects, oid, &oi, + ret = odb_read_object_info_extended(odb, oid, &oi, OBJECT_INFO_DIE_IF_CORRUPT); if (ret) return ret; @@ -180,47 +180,47 @@ static int open_istream_incore(struct odb_read_stream **out, *****************************************************************************/ static int istream_source(struct odb_read_stream **out, - struct repository *r, + struct object_database *odb, const struct object_id *oid) { struct odb_source *source; - if (!packfile_store_read_object_stream(out, r->objects->packfiles, oid)) + if (!packfile_store_read_object_stream(out, odb->packfiles, oid)) return 0; - odb_prepare_alternates(r->objects); - for (source = r->objects->sources; source; source = source->next) + odb_prepare_alternates(odb); + for (source = odb->sources; source; source = source->next) if (!odb_source_loose_read_object_stream(out, source, oid)) return 0; - return open_istream_incore(out, r, oid); + return open_istream_incore(out, odb, oid); } /**************************************************************** * Users of streaming interface ****************************************************************/ -int close_istream(struct odb_read_stream *st) +int odb_read_stream_close(struct odb_read_stream *st) { int r = st->close(st); free(st); return r; } -ssize_t read_istream(struct odb_read_stream *st, void *buf, size_t sz) +ssize_t odb_read_stream_read(struct odb_read_stream *st, void *buf, size_t sz) { return st->read(st, buf, sz); } -struct odb_read_stream *open_istream(struct repository *r, - const struct object_id *oid, - enum object_type *type, - unsigned long *size, - struct stream_filter *filter) +struct odb_read_stream *odb_read_stream_open(struct object_database *odb, + const struct object_id *oid, + enum object_type *type, + unsigned long *size, + struct stream_filter *filter) { struct odb_read_stream *st; - const struct object_id *real = lookup_replace_object(r, oid); - int ret = istream_source(&st, r, real); + const struct object_id *real = lookup_replace_object(odb->repo, oid); + int ret = istream_source(&st, odb, real); if (ret) return NULL; @@ -229,7 +229,7 @@ struct odb_read_stream *open_istream(struct repository *r, /* Add "&& !is_null_stream_filter(filter)" for performance */ struct odb_read_stream *nst = attach_stream_filter(st, filter); if (!nst) { - close_istream(st); + odb_read_stream_close(st); return NULL; } st = nst; @@ -252,7 +252,7 @@ int odb_stream_blob_to_fd(struct object_database *odb, ssize_t kept = 0; int result = -1; - st = open_istream(odb->repo, oid, &type, &sz, filter); + st = odb_read_stream_open(odb, oid, &type, &sz, filter); if (!st) { if (filter) free_stream_filter(filter); @@ -263,7 +263,7 @@ int odb_stream_blob_to_fd(struct object_database *odb, for (;;) { char buf[1024 * 16]; ssize_t wrote, holeto; - ssize_t readlen = read_istream(st, buf, sizeof(buf)); + ssize_t readlen = odb_read_stream_read(st, buf, sizeof(buf)); if (readlen < 0) goto close_and_exit; @@ -294,6 +294,6 @@ int odb_stream_blob_to_fd(struct object_database *odb, result = 0; close_and_exit: - close_istream(st); + odb_read_stream_close(st); return result; } diff --git a/streaming.h b/streaming.h index acfdef1598..7cb55213b7 100644 --- a/streaming.h +++ b/streaming.h @@ -24,11 +24,31 @@ struct odb_read_stream { unsigned long size; /* inflated size of full object */ }; -struct odb_read_stream *open_istream(struct repository *, const struct object_id *, - enum object_type *, unsigned long *, - struct stream_filter *); -int close_istream(struct odb_read_stream *); -ssize_t read_istream(struct odb_read_stream *, void *, size_t); +/* + * Create a new object stream for the given object database. Populates the type + * and size pointers with the object's info. An optional filter can be used to + * transform the object's content. + * + * Returns the stream on success, a `NULL` pointer otherwise. + */ +struct odb_read_stream *odb_read_stream_open(struct object_database *odb, + const struct object_id *oid, + enum object_type *type, + unsigned long *size, + struct stream_filter *filter); + +/* + * Close the given read stream and release all resources associated with it. + * Returns 0 on success, a negative error code otherwise. + */ +int odb_read_stream_close(struct odb_read_stream *stream); + +/* + * Read data from the stream into the buffer. Returns 0 on EOF and the number + * of bytes read on success. Returns a negative error code in case reading from + * the stream fails. + */ +ssize_t odb_read_stream_read(struct odb_read_stream *stream, void *buf, size_t len); /* * Look up the object by its ID and write the full contents to the file -- 2.47.3