/*! ZSTD_resetCStream() :
* start a new compression job, using same parameters from previous job.
- * This is typically useful to skip dictionary loading stage, since it will re-use it in-place..
+ * This is typically useful to skip dictionary loading stage, since it will re-use it in-place.
* Note that zcs must be init at least once before using ZSTD_resetCStream().
* If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN.
* If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end.
* For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs,
* but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead.
- * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
+ * @return : 0, or an error code (which can be tested using ZSTD_isError())
+ */
ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize);
unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */
} ZSTD_frameProgression;
-/* ZSTD_getFrameProgression():
+/* ZSTD_getFrameProgression() :
* tells how much data has been ingested (read from input)
* consumed (input actually compressed) and produced (output) for current frame.
- * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
- * Can report progression inside worker threads (multi-threading and non-blocking mode).
+ * Note : (ingested - consumed) is amount of input data buffered internally, not yet compressed.
+ * Aggregates progression inside active worker threads.
*/
ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
-/*! ZSTD_toFlushNow()
+/*! ZSTD_toFlushNow() :
* Tell how many bytes are ready to be flushed immediately.
* Useful for multithreading scenarios (nbWorkers >= 1).
- * Probe the oldest active job (not yet entirely flushed) and check its output buffer.
- * If return 0, it means there is no active job, or
- * it means oldest job is still active, but everything produced has been flushed so far,
- * therefore flushing is limited by speed of oldest job. */
+ * Probe the oldest active job, defined as oldest job not yet entirely flushed,
+ * and check its output buffer.
+ * @return : amount of data stored in oldest job and ready to be flushed immediately.
+ * if @return == 0, it means either :
+ * + there is no active job (could be checked with ZSTD_frameProgression()), or
+ * + oldest job is still actively compressing data,
+ * but everything it has produced has also been flushed so far,
+ * therefore flushing speed is currently limited by production speed of oldest job
+ * irrespective of the speed of concurrent newer jobs.
+ */
ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx);
g_overlapLog = overlapLog;
}
static U32 g_adaptiveMode = 0;
-void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; }
+void FIO_setAdaptiveMode(unsigned adapt) {
+ if ((adapt>0) && (g_nbWorkers==0))
+ EXM_THROW(1, "Adaptive mode is not compatible with single thread mode \n");
+ g_adaptiveMode = adapt;
+}
static U32 g_ldmFlag = 0;
void FIO_setLdmFlag(unsigned ldmFlag) {
g_ldmFlag = (ldmFlag>0);
#ifdef ZSTD_GZCOMPRESS
-static unsigned long long FIO_compressGzFrame(cRess_t* ress,
+static unsigned long long
+FIO_compressGzFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize)
{
#ifdef ZSTD_LZMACOMPRESS
-static unsigned long long FIO_compressLzmaFrame(cRess_t* ress,
- const char* srcFileName, U64 const srcFileSize,
- int compressionLevel, U64* readsize, int plain_lzma)
+static unsigned long long
+FIO_compressLzmaFrame(cRess_t* ress,
+ const char* srcFileName, U64 const srcFileSize,
+ int compressionLevel, U64* readsize, int plain_lzma)
{
unsigned long long inFileSize = 0, outFileSize = 0;
lzma_stream strm = LZMA_STREAM_INIT;
#define LZ4F_max64KB max64KB
#endif
static int FIO_LZ4_GetBlockSize_FromBlockId (int id) { return (1 << (8 + (2 * id))); }
-static unsigned long long FIO_compressLz4Frame(cRess_t* ress,
- const char* srcFileName, U64 const srcFileSize,
- int compressionLevel, U64* readsize)
+static unsigned long long
+FIO_compressLz4Frame(cRess_t* ress,
+ const char* srcFileName, U64 const srcFileSize,
+ int compressionLevel, U64* readsize)
{
const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
unsigned long long inFileSize = 0, outFileSize = 0;
/* count stats */
inputPresented++;
- if (oldIPos == inBuff.pos) inputBlocked++;
+ if (oldIPos == inBuff.pos) inputBlocked++; /* input buffer is full and can't take any more : input speed is faster than consumption rate */
if (!toFlushNow) flushWaiting = 1;
/* Write compressed stream */
(U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
- if (sizeCheck!=outBuff.pos)
+ if (sizeCheck != outBuff.pos)
EXM_THROW(25, "Write error : cannot write compressed block");
compressedfilesize += outBuff.pos;
}
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
/* check output speed */
- if (zfp.currentJobID > 1) {
- static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };
+ if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
+ static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; /* note : requires fileio to run main thread */
unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
assert(zfp.produced >= cpszfp.produced);
-
- cpszfp = zfp;
+ assert(g_nbWorkers >= 1);
if ( (zfp.ingested == cpszfp.ingested) /* no data read : input buffer full */
&& (zfp.consumed == cpszfp.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
- && (zfp.currentJobID > 0) /* first job started : only remaining reason is no more available buffer to start compression */
) {
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
speedChange = slower;
}
+ cpszfp = zfp;
+
if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
) {