From: Sean Purcell Date: Mon, 10 Apr 2017 23:22:35 +0000 (-0700) Subject: Seekable compression demo X-Git-Tag: v1.3.0~1^2~47^2~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c3ba15e48f89071b2f2ab5aa137b5c5774709e3a;p=thirdparty%2Fzstd.git Seekable compression demo --- diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index a66f0a49c..a47b7142a 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -28,6 +28,9 @@
  • Buffer-less streaming compression (synchronous mode)
  • Buffer-less streaming decompression (synchronous mode)
  • Block functions
  • +
  • Seekable Format
  • +
  • Seekable compression - HowTo
  • +
  • Seekable decompression - HowTo

  • Introduction

    @@ -660,5 +663,109 @@ size_t ZSTD_compressBlock  (ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, cons
     size_t ZSTD_decompressBlock(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
     size_t ZSTD_insertBlock(ZSTD_DCtx* dctx, const void* blockStart, size_t blockSize);  /**< insert block into `dctx` history. Useful for uncompressed blocks */
     

    +

    Seekable Format

    +  The seekable format splits the compressed data into a series of "chunks",
    +  each compressed individually so that decompression of a section in the
    +  middle of an archive only requires zstd to decompress at most a chunk's
    +  worth of extra data, instead of the entire archive.
    +
    + +

    Seekable compression - HowTo

      A ZSTD_seekable_CStream object is required to tracking streaming operation.
    +  Use ZSTD_seekable_createCStream() and ZSTD_seekable_freeCStream() to create/
    +  release resources.
    +
    +  Streaming objects are reusable to avoid allocation and deallocation,
    +  to start a new compression operation call ZSTD_seekable_initCStream() on the
    +  compressor.
    +
    +  Data streamed to the seekable compressor will automatically be split into
    +  chunks of size `maxChunkSize` (provided in ZSTD_seekable_initCStream()),
    +  or if none is provided, will be cut off whenver ZSTD_endChunk() is called
    +  or when the default maximum chunk size is reached (approximately 4GB).
    +
    +  Use ZSTD_seekable_initCStream() to initialize a ZSTD_seekable_CStream object
    +  for a new compression operation.
    +  `maxChunkSize` indicates the size at which to automatically start a new
    +  seekable frame.  `maxChunkSize == 0` implies the default maximum size.
    +  @return : a size hint for input to provide for compression, or an error code
    +            checkable with ZSTD_isError()
    +
    +  Use ZSTD_seekable_compressStream() repetitively to consume input stream.
    +  The function will automatically update both `pos` fields.
    +  Note that it may not consume the entire input, in which case `pos < size`,
    +  and it's up to the caller to present again remaining data.
    +  @return : a size hint, preferred nb of bytes to use as input for next
    +            function call or an error code, which can be tested using
    +            ZSTD_isError().
    +            Note 1 : it's just a hint, to help latency a little, any other
    +                     value will work fine.
    +            Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize()
    +
    +  At any time, call ZSTD_seekable_endChunk() to end the current chunk and
    +  start a new one.
    +
    +  ZSTD_endStream() will end the current chunk, and then write the seek table
    +  so that decompressors can efficiently find compressed chunks.
    +  ZSTD_endStream() may return a number > 0 if it was unable to flush all the
    +  necessary data to `output`.  In this case, it should be called again until
    +  all remaining data is flushed out and 0 is returned.
    +
    + +

    Seekable compressor management

    ZSTD_seekable_CStream* ZSTD_seekable_createCStream(void);
    +size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs);
    +

    +

    Seekable compression functions

    size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, unsigned maxChunkSize);
    +size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
    +size_t ZSTD_seekable_endChunk(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output);
    +size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output);
    +

    +

    Seekable decompression - HowTo

      A ZSTD_seekable_DStream object is required to tracking streaming operation.
    +  Use ZSTD_seekable_createDStream() and ZSTD_seekable_freeDStream() to create/
    +  release resources.
    +
    +  Streaming objects are reusable to avoid allocation and deallocation,
    +  to start a new compression operation call ZSTD_seekable_initDStream() on the
    +  compressor.
    +
    +  Use ZSTD_seekable_loadSeekTable() to load the seek table from a file.
    +  `src` should point to a block of data read from the end of the file,
    +  i.e. `src + srcSize` should always be the end of the file.
    +  @return : 0 if the table was loaded successfully, or if `srcSize` was too
    +            small, a size hint for how much data to provide.
    +            An error code may also be returned, checkable with ZSTD_isError()
    +
    +  Use ZSTD_initDStream to prepare for a new decompression operation using the
    +  seektable loaded with ZSTD_seekable_loadSeekTable().
    +  Data in the range [rangeStart, rangeEnd) will be decompressed.
    +
    +  Call ZSTD_seekable_decompressStream() repetitively to consume input stream.
    +  @return : There are a number of possible return codes for this function
    +           - 0, the decompression operation has completed.
    +           - An error code checkable with ZSTD_isError
    +             + If this error code is ZSTD_error_needSeek, the user should seek
    +               to the file position provided by ZSTD_seekable_getSeekOffset()
    +               and indicate this to the stream with
    +               ZSTD_seekable_updateOffset(), before resuming decompression
    +             + Otherwise, this is a regular decompression error and the input
    +               file is likely corrupted or the API was incorrectly used.
    +           - A size hint, the preferred nb of bytes to provide as input to the
    +             next function call to improve latency.
    +
    +  ZSTD_seekable_getSeekOffset() and ZSTD_seekable_updateOffset() are helper
    +  functions to indicate where the user should seek their file stream to, when
    +  a different position is required to continue decompression.
    +  Note that ZSTD_seekable_updateOffset will error if given an offset other
    +  than the one requested from ZSTD_seekable_getSeekOffset().
    +
    + +

    Seekable decompressor management

    ZSTD_seekable_DStream* ZSTD_seekable_createDStream(void);
    +size_t ZSTD_seekable_freeDStream(ZSTD_seekable_DStream* zds);
    +

    +

    Seekable decompression functions

    size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, size_t srcSize);
    +size_t ZSTD_seekable_initDStream(ZSTD_seekable_DStream* zds, unsigned long long rangeStart, unsigned long long rangeEnd);
    +size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
    +unsigned long long ZSTD_seekable_getSeekOffset(ZSTD_seekable_DStream* zds);
    +size_t ZSTD_seekable_updateOffset(ZSTD_seekable_DStream* zds, unsigned long long offset);
    +

    diff --git a/examples/.gitignore b/examples/.gitignore index 0711813d3..c6b2a9f42 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -6,6 +6,8 @@ dictionary_decompression streaming_compression streaming_decompression multiple_streaming_compression +seekable_compression +seekable_decompression #test artefact tmp* diff --git a/examples/Makefile b/examples/Makefile index b84983f08..88cc10d24 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -18,7 +18,8 @@ default: all all: simple_compression simple_decompression \ dictionary_compression dictionary_decompression \ streaming_compression streaming_decompression \ - multiple_streaming_compression + multiple_streaming_compression \ + seekable_compression simple_compression : simple_compression.c $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ @@ -41,6 +42,12 @@ multiple_streaming_compression : multiple_streaming_compression.c streaming_decompression : streaming_decompression.c $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ +seekable_compression : seekable_compression.c + $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ + +seekable_decompression : seekable_decompression.c + $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@ + clean: @rm -f core *.o tmp* result* *.zst \ simple_compression simple_decompression \ diff --git a/examples/seekable_compression.c b/examples/seekable_compression.c new file mode 100644 index 000000000..5ab36c3cb --- /dev/null +++ b/examples/seekable_compression.c @@ -0,0 +1,126 @@ +/** + * Copyright 2017-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the license found in the + * LICENSE-examples file in the root directory of this source tree. + */ + +#include // malloc, free, exit, atoi +#include // fprintf, perror, feof, fopen, etc. +#include // strlen, memset, strcat +#define ZSTD_STATIC_LINKING_ONLY +#include // presumes zstd library is installed + +static void* malloc_orDie(size_t size) +{ + void* const buff = malloc(size); + if (buff) return buff; + /* error */ + perror("malloc:"); + exit(1); +} + +static FILE* fopen_orDie(const char *filename, const char *instruction) +{ + FILE* const inFile = fopen(filename, instruction); + if (inFile) return inFile; + /* error */ + perror(filename); + exit(3); +} + +static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file) +{ + size_t const readSize = fread(buffer, 1, sizeToRead, file); + if (readSize == sizeToRead) return readSize; /* good */ + if (feof(file)) return readSize; /* good, reached end of file */ + /* error */ + perror("fread"); + exit(4); +} + +static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file) +{ + size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file); + if (writtenSize == sizeToWrite) return sizeToWrite; /* good */ + /* error */ + perror("fwrite"); + exit(5); +} + +static size_t fclose_orDie(FILE* file) +{ + if (!fclose(file)) return 0; + /* error */ + perror("fclose"); + exit(6); +} + +static void compressFile_orDie(const char* fname, const char* outName, int cLevel, unsigned chunkSize) +{ + FILE* const fin = fopen_orDie(fname, "rb"); + FILE* const fout = fopen_orDie(outName, "wb"); + size_t const buffInSize = ZSTD_CStreamInSize(); /* can always read one full block */ + void* const buffIn = malloc_orDie(buffInSize); + size_t const buffOutSize = ZSTD_CStreamOutSize(); /* can always flush a full block */ + void* const buffOut = malloc_orDie(buffOutSize); + + ZSTD_seekable_CStream* const cstream = ZSTD_seekable_createCStream(); + if (cstream==NULL) { fprintf(stderr, "ZSTD_seekable_createCStream() error \n"); exit(10); } + size_t const initResult = ZSTD_seekable_initCStream(cstream, cLevel, chunkSize); + if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_seekable_initCStream() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); } + + size_t read, toRead = buffInSize; + while( (read = fread_orDie(buffIn, toRead, fin)) ) { + ZSTD_inBuffer input = { buffIn, read, 0 }; + while (input.pos < input.size) { + ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; + toRead = ZSTD_seekable_compressStream(cstream, &output , &input); /* toRead is guaranteed to be <= ZSTD_CStreamInSize() */ + if (ZSTD_isError(toRead)) { fprintf(stderr, "ZSTD_compressStream() error : %s \n", ZSTD_getErrorName(toRead)); exit(12); } + if (toRead > buffInSize) toRead = buffInSize; /* Safely handle case when `buffInSize` is manually changed to a value < ZSTD_CStreamInSize()*/ + fwrite_orDie(buffOut, output.pos, fout); + } + } + + ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; + while (1) { + size_t const remainingToFlush = ZSTD_seekable_endStream(cstream, &output); /* close stream */ + fwrite_orDie(buffOut, output.pos, fout); + if (!remainingToFlush) break; + } + + ZSTD_seekable_freeCStream(cstream); + fclose_orDie(fout); + fclose_orDie(fin); + free(buffIn); + free(buffOut); +} + +static const char* createOutFilename_orDie(const char* filename) +{ + size_t const inL = strlen(filename); + size_t const outL = inL + 5; + void* outSpace = malloc_orDie(outL); + memset(outSpace, 0, outL); + strcat(outSpace, filename); + strcat(outSpace, ".zst"); + return (const char*)outSpace; +} + +int main(int argc, const char** argv) { + const char* const exeName = argv[0]; + if (argc!=3) { + printf("wrong arguments\n"); + printf("usage:\n"); + printf("%s FILE CHUNK_SIZE\n", exeName); + return 1; + } + + { const char* const inFileName = argv[1]; + unsigned const chunkSize = (unsigned)atoi(argv[2]); + + const char* const outFileName = createOutFilename_orDie(inFileName); + compressFile_orDie(inFileName, outFileName, 5, chunkSize); + } +} diff --git a/examples/seekable_decompression.c b/examples/seekable_decompression.c new file mode 100644 index 000000000..97563c3de --- /dev/null +++ b/examples/seekable_decompression.c @@ -0,0 +1,175 @@ +/** + * Copyright 2016-present, Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the license found in the + * LICENSE-examples file in the root directory of this source tree. + */ + + +#include // malloc, exit +#include // fprintf, perror, feof +#include // strerror +#include // errno +#define ZSTD_STATIC_LINKING_ONLY +#include // presumes zstd library is installed +#include + + +static void* malloc_orDie(size_t size) +{ + void* const buff = malloc(size); + if (buff) return buff; + /* error */ + perror("malloc"); + exit(1); +} + +static void* realloc_orDie(void* ptr, size_t size) +{ + ptr = realloc(ptr, size); + if (ptr) return ptr; + /* error */ + perror("realloc"); + exit(1); +} + +static FILE* fopen_orDie(const char *filename, const char *instruction) +{ + FILE* const inFile = fopen(filename, instruction); + if (inFile) return inFile; + /* error */ + perror(filename); + exit(3); +} + +static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file) +{ + size_t const readSize = fread(buffer, 1, sizeToRead, file); + if (readSize == sizeToRead) return readSize; /* good */ + if (feof(file)) return readSize; /* good, reached end of file */ + /* error */ + perror("fread"); + exit(4); +} + +static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file) +{ + size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file); + if (writtenSize == sizeToWrite) return sizeToWrite; /* good */ + /* error */ + perror("fwrite"); + exit(5); +} + +static size_t fclose_orDie(FILE* file) +{ + if (!fclose(file)) return 0; + /* error */ + perror("fclose"); + exit(6); +} + +static void fseek_orDie(FILE* file, long int offset, int origin) { + if (!fseek(file, offset, origin)) { + if (!fflush(file)) return; + } + /* error */ + perror("fseek"); + exit(7); +} + + +static void decompressFile_orDie(const char* fname, unsigned startOffset, unsigned endOffset) +{ + FILE* const fin = fopen_orDie(fname, "rb"); + size_t const buffInSize = ZSTD_DStreamInSize(); + void* const buffIn = malloc_orDie(buffInSize); + FILE* const fout = stdout; + size_t const buffOutSize = ZSTD_DStreamOutSize(); /* Guarantee to successfully flush at least one complete compressed block in all circumstances. */ + void* const buffOut = malloc_orDie(buffOutSize); + + ZSTD_seekable_DStream* const dstream = ZSTD_seekable_createDStream(); + if (dstream==NULL) { fprintf(stderr, "ZSTD_seekable_createDStream() error \n"); exit(10); } + + { size_t sizeNeeded = 0; + void* buffSeekTable = NULL; + + do { + sizeNeeded = ZSTD_seekable_loadSeekTable(dstream, buffSeekTable, sizeNeeded); + if (!sizeNeeded) break; + + if (ZSTD_isError(sizeNeeded)) { + fprintf(stderr, "ZSTD_seekable_loadSeekTable() error : %s \n", + ZSTD_getErrorName(sizeNeeded)); + exit(11); + } + + fseek_orDie(fin, -(long) sizeNeeded, SEEK_END); + buffSeekTable = realloc_orDie(buffSeekTable, sizeNeeded); + fread_orDie(buffSeekTable, sizeNeeded, fin); + } while (sizeNeeded > 0); + + free(buffSeekTable); + } + + /* In more complex scenarios, a file may consist of multiple appended frames (ex : pzstd). + * The following example decompresses only the first frame. + * It is compatible with other provided streaming examples */ + size_t const initResult = ZSTD_seekable_initDStream(dstream, startOffset, endOffset); + if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_seekable_initDStream() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); } + + size_t result, read, toRead = 0; + + do { + read = fread_orDie(buffIn, toRead, fin); + { ZSTD_inBuffer input = { buffIn, read, 0 }; + ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; + result = ZSTD_seekable_decompressStream(dstream, &output, &input); + + if (ZSTD_isError(result)) { + if (ZSTD_getErrorCode(result) == ZSTD_error_needSeek) { + unsigned long long const offset = ZSTD_seekable_getSeekOffset(dstream); + fseek_orDie(fin, offset, SEEK_SET); + ZSTD_seekable_updateOffset(dstream, offset); + toRead = 0; + } else { + fprintf(stderr, + "ZSTD_seekable_decompressStream() error : %s \n", + ZSTD_getErrorName(result)); + exit(12); + } + } else { + toRead = result; + } + fwrite_orDie(buffOut, output.pos, fout); + } + } while (result > 0); + + ZSTD_seekable_freeDStream(dstream); + fclose_orDie(fin); + fclose_orDie(fout); + free(buffIn); + free(buffOut); +} + + +int main(int argc, const char** argv) +{ + const char* const exeName = argv[0]; + + if (argc!=4) { + fprintf(stderr, "wrong arguments\n"); + fprintf(stderr, "usage:\n"); + fprintf(stderr, "%s FILE\n", exeName); + return 1; + } + + { + const char* const inFilename = argv[1]; + unsigned const startOffset = (unsigned) atoi(argv[2]); + unsigned const endOffset = (unsigned) atoi(argv[3]); + decompressFile_orDie(inFilename, startOffset, endOffset); + } + return 0; +} diff --git a/lib/common/error_private.c b/lib/common/error_private.c index 44ae20104..4084edca6 100644 --- a/lib/common/error_private.c +++ b/lib/common/error_private.c @@ -38,6 +38,8 @@ const char* ERR_getErrorString(ERR_enum code) case PREFIX(dictionary_corrupted): return "Dictionary is corrupted"; case PREFIX(dictionary_wrong): return "Dictionary mismatch"; case PREFIX(dictionaryCreation_failed): return "Cannot create Dictionary from provided samples"; + case PREFIX(chunkIndex_tooLarge): return "Chunk index is too large"; + case PREFIX(needSeek): return "Wrong file position, a seek is required to continue"; case PREFIX(maxCode): default: return notErrorCode; } diff --git a/lib/common/seekable.h b/lib/common/seekable.h new file mode 100644 index 000000000..ab11a4388 --- /dev/null +++ b/lib/common/seekable.h @@ -0,0 +1,23 @@ +#ifndef SEEKABLE_H +#define SEEKABLE_H + +#if defined (__cplusplus) +extern "C" { +#endif + +#include "zstd_internal.h" + +static const unsigned ZSTD_seekTableFooterSize = 9; + +#define ZSTD_SEEKABLE_MAGICNUMBER 0x8F92EAB1 + +#define ZSTD_SEEKABLE_MAXCHUNKS 0x8000000U + +/* 0xFE03F607 is the largest number x such that ZSTD_compressBound(x) fits in a 32-bit integer */ +#define ZSTD_SEEKABLE_MAX_CHUNK_DECOMPRESSED_SIZE 0xFE03F607 + +#if defined (__cplusplus) +} +#endif + +#endif diff --git a/lib/common/zstd_errors.h b/lib/common/zstd_errors.h index 3d579d969..13a5608e0 100644 --- a/lib/common/zstd_errors.h +++ b/lib/common/zstd_errors.h @@ -58,6 +58,8 @@ typedef enum { ZSTD_error_dictionary_corrupted, ZSTD_error_dictionary_wrong, ZSTD_error_dictionaryCreation_failed, + ZSTD_error_chunkIndex_tooLarge, + ZSTD_error_needSeek, ZSTD_error_maxCode } ZSTD_ErrorCode; diff --git a/lib/compress/zstdseek_compress.c b/lib/compress/zstdseek_compress.c new file mode 100644 index 000000000..284724e58 --- /dev/null +++ b/lib/compress/zstdseek_compress.c @@ -0,0 +1,265 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include /* malloc, free */ + +#define XXH_STATIC_LINKING_ONLY +#include "xxhash.h" + +#include "zstd_internal.h" /* includes zstd.h */ +#include "seekable.h" + +typedef struct { + U32 cSize; + U32 dSize; + U32 checksum; +} chunklogEntry_t; + +typedef struct { + chunklogEntry_t* entries; + U32 size; + U32 capacity; +} chunklog_t; + +struct ZSTD_seekable_CStream_s { + ZSTD_CStream* cstream; + chunklog_t chunklog; + + U32 chunkCSize; + U32 chunkDSize; + + XXH64_state_t xxhState; + + U32 maxChunkSize; + + int checksumFlag; +}; + +ZSTD_seekable_CStream* ZSTD_seekable_createCStream() +{ + ZSTD_seekable_CStream* zcs = malloc(sizeof(ZSTD_seekable_CStream)); + + if (zcs == NULL) return NULL; + + memset(zcs, 0, sizeof(*zcs)); + + zcs->cstream = ZSTD_createCStream(); + if (zcs->cstream == NULL) goto failed1; + + { size_t const CHUNKLOG_STARTING_CAPACITY = 16; + zcs->chunklog.entries = + malloc(sizeof(chunklogEntry_t) * CHUNKLOG_STARTING_CAPACITY); + if (zcs->chunklog.entries == NULL) goto failed2; + zcs->chunklog.capacity = CHUNKLOG_STARTING_CAPACITY; + } + + return zcs; + +failed2: + ZSTD_freeCStream(zcs->cstream); +failed1: + free(zcs); + return NULL; +} + +size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs) +{ + if (zcs == NULL) return 0; /* support free on NULL */ + ZSTD_freeCStream(zcs->cstream); + free(zcs->chunklog.entries); + free(zcs); + + return 0; +} + +size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, + int compressionLevel, + U32 maxChunkSize) +{ + zcs->chunklog.size = 0; + zcs->chunkCSize = 0; + zcs->chunkDSize = 0; + + if (maxChunkSize > ZSTD_SEEKABLE_MAX_CHUNK_DECOMPRESSED_SIZE) { + return ERROR(compressionParameter_unsupported); + } + + zcs->maxChunkSize = maxChunkSize + ? maxChunkSize + : ZSTD_SEEKABLE_MAX_CHUNK_DECOMPRESSED_SIZE; + + zcs->checksumFlag = 0; + if (zcs->checksumFlag) { + XXH64_reset(&zcs->xxhState, 0); + } + + return ZSTD_initCStream(zcs->cstream, compressionLevel); +} + +static size_t ZSTD_seekable_logChunk(ZSTD_seekable_CStream* zcs) +{ + if (zcs->chunklog.size == ZSTD_SEEKABLE_MAXCHUNKS) + return ERROR(chunkIndex_tooLarge); + + zcs->chunklog.entries[zcs->chunklog.size] = (chunklogEntry_t) + { + .cSize = zcs->chunkCSize, + .dSize = zcs->chunkDSize, + }; + if (zcs->checksumFlag) + zcs->chunklog.entries[zcs->chunklog.size].checksum = + XXH64_digest(&zcs->xxhState) & 0xFFFFFFFFU; + + zcs->chunklog.size++; + if (zcs->chunklog.size == zcs->chunklog.capacity) { + size_t const newCapacity = zcs->chunklog.capacity * 2; + chunklogEntry_t* const newEntries = realloc(zcs->chunklog.entries, + sizeof(chunklogEntry_t) * newCapacity); + + if (newEntries == NULL) return ERROR(memory_allocation); + + zcs->chunklog.entries = newEntries; + zcs->chunklog.capacity = newCapacity; + } + + return 0; +} + +size_t ZSTD_seekable_endChunk(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output) +{ + size_t const prevOutPos = output->pos; + size_t ret = ZSTD_endStream(zcs->cstream, output); + + zcs->chunkCSize += output->pos - prevOutPos; + + /* need to flush before doing the rest */ + if (ret) return ret; + + /* frame done */ + + /* store the chunk data for later */ + ret = ZSTD_seekable_logChunk(zcs); + if (ret) return ret; + + /* reset for the next chunk */ + zcs->chunkCSize = 0; + zcs->chunkDSize = 0; + + ZSTD_resetCStream(zcs->cstream, 0); + if (zcs->checksumFlag) + XXH64_reset(&zcs->xxhState, 0); + + return 0; +} + +size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +{ + const BYTE* const inBase = (const BYTE*) input->src + input->pos; + size_t inLen = input->size - input->pos; + + inLen = MIN(inLen, (size_t)(zcs->maxChunkSize - zcs->chunkDSize)); + + if (inLen > 0) { + ZSTD_inBuffer inTmp = { inBase, inLen, 0 }; + size_t const prevOutPos = output->pos; + + size_t const ret = ZSTD_compressStream(zcs->cstream, output, &inTmp); + + if (zcs->checksumFlag) { + XXH64_update(&zcs->xxhState, inBase, inTmp.pos); + } + + zcs->chunkCSize += output->pos - prevOutPos; + zcs->chunkDSize += inTmp.pos; + + input->pos += inTmp.pos; + + if (ZSTD_isError(ret)) return ret; + } + + if (zcs->maxChunkSize == zcs->chunkDSize) { + size_t const ret = ZSTD_seekable_endChunk(zcs, output); + if (ZSTD_isError(ret)) return ret; + } + + return (size_t)(zcs->maxChunkSize - zcs->chunkDSize); +} + +static size_t ZSTD_seekable_seekTableSize(ZSTD_seekable_CStream* zcs) +{ + size_t const sizePerChunk = 8 + (zcs->checksumFlag?4:0); + size_t const seekTableLen = ZSTD_skippableHeaderSize + + sizePerChunk * zcs->chunklog.size + + ZSTD_seekTableFooterSize; + + return seekTableLen; +} + +static size_t ZSTD_seekable_writeSeekTable(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output) +{ + BYTE* op = (BYTE*) output->dst; + + /* repurpose + * zcs->chunkDSize: the current index in the table and + * zcs->chunkCSize: the amount of the table written so far */ + + size_t const sizePerChunk = 8 + (zcs->checksumFlag?4:0); + size_t const seekTableLen = ZSD_seekable_seekTableSize(zcs); + + if (zcs->chunkCSize == 0) { + if (output->size - output->pos < 4) return seekTableLen - zcs->chunkCSize; + MEM_writeLE32(op + output->pos, ZSTD_MAGIC_SKIPPABLE_START); + output->pos += 4; + zcs->chunkCSize += 4; + } + if (zcs->chunkCSize == 4) { + if (output->size - output->pos < 4) return seekTableLen - zcs->chunkCSize; + MEM_writeLE32(op + output->pos, seekTableLen - ZSTD_skippableHeaderSize); + output->pos += 4; + zcs->chunkCSize += 4; + } + + while (zcs->chunkDSize < zcs->chunklog.size) { + if (output->size - output->pos < sizePerChunk) return seekTableLen - zcs->chunkCSize; + MEM_writeLE32(op + output->pos + 0, zcs->chunklog.entries[zcs->chunkDSize].cSize); + MEM_writeLE32(op + output->pos + 4, zcs->chunklog.entries[zcs->chunkDSize].dSize); + if (zcs->checksumFlag) { + MEM_writeLE32(op + output->pos + 8, zcs->chunklog.entries[zcs->chunkDSize].checksum); + } + output->pos += sizePerChunk; + zcs->chunkCSize += sizePerChunk; + zcs->chunkDSize++; + } + + if (output->size - output->pos < ZSTD_seekTableFooterSize) return seekTableLen - zcs->chunkCSize; + MEM_writeLE32(op + output->pos, zcs->chunklog.size); + { BYTE sfd = 0; + sfd |= (zcs->checksumFlag) << 7; + + op[output->pos + 4] = sfd; + } + MEM_writeLE32(op + output->pos + 5, ZSTD_SEEKABLE_MAGICNUMBER); + + output->pos += ZSTD_seekTableFooterSize; + zcs->chunkCSize += ZSTD_seekTableFooterSize; + + if (zcs->chunkCSize != seekTableLen) return ERROR(GENERIC); + return 0; +} + +size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output) +{ + if (zcs->chunkDSize) { + const size_t endChunk = ZSTD_seekable_endChunk(zcs, output); + /* return an accurate size hint */ + if (endChunk) return endChunk + ZSTD_seekable_seekTableLen(zcs); + } + + return ZSTD_seekable_writeSeekTable(zcs, output); +} diff --git a/lib/decompress/zstdseek_decompress.c b/lib/decompress/zstdseek_decompress.c new file mode 100644 index 000000000..e76b4c612 --- /dev/null +++ b/lib/decompress/zstdseek_decompress.c @@ -0,0 +1,347 @@ +/* + * Copyright (c) 2017-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include /* malloc, free */ + +#define XXH_STATIC_LINKING_ONLY +#include "xxhash.h" + +#include "zstd_internal.h" /* includes zstd.h */ +#include "seekable.h" + +typedef struct { + U64 cOffset; + U64 dOffset; + U32 checksum; +} seekEntry_t; + +typedef struct { + seekEntry_t* entries; + size_t tableLen; + + int checksumFlag; +} seekTable_t; + +static U32 ZSTD_seekable_offsetToChunk(const seekTable_t* table, U64 pos) +{ + U32 lo = 0; + U32 hi = table->tableLen; + + while (lo + 1 < hi) { + U32 mid = lo + ((hi - lo) >> 1); + if (table->entries[mid].dOffset <= pos) { + lo = mid; + } else { + hi = mid; + } + } + return lo; +} + +enum ZSTD_seekable_DStream_stage { + zsds_init, + zsds_seek, + zsds_decompress, + zsds_done, +}; + +struct ZSTD_seekable_DStream_s { + ZSTD_DStream* dstream; + seekTable_t seekTable; + + U32 curChunk; + U64 compressedOffset; + U64 decompressedOffset; + + U64 targetStart; + U64 targetEnd; + + U64 nextSeek; + + enum ZSTD_seekable_DStream_stage stage; + + XXH64_state_t xxhState; +}; + +ZSTD_seekable_DStream* ZSTD_seekable_createDStream(void) +{ + ZSTD_seekable_DStream* zds = malloc(sizeof(ZSTD_seekable_DStream)); + + if (zds == NULL) return NULL; + + memset(zds, 0, sizeof(*zds)); + + zds->dstream = ZSTD_createDStream(); + if (zds->dstream == NULL) { + free(zds); + return NULL; + } + + return zds; +} + +size_t ZSTD_seekable_freeDStream(ZSTD_seekable_DStream* zds) +{ + if (zds == NULL) return 0; + ZSTD_freeDStream(zds->dstream); + free(zds->seekTable.entries); + free(zds); + + return 0; +} + +size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, size_t srcSize) +{ + const BYTE* ip = (const BYTE*)src + srcSize; + + U32 numChunks; + int checksumFlag; + + U32 sizePerEntry; + + if (srcSize < ZSTD_seekTableFooterSize) + return ZSTD_seekTableFooterSize; + + if (MEM_readLE32(ip - 4) != ZSTD_SEEKABLE_MAGICNUMBER) { + return ERROR(prefix_unknown); + } + + { + BYTE const sfd = ip[-5]; + checksumFlag = sfd >> 7; + + numChunks = MEM_readLE32(ip-9); + + sizePerEntry = 8 + (checksumFlag?4:0); + } + + { U32 const tableSize = sizePerEntry * numChunks; + U32 const frameSize = tableSize + ZSTD_seekTableFooterSize + ZSTD_skippableHeaderSize; + + const BYTE* base = ip - frameSize; + + if (srcSize < frameSize) return frameSize; + + if ((MEM_readLE32(base) & 0xFFFFFFF0U) != ZSTD_MAGIC_SKIPPABLE_START) { + return ERROR(prefix_unknown); + } + if (MEM_readLE32(base+4) + ZSTD_skippableHeaderSize != frameSize) { + return ERROR(prefix_unknown); + } + + { /* Allocate an extra entry at the end so that we can do size + * computations on the last element without special case */ + seekEntry_t* entries = malloc(sizeof(seekEntry_t) * (numChunks + 1)); + const BYTE* tableBase = base + ZSTD_skippableHeaderSize; + + U32 idx; + size_t pos; + + U64 cOffset = 0; + U64 dOffset = 0; + + if (!entries) { + free(entries); + return ERROR(memory_allocation); + } + + for (idx = 0, pos = 0; idx < numChunks; idx++) { + entries[idx].cOffset = cOffset; + entries[idx].dOffset = dOffset; + + cOffset += MEM_readLE32(tableBase + pos); pos += 4; + dOffset += MEM_readLE32(tableBase + pos); pos += 4; + if (checksumFlag) { + entries[idx].checksum = MEM_readLE32(tableBase + pos); + pos += 4; + } + } + entries[numChunks].cOffset = cOffset; + entries[numChunks].dOffset = dOffset; + + zds->seekTable.entries = entries; + zds->seekTable.tableLen = numChunks; + zds->seekTable.checksumFlag = checksumFlag; + return 0; + } + } +} + +size_t ZSTD_seekable_initDStream(ZSTD_seekable_DStream* zds, U64 rangeStart, U64 rangeEnd) +{ + /* restrict range to the end of the file, and not before the range start */ + rangeEnd = MIN(rangeEnd, zds->seekTable.entries[zds->seekTable.tableLen].dOffset); + rangeEnd = MAX(rangeEnd, rangeStart); + + zds->targetStart = rangeStart; + zds->targetEnd = rangeEnd; + zds->stage = zsds_seek; + + /* force a seek first */ + zds->curChunk = (U32) -1; + zds->compressedOffset = (U64) -1; + zds->decompressedOffset = (U64) -1; + + if (zds->seekTable.checksumFlag) { + XXH64_reset(&zds->xxhState, 0); + } + + { const size_t ret = ZSTD_initDStream(zds->dstream); + if (ZSTD_isError(ret)) return ret; } + return 0; +} + +U64 ZSTD_seekable_getSeekOffset(ZSTD_seekable_DStream* zds) +{ + return zds->nextSeek; +} + +size_t ZSTD_seekable_updateOffset(ZSTD_seekable_DStream* zds, U64 offset) +{ + if (zds->stage != zsds_seek) { + return ERROR(stage_wrong); + } + if (offset != zds->nextSeek) { + return ERROR(needSeek); + } + + zds->stage = zsds_decompress; + zds->compressedOffset = offset; + return 0; +} + +size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +{ + const seekTable_t* const jt = &zds->seekTable; + while (1) { + switch (zds->stage) { + case zsds_init: + return ERROR(init_missing); + case zsds_decompress: { + BYTE* const outBase = (BYTE*)output->dst + output->pos; + size_t const outLen = output->size - output->pos; + while (zds->decompressedOffset < zds->targetStart) { + U64 const toDecompress = + zds->targetStart - zds->decompressedOffset; + size_t const prevInputPos = input->pos; + + ZSTD_outBuffer outTmp = { + .dst = outBase, + .size = (size_t)MIN((U64)outLen, toDecompress), + .pos = 0}; + + size_t const ret = + ZSTD_decompressStream(zds->dstream, &outTmp, input); + + if (ZSTD_isError(ret)) return ret; + if (ret == 0) { + /* should not happen at this stage */ + return ERROR(corruption_detected); + } + + zds->compressedOffset += input->pos - prevInputPos; + zds->decompressedOffset += outTmp.pos; + + if (zds->seekTable.checksumFlag) { + XXH64_update(&zds->xxhState, outTmp.dst, outTmp.pos); + } + + if (input->pos == input->size) { + /* need more input */ + return MIN( + ZSTD_DStreamInSize(), + (size_t)(zds->seekTable.entries[zds->curChunk + 1] + .cOffset - + zds->compressedOffset)); + } + } + + /* do actual decompression */ + { + U64 const toDecompress = + MIN(zds->targetEnd, + jt->entries[zds->curChunk + 1].dOffset) - + zds->decompressedOffset; + size_t const prevInputPos = input->pos; + + ZSTD_outBuffer outTmp = { + .dst = outBase, + .size = (size_t)MIN((U64)outLen, toDecompress), + .pos = 0}; + + size_t const ret = + ZSTD_decompressStream(zds->dstream, &outTmp, input); + + if (ZSTD_isError(ret)) return ret; + + zds->compressedOffset += input->pos - prevInputPos; + zds->decompressedOffset += outTmp.pos; + + output->pos += outTmp.pos; + + if (zds->seekTable.checksumFlag) { + XXH64_update(&zds->xxhState, outTmp.dst, outTmp.pos); + if (ret == 0) { + /* verify the checksum */ + U32 const digest = XXH64_digest(&zds->xxhState); + if (digest != jt->entries[zds->curChunk].checksum) { + return ERROR(checksum_wrong); + } + + XXH64_reset(&zds->xxhState, 0); + } + } + + if (zds->decompressedOffset == zds->targetEnd) { + /* done */ + zds->stage = zsds_done; + return 0; + } + + if (ret == 0) { + /* frame is done */ + ZSTD_resetDStream(zds->dstream); + zds->stage = zsds_seek; + break; + } + + /* need more input */ + return MIN( + ZSTD_DStreamInSize(), + (size_t)(zds->seekTable.entries[zds->curChunk + 1] + .cOffset - + zds->compressedOffset)); + } + } + case zsds_seek: { + U32 targetChunk; + if (zds->decompressedOffset < zds->targetStart || + zds->decompressedOffset >= zds->targetEnd) { + /* haven't started yet */ + targetChunk = ZSTD_seekable_offsetToChunk(jt, zds->targetStart); + } else { + targetChunk = ZSTD_seekable_offsetToChunk(jt, zds->decompressedOffset); + } + + zds->curChunk = targetChunk; + + if (zds->compressedOffset == jt->entries[targetChunk].cOffset) { + zds->stage = zsds_decompress; + break; + } + + zds->nextSeek = jt->entries[targetChunk].cOffset; + zds->decompressedOffset = jt->entries[targetChunk].dOffset; + return ERROR(needSeek); + } + case zsds_done: + return 0; + } + } +} diff --git a/lib/zstd.h b/lib/zstd.h index 6066db45e..5bbd1b742 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -776,6 +776,123 @@ ZSTDLIB_API size_t ZSTD_decompressBlock(ZSTD_DCtx* dctx, void* dst, size_t dstCa ZSTDLIB_API size_t ZSTD_insertBlock(ZSTD_DCtx* dctx, const void* blockStart, size_t blockSize); /**< insert block into `dctx` history. Useful for uncompressed blocks */ +/*-**************************************************************************** +* Seekable Format +* +* The seekable format splits the compressed data into a series of "chunks", +* each compressed individually so that decompression of a section in the +* middle of an archive only requires zstd to decompress at most a chunk's +* worth of extra data, instead of the entire archive. +******************************************************************************/ + +typedef struct ZSTD_seekable_CStream_s ZSTD_seekable_CStream; +typedef struct ZSTD_seekable_DStream_s ZSTD_seekable_DStream; + +/*-**************************************************************************** +* Seekable compression - HowTo +* A ZSTD_seekable_CStream object is required to tracking streaming operation. +* Use ZSTD_seekable_createCStream() and ZSTD_seekable_freeCStream() to create/ +* release resources. +* +* Streaming objects are reusable to avoid allocation and deallocation, +* to start a new compression operation call ZSTD_seekable_initCStream() on the +* compressor. +* +* Data streamed to the seekable compressor will automatically be split into +* chunks of size `maxChunkSize` (provided in ZSTD_seekable_initCStream()), +* or if none is provided, will be cut off whenver ZSTD_endChunk() is called +* or when the default maximum chunk size is reached (approximately 4GB). +* +* Use ZSTD_seekable_initCStream() to initialize a ZSTD_seekable_CStream object +* for a new compression operation. +* `maxChunkSize` indicates the size at which to automatically start a new +* seekable frame. `maxChunkSize == 0` implies the default maximum size. +* @return : a size hint for input to provide for compression, or an error code +* checkable with ZSTD_isError() +* +* Use ZSTD_seekable_compressStream() repetitively to consume input stream. +* The function will automatically update both `pos` fields. +* Note that it may not consume the entire input, in which case `pos < size`, +* and it's up to the caller to present again remaining data. +* @return : a size hint, preferred nb of bytes to use as input for next +* function call or an error code, which can be tested using +* ZSTD_isError(). +* Note 1 : it's just a hint, to help latency a little, any other +* value will work fine. +* Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize() +* +* At any time, call ZSTD_seekable_endChunk() to end the current chunk and +* start a new one. +* +* ZSTD_endStream() will end the current chunk, and then write the seek table +* so that decompressors can efficiently find compressed chunks. +* ZSTD_endStream() may return a number > 0 if it was unable to flush all the +* necessary data to `output`. In this case, it should be called again until +* all remaining data is flushed out and 0 is returned. +******************************************************************************/ + +/*===== Seekable compressor management =====*/ +ZSTDLIB_API ZSTD_seekable_CStream* ZSTD_seekable_createCStream(void); +ZSTDLIB_API size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs); + +/*===== Seekable compression functions =====*/ +ZSTDLIB_API size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, unsigned maxChunkSize); +ZSTDLIB_API size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); +ZSTDLIB_API size_t ZSTD_seekable_endChunk(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output); +ZSTDLIB_API size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output); + +/*-**************************************************************************** +* Seekable decompression - HowTo +* A ZSTD_seekable_DStream object is required to tracking streaming operation. +* Use ZSTD_seekable_createDStream() and ZSTD_seekable_freeDStream() to create/ +* release resources. +* +* Streaming objects are reusable to avoid allocation and deallocation, +* to start a new compression operation call ZSTD_seekable_initDStream() on the +* compressor. +* +* Use ZSTD_seekable_loadSeekTable() to load the seek table from a file. +* `src` should point to a block of data read from the end of the file, +* i.e. `src + srcSize` should always be the end of the file. +* @return : 0 if the table was loaded successfully, or if `srcSize` was too +* small, a size hint for how much data to provide. +* An error code may also be returned, checkable with ZSTD_isError() +* +* Use ZSTD_initDStream to prepare for a new decompression operation using the +* seektable loaded with ZSTD_seekable_loadSeekTable(). +* Data in the range [rangeStart, rangeEnd) will be decompressed. +* +* Call ZSTD_seekable_decompressStream() repetitively to consume input stream. +* @return : There are a number of possible return codes for this function +* - 0, the decompression operation has completed. +* - An error code checkable with ZSTD_isError +* + If this error code is ZSTD_error_needSeek, the user should seek +* to the file position provided by ZSTD_seekable_getSeekOffset() +* and indicate this to the stream with +* ZSTD_seekable_updateOffset(), before resuming decompression +* + Otherwise, this is a regular decompression error and the input +* file is likely corrupted or the API was incorrectly used. +* - A size hint, the preferred nb of bytes to provide as input to the +* next function call to improve latency. +* +* ZSTD_seekable_getSeekOffset() and ZSTD_seekable_updateOffset() are helper +* functions to indicate where the user should seek their file stream to, when +* a different position is required to continue decompression. +* Note that ZSTD_seekable_updateOffset will error if given an offset other +* than the one requested from ZSTD_seekable_getSeekOffset(). +******************************************************************************/ + +/*===== Seekable decompressor management =====*/ +ZSTDLIB_API ZSTD_seekable_DStream* ZSTD_seekable_createDStream(void); +ZSTDLIB_API size_t ZSTD_seekable_freeDStream(ZSTD_seekable_DStream* zds); + +/*===== Seekable decompression functions =====*/ +ZSTDLIB_API size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, size_t srcSize); +ZSTDLIB_API size_t ZSTD_seekable_initDStream(ZSTD_seekable_DStream* zds, unsigned long long rangeStart, unsigned long long rangeEnd); +ZSTDLIB_API size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer* output, ZSTD_inBuffer* input); +ZSTDLIB_API unsigned long long ZSTD_seekable_getSeekOffset(ZSTD_seekable_DStream* zds); +ZSTDLIB_API size_t ZSTD_seekable_updateOffset(ZSTD_seekable_DStream* zds, unsigned long long offset); + #endif /* ZSTD_H_ZSTD_STATIC_LINKING_ONLY */ #if defined (__cplusplus)