int result;
FILE* srcFile;
stat_t srcFileStat;
+ U64 fileSize = UTIL_FILESIZE_UNKNOWN;
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
if (strcmp(srcFileName, stdinmark)) {
srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
if (srcFile == NULL) return 1; /* srcFile could not be opened */
+ /* Don't use AsyncIO for small files */
+ if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
+ fileSize = UTIL_getFileSizeStat(&srcFileStat);
+ if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
+ AIO_ReadPool_setAsync(ress.readCtx, 0);
+ AIO_WritePool_setAsync(ress.writeCtx, 0);
+ } else {
+ AIO_ReadPool_setAsync(ress.readCtx, 1);
+ AIO_WritePool_setAsync(ress.writeCtx, 1);
+ }
+
AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_compressFilename_dstFile(
fCtx, prefs, ress,
FILE* srcFile;
stat_t srcFileStat;
int result;
+ U64 fileSize = UTIL_FILESIZE_UNKNOWN;
if (UTIL_isDirectory(srcFileName)) {
DISPLAYLEVEL(1, "zstd: %s is a directory -- ignored \n", srcFileName);
srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
if (srcFile==NULL) return 1;
+
+ /* Don't use AsyncIO for small files */
+ if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
+ fileSize = UTIL_getFileSizeStat(&srcFileStat);
+ if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
+ AIO_ReadPool_setAsync(ress.readCtx, 0);
+ AIO_WritePool_setAsync(ress.writeCtx, 0);
+ } else {
+ AIO_ReadPool_setAsync(ress.readCtx, 1);
+ AIO_WritePool_setAsync(ress.writeCtx, 1);
+ }
+
AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName, &srcFileStat);
}
/* ***********************************
- * General IoPool implementation
+ * Generic IoPool implementation
*************************************/
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
* Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
ctx->threadPool = NULL;
+ ctx->threadPoolActive = 0;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
- EXM_THROW(102,"Failed creating write availableJobs mutex");
+ EXM_THROW(102,"Failed creating ioJobsMutex mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
+ ctx->threadPoolActive = 1;
if (!ctx->threadPool)
- EXM_THROW(104, "Failed creating writer thread pool");
+ EXM_THROW(104, "Failed creating I/O thread pool");
}
}
/* AIO_IOPool_init:
- * Allocates and sets and a new write pool including its included availableJobs. */
+ * Allocates and sets and a new I/O thread pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
}
+/* AIO_IOPool_threadPoolActive:
+ * Check if current operation uses thread pool.
+ * Note that in some cases we have a thread pool initialized but choose not to use it. */
+static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
+ return ctx->threadPool && ctx->threadPoolActive;
+}
+
+
+/* AIO_IOPool_lockJobsMutex:
+ * Locks the IO jobs mutex if threading is active */
+static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
+ if(AIO_IOPool_threadPoolActive(ctx))
+ ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+}
+
+/* AIO_IOPool_unlockJobsMutex:
+ * Unlocks the IO jobs mutex if threading is active */
+static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
+ if(AIO_IOPool_threadPoolActive(ctx))
+ ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+}
+
/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
- if(ctx->threadPool)
- ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+ AIO_IOPool_lockJobsMutex(ctx);
assert(ctx->availableJobsCount < ctx->totalIoJobs);
ctx->availableJobs[ctx->availableJobsCount++] = job;
- if(ctx->threadPool)
- ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+ AIO_IOPool_unlockJobsMutex(ctx);
}
/* AIO_IOPool_join:
* Waits for all tasks in the pool to finish executing. */
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
- if(ctx->threadPool)
+ if(AIO_IOPool_threadPoolActive(ctx))
POOL_joinJobs(ctx->threadPool);
}
+/* AIO_IOPool_setThreaded:
+ * Allows (de)activating threaded mode, to be used when the expected overhead
+ * of threading costs more than the expected gains. */
+static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
+ assert(threaded == 0 || threaded == 1);
+ assert(ctx != NULL);
+ if(ctx->threadPoolActive != threaded) {
+ AIO_IOPool_join(ctx);
+ ctx->threadPoolActive = threaded;
+ }
+}
+
/* AIO_IOPool_free:
- * Release a previously allocated write thread pool. Makes sure all takss are done and released. */
+ * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
int i;
if(ctx->threadPool) {
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
- if(ctx->threadPool)
- ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+ AIO_IOPool_lockJobsMutex(ctx);
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
- if(ctx->threadPool)
- ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+ AIO_IOPool_unlockJobsMutex(ctx);
job->usedBufferSize = 0;
job->file = ctx->file;
job->offset = 0;
/* AIO_IOPool_setFile:
* Sets the destination file for future files in the pool.
- * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
- * Also requires ending of sparse write if a previous file was used in sparse mode. */
+ * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
* The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
- if(ctx->threadPool)
+ if(AIO_IOPool_threadPoolActive(ctx))
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
ctx->poolFunction(job);
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
assert(ctx != NULL);
- if(ctx->base.threadPool)
- POOL_joinJobs(ctx->base.threadPool);
+ AIO_IOPool_join(&ctx->base);
AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
ctx->storedSkips = 0;
}
free(ctx);
}
+/* AIO_WritePool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
+ AIO_IOPool_setThreaded(&ctx->base, async);
+}
+
/* ***********************************
* ReadPool implementation
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
- if(ctx->base.threadPool)
- ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+ AIO_IOPool_lockJobsMutex(&ctx->base);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
ctx->completedJobs[ctx->completedJobsCount++] = job;
- if(ctx->base.threadPool) {
+ if(AIO_IOPool_threadPoolActive(&ctx->base)) {
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
- ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
}
+ AIO_IOPool_unlockJobsMutex(&ctx->base);
}
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
- if (ctx->base.threadPool)
- ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+ AIO_IOPool_lockJobsMutex(&ctx->base);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
ctx->waitingOnOffset += job->usedBufferSize;
}
- if (ctx->base.threadPool)
- ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
+ AIO_IOPool_unlockJobsMutex(&ctx->base);
return job;
}
if(ctx->base.threadPool)
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
- EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
+ EXM_THROW(103,"Failed creating jobCompletedCond cond");
return ctx;
}
AIO_ReadPool_setFile(ctx, NULL);
return fclose(file);
}
+
+/* AIO_ReadPool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
+ AIO_IOPool_setThreaded(&ctx->base, async);
+}
* You may select, at your option, one of the above-listed licenses.
*/
+ /*
+ * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
+ * Current implementation relies on having one thread that reads and one that
+ * writes.
+ * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
+ * are performed serially by the appropriate worker thread.
+ * Most systems exposes better primitives to perform asynchronous IO, such as
+ * io_uring on newer linux systems. The API is built in such a way that in the
+ * future we could replace the threads with better solutions when available.
+ */
+
#ifndef ZSTD_FILEIO_ASYNCIO_H
#define ZSTD_FILEIO_ASYNCIO_H
typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
+ int threadPoolActive;
int totalIoJobs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);
+/* AIO_WritePool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
+
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
+/* AIO_ReadPool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
+
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);