]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
AsyncIO compression part 1 - refactor of existing asyncio code (#3021)
authorYonatan Komornik <11005061+yoniko@users.noreply.github.com>
Mon, 24 Jan 2022 22:43:02 +0000 (14:43 -0800)
committerGitHub <noreply@github.com>
Mon, 24 Jan 2022 22:43:02 +0000 (14:43 -0800)
* Refactored fileio.c:
- Extracted asyncio code to fileio_asyncio.c/.h
- Moved type definitions to fileio_types.h
- Moved common macro definitions needed by both fileio.c and fileio_asyncio.c to fileio_common.h

* Bugfix - rename fileio_asycio to fileio_asyncio

* Added copyrights & license to new files

* CR fixes

12 files changed:
build/VS2008/zstd/zstd.vcproj
build/VS2010/zstd/zstd.vcxproj
build/cmake/programs/CMakeLists.txt
build/meson/programs/meson.build
contrib/VS2005/zstd/zstd.vcproj
programs/Makefile
programs/fileio.c
programs/fileio.h
programs/fileio_asyncio.c [new file with mode: 0644]
programs/fileio_asyncio.h [new file with mode: 0644]
programs/fileio_common.h [new file with mode: 0644]
programs/fileio_types.h [new file with mode: 0644]

index c7eec577db38a6966e31e8a0c0edbe3ef0030dfa..91f2bda536c436a93945b3dbf1551d7ecff57540 100644 (file)
                                RelativePath="..\..\..\programs\fileio.c"
                                >
                        </File>
+                       <File
+                RelativePath="..\..\..\programs\fileio_asyncio.c"
+                >
+            </File>
                        <File
                                RelativePath="..\..\..\lib\compress\fse_compress.c"
                                >
index 46e22f42e9b643104c0056e5e1229bd61e5da7e7..8ab239dd814d78f9f5145a98d71f3696a38d8b81 100644 (file)
@@ -62,6 +62,7 @@
     <ClCompile Include="..\..\..\programs\datagen.c" />
     <ClCompile Include="..\..\..\programs\dibio.c" />
     <ClCompile Include="..\..\..\programs\fileio.c" />
+    <ClCompile Include="..\..\..\programs\fileio_asyncio.c" />
     <ClCompile Include="..\..\..\programs\zstdcli.c" />
     <ClCompile Include="..\..\..\programs\zstdcli_trace.c" />
   </ItemGroup>
index 490030783d35a35b2a65e1299120f4a47e5009f9..28b1e1d166baf77fe3eb3230e28c28c3dd905923 100644 (file)
@@ -32,7 +32,7 @@ if (MSVC)
     set(PlatformDependResources ${MSVC_RESOURCE_DIR}/zstd.rc)
 endif ()
 
-add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources})
+add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources})
 target_link_libraries(zstd ${PROGRAMS_ZSTD_LINK_TARGET})
 if (CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)")
     target_link_libraries(zstd rt)
@@ -75,7 +75,7 @@ if (UNIX)
         ${CMAKE_CURRENT_BINARY_DIR}/zstdless.1
         DESTINATION "${MAN_INSTALL_DIR}")
 
-    add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c)
+    add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c)
     target_link_libraries(zstd-frugal ${PROGRAMS_ZSTD_LINK_TARGET})
     set_property(TARGET zstd-frugal APPEND PROPERTY COMPILE_DEFINITIONS "ZSTD_NOBENCH;ZSTD_NODICT;ZSTD_NOTRACE")
 endif ()
index 0ae93fc107ca48ff77811125b8cd650ae87713d6..5ccd679a16752e6e4c7288a308a2452363ab34d4 100644 (file)
@@ -14,6 +14,7 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
   join_paths(zstd_rootdir, 'programs/util.c'),
   join_paths(zstd_rootdir, 'programs/timefn.c'),
   join_paths(zstd_rootdir, 'programs/fileio.c'),
+  join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'),
   join_paths(zstd_rootdir, 'programs/benchfn.c'),
   join_paths(zstd_rootdir, 'programs/benchzstd.c'),
   join_paths(zstd_rootdir, 'programs/datagen.c'),
@@ -80,6 +81,7 @@ zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
   join_paths(zstd_rootdir, 'programs/timefn.c'),
   join_paths(zstd_rootdir, 'programs/util.c'),
   join_paths(zstd_rootdir, 'programs/fileio.c'),
+  join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'),
   join_paths(zstd_rootdir, 'lib/common/pool.c'),
   join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
   join_paths(zstd_rootdir, 'lib/common/error_private.c')]
index 78645d18a360ba910b3954b8679ddb3e3daae104..e37ebee391128b777a6469d7ca78927ab34ca50e 100644 (file)
                                RelativePath="..\..\..\programs\fileio.c"
                                >
                        </File>
+                       <File
+                RelativePath="..\..\..\programs\fileio_asyncio.c"
+                >
+            </File>
                        <File
                                RelativePath="..\..\..\lib\compress\fse_compress.c"
                                >
index f77e1b7f10f8ee4128801f81450ef850295cdf7d..16763e493656db13bb38fd999bc73bb327958765 100644 (file)
@@ -243,17 +243,17 @@ zstd-pgo :
 
 ## zstd-small: minimal target, supporting only zstd compression and decompression. no bench. no legacy. no other format.
 zstd-small: CFLAGS = -Os -s
-zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c
+zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c
        $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT)
 
-zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c
+zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c
        $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT)
 
-zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c
+zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c
        $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NODECOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT)
 
 ## zstd-dictBuilder: executable supporting dictionary creation and compression (only)
-zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c dibio.c
+zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c dibio.c
        $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODECOMPRESS -DZSTD_NOTRACE $^ -o $@$(EXT)
 
 zstdmt: zstd
index d40ebbc1e040a079d1641e6e5e68599c11004997..2066096d23318cbfcf36d65f65dd74c101d9b12d 100644 (file)
 #include <limits.h>     /* INT_MAX */
 #include <signal.h>
 #include "timefn.h"     /* UTIL_getTime, UTIL_clockSpanMicro */
-#include "../lib/common/pool.h"
-#include "../lib/common/threading.h"
 
 #if defined (_MSC_VER)
 #  include <sys/stat.h>
 #  include <io.h>
 #endif
 
-#include "../lib/common/mem.h"     /* U32, U64 */
 #include "fileio.h"
+#include "fileio_asyncio.h"
+#include "fileio_common.h"
+
+FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto};
+UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
 
 #define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
 #include "../lib/zstd.h"
 #define DEFAULT_FILE_PERMISSIONS (0666)
 #endif
 
