]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
new streaming API (compression)
authorYann Collet <yann.collet.73@gmail.com>
Thu, 11 Aug 2016 23:20:36 +0000 (01:20 +0200)
committerYann Collet <yann.collet.73@gmail.com>
Tue, 16 Aug 2016 13:11:27 +0000 (15:11 +0200)
lib/compress/zstd_compress.c
lib/zstd.h

index e218b63743defab0504ce603f42b0f35fc83464c..d14a1cde878ea910de7f61bd2f17615c6946ca2b 100644 (file)
@@ -32,7 +32,7 @@
 */
 
 
-/* *******************************************************
+/*-*******************************************************
 *  Compiler specifics
 *********************************************************/
 #ifdef _MSC_VER    /* Visual Studio */
@@ -55,7 +55,7 @@
 #include "mem.h"
 #define XXH_STATIC_LINKING_ONLY   /* XXH64_state_t */
 #include "xxhash.h"         /* XXH_reset, update, digest */
-#define FSE_STATIC_LINKING_ONLY
+#define FSE_STATIC_LINKING_ONLY   /* FSE_encodeSymbol */
 #include "fse.h"
 #define HUF_STATIC_LINKING_ONLY
 #include "huf.h"
@@ -2758,6 +2758,274 @@ ZSTDLIB_API size_t ZSTD_compress_usingCDict(ZSTD_CCtx* cctx,
 
 
 
+/*-======  Streaming  ======-*/
+
+typedef enum { zcss_init, zcss_load, zcss_flush, zcss_final } ZSTD_cStreamStage;
+
+struct ZSTD_CStream_s {
+    ZSTD_CCtx* zc;
+    char*  inBuff;
+    size_t inBuffSize;
+    size_t inToCompress;
+    size_t inBuffPos;
+    size_t inBuffTarget;
+    size_t blockSize;
+    char*  outBuff;
+    size_t outBuffSize;
+    size_t outBuffContentSize;
+    size_t outBuffFlushedSize;
+    ZSTD_cStreamStage stage;
+    U32    checksum;
+    U32    frameEnded;
+    ZSTD_customMem customMem;
+};   /* typedef'd to ZSTD_CStream within "zstd.h" */
+
+ZSTD_CStream* ZSTD_createCStream(void)
+{
+    return ZSTD_createCStream_advanced(defaultCustomMem);
+}
+
+ZSTD_CStream* ZSTD_createCStream_advanced(ZSTD_customMem customMem)
+{
+    ZSTD_CStream* zcs;
+
+    if (!customMem.customAlloc && !customMem.customFree)
+        customMem = defaultCustomMem;
+
+    if (!customMem.customAlloc || !customMem.customFree)
+        return NULL;
+
+    zcs = (ZSTD_CStream*)customMem.customAlloc(customMem.opaque, sizeof(ZSTD_CStream));
+    if (zcs==NULL) return NULL;
+    memset(zcs, 0, sizeof(ZSTD_CStream));
+    memcpy(&zcs->customMem, &customMem, sizeof(ZSTD_customMem));
+    zcs->zc = ZSTD_createCCtx_advanced(customMem);
+    if (zcs->zc == NULL) { ZSTD_freeCStream(zcs); return NULL; }
+    return zcs;
+}
+
+size_t ZSTD_freeCStream(ZSTD_CStream* zcs)
+{
+    if (zcs==NULL) return 0;   /* support free on NULL */
+    ZSTD_freeCCtx(zcs->zc);
+    if (zcs->inBuff) zcs->customMem.customFree(zcs->customMem.opaque, zcs->inBuff);
+    if (zcs->outBuff) zcs->customMem.customFree(zcs->customMem.opaque, zcs->outBuff);
+    zcs->customMem.customFree(zcs->customMem.opaque, zcs);
+    return 0;
+}
+
+
+/*  Initialization  */
+
+size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
+                                 const void* dict, size_t dictSize,
+                                 ZSTD_parameters params, unsigned long long pledgedSrcSize)
+{
+    /* allocate buffers */
+    {   size_t const neededInBuffSize = (size_t)1 << params.cParams.windowLog;
+        if (zcs->inBuffSize < neededInBuffSize) {
+            zcs->inBuffSize = neededInBuffSize;
+            zcs->customMem.customFree(zcs->customMem.opaque, zcs->inBuff);   /* should not be necessary */
+            zcs->inBuff = (char*)zcs->customMem.customAlloc(zcs->customMem.opaque, neededInBuffSize);
+            if (zcs->inBuff == NULL) return ERROR(memory_allocation);
+        }
+        zcs->blockSize = MIN(ZSTD_BLOCKSIZE_ABSOLUTEMAX, neededInBuffSize);
+    }
+    if (zcs->outBuffSize < ZSTD_compressBound(zcs->blockSize)+1) {
+        zcs->outBuffSize = ZSTD_compressBound(zcs->blockSize)+1;
+        zcs->customMem.customFree(zcs->customMem.opaque, zcs->outBuff);   /* should not be necessary */
+        zcs->outBuff = (char*)zcs->customMem.customAlloc(zcs->customMem.opaque, zcs->outBuffSize);
+        if (zcs->outBuff == NULL) return ERROR(memory_allocation);
+    }
+
+    { size_t const errorCode = ZSTD_compressBegin_advanced(zcs->zc, dict, dictSize, params, pledgedSrcSize);
+      if (ZSTD_isError(errorCode)) return errorCode; }
+
+    zcs->inToCompress = 0;
+    zcs->inBuffPos = 0;
+    zcs->inBuffTarget = zcs->blockSize;
+    zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
+    zcs->stage = zcss_load;
+    zcs->checksum = params.fParams.checksumFlag > 0;
+    zcs->frameEnded = 0;
+    return 0;   /* ready to go */
+}
+
+size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel)
+{
+    ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, dictSize);
+    return ZSTD_initCStream_advanced(zcs, dict, dictSize, params, 0);
+}
+
+size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel)
+{
+    return ZSTD_initCStream_usingDict(zcs, NULL, 0, compressionLevel);
+}
+
+
+/*   Compression   */
+
+typedef enum { zsf_gather, zsf_flush, zsf_end } ZSTD_flush_e;
+
+MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, const void* src, size_t srcSize)
+{
+    size_t const length = MIN(dstCapacity, srcSize);
+    memcpy(dst, src, length);
+    return length;
+}
+
+static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
+                              void* dst, size_t* dstCapacityPtr,
+                        const void* src, size_t* srcSizePtr,
+                              ZSTD_flush_e const flush)
+{
+    U32 someMoreWork = 1;
+    const char* const istart = (const char*)src;
+    const char* const iend = istart + *srcSizePtr;
+    const char* ip = istart;
+    char* const ostart = (char*)dst;
+    char* const oend = ostart + *dstCapacityPtr;
+    char* op = ostart;
+
+    while (someMoreWork) {
+        switch(zcs->stage)
+        {
+        case zcss_init: return ERROR(init_missing);   /* call ZBUFF_compressInit() first ! */
+
+        case zcss_load:
+            /* complete inBuffer */
+            {   size_t const toLoad = zcs->inBuffTarget - zcs->inBuffPos;
+                size_t const loaded = ZSTD_limitCopy(zcs->inBuff + zcs->inBuffPos, toLoad, ip, iend-ip);
+                zcs->inBuffPos += loaded;
+                ip += loaded;
+                if ( (zcs->inBuffPos==zcs->inToCompress) || (!flush && (toLoad != loaded)) ) {
+                    someMoreWork = 0; break;  /* not enough input to get a full block : stop there, wait for more */
+            }   }
+            /* compress current block (note : this stage cannot be stopped in the middle) */
+            {   void* cDst;
+                size_t cSize;
+                size_t const iSize = zcs->inBuffPos - zcs->inToCompress;
+                size_t oSize = oend-op;
+                if (oSize >= ZSTD_compressBound(iSize))
+                    cDst = op;   /* compress directly into output buffer (avoid flush stage) */
+                else
+                    cDst = zcs->outBuff, oSize = zcs->outBuffSize;
+                cSize = (flush == zsf_end) ?
+                        ZSTD_compressEnd(zcs->zc, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize) :
+                        ZSTD_compressContinue(zcs->zc, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize);
+                if (ZSTD_isError(cSize)) return cSize;
+                if (flush == zsf_end) zcs->frameEnded = 1;
+                /* prepare next block */
+                zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize;
+                if (zcs->inBuffTarget > zcs->inBuffSize)
+                    zcs->inBuffPos = 0, zcs->inBuffTarget = zcs->blockSize;   /* note : inBuffSize >= blockSize */
+                zcs->inToCompress = zcs->inBuffPos;
+                if (cDst == op) { op += cSize; break; }   /* no need to flush */
+                zcs->outBuffContentSize = cSize;
+                zcs->outBuffFlushedSize = 0;
+                zcs->stage = zcss_flush;   /* pass-through to flush stage */
+            }
+
+        case zcss_flush:
+            {   size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize;
+                size_t const flushed = ZSTD_limitCopy(op, oend-op, zcs->outBuff + zcs->outBuffFlushedSize, toFlush);
+                op += flushed;
+                zcs->outBuffFlushedSize += flushed;
+                if (toFlush!=flushed) { someMoreWork = 0; break; }  /* dst too small to store flushed data : stop there */
+                zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
+                zcs->stage = zcss_load;
+                break;
+            }
+
+        case zcss_final:
+            someMoreWork = 0;   /* do nothing */
+            break;
+
+        default:
+            return ERROR(GENERIC);   /* impossible */
+        }
+    }
+
+    *srcSizePtr = ip - istart;
+    *dstCapacityPtr = op - ostart;
+    if (zcs->frameEnded) return 0;
+    {   size_t hintInSize = zcs->inBuffTarget - zcs->inBuffPos;
+        if (hintInSize==0) hintInSize = zcs->blockSize;
+        return hintInSize;
+    }
+}
+
+size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_wCursor* output, ZSTD_rCursor* input)
+{
+    size_t sizeRead = input->size;
+    size_t sizeWritten = output->size;
+    size_t const result = ZSTD_compressStream_generic(zcs, output->ptr, &sizeWritten, input->ptr, &sizeRead, zsf_gather);
+    input->ptr = (const char*)(input->ptr) + sizeRead;
+    input->size -= sizeRead;
+    output->ptr = (char*)(output->ptr) + sizeWritten;
+    output->size -= sizeWritten;
+    output->nbBytesWritten += sizeWritten;
+    return result;
+}
+
+
+/*    Finalize    */
+
+/*! ZSTD_flushStream() :
+*   @return : amount of data remaining to flush */
+size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_wCursor* output)
+{
+    size_t srcSize = 0;
+    size_t sizeWritten = output->size;
+    size_t const result = ZSTD_compressStream_generic(zcs, output->ptr, &sizeWritten, &srcSize, &srcSize, zsf_flush);  /* use a valid src address instead of NULL */
+    output->ptr = (char*)(output->ptr) + sizeWritten;
+    output->size -= sizeWritten;
+    output->nbBytesWritten += sizeWritten;
+    if (ZSTD_isError(result)) return result;
+    return zcs->outBuffContentSize - zcs->outBuffFlushedSize;   /* remaining to flush */
+}
+
+
+size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_wCursor* output)
+{
+    BYTE* const ostart = (BYTE*)(output->ptr);
+    BYTE* const oend = ostart + output->size;
+    BYTE* op = ostart;
+
+    if (zcs->stage != zcss_final) {
+        /* flush whatever remains */
+        size_t srcSize = 0;
+        size_t sizeWritten = output->size;
+        size_t const notEnded = ZSTD_compressStream_generic(zcs, ostart, &sizeWritten, &srcSize, &srcSize, zsf_end);  /* use a valid src address instead of NULL */
+        size_t const remainingToFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize;
+        op += sizeWritten;
+        if (remainingToFlush) {
+            output->ptr = op;
+            output->size -= sizeWritten;
+            output->nbBytesWritten += sizeWritten;
+            return remainingToFlush + ZSTD_BLOCKHEADERSIZE /* final empty block */ + (zcs->checksum * 4);
+        }
+        /* create epilogue */
+        zcs->stage = zcss_final;
+        zcs->outBuffContentSize = !notEnded ? 0 :
+            ZSTD_compressEnd(zcs->zc, zcs->outBuff, zcs->outBuffSize, NULL, 0);  /* write epilogue, including final empty block, into outBuff */
+    }
+
+    /* flush epilogue */
+    {   size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize;
+        size_t const flushed = ZSTD_limitCopy(op, oend-op, zcs->outBuff + zcs->outBuffFlushedSize, toFlush);
+        op += flushed;
+        zcs->outBuffFlushedSize += flushed;
+        output->ptr = op;
+        output->size = oend-op;
+        output->nbBytesWritten += op-ostart;
+        if (toFlush==flushed) zcs->stage = zcss_init;  /* end reached */
+        return toFlush - flushed;
+    }
+}
+
+
+
 /*-=====  Pre-defined compression levels  =====-*/
 
 #define ZSTD_DEFAULT_CLEVEL 1
