]> git.ipfire.org Git - thirdparty/opentracker.git/commitdiff
Add support for zstd
authorDirk Engling <erdgeist@erdgeist.org>
Thu, 18 Apr 2024 12:54:34 +0000 (14:54 +0200)
committerDirk Engling <erdgeist@erdgeist.org>
Thu, 18 Apr 2024 12:54:34 +0000 (14:54 +0200)
Makefile
ot_fullscrape.c
ot_http.c
ot_http.h
ot_mutex.h
ot_stats.h

index a224845e22e8785f249a33b2c8a49eab54cac10a..e5ca6e4de9fb1fa9b3d53dde5105fdb9c271e2db 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -27,6 +27,11 @@ STRIP?=strip
 #FEATURES+=-DWANT_IP_FROM_QUERY_STRING
 FEATURES+=-DWANT_COMPRESSION_GZIP
 FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS
+
+#FEATURES+=-DWANT_COMPRESSION_ZSTD
+#FEATURES+=-DWANT_COMPRESSION_ZSTD_ALWAYS
+#LDFLAGS+=-lzstd
+
 #FEATURES+=-DWANT_LOG_NETWORKS
 #FEATURES+=-DWANT_RESTRICT_STATS
 #FEATURES+=-DWANT_IP_FROM_PROXY
index aed2ad976c6e90ce319f4f3def73628130021a6f..6fd6d1cde30b19e1aa954c2cbb4ed57cf745926a 100644 (file)
 #ifdef WANT_COMPRESSION_GZIP
 #include <zlib.h>
 #endif
+#ifdef WANT_COMPRESSION_ZSTD
+#include <zstd.h>
+#endif
+
 
 /* Libowfat */
 #include "byte.h"
@@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode);
 #ifdef WANT_COMPRESSION_GZIP
 static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
 #endif
+#ifdef WANT_COMPRESSION_ZSTD
+static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
+#endif
 
 /* Converter function from memory to human readable hex strings
    XXX - Duplicated from ot_stats. Needs fix. */