-/*-*************************************
-*  Macros
-***************************************/
-#define KB *(1 <<10)
-#define MB *(1 <<20)
-#define GB *(1U<<30)
-#undef MAX
-#define MAX(a,b) ((a)>(b) ? (a) : (b))
-
-struct FIO_display_prefs_s {
-    int displayLevel;   /* 0 : no display;  1: errors;  2: + result + interaction + warnings;  3: + progression;  4: + information */
-    FIO_progressSetting_e progressSetting;
-};
-
-static FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto};
-
-#define DISPLAY(...)         fprintf(stderr, __VA_ARGS__)
-#define DISPLAYOUT(...)      fprintf(stdout, __VA_ARGS__)
-#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
-
-static const U64 g_refreshRate = SEC_TO_MICRO / 6;
-static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
-
-#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > g_refreshRate)
-#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
-#define DISPLAYUPDATE(l, ...) {                              \
-        if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
-            if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
-                DELAY_NEXT_UPDATE();                         \
-                DISPLAY(__VA_ARGS__);                        \
-                if (g_display_prefs.displayLevel>=4) fflush(stderr);       \
-    }   }   }
-
-#undef MIN  /* in case it would be already defined */
-#define MIN(a,b)    ((a) < (b) ? (a) : (b))
-
-
-#define EXM_THROW(error, ...)                                             \
-{                                                                         \
-    DISPLAYLEVEL(1, "zstd: ");                                            \
-    DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \
-    DISPLAYLEVEL(1, "error %i : ", error);                                \
-    DISPLAYLEVEL(1, __VA_ARGS__);                                         \
-    DISPLAYLEVEL(1, " \n");                                               \
-    exit(error);                                                          \
-}
-
-#define CHECK_V(v, f)                                \
-    v = f;                                           \
-    if (ZSTD_isError(v)) {                           \
-        DISPLAYLEVEL(5, "%s \n", #f);                \
-        EXM_THROW(11, "%s", ZSTD_getErrorName(v));   \
-    }
-#define CHECK(f) { size_t err; CHECK_V(err, f); }
-
-
 /*-************************************
 *  Signal (Ctrl-C trapping)
 **************************************/
@@ -250,95 +196,6 @@ void FIO_addAbortHandler()
 #endif
 }
 
-
-/*-************************************************************
-* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW
-***************************************************************/
-#if defined(_MSC_VER) && _MSC_VER >= 1400
-#   define LONG_SEEK _fseeki64
-#   define LONG_TELL _ftelli64
-#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */
-#  define LONG_SEEK fseeko
-#  define LONG_TELL ftello
-#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__)
-#   define LONG_SEEK fseeko64
-#   define LONG_TELL ftello64
-#elif defined(_WIN32) && !defined(__DJGPP__)
-#   include <windows.h>
-    static int LONG_SEEK(FILE* file, __int64 offset, int origin) {
-        LARGE_INTEGER off;
-        DWORD method;
-        off.QuadPart = offset;
-        if (origin == SEEK_END)
-            method = FILE_END;
-        else if (origin == SEEK_CUR)
-            method = FILE_CURRENT;
-        else
-            method = FILE_BEGIN;
-
-        if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method))
-            return 0;
-        else
-            return -1;
-    }
-    static __int64 LONG_TELL(FILE* file) {
-        LARGE_INTEGER off, newOff;
-        off.QuadPart = 0;
-        newOff.QuadPart = 0;
-        SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT);
-        return newOff.QuadPart;
-    }
-#else
-#   define LONG_SEEK fseek
-#   define LONG_TELL ftell
-#endif
-
-
-/*-*************************************
-*  Parameters: FIO_prefs_t
-***************************************/
-
-/* typedef'd to FIO_prefs_t within fileio.h */
-struct FIO_prefs_s {
-
-    /* Algorithm preferences */
-    FIO_compressionType_t compressionType;
-    U32 sparseFileSupport;   /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */
-    int dictIDFlag;
-    int checksumFlag;
-    int blockSize;
-    int overlapLog;
-    U32 adaptiveMode;
-    U32 useRowMatchFinder;
-    int rsyncable;
-    int minAdaptLevel;
-    int maxAdaptLevel;
-    int ldmFlag;
-    int ldmHashLog;
-    int ldmMinMatch;
-    int ldmBucketSizeLog;
-    int ldmHashRateLog;
-    size_t streamSrcSize;
-    size_t targetCBlockSize;
-    int srcSizeHint;
-    int testMode;
-    ZSTD_paramSwitch_e literalCompressionMode;
-
-    /* IO preferences */
-    U32 removeSrcFile;
-    U32 overwrite;
-    U32 asyncIO;
-
-    /* Computation resources preferences */
-    unsigned memLimit;
-    int nbWorkers;
-
-    int excludeCompressedFiles;
-    int patchFromMode;
-    int contentSize;
-    int allowBlockDevices;
-};
-
 /*-*************************************
 *  Parameters: FIO_ctx_t
 ***************************************/
@@ -563,7 +420,13 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value)
 }
 
 void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) {
+#ifdef ZSTD_MULTITHREAD
     prefs->asyncIO = value;
+#else
+    (void) prefs;
+    (void) value;
+    DISPLAYLEVEL(2, "Note : asyncio is disabled (lack of multithreading support) \n");
+#endif
 }
 
 /* FIO_ctx_t functions */
@@ -2019,124 +1882,15 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
 /* **************************************************************************
  *  Decompression
  ***************************************************************************/
-#define DECOMPRESSION_MAX_WRITE_JOBS    (10)
-
-typedef struct {
-    /* These struct fields should be set only on creation and not changed afterwards */
-    POOL_ctx* writerPool;
-    int totalWriteJobs;
-    FIO_prefs_t* prefs;
-
-    /* Controls the file we currently write to, make changes only by using provided utility functions */
-    FILE* dstFile;
-    unsigned storedSkips;
-
-    /* The jobs and availableWriteJobs fields are access by both the main and writer threads and should
-     * only be mutated after locking the mutex */
-    ZSTD_pthread_mutex_t writeJobsMutex;
-    void* jobs[DECOMPRESSION_MAX_WRITE_JOBS];
-    int availableWriteJobs;
-} write_pool_ctx_t;
-
-typedef struct {
-    /* These fields are automaically set and shouldn't be changed by non WritePool code. */
-    write_pool_ctx_t *ctx;
-    FILE* dstFile;
-    void *buffer;
-    size_t bufferSize;
-
-    /* This field should be changed before a job is queued for execution and should contain the number
-     * of bytes to write from the buffer. */
-    size_t usedBufferSize;
-} write_job_t;
 
 typedef struct {
     void*  srcBuffer;
     size_t srcBufferSize;
     size_t srcBufferLoaded;
     ZSTD_DStream* dctx;
-    write_pool_ctx_t *writePoolCtx;
+    WritePoolCtx_t *writeCtx;
 } dRess_t;
 
