FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto};
UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
+/* *************************************
+* Synchronous compression IO helpers
+***************************************/
+typedef struct {
+ const FIO_prefs_t* prefs;
+ FILE* srcFile;
+ FILE* dstFile;
+ unsigned storedSkips;
+ U8* inBuffer;
+ size_t inCapacity;
+ U8* srcBuffer;
+ size_t srcBufferLoaded;
+ U8* outBuffer;
+ size_t outCapacity;
+} FIO_SyncCompressIO;
+
+static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io,
+ const FIO_prefs_t* prefs,
+ size_t inCapacity,
+ size_t outCapacity);
+static void FIO_SyncCompressIO_free(FIO_SyncCompressIO* io);
+static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file);
+static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io);
+static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file);
+static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io);
+static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave);
+static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n);
+static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size);
+static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io);
+
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
#include "../lib/zstd.h"
#include "../lib/zstd_errors.h" /* ZSTD_error_frameParameter_windowTooLarge */
#define TEMPORARY_FILE_PERMISSIONS (0600)
#endif
+static unsigned FIO_sparseWrite(FILE* file,
+ const void* buffer, size_t bufferSize,
+ const FIO_prefs_t* const prefs,
+ unsigned storedSkips)
+{
+ const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
+ size_t bufferSizeT = bufferSize / sizeof(size_t);
+ const size_t* const bufferTEnd = bufferT + bufferSizeT;
+ const size_t* ptrT = bufferT;
+ static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
+
+ if (prefs->testMode) return 0; /* do not output anything in test mode */
+
+ if (!prefs->sparseFileSupport) { /* normal write */
+ size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
+ if (sizeCheck != bufferSize)
+ EXM_THROW(70, "Write error : cannot write block : %s",
+ strerror(errno));
+ return 0;
+ }
+
+ /* avoid int overflow */
+ if (storedSkips > 1 GB) {
+ if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
+ EXM_THROW(91, "1 GB skip error (sparse file support)");
+ storedSkips -= 1 GB;
+ }
+
+ while (ptrT < bufferTEnd) {
+ size_t nb0T;
+
+ /* adjust last segment if < 32 KB */
+ size_t seg0SizeT = segmentSizeT;
+ if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
+ bufferSizeT -= seg0SizeT;
+
+ /* count leading zeroes */
+ for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
+ storedSkips += (unsigned)(nb0T * sizeof(size_t));
+
+ if (nb0T != seg0SizeT) { /* not all 0s */
+ size_t const nbNon0ST = seg0SizeT - nb0T;
+ /* skip leading zeros */
+ if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
+ EXM_THROW(92, "Sparse skip error ; try --no-sparse");
+ storedSkips = 0;
+ /* write the rest */
+ if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
+ EXM_THROW(93, "Write error : cannot write block : %s",
+ strerror(errno));
+ }
+ ptrT += seg0SizeT;
+ }
+
+ { static size_t const maskT = sizeof(size_t)-1;
+ if (bufferSize & maskT) {
+ /* size not multiple of sizeof(size_t) : implies end of block */
+ const char* const restStart = (const char*)bufferTEnd;
+ const char* restPtr = restStart;
+ const char* const restEnd = (const char*)buffer + bufferSize;
+ assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
+ for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
+ storedSkips += (unsigned) (restPtr - restStart);
+ if (restPtr != restEnd) {
+ /* not all remaining bytes are 0 */
+ size_t const restSize = (size_t)(restEnd - restPtr);
+ if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
+ EXM_THROW(92, "Sparse skip error ; try --no-sparse");
+ if (fwrite(restPtr, 1, restSize, file) != restSize)
+ EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
+ strerror(errno));
+ storedSkips = 0;
+ } } }
+
+ return storedSkips;
+}
+
+static void FIO_sparseWriteEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
+{
+ if (file == NULL) return;
+ if (prefs->testMode) {
+ assert(storedSkips == 0);
+ return;
+ }
+ if (storedSkips>0) {
+ assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
+ if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
+ EXM_THROW(69, "Final skip error (sparse file support)");
+ /* last zero must be explicitly written,
+ * so that skipped ones get implicitly translated as zero by FS */
+ { const char lastZeroByte[1] = { 0 };
+ if (fwrite(lastZeroByte, 1, 1, file) != 1)
+ EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
+ }
+ }
+}
+
+static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io,
+ const FIO_prefs_t* prefs,
+ size_t inCapacity,
+ size_t outCapacity)
+{
+ memset(io, 0, sizeof(*io));
+ io->prefs = prefs;
+ io->inCapacity = inCapacity;
+ io->outCapacity = outCapacity;
+ io->inBuffer = (U8*)malloc(inCapacity);
+ if (!io->inBuffer)
+ EXM_THROW(101, "Allocation error : not enough memory");
+ io->outBuffer = (U8*)malloc(outCapacity);
+ if (!io->outBuffer) {
+ free(io->inBuffer);
+ io->inBuffer = NULL;
+ EXM_THROW(101, "Allocation error : not enough memory");
+ }
+ io->srcBuffer = io->inBuffer;
+ io->srcBufferLoaded = 0;
+}
+
+static void FIO_SyncCompressIO_free(FIO_SyncCompressIO* io)
+{
+ if (!io) return;
+ free(io->inBuffer);
+ free(io->outBuffer);
+ io->inBuffer = NULL;
+ io->outBuffer = NULL;
+ io->srcBuffer = NULL;
+ io->srcBufferLoaded = 0;
+ io->srcFile = NULL;
+ io->dstFile = NULL;
+ io->storedSkips = 0;
+}
+
+static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file)
+{
+ io->srcFile = file;
+ io->srcBuffer = io->inBuffer;
+ io->srcBufferLoaded = 0;
+}
+
+static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io)
+{
+ io->srcFile = NULL;
+ io->srcBuffer = io->inBuffer;
+ io->srcBufferLoaded = 0;
+}
+
+static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file)
+{
+ io->dstFile = file;
+ io->storedSkips = 0;
+}
+
+static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io)
+{
+ int result = 0;
+ if (io->dstFile != NULL) {
+ FIO_SyncCompressIO_finish(io);
+ result = fclose(io->dstFile);
+ io->dstFile = NULL;
+ }
+ return result;
+}
+
+static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave)
+{
+ size_t added = 0;
+ if (io->srcFile == NULL)
+ return 0;
+
+ if (minToHave > io->inCapacity)
+ minToHave = io->inCapacity;
+
+ if (io->srcBufferLoaded >= minToHave)
+ return 0;
+
+ if (io->srcBuffer != io->inBuffer) {
+ if (io->srcBufferLoaded > 0)
+ memmove(io->inBuffer, io->srcBuffer, io->srcBufferLoaded);
+ io->srcBuffer = io->inBuffer;
+ }
+
+ while (io->srcBufferLoaded < minToHave) {
+ size_t const toRead = io->inCapacity - io->srcBufferLoaded;
+ size_t const readBytes = fread(io->inBuffer + io->srcBufferLoaded, 1, toRead, io->srcFile);
+ if (readBytes == 0) {
+ if (ferror(io->srcFile))
+ EXM_THROW(37, "Read error");
+ break; /* EOF */
+ }
+ io->srcBufferLoaded += readBytes;
+ added += readBytes;
+ if (readBytes < toRead)
+ break;
+ }
+
+ return added;
+}
+
+static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n)
+{
+ assert(n <= io->srcBufferLoaded);
+ io->srcBuffer += n;
+ io->srcBufferLoaded -= n;
+ if (io->srcBufferLoaded == 0)
+ io->srcBuffer = io->inBuffer;
+}
+
+static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size)
+{
+ if (size == 0)
+ return;
+ if (io->dstFile == NULL) {
+ assert(io->prefs->testMode);
+ return;
+ }
+ io->storedSkips = FIO_sparseWrite(io->dstFile, buffer, size, io->prefs, io->storedSkips);
+}
+
+static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io)
+{
+ if (io->dstFile == NULL)
+ return;
+ FIO_sparseWriteEnd(io->prefs, io->dstFile, io->storedSkips);
+ io->storedSkips = 0;
+}
+
/*-************************************
* Signal (Ctrl-C trapping)
**************************************/
const char* dictFileName;
stat_t dictFileStat;
ZSTD_CStream* cctx;
- WritePoolCtx_t *writeCtx;
- ReadPoolCtx_t *readCtx;
+ FIO_SyncCompressIO io;
} cRess_t;
/** ZSTD_cycleLog() :
dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict;
FIO_initDict(&ress.dict, dictFileName, prefs, &ress.dictFileStat, dictBufferType); /* works with dictFileName==NULL */
- {
- /* Compression paths stay synchronous for now: lower overhead and easier upkeep. */
- int const savedAsyncIO = prefs->asyncIO;
- prefs->asyncIO = 0;
- ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
- ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
- prefs->asyncIO = savedAsyncIO;
- }
+ FIO_SyncCompressIO_init(&ress.io, prefs, ZSTD_CStreamInSize(), ZSTD_CStreamOutSize());
/* Advanced parameters, including dictionary */
if (dictFileName && (ress.dict.dictBuffer==NULL))
static void FIO_freeCResources(cRess_t* const ress)
{
FIO_freeDict(&(ress->dict));
- AIO_WritePool_free(ress->writeCtx);
- AIO_ReadPool_free(ress->readCtx);
+ FIO_SyncCompressIO_free(&ress->io);
ZSTD_freeCStream(ress->cctx); /* never fails */
}
#ifdef ZSTD_GZCOMPRESS
static unsigned long long
-FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but not changed */
+FIO_compressGzFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize)
{
+ FIO_SyncCompressIO* const io = &ress->io;
unsigned long long inFileSize = 0, outFileSize = 0;
z_stream strm;
- IOJob_t *writeJob = NULL;
if (compressionLevel > Z_BEST_COMPRESSION)
compressionLevel = Z_BEST_COMPRESSION;
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
} }
- writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_in = 0;
strm.avail_in = 0;
- strm.next_out = (Bytef*)writeJob->buffer;
- strm.avail_out = (uInt)writeJob->bufferSize;
+ strm.next_out = (Bytef*)io->outBuffer;
+ strm.avail_out = (uInt)io->outCapacity;
while (1) {
int ret;
if (strm.avail_in == 0) {
- AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
- if (ress->readCtx->srcBufferLoaded == 0) break;
- inFileSize += ress->readCtx->srcBufferLoaded;
- strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
- strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
+ size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize());
+ if (io->srcBufferLoaded == 0) break;
+ inFileSize += added;
+ *readsize += added;
+ strm.next_in = (z_const unsigned char*)io->srcBuffer;
+ strm.avail_in = (uInt)io->srcBufferLoaded;
}
{
size_t const availBefore = strm.avail_in;
ret = deflate(&strm, Z_NO_FLUSH);
- AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
+ FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in);
}
if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
- { size_t const cSize = writeJob->bufferSize - strm.avail_out;
+ { size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out);
if (cSize) {
- writeJob->usedBufferSize = cSize;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize);
outFileSize += cSize;
- strm.next_out = (Bytef*)writeJob->buffer;
- strm.avail_out = (uInt)writeJob->bufferSize;
+ strm.next_out = (Bytef*)io->outBuffer;
+ strm.avail_out = (uInt)io->outCapacity;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE_PROGRESS(
while (1) {
int const ret = deflate(&strm, Z_FINISH);
- { size_t const cSize = writeJob->bufferSize - strm.avail_out;
+ { size_t const cSize = (size_t)((uInt)io->outCapacity - strm.avail_out);
if (cSize) {
- writeJob->usedBufferSize = cSize;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, cSize);
outFileSize += cSize;
- strm.next_out = (Bytef*)writeJob->buffer;
- strm.avail_out = (uInt)writeJob->bufferSize;
+ strm.next_out = (Bytef*)io->outBuffer;
+ strm.avail_out = (uInt)io->outCapacity;
} }
if (ret == Z_STREAM_END) break;
if (ret != Z_BUF_ERROR)
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
} }
*readsize = inFileSize;
- AIO_WritePool_releaseIoJob(writeJob);
- AIO_WritePool_sparseWriteEnd(ress->writeCtx);
+ FIO_SyncCompressIO_finish(io);
return outFileSize;
}
#endif
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize, int plain_lzma)
{
+ FIO_SyncCompressIO* const io = &ress->io;
unsigned long long inFileSize = 0, outFileSize = 0;
lzma_stream strm = LZMA_STREAM_INIT;
lzma_action action = LZMA_RUN;
lzma_ret ret;
- IOJob_t *writeJob = NULL;
if (compressionLevel < 0) compressionLevel = 0;
if (compressionLevel > 9) compressionLevel = 9;
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
}
- writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
- strm.next_out = (BYTE*)writeJob->buffer;
- strm.avail_out = writeJob->bufferSize;
+ strm.next_out = (BYTE*)io->outBuffer;
+ strm.avail_out = io->outCapacity;
strm.next_in = 0;
strm.avail_in = 0;
while (1) {
if (strm.avail_in == 0) {
- size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
- if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
- inFileSize += inSize;
- strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
- strm.avail_in = ress->readCtx->srcBufferLoaded;
+ size_t const added = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize());
+ if (io->srcBufferLoaded == 0) action = LZMA_FINISH;
+ inFileSize += added;
+ *readsize += added;
+ strm.next_in = (BYTE const*)io->srcBuffer;
+ strm.avail_in = io->srcBufferLoaded;
}
{
size_t const availBefore = strm.avail_in;
ret = lzma_code(&strm, action);
- AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
+ FIO_SyncCompressIO_consumeBytes(io, availBefore - strm.avail_in);
}
-
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
- { size_t const compBytes = writeJob->bufferSize - strm.avail_out;
+ { size_t const compBytes = io->outCapacity - strm.avail_out;
if (compBytes) {
- writeJob->usedBufferSize = compBytes;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, compBytes);
outFileSize += compBytes;
- strm.next_out = (BYTE*)writeJob->buffer;
- strm.avail_out = writeJob->bufferSize;
+ strm.next_out = (BYTE*)io->outBuffer;
+ strm.avail_out = io->outCapacity;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%",
lzma_end(&strm);
*readsize = inFileSize;
- AIO_WritePool_releaseIoJob(writeJob);
- AIO_WritePool_sparseWriteEnd(ress->writeCtx);
+ FIO_SyncCompressIO_finish(io);
return outFileSize;
}
int compressionLevel, int checksumFlag,
U64* readsize)
{
+ FIO_SyncCompressIO* const io = &ress->io;
const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
unsigned long long inFileSize = 0, outFileSize = 0;
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;
- IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
-
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(errorCode))
EXM_THROW(31, "zstd: failed to create lz4 compression context");
memset(&prefs, 0, sizeof(prefs));
- assert(blockSize <= ress->readCtx->base.jobBufferSize);
+ assert(blockSize <= io->inCapacity);
/* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
prefs.autoFlush = 0;
#if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif
- assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
+ assert(LZ4F_compressBound(blockSize, &prefs) <= io->outCapacity);
{
- size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
+ size_t headerSize = LZ4F_compressBegin(ctx, io->outBuffer, io->outCapacity, &prefs);
if (LZ4F_isError(headerSize))
EXM_THROW(33, "File header generation failed : %s",
LZ4F_getErrorName(headerSize));
- writeJob->usedBufferSize = headerSize;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize);
outFileSize += headerSize;
- /* Read first block */
- inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
+ {
+ size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize);
+ inFileSize += added;
+ *readsize += added;
+ }
- /* Main Loop */
- while (ress->readCtx->srcBufferLoaded) {
- size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
- size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
- ress->readCtx->srcBuffer, inSize, NULL);
+ while (io->srcBufferLoaded) {
+ size_t const inSize = MIN(blockSize, io->srcBufferLoaded);
+ size_t const outSize = LZ4F_compressUpdate(ctx, io->outBuffer, io->outCapacity,
+ io->srcBuffer, inSize, NULL);
if (LZ4F_isError(outSize))
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
srcFileName, LZ4F_getErrorName(outSize));
(double)outFileSize/(double)inFileSize*100);
}
- /* Write Block */
- writeJob->usedBufferSize = outSize;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, outSize);
- /* Read next block */
- AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
- inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
+ FIO_SyncCompressIO_consumeBytes(io, inSize);
+ {
+ size_t const added = FIO_SyncCompressIO_fillBuffer(io, blockSize);
+ inFileSize += added;
+ *readsize += added;
+ }
}
- /* End of Stream mark */
- headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
+ headerSize = LZ4F_compressEnd(ctx, io->outBuffer, io->outCapacity, NULL);
if (LZ4F_isError(headerSize))
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
srcFileName, LZ4F_getErrorName(headerSize));
- writeJob->usedBufferSize = headerSize;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, headerSize);
outFileSize += headerSize;
}
- *readsize = inFileSize;
LZ4F_freeCompressionContext(ctx);
- AIO_WritePool_releaseIoJob(writeJob);
- AIO_WritePool_sparseWriteEnd(ress->writeCtx);
+ FIO_SyncCompressIO_finish(io);
return outFileSize;
}
static unsigned long long
FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
- const cRess_t* ressPtr,
+ cRess_t* ress,
const char* srcFileName, U64 fileSize,
int compressionLevel, U64* readsize)
{
- cRess_t const ress = *ressPtr;
- IOJob_t* writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
+ FIO_SyncCompressIO* const io = &ress->io;
U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue;
/* init */
if (fileSize != UTIL_FILESIZE_UNKNOWN) {
pledgedSrcSize = fileSize;
- CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
+ CHECK(ZSTD_CCtx_setPledgedSrcSize(ress->cctx, fileSize));
} else if (prefs->streamSrcSize > 0) {
/* unknown source size; use the declared stream size */
pledgedSrcSize = prefs->streamSrcSize;
- CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) );
+ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress->cctx, prefs->streamSrcSize) );
}
{ int windowLog;
UTIL_HumanReadableSize_t windowSize;
- CHECK(ZSTD_CCtx_getParameter(ress.cctx, ZSTD_c_windowLog, &windowLog));
+ CHECK(ZSTD_CCtx_getParameter(ress->cctx, ZSTD_c_windowLog, &windowLog));
if (windowLog == 0) {
if (prefs->ldmFlag) {
/* If long mode is set without a window size libzstd will set this size internally */
do {
size_t stillToFlush;
/* Fill input Buffer */
- size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
- ZSTD_inBuffer inBuff = setInBuffer( ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 );
+ size_t const inSize = FIO_SyncCompressIO_fillBuffer(io, ZSTD_CStreamInSize());
+ ZSTD_inBuffer inBuff = setInBuffer( io->srcBuffer, io->srcBufferLoaded, 0 );
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
*readsize += inSize;
- if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
+ if ((io->srcBufferLoaded == 0) || (*readsize == fileSize))
directive = ZSTD_e_end;
stillToFlush = 1;
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos;
- ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
- size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
- CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
- AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
+ ZSTD_outBuffer outBuff = setOutBuffer( io->outBuffer, io->outCapacity, 0 );
+ size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx);
+ CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive));
+ FIO_SyncCompressIO_consumeBytes(io, inBuff.pos - oldIPos);
/* count stats */
inputPresented++;
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) {
- writeJob->usedBufferSize = outBuff.pos;
- AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ FIO_SyncCompressIO_commitOut(io, io->outBuffer, outBuff.pos);
compressedfilesize += outBuff.pos;
}
/* adaptive mode : statistics measurement and speed correction */
if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
- ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
+ ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx);
lastAdaptTime = UTIL_getTime();
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
compressionLevel += (compressionLevel == 0); /* skip 0 */
- ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
+ ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel);
}
if (speedChange == faster) {
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
compressionLevel --;
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
compressionLevel -= (compressionLevel == 0); /* skip 0 */
- ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
+ ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel);
}
speedChange = noChange;
/* display notification */
if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
- ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
+ ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx);
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
(unsigned long long)*readsize, (unsigned long long)fileSize);
}
- AIO_WritePool_releaseIoJob(writeJob);
- AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
+ FIO_SyncCompressIO_finish(io);
return compressedfilesize;
}
static int
FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
- cRess_t ress,
+ cRess_t* ress,
const char* dstFileName, const char* srcFileName,
int compressionLevel)
{
switch (prefs->compressionType) {
default:
case FIO_zstdCompression:
- compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, &ress, srcFileName, fileSize, compressionLevel, &readsize);
+ compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, ress, srcFileName, fileSize, compressionLevel, &readsize);
break;
case FIO_gzipCompression:
#ifdef ZSTD_GZCOMPRESS
- compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
+ compressedfilesize = FIO_compressGzFrame(ress, srcFileName, fileSize, compressionLevel, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n",
case FIO_xzCompression:
case FIO_lzmaCompression:
#ifdef ZSTD_LZMACOMPRESS
- compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression);
+ compressedfilesize = FIO_compressLzmaFrame(ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n",
case FIO_lz4Compression:
#ifdef ZSTD_LZ4COMPRESS
- compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize);
+ compressedfilesize = FIO_compressLz4Frame(ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n",
*/
static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
- cRess_t ress,
+ cRess_t* ress,
const char* dstFileName,
const char* srcFileName,
const stat_t* srcFileStat,
int transferStat = 0;
int dstFd = -1;
- assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
- if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
+ if (ress->io.dstFile == NULL) {
int dstFileInitialPermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp (srcFileName, stdinmark)
&& strcmp (dstFileName, stdoutmark)
closeDstFile = 1;
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
- { FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions);
+ {
+ FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions);
if (dstFile==NULL) return 1; /* could not open dstFileName */
dstFd = fileno(dstFile);
- AIO_WritePool_setFile(ress.writeCtx, dstFile);
+ FIO_SyncCompressIO_setDst(&ress->io, dstFile);
}
- /* Must only be added after FIO_openDstFile() succeeds.
- * Otherwise we may delete the destination file if it already exists,
- * and the user presses Ctrl-C when asked if they wish to overwrite.
- */
+ /* Must only be added after FIO_openDstFile() succeeds. */
addHandler(dstFileName);
}
}
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
- if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
+ if (FIO_SyncCompressIO_closeDst(&ress->io)) { /* error closing file */
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result=1;
}
UTIL_utime(dstFileName, srcFileStat);
}
- if ( (result != 0) /* operation failure */
- && strcmp(dstFileName, stdoutmark) /* special case : don't remove() stdout */
- ) {
- FIO_removeFile(dstFileName); /* remove compression artefact; note don't do anything special if remove() fails */
+ if ( (result != 0)
+ && strcmp(dstFileName, stdoutmark) ) {
+ FIO_removeFile(dstFileName);
}
}
static int
FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
- cRess_t ress,
+ cRess_t* ress,
const char* dstFileName,
const char* srcFileName,
int compressionLevel)
}
/* ensure src is not the same as dict (if present) */
- if (ress.dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress.dictFileName, &srcFileStat, &ress.dictFileStat)) {
+ if (ress->dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress->dictFileName, &srcFileStat, &ress->dictFileStat)) {
DISPLAYLEVEL(1, "zstd: cannot use %s as an input file and dictionary \n", srcFileName);
return 1;
}
srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
if (srcFile == NULL) return 1; /* srcFile could not be opened */
- /* AsyncIO is disabled for compression to favor predictable performance and simpler upkeep. */
if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
fileSize = UTIL_getFileSizeStat(&srcFileStat);
(void)fileSize;
- AIO_ReadPool_setAsync(ress.readCtx, 0);
- AIO_WritePool_setAsync(ress.writeCtx, 0);
- AIO_ReadPool_setFile(ress.readCtx, srcFile);
+ FIO_SyncCompressIO_setSrc(&ress->io, srcFile);
result = FIO_compressFilename_dstFile(
fCtx, prefs, ress,
dstFileName, srcFileName,
&srcFileStat, compressionLevel);
- AIO_ReadPool_closeFile(ress.readCtx);
+ FIO_SyncCompressIO_clearSrc(&ress->io);
+
+ if (srcFile != NULL && fclose(srcFile)) {
+ DISPLAYLEVEL(1, "zstd: %s: %s \n", srcFileName, strerror(errno));
+ return 1;
+ }
if ( prefs->removeSrcFile /* --rm */
&& result == 0 /* success */
int compressionLevel, ZSTD_compressionParameters comprParams)
{
cRess_t ress = FIO_createCResources(prefs, dictFileName, UTIL_getFileSize(srcFileName), compressionLevel, comprParams);
- int const result = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
+ int const result = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel);
#define DISPLAY_LEVEL_DEFAULT 2
if (dstFile == NULL) { /* could not open outFileName */
error = 1;
} else {
- AIO_WritePool_setFile(ress.writeCtx, dstFile);
+ FIO_SyncCompressIO_setDst(&ress.io, dstFile);
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
- status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
+ status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
- if (AIO_WritePool_closeFile(ress.writeCtx))
+ if (FIO_SyncCompressIO_closeDst(&ress.io))
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
strerror(errno), outFileName);
}
} else {
dstFileName = FIO_determineCompressedName(srcFileName, outDirName, suffix); /* cannot fail */
}
- status = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
+ status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}