@@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) {
   while (g_opentracker_running) {
     ot_tasktype tasktype = TASK_FULLSCRAPE;
     ot_taskid   taskid   = mutex_workqueue_poptask(&tasktype);
+#ifdef WANT_COMPRESSION_ZSTD
+    if (tasktype & TASK_FLAG_ZSTD)
+      fullscrape_make_zstd(taskid, tasktype);
+    else
+#endif
 #ifdef WANT_COMPRESSION_GZIP
     if (tasktype & TASK_FLAG_GZIP)
       fullscrape_make_gzip(taskid, tasktype);
@@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
   struct iovec iovector = {NULL, 0};
   int          zres;
   z_stream     strm;
-  fprintf(stderr, "GZIP path\n");
   /* Setup return vector... */
   iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
   if (!iovector.iov_base)
@@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
     mutex_bucket_unlock(bucket, 0);
 
     /* Parent thread died? */
-    if (!g_opentracker_running)
+    if (!g_opentracker_running) {
+      deflateEnd(&strm);
       return;
+    }
   }
 
   if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
@@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
   iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
   if (mutex_workqueue_pushchunked(taskid, &iovector)) {
     free(iovector.iov_base);
-    return mutex_bucket_unlock(bucket, 0);
+    deflateEnd(&strm);
+    return;
   }
 
   /* Check if there's a last batch of data in the zlib buffer */
@@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
       if (!iovector.iov_base) {
         fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
         deflateEnd(&strm);
-        return mutex_bucket_unlock(bucket, 0);
+        return;
       }
       strm.next_out  = iovector.iov_base;
       strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
@@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) {
 /* WANT_COMPRESSION_GZIP */
 #endif
 
+#ifdef WANT_COMPRESSION_ZSTD
+
+static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
+  int            bucket;
+  char          *r;
+  struct iovec   iovector = {NULL, 0};
+  ZSTD_CCtx     *zstream = ZSTD_createCCtx();
+  ZSTD_inBuffer  inbuf;
+  ZSTD_outBuffer outbuf;
+  size_t         more_bytes;
+
+  if (!zstream)
+    return;
+
+  /* Setup return vector... */
+  iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
+  if (!iovector.iov_base) {
+    ZSTD_freeCCtx(zstream);
+    return;
+  }
+
+  /* Working with a compression level 6 is half as fast as level 3, but
+     seems to be the last reasonable bump that's worth extra cpu */
+  ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
+
+  outbuf.dst  = iovector.iov_base;
+  outbuf.size = OT_SCRAPE_CHUNK_SIZE;
+  outbuf.pos  = 0;
+
+  if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
+    inbuf.src  = (const void *)"d5:filesd";
+    inbuf.size = strlen("d5:filesd");
+    inbuf.pos  = 0;
+    ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
+  }
+
+  /* For each bucket... */
+  for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
+    /* Get exclusive access to that bucket */
+    ot_vector  *torrents_list = mutex_bucket_lock(bucket);
+    ot_torrent *torrents      = (ot_torrent *)(torrents_list->data);
+    size_t      i;
+
+    /* For each torrent in this bucket.. */
+    for (i = 0; i < torrents_list->size; ++i) {
+      char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
+      r             = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
+      inbuf.src     = compress_buffer;
+      inbuf.size    = r - compress_buffer;
+      inbuf.pos     = 0;
+      ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
+
+      /* Check if there still is enough buffer left */
+      while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
+        iovector.iov_len = outbuf.size;
+
+        if (mutex_workqueue_pushchunked(taskid, &iovector)) {
+          free(iovector.iov_base);
+          ZSTD_freeCCtx(zstream);
+          return mutex_bucket_unlock(bucket, 0);
+        }
+        /* Allocate a fresh output buffer */
+        iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
+        if (!iovector.iov_base) {
+          fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
+          ZSTD_freeCCtx(zstream);
+          return mutex_bucket_unlock(bucket, 0);
+        }
+
+        outbuf.dst  = iovector.iov_base;
+        outbuf.size = OT_SCRAPE_CHUNK_SIZE;
+        outbuf.pos  = 0;
+
+        ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
+      }
+    }
+
+    /* All torrents done: release lock on current bucket */
+    mutex_bucket_unlock(bucket, 0);
+
+    /* Parent thread died? */
+    if (!g_opentracker_running)
+      return;
+  }
+
+  if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
+    inbuf.src  = (const void *)"ee";
+    inbuf.size = strlen("ee");
+    inbuf.pos  = 0;
+  }
+
+  more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
+
+  iovector.iov_len = outbuf.pos;
+  if (mutex_workqueue_pushchunked(taskid, &iovector)) {
+    free(iovector.iov_base);
+    ZSTD_freeCCtx(zstream);
+    return;
+  }
+
+  /* Check if there's a last batch of data in the zlib buffer */
+  if (more_bytes) {
+      /* Allocate a fresh output buffer */
+      iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
+
+      if (!iovector.iov_base) {
+        fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
+        ZSTD_freeCCtx(zstream);
+        return;
+      }
+
+      outbuf.dst  = iovector.iov_base;
+      outbuf.size = OT_SCRAPE_CHUNK_SIZE;
+      outbuf.pos  = 0;
+
+      ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
+
+      /* Only pass the new buffer if there actually was some data left in the buffer */
+      iovector.iov_len = outbuf.pos;
+      if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
+        free(iovector.iov_base);
+  }
+
+  ZSTD_freeCCtx(zstream);
+}
+/* WANT_COMPRESSION_ZSTD */
+#endif
+
 /* WANT_FULLSCRAPE */
 #endif