-static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) {
-    void *buffer;
-    write_job_t *job;
-    job = (write_job_t*) malloc(sizeof(write_job_t));
-    buffer = malloc(ZSTD_DStreamOutSize());
-    if(!job || !buffer)
-        EXM_THROW(101, "Allocation error : not enough memory");
-    job->buffer = buffer;
-    job->bufferSize = ZSTD_DStreamOutSize();
-    job->usedBufferSize = 0;
-    job->dstFile = NULL;
-    job->ctx = ctx;
-    return job;
-}
-
-/* WritePool_createThreadPool:
- * Creates a thread pool and a mutex for threaded write pool.
- * Displays warning if asyncio is requested but MT isn't available. */
-static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) {
-    ctx->writerPool = NULL;
-    if(prefs->asyncIO) {
-#ifdef ZSTD_MULTITHREAD
-        if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL))
-            EXM_THROW(102, "Failed creating write jobs mutex");
-        /* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to
-         * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
-        assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2);
-        ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2);
-        if (!ctx->writerPool)
-            EXM_THROW(103, "Failed creating writer thread pool");
-#else
-        DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n");
-#endif
-    }
-}
-
-/* WritePool_create:
- * Allocates and sets and a new write pool including its included jobs. */
-static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) {
-    write_pool_ctx_t *ctx;
-    int i;
-    ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t));
-    if(!ctx)
-        EXM_THROW(100, "Allocation error : not enough memory");
-    WritePool_createThreadPool(ctx, prefs);
-    ctx->prefs = prefs;
-    ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1;
-    ctx->availableWriteJobs = ctx->totalWriteJobs;
-    for(i=0; i < ctx->availableWriteJobs; i++) {
-        ctx->jobs[i] = FIO_createWriteJob(ctx);
-    }
-    ctx->storedSkips = 0;
-    ctx->dstFile = NULL;
-    return ctx;
-}
-
-/* WritePool_free:
- * Release a previously allocated write thread pool. Makes sure all takss are done and released. */
-static void WritePool_free(write_pool_ctx_t* ctx) {
-    int i=0;
-    if(ctx->writerPool) {
-        /* Make sure we finish all tasks and then free the resources */
-        POOL_joinJobs(ctx->writerPool);
-        /* Make sure we are not leaking jobs */
-        assert(ctx->availableWriteJobs==ctx->totalWriteJobs);
-        POOL_free(ctx->writerPool);
-        ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex);
-    }
-    assert(ctx->dstFile==NULL);
-    assert(ctx->storedSkips==0);
-    for(i=0; i<ctx->availableWriteJobs; i++) {
-        write_job_t* job = (write_job_t*) ctx->jobs[i];
-        free(job->buffer);
-        free(job);
-    }
-    free(ctx);
-}
-
-
 static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
 {
     dRess_t ress;
@@ -2164,7 +1918,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
         free(dictBuffer);
     }
 
-    ress.writePoolCtx = WritePool_create(prefs);
+    ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
 
     return ress;
 }
