]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added POOL_tryAdd()
authorYann Collet <cyan@fb.com>
Thu, 18 Jan 2018 22:39:51 +0000 (14:39 -0800)
committerYann Collet <cyan@fb.com>
Thu, 18 Jan 2018 22:39:51 +0000 (14:39 -0800)
doc/zstd_manual.html
lib/common/pool.c
lib/common/pool.h

index 473d5be8b2e24b03b689b3720335fba9f3bef41d..dc3f8be1bcb6a56e3ed07e426098604939c6eba6 100644 (file)
@@ -582,10 +582,16 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict*
   If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN.
   If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end.
   For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs,
-  but it may change to mean "empty" in some future version, so prefer using macro ZSTD_CONTENTSIZE_UNKNOWN.
+  but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead.
  @return : 0, or an error code (which can be tested using ZSTD_isError()) 
 </p></pre><BR>
 
+<pre><b>typedef struct {
+    unsigned long long ingested;
+    unsigned long long consumed;
+    unsigned long long produced;
+} ZSTD_frameProgression;
+</b></pre><BR>
 <h3>Advanced Streaming decompression functions</h3><pre></pre><b><pre>typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e;
 size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue);   </b>/* obsolete : this API will be removed in a future version */<b>
 size_t ZSTD_initDStream_usingDict(ZSTD_DStream* zds, const void* dict, size_t dictSize); </b>/**< note: no dictionary will be used if dict == NULL or dictSize < 8 */<b>
index 98b109e72a8699ef17a58ef7adf9245af97d356c..773488b072558119ddfd172d4dbd499db8e0d77a 100644 (file)
@@ -12,6 +12,7 @@
 /* ======   Dependencies   ======= */
 #include <stddef.h>  /* size_t */
 #include "pool.h"
+#include "zstd_internal.h"  /* ZSTD_malloc, ZSTD_free */
 
 /* ======   Compiler specifics   ====== */
 #if defined(_MSC_VER)
@@ -193,32 +194,54 @@ static int isQueueFull(POOL_ctx const* ctx) {
     }
 }
 
-void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
-    POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
-    if (!ctx) { return; }
 
+static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
+{
+    POOL_job const job = {function, opaque};
+    assert(ctx != NULL);
+    if (ctx->shutdown) return;
+
+    ctx->queueEmpty = 0;
+    ctx->queue[ctx->queueTail] = job;
+    ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
+    ZSTD_pthread_cond_signal(&ctx->queuePopCond);
+}
+
+void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
+{
+    assert(ctx != NULL);
     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
-    {   POOL_job const job = {function, opaque};
+    /* Wait until there is space in the queue for the new job */
+    while (isQueueFull(ctx) && (!ctx->shutdown)) {
+        ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
+    }
+    POOL_add_internal(ctx, function, opaque);
+    ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+}
 
-        /* Wait until there is space in the queue for the new job */
-        while (isQueueFull(ctx) && !ctx->shutdown) {
-          ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
-        }
-        /* The queue is still going => there is space */
-        if (!ctx->shutdown) {
-            ctx->queueEmpty = 0;
-            ctx->queue[ctx->queueTail] = job;
-            ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
-        }
+
+int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
+{
+    assert(ctx != NULL);
+    ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+    if (isQueueFull(ctx)) {
+        ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+        return 0;
     }
+    POOL_add_internal(ctx, function, opaque);
     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
-    ZSTD_pthread_cond_signal(&ctx->queuePopCond);
+    return 1;
 }
 
+
 #else  /* ZSTD_MULTITHREAD  not defined */
+
+/* ========================== */
 /* No multi-threading support */
+/* ========================== */
 
-/* We don't need any data, but if it is empty malloc() might return NULL. */
+
+/* We don't need any data, but if it is empty, malloc() might return NULL. */
 struct POOL_ctx_s {
     int dummy;
 };
@@ -240,9 +263,15 @@ void POOL_free(POOL_ctx* ctx) {
     (void)ctx;
 }
 
-void POOL_add(void* ctx, POOL_function function, void* opaque) {
+void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
+    (void)ctx;
+    function(opaque);
+}
+
+int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
     (void)ctx;
     function(opaque);
+    return 1;
 }
 
 size_t POOL_sizeof(POOL_ctx* ctx) {
index 08c63715aaa6833ed7e93787a72d7ca045d9ea3e..a57e9b4fabc2dc6d4bb818a1d3696372b9083680 100644 (file)
@@ -17,7 +17,8 @@ extern "C" {
 
 
 #include <stddef.h>   /* size_t */
-#include "zstd_internal.h"   /* ZSTD_customMem */
+#define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_customMem */
+#include "zstd.h"
 
 typedef struct POOL_ctx_s POOL_ctx;
 
@@ -27,35 +28,43 @@ typedef struct POOL_ctx_s POOL_ctx;
  *  The maximum number of queued jobs before blocking is `queueSize`.
  * @return : POOL_ctx pointer on success, else NULL.
 */
-POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);
+POOL_ctxPOOL_create(size_t numThreads, size_t queueSize);
 
-POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
+POOL_ctxPOOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
 
 /*! POOL_free() :
     Free a thread pool returned by POOL_create().
 */
-void POOL_free(POOL_ctx *ctx);
+void POOL_free(POOL_ctxctx);
 
 /*! POOL_sizeof() :
     return memory usage of pool returned by POOL_create().
 */
-size_t POOL_sizeof(POOL_ctx *ctx);
+size_t POOL_sizeof(POOL_ctxctx);
 
 /*! POOL_function :
     The function type that can be added to a thread pool.
 */
-typedef void (*POOL_function)(void *);
+typedef void (*POOL_function)(void*);
 /*! POOL_add_function :
     The function type for a generic thread pool add function.
 */
-typedef void (*POOL_add_function)(void *, POOL_function, void *);
+typedef void (*POOL_add_function)(void*, POOL_function, void*);
 
 /*! POOL_add() :
-    Add the job `function(opaque)` to the thread pool.
+    Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
     Possibly blocks until there is room in the queue.
     Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
 */
-void POOL_add(void *ctx, POOL_function function, void *opaque);
+void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
+
+
+/*! POOL_tryAdd() :
+    Add the job `function(opaque)` to the thread pool if a worker is available.
+    return immediately otherwise.
+   @return : 1 if successful, 0 if not.
+*/
+int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);
 
 
 #if defined (__cplusplus)