index cd2dfc144ed8cd4cf4017929be0f0526545d84ad..af3f210ce3def4c71cd4a4f56a33c01a8929677f 100644 (file)
--- a/ot_http.c
+++ b/ot_http.c
@@ -159,7 +159,9 @@ ssize_t http_sendiovecdata(const int64 sock, struct ot_workstruct *ws, int iovec
 
   if (iovec_entries) {
 
-    if (cookie->flag & STRUCT_HTTP_FLAG_GZIP)
+    if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD)
+      encoding = "Content-Encoding: zstd\r\n";
+    else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP)
       encoding = "Content-Encoding: gzip\r\n";
     else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2)
       encoding = "Content-Encoding: bzip2\r\n";
@@ -369,19 +371,34 @@ static ssize_t http_handle_fullscrape(const int64 sock, struct ot_workstruct *ws
   }
 #endif
 
-#ifdef WANT_COMPRESSION_GZIP
+
+#if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD)
   ws->request[ws->request_size - 1] = 0;
-#ifndef WANT_COMPRESSION_GZIP_ALWAYS
+#ifdef WANT_COMPRESSION_GZIP
   if (strstr(ws->request, "gzip")) {
-#endif
     cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
-    format        = TASK_FLAG_GZIP;
-    stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip);
-#ifndef WANT_COMPRESSION_GZIP_ALWAYS
-  } else
+    format       |= TASK_FLAG_GZIP;
+  }
 #endif
+#ifdef WANT_COMPRESSION_ZSTD
+  if (strstr(ws->request, "zstd")) {
+    cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
+    format       |= TASK_FLAG_ZSTD;
+  }
+#endif
+
+#if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS)
+  cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
+  format       |= TASK_FLAG_ZSTD;
 #endif
-    stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip);
+
+#if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS)
+  cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
+  format       |= TASK_FLAG_GZIP;
+#endif
+#endif
+
+  stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip);
 
 #ifdef _DEBUG_HTTPERROR
   fprintf(stderr, "%s", ws->debugbuf);
index fecb4eb1d74ca862d518e85593079a7163194d76..b5ae9ff78be58f8fb9118de0abbc3c119df1bab9 100644 (file)
--- a/ot_http.h
+++ b/ot_http.h
@@ -10,8 +10,9 @@ typedef enum {
   STRUCT_HTTP_FLAG_WAITINGFORTASK      = 1,
   STRUCT_HTTP_FLAG_GZIP                = 2,
   STRUCT_HTTP_FLAG_BZIP2               = 4,
-  STRUCT_HTTP_FLAG_CHUNKED             = 8,
-  STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16
+  STRUCT_HTTP_FLAG_ZSTD                = 8,
+  STRUCT_HTTP_FLAG_CHUNKED             = 16,
+  STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 32
 } STRUCT_HTTP_FLAG;
 
 struct http_data {
index 66b627f7f30f3571ff0ce84215fd34734fa9c2be..cdfabc9bed0c191ec0f079f198aca63217c905c8 100644 (file)
@@ -59,7 +59,8 @@ typedef enum {
 
   TASK_FLAG_GZIP                 = 0x1000,
   TASK_FLAG_BZIP2                = 0x2000,
-  TASK_FLAG_CHUNKED              = 0x4000,
+  TASK_FLAG_ZSTD                 = 0x4000,
+  TASK_FLAG_CHUNKED              = 0x8000,
 
   TASK_TASK_MASK                 = 0x0fff,
   TASK_CLASS_MASK                = 0x0f00,
index 4f75049d998f13cb4508f52b8c4cd6b76ac21c4a..8ed2b1e9a02b26e1e673fb4ca6165b14f4a2a4e2 100644 (file)
@@ -19,6 +19,7 @@ typedef enum {
   EVENT_SCRAPE,
   EVENT_FULLSCRAPE_REQUEST,
   EVENT_FULLSCRAPE_REQUEST_GZIP,
+  EVENT_FULLSCRAPE_REQUEST_ZSTD,
   EVENT_FULLSCRAPE, /* TCP only */
   EVENT_FAILED,
   EVENT_BUCKET_LOCKED,