@@ -2173,7 +1927,7 @@ static void FIO_freeDResources(dRess_t ress)
 {
     CHECK( ZSTD_freeDStream(ress.dctx) );
     free(ress.srcBuffer);
-    WritePool_free(ress.writePoolCtx);
+    AIO_WritePool_free(ress.writeCtx);
 }
 
 /* FIO_consumeDSrcBuffer:
@@ -2184,205 +1938,6 @@ static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
     memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
 }
 
-/** FIO_fwriteSparse() :
-*  @return : storedSkips,
-*            argument for next call to FIO_fwriteSparse() or FIO_fwriteSparseEnd() */
-static unsigned
-FIO_fwriteSparse(FILE* file,
-                 const void* buffer, size_t bufferSize,
-                 const FIO_prefs_t* const prefs,
-                 unsigned storedSkips)
-{
-    const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
-    size_t bufferSizeT = bufferSize / sizeof(size_t);
-    const size_t* const bufferTEnd = bufferT + bufferSizeT;
-    const size_t* ptrT = bufferT;
-    static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
-
-    if (prefs->testMode) return 0;  /* do not output anything in test mode */
-
-    if (!prefs->sparseFileSupport) {  /* normal write */
-        size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
-        if (sizeCheck != bufferSize)
-            EXM_THROW(70, "Write error : cannot write decoded block : %s",
-                            strerror(errno));
-        return 0;
-    }
-
-    /* avoid int overflow */
-    if (storedSkips > 1 GB) {
-        if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
-            EXM_THROW(91, "1 GB skip error (sparse file support)");
-        storedSkips -= 1 GB;
-    }
-
-    while (ptrT < bufferTEnd) {
-        size_t nb0T;
-
-        /* adjust last segment if < 32 KB */
-        size_t seg0SizeT = segmentSizeT;
-        if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
-        bufferSizeT -= seg0SizeT;
-
-        /* count leading zeroes */
-        for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
-        storedSkips += (unsigned)(nb0T * sizeof(size_t));
-
-        if (nb0T != seg0SizeT) {   /* not all 0s */
-            size_t const nbNon0ST = seg0SizeT - nb0T;
-            /* skip leading zeros */
-            if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
-                EXM_THROW(92, "Sparse skip error ; try --no-sparse");
-            storedSkips = 0;
-            /* write the rest */
-            if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
-                EXM_THROW(93, "Write error : cannot write decoded block : %s",
-                            strerror(errno));
-        }
-        ptrT += seg0SizeT;
-    }
-
-    {   static size_t const maskT = sizeof(size_t)-1;
-        if (bufferSize & maskT) {
-            /* size not multiple of sizeof(size_t) : implies end of block */
-            const char* const restStart = (const char*)bufferTEnd;
-            const char* restPtr = restStart;
-            const char* const restEnd = (const char*)buffer + bufferSize;
-            assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
-            for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
-            storedSkips += (unsigned) (restPtr - restStart);
-            if (restPtr != restEnd) {
-                /* not all remaining bytes are 0 */
-                size_t const restSize = (size_t)(restEnd - restPtr);
-                if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
-                    EXM_THROW(92, "Sparse skip error ; try --no-sparse");
-                if (fwrite(restPtr, 1, restSize, file) != restSize)
-                    EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
-                        strerror(errno));
-                storedSkips = 0;
-    }   }   }
-
-    return storedSkips;
-}
-
-static void
-FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
-{
-    if (prefs->testMode) assert(storedSkips == 0);
-    if (storedSkips>0) {
-        assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
-        (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
-        if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
-            EXM_THROW(69, "Final skip error (sparse file support)");
-        /* last zero must be explicitly written,
-         * so that skipped ones get implicitly translated as zero by FS */
-        {   const char lastZeroByte[1] = { 0 };
-            if (fwrite(lastZeroByte, 1, 1, file) != 1)
-                EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
-    }   }
-}
-
-/* WritePool_releaseWriteJob:
- * Releases an acquired job back to the pool. Doesn't execute the job. */
-static void WritePool_releaseWriteJob(write_job_t *job) {
-    write_pool_ctx_t *ctx = job->ctx;
-    if(ctx->writerPool) {
-        ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
-        assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS);
-        ctx->jobs[ctx->availableWriteJobs++] = job;
-        ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
-    } else {
-        ctx->availableWriteJobs++;
-    }
-}
-
-/* WritePool_acquireWriteJob:
- * Returns an available write job to be used for a future write. */
-static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) {
-    write_job_t *job;
-    assert(ctx->dstFile!=NULL || ctx->prefs->testMode);
-    if(ctx->writerPool) {
-        ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
-        assert(ctx->availableWriteJobs > 0);
-        job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs];
-        ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
-    } else {
-        assert(ctx->availableWriteJobs==1);
-        ctx->availableWriteJobs--;
-        job = (write_job_t*)ctx->jobs[0];
-    }
-    job->usedBufferSize = 0;
-    job->dstFile = ctx->dstFile;
-    return job;
-}
-
-/* WritePool_executeWriteJob:
- * Executes a write job synchronously. Can be used as a function for a thread pool. */
-static void WritePool_executeWriteJob(void* opaque){
-    write_job_t* job = (write_job_t*) opaque;
-    write_pool_ctx_t* ctx = job->ctx;
-    ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips);
-    WritePool_releaseWriteJob(job);
-}
-
-/* WritePool_queueWriteJob:
- * Queues a write job for execution.
- * Make sure to set `usedBufferSize` to the wanted length before call.
- * The queued job shouldn't be used directly after queueing it. */
-static void WritePool_queueWriteJob(write_job_t *job) {
-    write_pool_ctx_t* ctx = job->ctx;
-    if(ctx->writerPool)
-        POOL_add(ctx->writerPool, WritePool_executeWriteJob, job);
-    else
-        WritePool_executeWriteJob(job);
-}
-
-/* WritePool_queueAndReacquireWriteJob:
- * Queues a write job for execution and acquires a new one.
- * After execution `job`'s pointed value would change to the newly acquired job.
- * Make sure to set `usedBufferSize` to the wanted length before call.
- * The queued job shouldn't be used directly after queueing it. */
-static void WritePool_queueAndReacquireWriteJob(write_job_t **job) {
-    WritePool_queueWriteJob(*job);
-    *job = WritePool_acquireWriteJob((*job)->ctx);
-}
-
-/* WritePool_sparseWriteEnd:
- * Ends sparse writes to the current dstFile.
- * Blocks on completion of all current write jobs before executing. */
-static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) {
-    assert(ctx != NULL);
-    if(ctx->writerPool)
-        POOL_joinJobs(ctx->writerPool);
-    FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips);
-    ctx->storedSkips = 0;
-}
-
-/* WritePool_setDstFile:
- * Sets the destination file for future files in the pool.
- * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
- * Also requires ending of sparse write if a previous file was used in sparse mode. */
-static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) {
-    assert(ctx!=NULL);
-    /* We can change the dst file only if we have finished writing */
-    if(ctx->writerPool)
-        POOL_joinJobs(ctx->writerPool);
-    assert(ctx->storedSkips == 0);
-    assert(ctx->availableWriteJobs == ctx->totalWriteJobs);
-    ctx->dstFile = dstFile;
-}
-
-/* WritePool_closeDstFile:
- * Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL.
- * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
-static int WritePool_closeDstFile(write_pool_ctx_t *ctx) {
-    FILE *dstFile = ctx->dstFile;
-    assert(dstFile!=NULL || ctx->prefs->testMode!=0);
-    WritePool_sparseWriteEnd(ctx);
-    WritePool_setDstFile(ctx, NULL);
-    return fclose(dstFile);
-}
-
 /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
     @return : 0 (no error) */
 static int FIO_passThrough(const FIO_prefs_t* const prefs,
@@ -2403,7 +1958,7 @@ static int FIO_passThrough(const FIO_prefs_t* const prefs,
 
     do {
         readFromInput = fread(buffer, 1, blockSize, finput);
-        storedSkips = FIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
+        storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
     } while (readFromInput == blockSize);
     if (ferror(finput)) {
         DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
@@ -2411,7 +1966,7 @@ static int FIO_passThrough(const FIO_prefs_t* const prefs,
     }
     assert(feof(finput));
 
-    FIO_fwriteSparseEnd(prefs, foutput, storedSkips);
+    AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
     return 0;
 }
 
@@ -2458,7 +2013,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
                         U64 alreadyDecoded)  /* for multi-frames streams */
 {
     U64 frameSize = 0;
-    write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+    IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
 
     /* display last 20 characters only */
     {   size_t const srcFileLength = strlen(srcFileName);
@@ -2486,12 +2041,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
             DISPLAYLEVEL(1, "%s : Decoding error (36) : %s \n",
                             srcFileName, ZSTD_getErrorName(readSizeHint));
             FIO_zstdErrorHelp(prefs, ress, readSizeHint, srcFileName);
+            AIO_WritePool_releaseIoJob(writeJob);
             return FIO_ERROR_FRAME_DECODING;
         }
 
         /* Write block */
         writeJob->usedBufferSize = outBuff.pos;
-        WritePool_queueAndReacquireWriteJob(&writeJob);
+        AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
         frameSize += outBuff.pos;
         if (fCtx->nbFilesTotal > 1) {
             size_t srcFileNameSize = strlen(srcFileName);
@@ -2526,8 +2082,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
                 ress->srcBufferLoaded += readSize;
     }   }   }
 
-    WritePool_releaseWriteJob(writeJob);
-    WritePool_sparseWriteEnd(ress->writePoolCtx);
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
 
     return frameSize;
 }
@@ -2541,7 +2097,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
     z_stream strm;
     int flush = Z_NO_FLUSH;
     int decodingError = 0;
-    write_job_t *writeJob = NULL;
+    IOJob_t *writeJob = NULL;
 
     strm.zalloc = Z_NULL;
     strm.zfree = Z_NULL;
@@ -2552,7 +2108,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
     if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK)
         return FIO_ERROR_FRAME_DECODING;
 
-    writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+    writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
     strm.next_out = (Bytef*)writeJob->buffer;
     strm.avail_out = (uInt)writeJob->bufferSize;
     strm.avail_in = (uInt)ress->srcBufferLoaded;
@@ -2578,7 +2134,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
         {   size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
             if (decompBytes) {
                 writeJob->usedBufferSize = decompBytes;
-                WritePool_queueAndReacquireWriteJob(&writeJob);
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 outFileSize += decompBytes;
                 strm.next_out = (Bytef*)writeJob->buffer;
                 strm.avail_out = (uInt)writeJob->bufferSize;
@@ -2594,8 +2150,8 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
         DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName);
         decodingError = 1;
     }
-    WritePool_releaseWriteJob(writeJob);
-    WritePool_sparseWriteEnd(ress->writePoolCtx);
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
     return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
 }
 #endif
