]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
AsyncIO performance regression for small files fix (#3474)
authorYonatan Komornik <11005061+yoniko@users.noreply.github.com>
Thu, 2 Feb 2023 23:19:22 +0000 (15:19 -0800)
committerGitHub <noreply@github.com>
Thu, 2 Feb 2023 23:19:22 +0000 (15:19 -0800)
- Do not use threaded AsyncIO when handling small files.
- Some typo / doc fixes

programs/fileio.c
programs/fileio_asyncio.c
programs/fileio_asyncio.h

index fb71804d6a9384d199dbf73097c57eef525c6732..9a8300cdd834209d049cacf3debc69a766344c87 100644 (file)
@@ -1758,6 +1758,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
     int result;
     FILE* srcFile;
     stat_t srcFileStat;
+    U64 fileSize = UTIL_FILESIZE_UNKNOWN;
     DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
 
     if (strcmp(srcFileName, stdinmark)) {
@@ -1790,6 +1791,17 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
     srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
     if (srcFile == NULL) return 1;   /* srcFile could not be opened */
 
+    /* Don't use AsyncIO for small files */
+    if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
+        fileSize = UTIL_getFileSizeStat(&srcFileStat);
+    if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
+        AIO_ReadPool_setAsync(ress.readCtx, 0);
+        AIO_WritePool_setAsync(ress.writeCtx, 0);
+    } else {
+        AIO_ReadPool_setAsync(ress.readCtx, 1);
+        AIO_WritePool_setAsync(ress.writeCtx, 1);
+    }
+
     AIO_ReadPool_setFile(ress.readCtx, srcFile);
     result = FIO_compressFilename_dstFile(
             fCtx, prefs, ress,
@@ -2586,6 +2598,7 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
     FILE* srcFile;
     stat_t srcFileStat;
     int result;
+    U64 fileSize = UTIL_FILESIZE_UNKNOWN;
 
     if (UTIL_isDirectory(srcFileName)) {
         DISPLAYLEVEL(1, "zstd: %s is a directory -- ignored \n", srcFileName);
@@ -2594,6 +2607,18 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
 
     srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
     if (srcFile==NULL) return 1;
+
+    /* Don't use AsyncIO for small files */
+    if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
+        fileSize = UTIL_getFileSizeStat(&srcFileStat);
+    if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
+        AIO_ReadPool_setAsync(ress.readCtx, 0);
+        AIO_WritePool_setAsync(ress.writeCtx, 0);
+    } else {
+        AIO_ReadPool_setAsync(ress.readCtx, 1);
+        AIO_WritePool_setAsync(ress.writeCtx, 1);
+    }
+
     AIO_ReadPool_setFile(ress.readCtx, srcFile);
 
     result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName, &srcFileStat);
index 8f12fe1f9f723afcf781abba1f7a03d43c83034c..fe9cca95d1f8af235bb926ae630d3d41d6962713 100644 (file)
@@ -140,7 +140,7 @@ int AIO_supported(void) {
 }
 
 /* ***********************************
- *  General IoPool implementation
+ *  Generic IoPool implementation
  *************************************/
 
 static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
@@ -163,20 +163,22 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
  * Displays warning if asyncio is requested but MT isn't available. */
 static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
     ctx->threadPool = NULL;
+    ctx->threadPoolActive = 0;
     if(prefs->asyncIO) {
         if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
-            EXM_THROW(102,"Failed creating write availableJobs mutex");
+            EXM_THROW(102,"Failed creating ioJobsMutex mutex");
         /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
          * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
         assert(MAX_IO_JOBS >= 2);
         ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
+        ctx->threadPoolActive = 1;
         if (!ctx->threadPool)
-            EXM_THROW(104, "Failed creating writer thread pool");
+            EXM_THROW(104, "Failed creating I/O thread pool");
     }
 }
 
 /* AIO_IOPool_init:
- * Allocates and sets and a new write pool including its included availableJobs. */
+ * Allocates and sets and a new I/O thread pool including its included availableJobs. */
 static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
     int i;
     AIO_IOPool_createThreadPool(ctx, prefs);
@@ -192,27 +194,59 @@ static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_fun
 }
 
 
+/* AIO_IOPool_threadPoolActive:
+ * Check if current operation uses thread pool.
+ * Note that in some cases we have a thread pool initialized but choose not to use it. */
+static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
+    return ctx->threadPool && ctx->threadPoolActive;
+}
+
+
+/* AIO_IOPool_lockJobsMutex:
+ * Locks the IO jobs mutex if threading is active */
+static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
+    if(AIO_IOPool_threadPoolActive(ctx))
+        ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+}
+
+/* AIO_IOPool_unlockJobsMutex:
+ * Unlocks the IO jobs mutex if threading is active */
+static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
+    if(AIO_IOPool_threadPoolActive(ctx))
+        ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+}
+
 /* AIO_IOPool_releaseIoJob:
  * Releases an acquired job back to the pool. Doesn't execute the job. */
 static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
     IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
-    if(ctx->threadPool)
-        ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+    AIO_IOPool_lockJobsMutex(ctx);
     assert(ctx->availableJobsCount < ctx->totalIoJobs);
     ctx->availableJobs[ctx->availableJobsCount++] = job;
-    if(ctx->threadPool)
-        ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+    AIO_IOPool_unlockJobsMutex(ctx);
 }
 
 /* AIO_IOPool_join:
  * Waits for all tasks in the pool to finish executing. */
 static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
-    if(ctx->threadPool)
+    if(AIO_IOPool_threadPoolActive(ctx))
         POOL_joinJobs(ctx->threadPool);
 }
 
