From: Yann Collet Date: Thu, 18 Jan 2018 22:39:51 +0000 (-0800) Subject: added POOL_tryAdd() X-Git-Tag: v1.3.4~1^2~67^2~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=997e4d0ccd4d4122949ab6726569542285f6b85b;p=thirdparty%2Fzstd.git added POOL_tryAdd() --- diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index 473d5be8b..dc3f8be1b 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -582,10 +582,16 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict* If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN. If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end. For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs, - but it may change to mean "empty" in some future version, so prefer using macro ZSTD_CONTENTSIZE_UNKNOWN. + but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead. @return : 0, or an error code (which can be tested using ZSTD_isError())


+
typedef struct {
+    unsigned long long ingested;
+    unsigned long long consumed;
+    unsigned long long produced;
+} ZSTD_frameProgression;
+

Advanced Streaming decompression functions

typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e;
 size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue);   /* obsolete : this API will be removed in a future version */
 size_t ZSTD_initDStream_usingDict(ZSTD_DStream* zds, const void* dict, size_t dictSize); /**< note: no dictionary will be used if dict == NULL or dictSize < 8 */
diff --git a/lib/common/pool.c b/lib/common/pool.c
index 98b109e72..773488b07 100644
--- a/lib/common/pool.c
+++ b/lib/common/pool.c
@@ -12,6 +12,7 @@
 /* ======   Dependencies   ======= */
 #include   /* size_t */
 #include "pool.h"
+#include "zstd_internal.h"  /* ZSTD_malloc, ZSTD_free */
 
 /* ======   Compiler specifics   ====== */
 #if defined(_MSC_VER)
@@ -193,32 +194,54 @@ static int isQueueFull(POOL_ctx const* ctx) {
     }
 }
 
-void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
-    POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
-    if (!ctx) { return; }
 
+static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
+{
+    POOL_job const job = {function, opaque};
+    assert(ctx != NULL);
+    if (ctx->shutdown) return;
+
+    ctx->queueEmpty = 0;
+    ctx->queue[ctx->queueTail] = job;
+    ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
+    ZSTD_pthread_cond_signal(&ctx->queuePopCond);
+}
+
+void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
+{
+    assert(ctx != NULL);
     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
-    {   POOL_job const job = {function, opaque};
+    /* Wait until there is space in the queue for the new job */
+    while (isQueueFull(ctx) && (!ctx->shutdown)) {
+        ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
+    }
+    POOL_add_internal(ctx, function, opaque);
+    ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+}
 
-        /* Wait until there is space in the queue for the new job */
-        while (isQueueFull(ctx) && !ctx->shutdown) {
-          ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
-        }
-        /* The queue is still going => there is space */
-        if (!ctx->shutdown) {
-            ctx->queueEmpty = 0;
-            ctx->queue[ctx->queueTail] = job;
-            ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
-        }
+
+int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
+{
+    assert(ctx != NULL);
+    ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+    if (isQueueFull(ctx)) {
+        ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+        return 0;
     }
+    POOL_add_internal(ctx, function, opaque);
     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
-    ZSTD_pthread_cond_signal(&ctx->queuePopCond);
+    return 1;
 }
 
+
 #else  /* ZSTD_MULTITHREAD  not defined */
+
+/* ========================== */
 /* No multi-threading support */
+/* ========================== */
 
-/* We don't need any data, but if it is empty malloc() might return NULL. */
+
+/* We don't need any data, but if it is empty, malloc() might return NULL. */
 struct POOL_ctx_s {
     int dummy;
 };
@@ -240,9 +263,15 @@ void POOL_free(POOL_ctx* ctx) {
     (void)ctx;
 }
 
-void POOL_add(void* ctx, POOL_function function, void* opaque) {
+void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
+    (void)ctx;
+    function(opaque);
+}
+
+int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
     (void)ctx;
     function(opaque);
+    return 1;
 }
 
 size_t POOL_sizeof(POOL_ctx* ctx) {
diff --git a/lib/common/pool.h b/lib/common/pool.h
index 08c63715a..a57e9b4fa 100644
--- a/lib/common/pool.h
+++ b/lib/common/pool.h
@@ -17,7 +17,8 @@ extern "C" {
 
 
 #include    /* size_t */
-#include "zstd_internal.h"   /* ZSTD_customMem */
+#define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_customMem */
+#include "zstd.h"
 
 typedef struct POOL_ctx_s POOL_ctx;
 
@@ -27,35 +28,43 @@ typedef struct POOL_ctx_s POOL_ctx;
  *  The maximum number of queued jobs before blocking is `queueSize`.
  * @return : POOL_ctx pointer on success, else NULL.
 */
-POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);
+POOL_ctx* POOL_create(size_t numThreads, size_t queueSize);
 
-POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
+POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
 
 /*! POOL_free() :
     Free a thread pool returned by POOL_create().
 */
-void POOL_free(POOL_ctx *ctx);
+void POOL_free(POOL_ctx* ctx);
 
 /*! POOL_sizeof() :
     return memory usage of pool returned by POOL_create().
 */
-size_t POOL_sizeof(POOL_ctx *ctx);
+size_t POOL_sizeof(POOL_ctx* ctx);
 
 /*! POOL_function :
     The function type that can be added to a thread pool.
 */
-typedef void (*POOL_function)(void *);
+typedef void (*POOL_function)(void*);
 /*! POOL_add_function :
     The function type for a generic thread pool add function.
 */
-typedef void (*POOL_add_function)(void *, POOL_function, void *);
+typedef void (*POOL_add_function)(void*, POOL_function, void*);
 
 /*! POOL_add() :
-    Add the job `function(opaque)` to the thread pool.
+    Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
     Possibly blocks until there is room in the queue.
     Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
 */
-void POOL_add(void *ctx, POOL_function function, void *opaque);
+void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
+
+
+/*! POOL_tryAdd() :
+    Add the job `function(opaque)` to the thread pool if a worker is available.
+    return immediately otherwise.
+   @return : 1 if successful, 0 if not.
+*/
+int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);
 
 
 #if defined (__cplusplus)