index 7bd8a05c4a7b8d172663e4f5f2d0c742d6cbb691..3c563e38f80b68bad83d588e620cf18f3f95ecf6 100644 (file)
@@ -323,7 +323,52 @@ ZSTDLIB_API size_t ZSTD_sizeofDCtx(const ZSTD_DCtx* dctx);
 
 
 /* ******************************************************************
-*  Buffer-less streaming functions (synchronous mode)
+*  Streaming
+********************************************************************/
+
+typedef struct ZSTD_readCursor_s {
+  const void* ptr;            /* position of cursor - update to new position */
+  size_t size;                /* remaining buffer size to read - update preserves end of buffer */
+} ZSTD_rCursor;
+
+typedef struct ZSTD_writeCursor_s {
+  void* ptr;                  /* position of cursor - update to new position */
+  size_t size;                /* remaining buffer size to write - update preserves end of buffer */
+  size_t nbBytesWritten;      /* already written bytes - update adds bytes newly written (accumulator) */
+} ZSTD_wCursor;
+
+
+/*======   compression   ======*/
+
+typedef struct ZSTD_CStream_s ZSTD_CStream;
+ZSTD_CStream* ZSTD_createCStream(void);
+size_t ZSTD_freeCStream(ZSTD_CStream* zcs);
+
+size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel);
+size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_wCursor* output, ZSTD_rCursor* input);
+size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_wCursor* output);
+size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_wCursor* output);
+
+/* advanced */
+ZSTD_CStream* ZSTD_createCStream_advanced(ZSTD_customMem customMem);
+size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel);
+size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize,
+                                 ZSTD_parameters params, unsigned long long pledgedSrcSize);
+
+
+/*======   decompression   ======*/
+
+typedef struct ZSTD_DStream_s ZSTD_DStream;
+ZSTD_DStream* ZSTD_createDStream(void);
+size_t ZSTD_freeDStream(ZSTD_DStream* zds);
+
+size_t ZSTD_initDStream(ZSTD_DStream* zds);
+size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_wCursor* output, ZSTD_rCursor* input);
+
+
+
+/* ******************************************************************
+*  Buffer-less and synchronous inner streaming functions
 ********************************************************************/
 /* This is an advanced API, giving full control over buffer management, for users which need direct control over memory.
 *  But it's also a complex one, with a lot of restrictions (documented below).