+/* AIO_IOPool_setThreaded:
+ * Allows (de)activating threaded mode, to be used when the expected overhead
+ * of threading costs more than the expected gains. */
+static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
+    assert(threaded == 0 || threaded == 1);
+    assert(ctx != NULL);
+    if(ctx->threadPoolActive != threaded) {
+        AIO_IOPool_join(ctx);
+        ctx->threadPoolActive = threaded;
+    }
+}
+
 /* AIO_IOPool_free:
- * Release a previously allocated write thread pool. Makes sure all takss are done and released. */
+ * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
 static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
     int i;
     if(ctx->threadPool) {
@@ -236,12 +270,10 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
 static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
     IOJob_t *job;
     assert(ctx->file != NULL || ctx->prefs->testMode);
-    if(ctx->threadPool)
-        ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
+    AIO_IOPool_lockJobsMutex(ctx);
     assert(ctx->availableJobsCount > 0);
     job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
-    if(ctx->threadPool)
-        ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
+    AIO_IOPool_unlockJobsMutex(ctx);
     job->usedBufferSize = 0;
     job->file = ctx->file;
     job->offset = 0;
@@ -251,8 +283,7 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
 
 /* AIO_IOPool_setFile:
  * Sets the destination file for future files in the pool.
- * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
- * Also requires ending of sparse write if a previous file was used in sparse mode. */
+ * Requires completion of all queued jobs and release of all otherwise acquired jobs. */
 static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
     assert(ctx!=NULL);
     AIO_IOPool_join(ctx);
@@ -269,7 +300,7 @@ static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
  * The queued job shouldn't be used directly after queueing it. */
 static void AIO_IOPool_enqueueJob(IOJob_t* job) {
     IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
-    if(ctx->threadPool)
+    if(AIO_IOPool_threadPoolActive(ctx))
         POOL_add(ctx->threadPool, ctx->poolFunction, job);
     else
         ctx->poolFunction(job);
@@ -300,8 +331,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
  * Blocks on completion of all current write jobs before executing. */
 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
     assert(ctx != NULL);
-    if(ctx->base.threadPool)
-        POOL_joinJobs(ctx->base.threadPool);
+    AIO_IOPool_join(&ctx->base);
     AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
     ctx->storedSkips = 0;
 }
@@ -368,6 +398,13 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
     free(ctx);
 }
 
+/* AIO_WritePool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
+    AIO_IOPool_setThreaded(&ctx->base, async);
+}
+
 
 /* ***********************************
  *  ReadPool implementation
@@ -383,14 +420,13 @@ static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
 
 static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
     ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
-    if(ctx->base.threadPool)
-        ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+    AIO_IOPool_lockJobsMutex(&ctx->base);
     assert(ctx->completedJobsCount < MAX_IO_JOBS);
     ctx->completedJobs[ctx->completedJobsCount++] = job;
-    if(ctx->base.threadPool) {
+    if(AIO_IOPool_threadPoolActive(&ctx->base)) {
         ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
-        ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
     }
+    AIO_IOPool_unlockJobsMutex(&ctx->base);
 }
 
 /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
@@ -426,8 +462,7 @@ static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
  * Would block. */
 static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
     IOJob_t *job = NULL;
-    if (ctx->base.threadPool)
-        ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+    AIO_IOPool_lockJobsMutex(&ctx->base);
 
     job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
 
@@ -443,8 +478,7 @@ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
         ctx->waitingOnOffset += job->usedBufferSize;
     }
 
-    if (ctx->base.threadPool)
-        ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
+    AIO_IOPool_unlockJobsMutex(&ctx->base);
     return job;
 }
 
@@ -524,7 +558,7 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize)
 
     if(ctx->base.threadPool)
         if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
-            EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
+            EXM_THROW(103,"Failed creating jobCompletedCond cond");
 
     return ctx;
 }
@@ -620,3 +654,10 @@ int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
     AIO_ReadPool_setFile(ctx, NULL);
     return fclose(file);
 }
+
+/* AIO_ReadPool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
+    AIO_IOPool_setThreaded(&ctx->base, async);
+}
index 34dad6f4da807dfb8521bd347b67d55f704f5aa0..feb25a3f9e92c5de5d971f3cdda0909b9b1a896c 100644 (file)
@@ -8,6 +8,17 @@
  * You may select, at your option, one of the above-listed licenses.
  */
 
+ /*
+  * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
+  * Current implementation relies on having one thread that reads and one that
+  * writes.
+  * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
+  * are performed serially by the appropriate worker thread.
+  * Most systems exposes better primitives to perform asynchronous IO, such as
+  * io_uring on newer linux systems. The API is built in such a way that in the
+  * future we could replace the threads with better solutions when available.
+  */
+
 #ifndef ZSTD_FILEIO_ASYNCIO_H
 #define ZSTD_FILEIO_ASYNCIO_H
 
@@ -27,6 +38,7 @@ extern "C" {
 typedef struct {
     /* These struct fields should be set only on creation and not changed afterwards */
     POOL_ctx* threadPool;
+    int threadPoolActive;
     int totalIoJobs;
     const FIO_prefs_t* prefs;
     POOL_function poolFunction;
@@ -136,6 +148,11 @@ WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize
  * Frees and releases a writePool and its resources. Closes destination file. */
 void AIO_WritePool_free(WritePoolCtx_t* ctx);
 
+/* AIO_WritePool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
+
 /* AIO_ReadPool_create:
  * Allocates and sets and a new readPool including its included jobs.
  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
@@ -146,6 +163,11 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
  * Frees and releases a readPool and its resources. Closes source file. */
 void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
 
+/* AIO_ReadPool_setAsync:
+ * Allows (de)activating async mode, to be used when the expected overhead
+ * of asyncio costs more than the expected gains. */
+void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
+
 /* AIO_ReadPool_consumeBytes:
  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);