@@ -2610,7 +2166,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
     lzma_action action = LZMA_RUN;
     lzma_ret initRet;
     int decodingError = 0;
-    write_job_t *writeJob = NULL;
+    IOJob_t *writeJob = NULL;
 
     strm.next_in = 0;
     strm.avail_in = 0;
@@ -2627,7 +2183,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
         return FIO_ERROR_FRAME_DECODING;
     }
 
-    writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+    writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
     strm.next_out = (Bytef*)writeJob->buffer;
     strm.avail_out = (uInt)writeJob->bufferSize;
     strm.next_in = (BYTE const*)ress->srcBuffer;
@@ -2655,7 +2211,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
         {   size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
             if (decompBytes) {
                 writeJob->usedBufferSize = decompBytes;
-                WritePool_queueAndReacquireWriteJob(&writeJob);
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 outFileSize += decompBytes;
                 strm.next_out = (Bytef*)writeJob->buffer;
                 strm.avail_out = writeJob->bufferSize;
@@ -2665,8 +2221,8 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
 
     FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
     lzma_end(&strm);
-    WritePool_releaseWriteJob(writeJob);
-    WritePool_sparseWriteEnd(ress->writePoolCtx);
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
     return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
 }
 #endif
@@ -2681,13 +2237,15 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
     LZ4F_decompressionContext_t dCtx;
     LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION);
     int decodingError = 0;
-    write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+    IOJob_t *writeJob = NULL;
 
     if (LZ4F_isError(errorCode)) {
         DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n");
         return FIO_ERROR_FRAME_DECODING;
     }
 
+    writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
+
     /* Main Loop */
     for (;nextToLoad;) {
         size_t readSize;
@@ -2724,7 +2282,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
             if (decodedBytes) {
                 UTIL_HumanReadableSize_t hrs;
                 writeJob->usedBufferSize = decodedBytes;
-                WritePool_queueAndReacquireWriteJob(&writeJob);
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 filesize += decodedBytes;
                 hrs = UTIL_makeHumanReadableSize(filesize);
                 DISPLAYUPDATE(2, "\rDecompressed : %.*f%s  ", hrs.precision, hrs.value, hrs.suffix);
@@ -2740,8 +2298,8 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
     }
 
     LZ4F_freeDecompressionContext(dCtx);
-    WritePool_releaseWriteJob(writeJob);
-    WritePool_sparseWriteEnd(ress->writePoolCtx);
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
 
     return decodingError ? FIO_ERROR_FRAME_DECODING : filesize;
 }
