ret->literalCompressionMode = ZSTD_ps_auto;
ret->excludeCompressedFiles = 0;
ret->allowBlockDevices = 0;
- ret->asyncIO = 0;
+ ret->asyncIO = AIO_supported();
return ret;
}
* Compression
************************************************************************/
typedef struct {
- FILE* srcFile;
- FILE* dstFile;
- void* srcBuffer;
- size_t srcBufferSize;
- void* dstBuffer;
- size_t dstBufferSize;
void* dictBuffer;
size_t dictBufferSize;
const char* dictFileName;
ZSTD_CStream* cctx;
+ WritePoolCtx_t *writeCtx;
+ ReadPoolCtx_t *readCtx;
} cRess_t;
/** ZSTD_cycleLog() :
if (ress.cctx == NULL)
EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
strerror(errno));
- ress.srcBufferSize = ZSTD_CStreamInSize();
- ress.srcBuffer = malloc(ress.srcBufferSize);
- ress.dstBufferSize = ZSTD_CStreamOutSize();
/* need to update memLimit before calling createDictBuffer
* because of memLimit check inside it */
unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
}
- ress.dstBuffer = malloc(ress.dstBufferSize);
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */
- if (!ress.srcBuffer || !ress.dstBuffer)
- EXM_THROW(31, "allocation error : not enough memory");
+
+ ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
+ ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
/* Advanced parameters, including dictionary */
if (dictFileName && (ress.dictBuffer==NULL))
static void FIO_freeCResources(const cRess_t* const ress)
{
- free(ress->srcBuffer);
- free(ress->dstBuffer);
free(ress->dictBuffer);
+ AIO_WritePool_free(ress->writeCtx);
+ AIO_ReadPool_free(ress->readCtx);
ZSTD_freeCStream(ress->cctx); /* never fails */
}
{
unsigned long long inFileSize = 0, outFileSize = 0;
z_stream strm;
+ IOJob_t *writeJob = NULL;
if (compressionLevel > Z_BEST_COMPRESSION)
compressionLevel = Z_BEST_COMPRESSION;
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
} }
+ writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_in = 0;
strm.avail_in = 0;
- strm.next_out = (Bytef*)ress->dstBuffer;
- strm.avail_out = (uInt)ress->dstBufferSize;
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
while (1) {
int ret;
if (strm.avail_in == 0) {
- size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
- if (inSize == 0) break;
- inFileSize += inSize;
- strm.next_in = (z_const unsigned char*)ress->srcBuffer;
- strm.avail_in = (uInt)inSize;
+ AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
+ if (ress->readCtx->srcBufferLoaded == 0) break;
+ inFileSize += ress->readCtx->srcBufferLoaded;
+ strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
+ strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
+ }
+
+ {
+ size_t const availBefore = strm.avail_in;
+ ret = deflate(&strm, Z_NO_FLUSH);
+ AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
}
- ret = deflate(&strm, Z_NO_FLUSH);
+
if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
- { size_t const cSize = ress->dstBufferSize - strm.avail_out;
+ { size_t const cSize = writeJob->bufferSize - strm.avail_out;
if (cSize) {
- if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
- EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno));
+ writeJob->usedBufferSize = cSize;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += cSize;
- strm.next_out = (Bytef*)ress->dstBuffer;
- strm.avail_out = (uInt)ress->dstBufferSize;
- } }
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
+ } }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ",
- (unsigned)(inFileSize>>20),
- (double)outFileSize/inFileSize*100)
+ (unsigned)(inFileSize>>20),
+ (double)outFileSize/inFileSize*100)
} else {
DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ",
- (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
- (double)outFileSize/inFileSize*100);
- } }
+ (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
+ (double)outFileSize/inFileSize*100);
+ } }
while (1) {
int const ret = deflate(&strm, Z_FINISH);
- { size_t const cSize = ress->dstBufferSize - strm.avail_out;
+ { size_t const cSize = writeJob->bufferSize - strm.avail_out;
if (cSize) {
- if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
- EXM_THROW(75, "Write error : %s ", strerror(errno));
+ writeJob->usedBufferSize = cSize;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += cSize;
- strm.next_out = (Bytef*)ress->dstBuffer;
- strm.avail_out = (uInt)ress->dstBufferSize;
- } }
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
+ } }
if (ret == Z_STREAM_END) break;
if (ret != Z_BUF_ERROR)
EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret);
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
} }
*readsize = inFileSize;
+ AIO_WritePool_releaseIoJob(writeJob);
+ AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize;
}
#endif
lzma_stream strm = LZMA_STREAM_INIT;
lzma_action action = LZMA_RUN;
lzma_ret ret;
+ IOJob_t *writeJob = NULL;
if (compressionLevel < 0) compressionLevel = 0;
if (compressionLevel > 9) compressionLevel = 9;
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
}
+ writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = 0;
strm.avail_in = 0;
- strm.next_out = (BYTE*)ress->dstBuffer;
- strm.avail_out = ress->dstBufferSize;
while (1) {
if (strm.avail_in == 0) {
- size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
- if (inSize == 0) action = LZMA_FINISH;
+ size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
+ if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
inFileSize += inSize;
- strm.next_in = (BYTE const*)ress->srcBuffer;
- strm.avail_in = inSize;
+ strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
+ strm.avail_in = ress->readCtx->srcBufferLoaded;
+ }
+
+ {
+ size_t const availBefore = strm.avail_in;
+ ret = lzma_code(&strm, action);
+ AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
}
- ret = lzma_code(&strm, action);
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
- { size_t const compBytes = ress->dstBufferSize - strm.avail_out;
+ { size_t const compBytes = writeJob->bufferSize - strm.avail_out;
if (compBytes) {
- if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes)
- EXM_THROW(85, "Write error : %s", strerror(errno));
+ writeJob->usedBufferSize = compBytes;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += compBytes;
- strm.next_out = (BYTE*)ress->dstBuffer;
- strm.avail_out = ress->dstBufferSize;
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = writeJob->bufferSize;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%",
lzma_end(&strm);
*readsize = inFileSize;
+ AIO_WritePool_releaseIoJob(writeJob);
+ AIO_WritePool_sparseWriteEnd(ress->writeCtx);
+
return outFileSize;
}
#endif
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;
+ IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
+
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(errorCode))
EXM_THROW(31, "zstd: failed to create lz4 compression context");
memset(&prefs, 0, sizeof(prefs));
- assert(blockSize <= ress->srcBufferSize);
+ assert(blockSize <= ress->readCtx->base.jobBufferSize);
- prefs.autoFlush = 1;
+ /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
+ prefs.autoFlush = 0;
prefs.compressionLevel = compressionLevel;
prefs.frameInfo.blockMode = LZ4F_blockLinked;
prefs.frameInfo.blockSizeID = LZ4F_max64KB;
#if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif
- assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize);
+ assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
{
- size_t readSize;
- size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs);
+ size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
if (LZ4F_isError(headerSize))
EXM_THROW(33, "File header generation failed : %s",
LZ4F_getErrorName(headerSize));
- if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize)
- EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno));
+ writeJob->usedBufferSize = headerSize;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += headerSize;
/* Read first block */
- readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
- inFileSize += readSize;
+ inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
/* Main Loop */
- while (readSize>0) {
- size_t const outSize = LZ4F_compressUpdate(ctx,
- ress->dstBuffer, ress->dstBufferSize,
- ress->srcBuffer, readSize, NULL);
+ while (ress->readCtx->srcBufferLoaded) {
+ size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
+ size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
+ ress->readCtx->srcBuffer, inSize, NULL);
if (LZ4F_isError(outSize))
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
srcFileName, LZ4F_getErrorName(outSize));
}
/* Write Block */
- { size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile);
- if (sizeCheck != outSize)
- EXM_THROW(36, "Write error : %s", strerror(errno));
- }
+ writeJob->usedBufferSize = outSize;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
/* Read next block */
- readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
- inFileSize += readSize;
+ AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
+ inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
}
- if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName);
/* End of Stream mark */
- headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL);
+ headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
if (LZ4F_isError(headerSize))
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
srcFileName, LZ4F_getErrorName(headerSize));
- { size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile);
- if (sizeCheck != headerSize)
- EXM_THROW(39, "Write error : %s (cannot write end of stream)",
- strerror(errno));
- }
+ writeJob->usedBufferSize = headerSize;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += headerSize;
}
*readsize = inFileSize;
LZ4F_freeCompressionContext(ctx);
+ AIO_WritePool_releaseIoJob(writeJob);
+ AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize;
}
int compressionLevel, U64* readsize)
{
cRess_t const ress = *ressPtr;
- FILE* const srcFile = ress.srcFile;
- FILE* const dstFile = ress.dstFile;
+ IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
+
U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue;
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
do {
size_t stillToFlush;
/* Fill input Buffer */
- size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
- ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
+ size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
+ ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 };
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
*readsize += inSize;
- if ((inSize == 0) || (*readsize == fileSize))
+ if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
directive = ZSTD_e_end;
stillToFlush = 1;
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos;
- ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
+ ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
+ AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
/* count stats */
inputPresented++;
/* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
- (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
+ (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) {
- size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
- if (sizeCheck != outBuff.pos)
- EXM_THROW(25, "Write error : %s (cannot write compressed block)",
- strerror(errno));
+ writeJob->usedBufferSize = outBuff.pos;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
compressedfilesize += outBuff.pos;
}
} /* while ((inBuff.pos != inBuff.size) */
} while (directive != ZSTD_e_end);
- if (ferror(srcFile)) {
- EXM_THROW(26, "Read error : I/O error");
- }
if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
(unsigned long long)*readsize, (unsigned long long)fileSize);
}
+ AIO_WritePool_releaseIoJob(writeJob);
+ AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
+
return compressedfilesize;
}
/*! FIO_compressFilename_dstFile() :
- * open dstFileName, or pass-through if ress.dstFile != NULL,
+ * open dstFileName, or pass-through if ress.file != NULL,
* then start compression with FIO_compressFilename_internal().
* Manages source removal (--rm) and file permissions transfer.
* note : ress.srcFile must be != NULL,
int result;
stat_t statbuf;
int transferMTime = 0;
- assert(ress.srcFile != NULL);
- if (ress.dstFile == NULL) {
+ FILE *dstFile;
+ assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
+ if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp (srcFileName, stdinmark)
&& strcmp (dstFileName, stdoutmark)
closeDstFile = 1;
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
- ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
- if (ress.dstFile==NULL) return 1; /* could not open dstFileName */
+ dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
+ if (dstFile==NULL) return 1; /* could not open dstFileName */
+ AIO_WritePool_setFile(ress.writeCtx, dstFile);
/* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists,
* and the user presses Ctrl-C when asked if they wish to overwrite.
result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
if (closeDstFile) {
- FILE* const dstFile = ress.dstFile;
- ress.dstFile = NULL;
-
clearHandler();
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
- if (fclose(dstFile)) { /* error closing dstFile */
+ if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result=1;
}
int compressionLevel)
{
int result;
+ FILE* srcFile;
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
/* ensure src is not a directory */
return 0;
}
- ress.srcFile = FIO_openSrcFile(prefs, srcFileName);
- if (ress.srcFile == NULL) return 1; /* srcFile could not be opened */
+ srcFile = FIO_openSrcFile(prefs, srcFileName);
+ if (srcFile == NULL) return 1; /* srcFile could not be opened */
+ AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
+ AIO_ReadPool_closeFile(ress.readCtx);
- fclose(ress.srcFile);
- ress.srcFile = NULL;
if ( prefs->removeSrcFile /* --rm */
&& result == 0 /* success */
&& strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */
/* init */
assert(outFileName != NULL || suffix != NULL);
if (outFileName != NULL) { /* output into a single destination (stdout typically) */
+ FILE *dstFile;
if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) {
FIO_freeCResources(&ress);
return 1;
}
- ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
- if (ress.dstFile == NULL) { /* could not open outFileName */
+ dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
+ if (dstFile == NULL) { /* could not open outFileName */
error = 1;
} else {
+ AIO_WritePool_setFile(ress.writeCtx, dstFile);
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
- if (fclose(ress.dstFile))
+ if (AIO_WritePool_closeFile(ress.writeCtx))
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
strerror(errno), outFileName);
- ress.dstFile = NULL;
}
} else {
if (outMirroredRootDirName)
/* **************************************************************************
* Decompression
***************************************************************************/
-
typedef struct {
- void* srcBuffer;
- size_t srcBufferSize;
- size_t srcBufferLoaded;
ZSTD_DStream* dctx;
WritePoolCtx_t *writeCtx;
+ ReadPoolCtx_t *readCtx;
} dRess_t;
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));
- ress.srcBufferSize = ZSTD_DStreamInSize();
- ress.srcBuffer = malloc(ress.srcBufferSize);
- if (!ress.srcBuffer)
- EXM_THROW(61, "Allocation error : not enough memory");
-
/* dictionary */
{ void* dictBuffer;
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs);
}
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
+ ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize());
return ress;
}
static void FIO_freeDResources(dRess_t ress)
{
CHECK( ZSTD_freeDStream(ress.dctx) );
- free(ress.srcBuffer);
AIO_WritePool_free(ress.writeCtx);
-}
-
-/* FIO_consumeDSrcBuffer:
- * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
-static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
- assert(ress->srcBufferLoaded >= len);
- ress->srcBufferLoaded -= len;
- memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
+ AIO_ReadPool_free(ress.readCtx);
}
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
@return : 0 (no error) */
-static int FIO_passThrough(const FIO_prefs_t* const prefs,
- FILE* foutput, FILE* finput,
- void* buffer, size_t bufferSize,
- size_t alreadyLoaded)
+static int FIO_passThrough(dRess_t *ress)
{
- size_t const blockSize = MIN(64 KB, bufferSize);
- size_t readFromInput;
- unsigned storedSkips = 0;
-
- /* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */
- { size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput);
- if (sizeCheck != alreadyLoaded) {
- DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno));
- return 1;
- } }
-
- do {
- readFromInput = fread(buffer, 1, blockSize, finput);
- storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
- } while (readFromInput == blockSize);
- if (ferror(finput)) {
- DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
- return 1;
+ size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize());
+ IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
+ AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
+
+ while(ress->readCtx->srcBufferLoaded) {
+ size_t writeSize;
+ writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
+ assert(writeSize <= writeJob->bufferSize);
+ memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize);
+ writeJob->usedBufferSize = writeSize;
+ AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+ AIO_ReadPool_consumeBytes(ress->readCtx, writeSize);
+ AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
}
- assert(feof(finput));
-
- AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
+ assert(ress->readCtx->reachedEof);
+ AIO_WritePool_releaseIoJob(writeJob);
+ AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return 0;
}
return;
/* Try to decode the frame header */
- err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded);
+ err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded);
if (err == 0) {
unsigned long long const windowSize = header.windowSize;
unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
*/
#define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2))
static unsigned long long
-FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
+FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress,
const FIO_prefs_t* const prefs,
const char* srcFileName,
U64 alreadyDecoded) /* for multi-frames streams */
ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only);
/* Header loading : ensures ZSTD_getFrameHeader() will succeed */
- { size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX;
- if (ress->srcBufferLoaded < toDecode) {
- size_t const toRead = toDecode - ress->srcBufferLoaded;
- void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
- ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput);
- } }
+ AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX);
/* Main decompression Loop */
while (1) {
- ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
+ ZSTD_inBuffer inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 };
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
if (srcFileNameSize > 18) {
const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ",
- fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
+ fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
} else {
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ",
fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix);
srcFileName, hrs.precision, hrs.value, hrs.suffix);
}
- FIO_consumeDSrcBuffer(ress, inBuff.pos);
+ AIO_ReadPool_consumeBytes(ress->readCtx, inBuff.pos);
if (readSizeHint == 0) break; /* end of frame */
/* Fill input buffer */
- { size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize); /* support large skippable frames */
- if (ress->srcBufferLoaded < toDecode) {
- size_t const toRead = toDecode - ress->srcBufferLoaded; /* > 0 */
- void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
- size_t const readSize = fread(startPosition, 1, toRead, finput);
+ { size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize()); /* support large skippable frames */
+ if (ress->readCtx->srcBufferLoaded < toDecode) {
+ size_t const readSize = AIO_ReadPool_fillBuffer(ress->readCtx, toDecode);
if (readSize==0) {
DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n",
- srcFileName);
+ srcFileName);
+ AIO_WritePool_releaseIoJob(writeJob);
return FIO_ERROR_FRAME_DECODING;
}
- ress->srcBufferLoaded += readSize;
- } } }
+ } } }
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
#ifdef ZSTD_GZDECOMPRESS
static unsigned long long
-FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
+FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName)
{
unsigned long long outFileSize = 0;
z_stream strm;
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
- strm.avail_in = (uInt)ress->srcBufferLoaded;
- strm.next_in = (z_const unsigned char*)ress->srcBuffer;
+ strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
+ strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
for ( ; ; ) {
int ret;
if (strm.avail_in == 0) {
- ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
- if (ress->srcBufferLoaded == 0) flush = Z_FINISH;
- strm.next_in = (z_const unsigned char*)ress->srcBuffer;
- strm.avail_in = (uInt)ress->srcBufferLoaded;
+ AIO_ReadPool_consumeAndRefill(ress->readCtx);
+ if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH;
+ strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
+ strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
}
ret = inflate(&strm, flush);
if (ret == Z_BUF_ERROR) {
if (ret == Z_STREAM_END) break;
}
- FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
+ AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
&& (decodingError==0) ) {
#ifdef ZSTD_LZMADECOMPRESS
static unsigned long long
-FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
+FIO_decompressLzmaFrame(dRess_t* ress,
const char* srcFileName, int plain_lzma)
{
unsigned long long outFileSize = 0;
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
- strm.next_in = (BYTE const*)ress->srcBuffer;
- strm.avail_in = ress->srcBufferLoaded;
+ strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
+ strm.avail_in = ress->readCtx->srcBufferLoaded;
for ( ; ; ) {
lzma_ret ret;
if (strm.avail_in == 0) {
- ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
- if (ress->srcBufferLoaded == 0) action = LZMA_FINISH;
- strm.next_in = (BYTE const*)ress->srcBuffer;
- strm.avail_in = ress->srcBufferLoaded;
+ AIO_ReadPool_consumeAndRefill(ress->readCtx);
+ if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
+ strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
+ strm.avail_in = ress->readCtx->srcBufferLoaded;
}
ret = lzma_code(&strm, action);
if (ret == LZMA_STREAM_END) break;
}
- FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
+ AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
lzma_end(&strm);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
#ifdef ZSTD_LZ4DECOMPRESS
static unsigned long long
-FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
- const char* srcFileName)
+FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName)
{
unsigned long long filesize = 0;
LZ4F_errorCode_t nextToLoad = 4;
/* Main Loop */
for (;nextToLoad;) {
- size_t readSize;
size_t pos = 0;
size_t decodedBytes = writeJob->bufferSize;
int fullBufferDecoded = 0;
/* Read input */
- nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
- readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
- if(!readSize && ferror(srcFile)) {
- DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
- decodingError=1;
- break;
- }
- if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
- ress->srcBufferLoaded += readSize;
+ AIO_ReadPool_fillBuffer(ress->readCtx, nextToLoad);
+ if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */
- while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
+ while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
/* Decode Input (at least partially) */
- size_t remaining = ress->srcBufferLoaded - pos;
+ size_t remaining = ress->readCtx->srcBufferLoaded - pos;
decodedBytes = writeJob->bufferSize;
- nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
+ nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos,
+ &remaining, NULL);
if (LZ4F_isError(nextToLoad)) {
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
srcFileName, LZ4F_getErrorName(nextToLoad));
decodingError = 1; nextToLoad = 0; break;
}
pos += remaining;
- assert(pos <= ress->srcBufferLoaded);
+ assert(pos <= ress->readCtx->srcBufferLoaded);
fullBufferDecoded = decodedBytes == writeJob->bufferSize;
/* Write Block */
if (!nextToLoad) break;
}
- FIO_consumeDSrcBuffer(ress, pos);
+ AIO_ReadPool_consumeBytes(ress->readCtx, pos);
}
if (nextToLoad!=0) {
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
* 1 : error
*/
static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
- dRess_t ress, FILE* srcFile,
- const FIO_prefs_t* const prefs,
- const char* dstFileName, const char* srcFileName)
+ dRess_t ress, const FIO_prefs_t* const prefs,
+ const char* dstFileName, const char* srcFileName)
{
unsigned readSomething = 0;
unsigned long long filesize = 0;
- assert(srcFile != NULL);
/* for each frame */
for ( ; ; ) {
/* check magic number -> version */
size_t const toRead = 4;
- const BYTE* const buf = (const BYTE*)ress.srcBuffer;
- if (ress.srcBufferLoaded < toRead) /* load up to 4 bytes for header */
- ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded,
- (size_t)1, toRead - ress.srcBufferLoaded, srcFile);
- if (ress.srcBufferLoaded==0) {
+ const BYTE* buf;
+ AIO_ReadPool_fillBuffer(ress.readCtx, toRead);
+ buf = (const BYTE*)ress.readCtx->srcBuffer;
+ if (ress.readCtx->srcBufferLoaded==0) {
if (readSomething==0) { /* srcFile is empty (which is invalid) */
DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName);
return 1;
break; /* no more input */
}
readSomething = 1; /* there is at least 1 byte in srcFile */
- if (ress.srcBufferLoaded < toRead) {
+ if (ress.readCtx->srcBufferLoaded < toRead) {
DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName);
return 1;
}
- if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) {
- unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize);
+ if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) {
+ unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
#ifdef ZSTD_GZDECOMPRESS
- unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
+ unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
#ifdef ZSTD_LZMADECOMPRESS
- unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
+ unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
#endif
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
#ifdef ZSTD_LZ4DECOMPRESS
- unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
+ unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
return 1;
#endif
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
- return FIO_passThrough(prefs,
- AIO_WritePool_getFile(ress.writeCtx), srcFile,
- ress.srcBuffer, ress.srcBufferSize,
- ress.srcBufferLoaded);
+ return FIO_passThrough(&ress);
} else {
DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName);
return 1;
}
/** FIO_decompressDstFile() :
- open `dstFileName`,
- or path-through if ress.dstFile is already != 0,
+ open `dstFileName`, or pass-through if writeCtx's file is already != 0,
then start decompression process (FIO_decompressFrames()).
@return : 0 : OK
1 : operation aborted
*/
static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
- dRess_t ress, FILE* srcFile,
+ dRess_t ress,
const char* dstFileName, const char* srcFileName)
{
int result;
addHandler(dstFileName);
}
- result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
+ result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName);
if (releaseDstFile) {
clearHandler();
srcFile = FIO_openSrcFile(prefs, srcFileName);
if (srcFile==NULL) return 1;
- ress.srcBufferLoaded = 0;
+ AIO_ReadPool_setFile(ress.readCtx, srcFile);
+
+ result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName);
- result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName);
+ AIO_ReadPool_setFile(ress.readCtx, NULL);
/* Close file */
if (fclose(srcFile)) {
/*
- * Copyright (c) Yann Collet, Facebook, Inc.
+ * Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
-unsigned AIO_fwriteSparse(FILE* file,
+static unsigned
+AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips)
if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize)
- EXM_THROW(70, "Write error : cannot write decoded block : %s",
+ EXM_THROW(70, "Write error : cannot write block : %s",
strerror(errno));
return 0;
}
storedSkips = 0;
/* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
- EXM_THROW(93, "Write error : cannot write decoded block : %s",
+ EXM_THROW(93, "Write error : cannot write block : %s",
strerror(errno));
}
ptrT += seg0SizeT;
return storedSkips;
}
-void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
+static void
+AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{
if (prefs->testMode) assert(storedSkips == 0);
if (storedSkips>0) {
* AsyncIO functionality
************************************************************************/
+/* AIO_supported:
+ * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
+int AIO_supported(void) {
+#ifdef ZSTD_MULTITHREAD
+ return 1;
+#else
+ return 0;
+#endif
+}
+
/* ***********************************
* General IoPool implementation
*************************************/
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
- void *buffer;
- IOJob_t *job;
- job = (IOJob_t*) malloc(sizeof(IOJob_t));
- buffer = malloc(bufferSize);
+ IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
+ void* const buffer = malloc(bufferSize);
if(!job || !buffer)
- EXM_THROW(101, "Allocation error : not enough memory");
+ EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
job->bufferSize = bufferSize;
job->usedBufferSize = 0;
/* AIO_IOPool_createThreadPool:
* Creates a thread pool and a mutex for threaded IO pool.
* Displays warning if asyncio is requested but MT isn't available. */
-static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
+static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
ctx->threadPool = NULL;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
- EXM_THROW(102,"Failed creating write availableJobs mutex");
+ EXM_THROW(102,"Failed creating write availableJobs mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
if (!ctx->threadPool)
- EXM_THROW(104, "Failed creating writer thread pool");
+ EXM_THROW(104, "Failed creating writer thread pool");
}
}
/* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */
-static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
+static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
ctx->poolFunction = poolFunction;
- ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
+ ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
ctx->availableJobsCount = ctx->totalIoJobs;
for(i=0; i < ctx->availableJobsCount; i++) {
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
}
+ ctx->jobBufferSize = bufferSize;
ctx->file = NULL;
}
/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
-static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
- IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
- if(ctx->threadPool) {
+static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
+ IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
+ if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
- assert(ctx->availableJobsCount < MAX_IO_JOBS);
- ctx->availableJobs[ctx->availableJobsCount++] = job;
+ assert(ctx->availableJobsCount < ctx->totalIoJobs);
+ ctx->availableJobs[ctx->availableJobsCount++] = job;
+ if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
- } else {
- assert(ctx->availableJobsCount == 0);
- ctx->availableJobsCount++;
- }
}
/* AIO_IOPool_join:
/* AIO_IOPool_acquireJob:
* Returns an available io job to be used for a future io. */
-static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
+static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
- if(ctx->threadPool) {
+ if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
- assert(ctx->availableJobsCount > 0);
- job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
+ assert(ctx->availableJobsCount > 0);
+ job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
+ if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
- } else {
- assert(ctx->availableJobsCount == 1);
- ctx->availableJobsCount--;
- job = (IOJob_t*)ctx->availableJobs[0];
- }
job->usedBufferSize = 0;
job->file = ctx->file;
job->offset = 0;
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
-static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
+static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
assert(ctx->availableJobsCount == ctx->totalIoJobs);
ctx->file = file;
}
-static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
+static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
return ctx->file;
}
/* AIO_IOPool_enqueueJob:
* Enqueues an io job for execution.
* The queued job shouldn't be used directly after queueing it. */
-static void AIO_IOPool_enqueueJob(IOJob_t *job) {
- IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
+static void AIO_IOPool_enqueueJob(IOJob_t* job) {
+ IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool)
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
-IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
+IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
return AIO_IOPool_acquireJob(&ctx->base);
}
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
-void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
+void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
assert(ctx != NULL);
if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool);
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
-void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
+void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
AIO_IOPool_setFile(&ctx->base, file);
assert(ctx->storedSkips == 0);
}
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
-FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
+FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
-void AIO_WritePool_releaseIoJob(IOJob_t *job) {
+void AIO_WritePool_releaseIoJob(IOJob_t* job) {
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
-int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
- FILE *dstFile = ctx->base.file;
+int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
+ FILE* const dstFile = ctx->base.file;
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
AIO_WritePool_sparseWriteEnd(ctx);
AIO_IOPool_setFile(&ctx->base, NULL);
/* AIO_WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){
- IOJob_t* job = (IOJob_t*) opaque;
- WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
+ IOJob_t* const job = (IOJob_t*) opaque;
+ WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
-WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
- WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
+WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
+ WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
ctx->storedSkips = 0;
assert(ctx->storedSkips==0);
free(ctx);
}
+
+
+/* ***********************************
+ * ReadPool implementation
+ *************************************/
+static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
+ int i;
+ for(i=0; i<ctx->completedJobsCount; i++) {
+ IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
+ AIO_IOPool_releaseIoJob(job);
+ }
+ ctx->completedJobsCount = 0;
+}
+
+static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
+ ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
+ if(ctx->base.threadPool)
+ ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+ assert(ctx->completedJobsCount < MAX_IO_JOBS);
+ ctx->completedJobs[ctx->completedJobsCount++] = job;
+ if(ctx->base.threadPool) {
+ ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
+ ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
+ }
+}
+
+/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
+ * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
+ * if job wasn't found returns NULL.
+ * IMPORTANT: assumes ioJobsMutex is locked. */
+static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
+ IOJob_t *job = NULL;
+ int i;
+ /* This implementation goes through all completed jobs and looks for the one matching the next offset.
+ * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
+ * reads to be completed in order) this implementation was chosen as it better fits other asyncio
+ * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
+ for (i=0; i<ctx->completedJobsCount; i++) {
+ job = (IOJob_t *) ctx->completedJobs[i];
+ if (job->offset == ctx->waitingOnOffset) {
+ ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
+ return job;
+ }
+ }
+ return NULL;
+}
+
+/* AIO_ReadPool_numReadsInFlight:
+ * Returns the number of IO read jobs currrently in flight. */
+static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
+ const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
+ return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
+}
+
+/* AIO_ReadPool_getNextCompletedJob:
+ * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
+ * Would block. */
+static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
+ IOJob_t *job = NULL;
+ if (ctx->base.threadPool)
+ ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+
+ job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
+
+ /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
+ while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
+ assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
+ ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
+ job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
+ }
+
+ if(job) {
+ assert(job->offset == ctx->waitingOnOffset);
+ ctx->waitingOnOffset += job->usedBufferSize;
+ }
+
+ if (ctx->base.threadPool)
+ ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
+ return job;
+}
+
+
+/* AIO_ReadPool_executeReadJob:
+ * Executes a read job synchronously. Can be used as a function for a thread pool. */
+static void AIO_ReadPool_executeReadJob(void* opaque){
+ IOJob_t* const job = (IOJob_t*) opaque;
+ ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
+ if(ctx->reachedEof) {
+ job->usedBufferSize = 0;
+ AIO_ReadPool_addJobToCompleted(job);
+ return;
+ }
+ job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
+ if(job->usedBufferSize < job->bufferSize) {
+ if(ferror(job->file)) {
+ EXM_THROW(37, "Read error");
+ } else if(feof(job->file)) {
+ ctx->reachedEof = 1;
+ } else {
+ EXM_THROW(37, "Unexpected short read");
+ }
+ }
+ AIO_ReadPool_addJobToCompleted(job);
+}
+
+static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
+ IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
+ job->offset = ctx->nextReadOffset;
+ ctx->nextReadOffset += job->bufferSize;
+ AIO_IOPool_enqueueJob(job);
+}
+
+static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
+ int i;
+ for (i = 0; i < ctx->base.availableJobsCount; i++) {
+ AIO_ReadPool_enqueueRead(ctx);
+ }
+}
+
+/* AIO_ReadPool_setFile:
+ * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
+ * Waits for all current enqueued tasks to complete if a previous file was set. */
+void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
+ assert(ctx!=NULL);
+ AIO_IOPool_join(&ctx->base);
+ AIO_ReadPool_releaseAllCompletedJobs(ctx);
+ if (ctx->currentJobHeld) {
+ AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
+ ctx->currentJobHeld = NULL;
+ }
+ AIO_IOPool_setFile(&ctx->base, file);
+ ctx->nextReadOffset = 0;
+ ctx->waitingOnOffset = 0;
+ ctx->srcBuffer = ctx->coalesceBuffer;
+ ctx->srcBufferLoaded = 0;
+ ctx->reachedEof = 0;
+ if(file != NULL)
+ AIO_ReadPool_startReading(ctx);
+}
+
+/* AIO_ReadPool_create:
+ * Allocates and sets and a new readPool including its included jobs.
+ * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
+ * as our basic read size. */
+ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
+ ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
+ if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
+ AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
+
+ ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
+ ctx->srcBuffer = ctx->coalesceBuffer;
+ ctx->srcBufferLoaded = 0;
+ ctx->completedJobsCount = 0;
+ ctx->currentJobHeld = NULL;
+
+ if(ctx->base.threadPool)
+ if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
+ EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
+
+ return ctx;
+}
+
+/* AIO_ReadPool_free:
+ * Frees and releases a readPool and its resources. Closes source file. */
+void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
+ if(AIO_ReadPool_getFile(ctx))
+ AIO_ReadPool_closeFile(ctx);
+ if(ctx->base.threadPool)
+ ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
+ AIO_IOPool_destroy(&ctx->base);
+ free(ctx->coalesceBuffer);
+ free(ctx);
+}
+
+/* AIO_ReadPool_consumeBytes:
+ * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
+void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
+ assert(n <= ctx->srcBufferLoaded);
+ ctx->srcBufferLoaded -= n;
+ ctx->srcBuffer += n;
+}
+
+/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
+ * Release the current held job and get the next one, returns NULL if no next job available. */
+static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
+ if (ctx->currentJobHeld) {
+ AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
+ ctx->currentJobHeld = NULL;
+ AIO_ReadPool_enqueueRead(ctx);
+ }
+ ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
+ return (IOJob_t*) ctx->currentJobHeld;
+}
+
+/* AIO_ReadPool_fillBuffer:
+ * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
+ * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
+ * Return value is the number of bytes added to the buffer.
+ * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
+size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
+ IOJob_t *job;
+ int useCoalesce = 0;
+ if(n > ctx->base.jobBufferSize)
+ n = ctx->base.jobBufferSize;
+
+ /* We are good, don't read anything */
+ if (ctx->srcBufferLoaded >= n)
+ return 0;
+
+ /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
+ * and coalesce the remaining bytes with the next job's buffer */
+ if (ctx->srcBufferLoaded > 0) {
+ useCoalesce = 1;
+ memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
+ ctx->srcBuffer = ctx->coalesceBuffer;
+ }
+
+ /* Read the next chunk */
+ job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
+ if(!job)
+ return 0;
+ if(useCoalesce) {
+ assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
+ memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
+ ctx->srcBufferLoaded += job->usedBufferSize;
+ }
+ else {
+ ctx->srcBuffer = (U8 *) job->buffer;
+ ctx->srcBufferLoaded = job->usedBufferSize;
+ }
+ return job->usedBufferSize;
+}
+
+/* AIO_ReadPool_consumeAndRefill:
+ * Consumes the current buffer and refills it with bufferSize bytes. */
+size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
+ AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
+ return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
+}
+
+/* AIO_ReadPool_getFile:
+ * Returns the current file set for the read pool. */
+FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
+ return AIO_IOPool_getFile(&ctx->base);
+}
+
+/* AIO_ReadPool_closeFile:
+ * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
+int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
+ FILE* const file = AIO_ReadPool_getFile(ctx);
+ AIO_ReadPool_setFile(ctx, NULL);
+ return fclose(file);
+}