@@ -2818,7 +2376,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
 #endif
         } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) {  /* pass-through mode */
             return FIO_passThrough(prefs,
-                                   ress.writePoolCtx->dstFile, srcFile,
+                                   AIO_WritePool_getFile(ress.writeCtx), srcFile,
                                    ress.srcBuffer, ress.srcBufferSize,
                                    ress.srcBufferLoaded);
         } else {
@@ -2856,7 +2414,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
     int releaseDstFile = 0;
     int transferMTime = 0;
 
-    if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) {
+    if ((AIO_WritePool_getFile(ress.writeCtx) == NULL) && (prefs->testMode == 0)) {
         FILE *dstFile;
         int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
         if ( strcmp(srcFileName, stdinmark)   /* special case : don't transfer permissions from stdin */
@@ -2871,7 +2429,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
 
         dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
         if (dstFile==NULL) return 1;
-        WritePool_setDstFile(ress.writePoolCtx, dstFile);
+        AIO_WritePool_setFile(ress.writeCtx, dstFile);
 
         /* Must only be added after FIO_openDstFile() succeeds.
          * Otherwise we may delete the destination file if it already exists,
@@ -2884,7 +2442,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
 
     if (releaseDstFile) {
         clearHandler();
-        if (WritePool_closeDstFile(ress.writePoolCtx)) {
+        if (AIO_WritePool_closeFile(ress.writeCtx)) {
             DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
             result = 1;
         }
@@ -3100,14 +2658,14 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx,
         if (!prefs->testMode) {
             FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
             if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
-            WritePool_setDstFile(ress.writePoolCtx, dstFile);
+            AIO_WritePool_setFile(ress.writeCtx, dstFile);
         }
         for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) {
             status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]);
             if (!status) fCtx->nbFilesProcessed++;
             error |= status;
         }
-        if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx)))
+        if ((!prefs->testMode) && (AIO_WritePool_closeFile(ress.writeCtx)))
             EXM_THROW(72, "Write error : %s : cannot properly close output file",
                         strerror(errno));
     } else {
index 398937a64e825a691db0f972927602f859115f1b..9d6ebb1accb88af6beae92c3179b3ee4a280ef5a 100644 (file)
@@ -13,6 +13,7 @@
 #define FILEIO_H_23981798732
 
 #define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_compressionParameters */
+#include "fileio_types.h"
 #include "../lib/zstd.h"           /* ZSTD_* */
 
 #if defined (__cplusplus)
@@ -53,10 +54,6 @@ extern "C" {
 /*-*************************************
 *  Types
 ***************************************/
-typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t;
-
-typedef struct FIO_prefs_s FIO_prefs_t;
-
 FIO_prefs_t* FIO_createPreferences(void);
 void FIO_freePreferences(FIO_prefs_t* const prefs);
 
@@ -66,9 +63,6 @@ typedef struct FIO_ctx_s FIO_ctx_t;
 FIO_ctx_t* FIO_createContext(void);
 void FIO_freeContext(FIO_ctx_t* const fCtx);
 
-typedef struct FIO_display_prefs_s FIO_display_prefs_t;
-
-typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e;
 
 /*-*************************************
 *  Parameters
diff --git a/programs/fileio_asyncio.c b/programs/fileio_asyncio.c
new file mode 100644 (file)
index 0000000..868720a
--- /dev/null
@@ -0,0 +1,365 @@
+/*
+ * Copyright (c) Yann Collet, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+#include "platform.h"
+#include <stdio.h>      /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
+#include <stdlib.h>     /* malloc, free */
+#include <assert.h>
+#include <errno.h>      /* errno */
+
+#if defined (_MSC_VER)
+#  include <sys/stat.h>
+#  include <io.h>
+#endif
+
+#include "fileio_asyncio.h"
+#include "fileio_common.h"
+
+/* **********************************************************************
+ *  Sparse write
+ ************************************************************************/
+
+/** AIO_fwriteSparse() :
+*  @return : storedSkips,
+*            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
+unsigned AIO_fwriteSparse(FILE* file,
+                 const void* buffer, size_t bufferSize,
+                 const FIO_prefs_t* const prefs,
+                 unsigned storedSkips)
+{
+    const size_t* const bufferT = (const size_t*)buffer;   /* Buffer is supposed malloc'ed, hence aligned on size_t */
+    size_t bufferSizeT = bufferSize / sizeof(size_t);
+    const size_t* const bufferTEnd = bufferT + bufferSizeT;
+    const size_t* ptrT = bufferT;
+    static const size_t segmentSizeT = (32 KB) / sizeof(size_t);   /* check every 32 KB */
+
+    if (prefs->testMode) return 0;  /* do not output anything in test mode */
+
+    if (!prefs->sparseFileSupport) {  /* normal write */
+        size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
+        if (sizeCheck != bufferSize)
+            EXM_THROW(70, "Write error : cannot write decoded block : %s",
+                      strerror(errno));
+        return 0;
+    }
+
+    /* avoid int overflow */
+    if (storedSkips > 1 GB) {
+        if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
+        EXM_THROW(91, "1 GB skip error (sparse file support)");
+        storedSkips -= 1 GB;
+    }
+
+    while (ptrT < bufferTEnd) {
+        size_t nb0T;
+
+        /* adjust last segment if < 32 KB */
+        size_t seg0SizeT = segmentSizeT;
+        if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
+        bufferSizeT -= seg0SizeT;
+
+        /* count leading zeroes */
+        for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
+        storedSkips += (unsigned)(nb0T * sizeof(size_t));
+
+        if (nb0T != seg0SizeT) {   /* not all 0s */
+            size_t const nbNon0ST = seg0SizeT - nb0T;
+            /* skip leading zeros */
+            if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
+                EXM_THROW(92, "Sparse skip error ; try --no-sparse");
+            storedSkips = 0;
+            /* write the rest */
+            if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
+                EXM_THROW(93, "Write error : cannot write decoded block : %s",
+                          strerror(errno));
+        }
+        ptrT += seg0SizeT;
+    }
+
+    {   static size_t const maskT = sizeof(size_t)-1;
+        if (bufferSize & maskT) {
+            /* size not multiple of sizeof(size_t) : implies end of block */
+            const char* const restStart = (const char*)bufferTEnd;
+            const char* restPtr = restStart;
+            const char* const restEnd = (const char*)buffer + bufferSize;
+            assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
+            for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
+            storedSkips += (unsigned) (restPtr - restStart);
+            if (restPtr != restEnd) {
+                /* not all remaining bytes are 0 */
+                size_t const restSize = (size_t)(restEnd - restPtr);
+                if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
+                    EXM_THROW(92, "Sparse skip error ; try --no-sparse");
+                if (fwrite(restPtr, 1, restSize, file) != restSize)
+                    EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
+                              strerror(errno));
+                storedSkips = 0;
+            }   }   }
+
+    return storedSkips;
+}
+
+void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
+{
+    if (prefs->testMode) assert(storedSkips == 0);
+    if (storedSkips>0) {
+        assert(prefs->sparseFileSupport > 0);  /* storedSkips>0 implies sparse support is enabled */
+        (void)prefs;   /* assert can be disabled, in which case prefs becomes unused */
+        if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
+            EXM_THROW(69, "Final skip error (sparse file support)");
+        /* last zero must be explicitly written,
+         * so that skipped ones get implicitly translated as zero by FS */
+        {   const char lastZeroByte[1] = { 0 };
+            if (fwrite(lastZeroByte, 1, 1, file) != 1)
+                EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
+        }   }
+}
+
+
+/* **********************************************************************
+ *  AsyncIO functionality
+ ************************************************************************/
+
+/* ***********************************
+ *  General IoPool implementation
+ *************************************/
+
+static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
+    void *buffer;
+    IOJob_t *job;
+    job = (IOJob_t*) malloc(sizeof(IOJob_t));
+    buffer = malloc(bufferSize);
+    if(!job || !buffer)
+    EXM_THROW(101, "Allocation error : not enough memory");
+    job->buffer = buffer;
+    job->bufferSize = bufferSize;
+    job->usedBufferSize = 0;
+    job->file = NULL;
+    job->ctx = ctx;
+    job->offset = 0;
+    return job;
+}
+
+
+/* AIO_IOPool_createThreadPool:
+ * Creates a thread pool and a mutex for threaded IO pool.
+ * Displays warning if asyncio is requested but MT isn't available. */
+static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
+    ctx->threadPool = NULL;
+    if(prefs->asyncIO) {
+        if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
+        EXM_THROW(102,"Failed creating write availableJobs mutex");
+        /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
+         * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
+        assert(MAX_IO_JOBS >= 2);
+        ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
+        if (!ctx->threadPool)
+        EXM_THROW(104, "Failed creating writer thread pool");
+    }
+}
+
+/* AIO_IOPool_init:
+ * Allocates and sets and a new write pool including its included availableJobs. */
+static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
+    int i;
+    AIO_IOPool_createThreadPool(ctx, prefs);
+    ctx->prefs = prefs;
+    ctx->poolFunction = poolFunction;
+    ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
+    ctx->availableJobsCount = ctx->totalIoJobs;
+    for(i=0; i < ctx->availableJobsCount; i++) {
+        ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
+    }
+    ctx->file = NULL;
+}
+
+
+/* AIO_IOPool_releaseIoJob:
+ * Releases an acquired job back to the pool. Doesn't execute the job. */
+static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
+    IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
+    if(ctx->threadPool) {
+        ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+        assert(ctx->availableJobsCount < MAX_IO_JOBS);
+        ctx->availableJobs[ctx->availableJobsCount++] = job;
+        ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+    } else {
+        assert(ctx->availableJobsCount == 0);
+        ctx->availableJobsCount++;
+    }
+}
+
+/* AIO_IOPool_join:
+ * Waits for all tasks in the pool to finish executing. */
+static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
+    if(ctx->threadPool)
+        POOL_joinJobs(ctx->threadPool);
+}
+
+/* AIO_IOPool_free:
+ * Release a previously allocated write thread pool. Makes sure all takss are done and released. */
+static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
+    int i;
+    if(ctx->threadPool) {
+        /* Make sure we finish all tasks and then free the resources */
+        AIO_IOPool_join(ctx);
+        /* Make sure we are not leaking availableJobs */
+        assert(ctx->availableJobsCount == ctx->totalIoJobs);
+        POOL_free(ctx->threadPool);
+        ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
+    }
+    assert(ctx->file == NULL);
+    for(i=0; i<ctx->availableJobsCount; i++) {
+        IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
+        free(job->buffer);
+        free(job);
+    }
+}
+
+/* AIO_IOPool_acquireJob:
+ * Returns an available io job to be used for a future io. */
+static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
+    IOJob_t *job;
+    assert(ctx->file != NULL || ctx->prefs->testMode);
+    if(ctx->threadPool) {
+        ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+        assert(ctx->availableJobsCount > 0);
+        job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
+        ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+    } else {
+        assert(ctx->availableJobsCount == 1);
+        ctx->availableJobsCount--;
+        job = (IOJob_t*)ctx->availableJobs[0];
+    }
+    job->usedBufferSize = 0;
+    job->file = ctx->file;
+    job->offset = 0;
+    return job;
+}
+
+
+/* AIO_IOPool_setFile:
+ * Sets the destination file for future files in the pool.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
+ * Also requires ending of sparse write if a previous file was used in sparse mode. */
+static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
+    assert(ctx!=NULL);
+    AIO_IOPool_join(ctx);
+    assert(ctx->availableJobsCount == ctx->totalIoJobs);
+    ctx->file = file;
+}
+
+static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
+    return ctx->file;
+}
+
+/* AIO_IOPool_enqueueJob:
+ * Enqueues an io job for execution.
+ * The queued job shouldn't be used directly after queueing it. */
+static void AIO_IOPool_enqueueJob(IOJob_t *job) {
+    IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
+    if(ctx->threadPool)
+        POOL_add(ctx->threadPool, ctx->poolFunction, job);
+    else
+        ctx->poolFunction(job);
+}
+
+/* ***********************************
+ *  WritePool implementation
+ *************************************/
+
+/* AIO_WritePool_acquireJob:
+ * Returns an available write job to be used for a future write. */
+IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
+    return AIO_IOPool_acquireJob(&ctx->base);
+}
+
+/* AIO_WritePool_enqueueAndReacquireWriteJob:
+ * Queues a write job for execution and acquires a new one.
+ * After execution `job`'s pointed value would change to the newly acquired job.
+ * Make sure to set `usedBufferSize` to the wanted length before call.
+ * The queued job shouldn't be used directly after queueing it. */
+void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
+    AIO_IOPool_enqueueJob(*job);
+    *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
+}
+
+/* AIO_WritePool_sparseWriteEnd:
+ * Ends sparse writes to the current file.
+ * Blocks on completion of all current write jobs before executing. */
+void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
+    assert(ctx != NULL);
+    if(ctx->base.threadPool)
+        POOL_joinJobs(ctx->base.threadPool);
+    AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
+    ctx->storedSkips = 0;
+}
+
+/* AIO_WritePool_setFile:
+ * Sets the destination file for future writes in the pool.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
+ * Also requires ending of sparse write if a previous file was used in sparse mode. */
+void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
+    AIO_IOPool_setFile(&ctx->base, file);
+    assert(ctx->storedSkips == 0);
+}
+
+/* AIO_WritePool_getFile:
+ * Returns the file the writePool is currently set to write to. */
+FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
+    return AIO_IOPool_getFile(&ctx->base);
+}
+
+/* AIO_WritePool_releaseIoJob:
+ * Releases an acquired job back to the pool. Doesn't execute the job. */
+void AIO_WritePool_releaseIoJob(IOJob_t *job) {
+    AIO_IOPool_releaseIoJob(job);
+}
+
+/* AIO_WritePool_closeFile:
+ * Ends sparse write and closes the writePool's current file and sets the file to NULL.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
+int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
+    FILE *dstFile = ctx->base.file;
+    assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
+    AIO_WritePool_sparseWriteEnd(ctx);
+    AIO_IOPool_setFile(&ctx->base, NULL);
+    return fclose(dstFile);
+}
+
+/* AIO_WritePool_executeWriteJob:
+ * Executes a write job synchronously. Can be used as a function for a thread pool. */
+static void AIO_WritePool_executeWriteJob(void* opaque){
+    IOJob_t* job = (IOJob_t*) opaque;
+    WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
+    ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
+    AIO_IOPool_releaseIoJob(job);
+}
+
+/* AIO_WritePool_create:
+ * Allocates and sets and a new write pool including its included jobs. */
+WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
+    WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
+    if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
+    AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
+    ctx->storedSkips = 0;
+    return ctx;
+}
+
+/* AIO_WritePool_free:
+ * Frees and releases a writePool and its resources. Closes destination file if needs to. */
+void AIO_WritePool_free(WritePoolCtx_t* ctx) {
+    /* Make sure we finish all tasks and then free the resources */
+    if(AIO_WritePool_getFile(ctx))
+        AIO_WritePool_closeFile(ctx);
+    AIO_IOPool_destroy(&ctx->base);
+    assert(ctx->storedSkips==0);
+    free(ctx);
+}
diff --git a/programs/fileio_asyncio.h b/programs/fileio_asyncio.h
new file mode 100644 (file)
index 0000000..3e91164
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) Yann Collet, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+#ifndef ZSTD_FILEIO_ASYNCIO_H
+#define ZSTD_FILEIO_ASYNCIO_H
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+#include "../lib/common/mem.h"     /* U32, U64 */
+#include "fileio_types.h"
+#include "platform.h"
+#include "util.h"
+#include "../lib/common/pool.h"
+#include "../lib/common/threading.h"
+
+#define MAX_IO_JOBS          (10)
+
+typedef struct {
+    /* These struct fields should be set only on creation and not changed afterwards */
+    POOL_ctx* threadPool;
+    int totalIoJobs;
+    FIO_prefs_t* prefs;
+    POOL_function poolFunction;
+
+    /* Controls the file we currently write to, make changes only by using provided utility functions */
+    FILE* file;
+
+    /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
+     * only be mutated after locking the mutex */
+    ZSTD_pthread_mutex_t ioJobsMutex;
+    void* availableJobs[MAX_IO_JOBS];
+    int availableJobsCount;
+} IOPoolCtx_t;
+
+typedef struct {
+    IOPoolCtx_t base;
+    unsigned storedSkips;
+} WritePoolCtx_t;
+
+typedef struct {
+    /* These fields are automatically set and shouldn't be changed by non WritePool code. */
+    void *ctx;
+    FILE* file;
+    void *buffer;
+    size_t bufferSize;
+
+    /* This field should be changed before a job is queued for execution and should contain the number
+     * of bytes to write from the buffer. */
+    size_t usedBufferSize;
+    U64 offset;
+} IOJob_t;
+
+/** AIO_fwriteSparse() :
+*  @return : storedSkips,
+*            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
+unsigned AIO_fwriteSparse(FILE* file,
+                 const void* buffer, size_t bufferSize,
+                 const FIO_prefs_t* const prefs,
+                 unsigned storedSkips);
+
+void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);
+
+/* AIO_WritePool_releaseIoJob:
+ * Releases an acquired job back to the pool. Doesn't execute the job. */
+void AIO_WritePool_releaseIoJob(IOJob_t *job);
+
+/* AIO_WritePool_acquireJob:
+ * Returns an available write job to be used for a future write. */
+IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
+
+/* AIO_WritePool_enqueueAndReacquireWriteJob:
+ * Enqueues a write job for execution and acquires a new one.
+ * After execution `job`'s pointed value would change to the newly acquired job.
+ * Make sure to set `usedBufferSize` to the wanted length before call.
+ * The queued job shouldn't be used directly after queueing it. */
+void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
+
+/* AIO_WritePool_sparseWriteEnd:
+ * Ends sparse writes to the current file.
+ * Blocks on completion of all current write jobs before executing. */
+void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
+
+/* AIO_WritePool_setFile:
+ * Sets the destination file for future writes in the pool.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
+ * Also requires ending of sparse write if a previous file was used in sparse mode. */
+void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
+
+/* AIO_WritePool_getFile:
+ * Returns the file the writePool is currently set to write to. */
+FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
+
+/* AIO_WritePool_closeFile:
+ * Ends sparse write and closes the writePool's current file and sets the file to NULL.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
+int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
+
+/* AIO_WritePool_create:
+ * Allocates and sets and a new write pool including its included jobs.
+ * bufferSize should be set to the maximal buffer we want to write to at a time. */
+WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
+
+/* AIO_WritePool_free:
+ * Frees and releases a writePool and its resources. Closes destination file. */
+void AIO_WritePool_free(WritePoolCtx_t* ctx);
+
+#if defined (__cplusplus)
+}
+#endif
+
+#endif /* ZSTD_FILEIO_ASYNCIO_H */
diff --git a/programs/fileio_common.h b/programs/fileio_common.h
new file mode 100644 (file)
index 0000000..d33c19d
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) Yann Collet, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+#ifndef ZSTD_FILEIO_COMMON_H
+#define ZSTD_FILEIO_COMMON_H
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+#include "../lib/common/mem.h"     /* U32, U64 */
+#include "fileio_types.h"
+#include "platform.h"
+#include "timefn.h"     /* UTIL_getTime, UTIL_clockSpanMicro */
+
+/*-*************************************
+*  Macros
+***************************************/
+#define KB *(1 <<10)
+#define MB *(1 <<20)
+#define GB *(1U<<30)
+#undef MAX
+#define MAX(a,b) ((a)>(b) ? (a) : (b))
+
+extern FIO_display_prefs_t g_display_prefs;
+
+#define DISPLAY(...)         fprintf(stderr, __VA_ARGS__)
+#define DISPLAYOUT(...)      fprintf(stdout, __VA_ARGS__)
+#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
+
+extern UTIL_time_t g_displayClock;
+
+#define REFRESH_RATE  ((U64)(SEC_TO_MICRO / 6))
+#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE)
+#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
+#define DISPLAYUPDATE(l, ...) {                              \
+        if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
+            if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
+                DELAY_NEXT_UPDATE();                         \
+                DISPLAY(__VA_ARGS__);                        \
+                if (g_display_prefs.displayLevel>=4) fflush(stderr);       \
+    }   }   }
+
+#undef MIN  /* in case it would be already defined */
+#define MIN(a,b)    ((a) < (b) ? (a) : (b))
+
+
+#define EXM_THROW(error, ...)                                             \
+{                                                                         \
+    DISPLAYLEVEL(1, "zstd: ");                                            \
+    DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \
+    DISPLAYLEVEL(1, "error %i : ", error);                                \
+    DISPLAYLEVEL(1, __VA_ARGS__);                                         \
+    DISPLAYLEVEL(1, " \n");                                               \
+    exit(error);                                                          \
+}
+
+#define CHECK_V(v, f)                                \
+    v = f;                                           \
+    if (ZSTD_isError(v)) {                           \
+        DISPLAYLEVEL(5, "%s \n", #f);                \
+        EXM_THROW(11, "%s", ZSTD_getErrorName(v));   \
+    }
+#define CHECK(f) { size_t err; CHECK_V(err, f); }
+
+
+/* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW */
+#if defined(_MSC_VER) && _MSC_VER >= 1400
+#   define LONG_SEEK _fseeki64
+#   define LONG_TELL _ftelli64
+#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */
+#  define LONG_SEEK fseeko
+#  define LONG_TELL ftello
+#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__)
+#   define LONG_SEEK fseeko64
+#   define LONG_TELL ftello64
+#elif defined(_WIN32) && !defined(__DJGPP__)
+#   include <windows.h>
+    static int LONG_SEEK(FILE* file, __int64 offset, int origin) {
+        LARGE_INTEGER off;
+        DWORD method;
+        off.QuadPart = offset;
+        if (origin == SEEK_END)
+            method = FILE_END;
+        else if (origin == SEEK_CUR)
+            method = FILE_CURRENT;
+        else
+            method = FILE_BEGIN;
+
+        if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method))
+            return 0;
+        else
+            return -1;
+    }
+    static __int64 LONG_TELL(FILE* file) {
+        LARGE_INTEGER off, newOff;
+        off.QuadPart = 0;
+        newOff.QuadPart = 0;
+        SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT);
+        return newOff.QuadPart;
+    }
+#else
+#   define LONG_SEEK fseek
+#   define LONG_TELL ftell
+#endif
+
+#if defined (__cplusplus)
+}
+#endif
+#endif //ZSTD_FILEIO_COMMON_H
diff --git a/programs/fileio_types.h b/programs/fileio_types.h
new file mode 100644 (file)
index 0000000..1909ab1
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) Yann Collet, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ * You may select, at your option, one of the above-listed licenses.
+ */
+
+#ifndef FILEIO_TYPES_HEADER
+#define FILEIO_TYPES_HEADER
+
+#define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_compressionParameters */
+#include "../lib/zstd.h"           /* ZSTD_* */
+
+/*-*************************************
+*  Parameters: FIO_prefs_t
+***************************************/
+
+typedef struct FIO_display_prefs_s FIO_display_prefs_t;
+
+typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e;
+
+struct FIO_display_prefs_s {
+    int displayLevel;   /* 0 : no display;  1: errors;  2: + result + interaction + warnings;  3: + progression;  4: + information */
+    FIO_progressSetting_e progressSetting;
+};
+
+
+typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t;
+
+typedef struct FIO_prefs_s {
+
+    /* Algorithm preferences */
+    FIO_compressionType_t compressionType;
+    U32 sparseFileSupport;   /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */
+    int dictIDFlag;
+    int checksumFlag;
+    int blockSize;
+    int overlapLog;
+    U32 adaptiveMode;
+    U32 useRowMatchFinder;
+    int rsyncable;
+    int minAdaptLevel;
+    int maxAdaptLevel;
+    int ldmFlag;
+    int ldmHashLog;
+    int ldmMinMatch;
+    int ldmBucketSizeLog;
+    int ldmHashRateLog;
+    size_t streamSrcSize;
+    size_t targetCBlockSize;
+    int srcSizeHint;
+    int testMode;
+    ZSTD_paramSwitch_e literalCompressionMode;
+
+    /* IO preferences */
+    U32 removeSrcFile;
+    U32 overwrite;
+    U32 asyncIO;
+
+    /* Computation resources preferences */
+    unsigned memLimit;
+    int nbWorkers;
+
+    int excludeCompressedFiles;
+    int patchFromMode;
+    int contentSize;
+    int allowBlockDevices;
+} FIO_prefs_t;
+
+#endif /* FILEIO_TYPES_HEADER */
\ No newline at end of file