#ifdef WANT_COMPRESSION_GZIP
#include <zlib.h>
#endif
+#ifdef WANT_COMPRESSION_ZSTD
+#include <zstd.h>
+#endif
+
/* Libowfat */
#include "byte.h"
#ifdef WANT_COMPRESSION_GZIP
static void fullscrape_make_gzip(int taskid, ot_tasktype mode);
#endif
+#ifdef WANT_COMPRESSION_ZSTD
+static void fullscrape_make_zstd(int taskid, ot_tasktype mode);
+#endif
/* Converter function from memory to human readable hex strings
XXX - Duplicated from ot_stats. Needs fix. */
while (g_opentracker_running) {
ot_tasktype tasktype = TASK_FULLSCRAPE;
ot_taskid taskid = mutex_workqueue_poptask(&tasktype);
+#ifdef WANT_COMPRESSION_ZSTD
+ if (tasktype & TASK_FLAG_ZSTD)
+ fullscrape_make_zstd(taskid, tasktype);
+ else
+#endif
#ifdef WANT_COMPRESSION_GZIP
if (tasktype & TASK_FLAG_GZIP)
fullscrape_make_gzip(taskid, tasktype);
struct iovec iovector = {NULL, 0};
int zres;
z_stream strm;
- fprintf(stderr, "GZIP path\n");
/* Setup return vector... */
iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
if (!iovector.iov_base)
mutex_bucket_unlock(bucket, 0);
/* Parent thread died? */
- if (!g_opentracker_running)
+ if (!g_opentracker_running) {
+ deflateEnd(&strm);
return;
+ }
}
if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base;
if (mutex_workqueue_pushchunked(taskid, &iovector)) {
free(iovector.iov_base);
- return mutex_bucket_unlock(bucket, 0);
+ deflateEnd(&strm);
+ return;
}
/* Check if there's a last batch of data in the zlib buffer */
if (!iovector.iov_base) {
fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
deflateEnd(&strm);
- return mutex_bucket_unlock(bucket, 0);
+ return;
}
strm.next_out = iovector.iov_base;
strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
/* WANT_COMPRESSION_GZIP */
#endif
+#ifdef WANT_COMPRESSION_ZSTD
+
+static void fullscrape_make_zstd(int taskid, ot_tasktype mode) {
+ int bucket;
+ char *r;
+ struct iovec iovector = {NULL, 0};
+ ZSTD_CCtx *zstream = ZSTD_createCCtx();
+ ZSTD_inBuffer inbuf;
+ ZSTD_outBuffer outbuf;
+ size_t more_bytes;
+
+ if (!zstream)
+ return;
+
+ /* Setup return vector... */
+ iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
+ if (!iovector.iov_base) {
+ ZSTD_freeCCtx(zstream);
+ return;
+ }
+
+ /* Working with a compression level 6 is half as fast as level 3, but
+ seems to be the last reasonable bump that's worth extra cpu */
+ ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6);
+
+ outbuf.dst = iovector.iov_base;
+ outbuf.size = OT_SCRAPE_CHUNK_SIZE;
+ outbuf.pos = 0;
+
+ if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
+ inbuf.src = (const void *)"d5:filesd";
+ inbuf.size = strlen("d5:filesd");
+ inbuf.pos = 0;
+ ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
+ }
+
+ /* For each bucket... */
+ for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
+ /* Get exclusive access to that bucket */
+ ot_vector *torrents_list = mutex_bucket_lock(bucket);
+ ot_torrent *torrents = (ot_torrent *)(torrents_list->data);
+ size_t i;
+
+ /* For each torrent in this bucket.. */
+ for (i = 0; i < torrents_list->size; ++i) {
+ char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
+ r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash);
+ inbuf.src = compress_buffer;
+ inbuf.size = r - compress_buffer;
+ inbuf.pos = 0;
+ ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
+
+ /* Check if there still is enough buffer left */
+ while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) {
+ iovector.iov_len = outbuf.size;
+
+ if (mutex_workqueue_pushchunked(taskid, &iovector)) {
+ free(iovector.iov_base);
+ ZSTD_freeCCtx(zstream);
+ return mutex_bucket_unlock(bucket, 0);
+ }
+ /* Allocate a fresh output buffer */
+ iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
+ if (!iovector.iov_base) {
+ fprintf(stderr, "Out of memory trying to claim ouput buffer\n");
+ ZSTD_freeCCtx(zstream);
+ return mutex_bucket_unlock(bucket, 0);
+ }
+
+ outbuf.dst = iovector.iov_base;
+ outbuf.size = OT_SCRAPE_CHUNK_SIZE;
+ outbuf.pos = 0;
+
+ ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue);
+ }
+ }
+
+ /* All torrents done: release lock on current bucket */
+ mutex_bucket_unlock(bucket, 0);
+
+ /* Parent thread died? */
+ if (!g_opentracker_running)
+ return;
+ }
+
+ if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) {
+ inbuf.src = (const void *)"ee";
+ inbuf.size = strlen("ee");
+ inbuf.pos = 0;
+ }
+
+ more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
+
+ iovector.iov_len = outbuf.pos;
+ if (mutex_workqueue_pushchunked(taskid, &iovector)) {
+ free(iovector.iov_base);
+ ZSTD_freeCCtx(zstream);
+ return;
+ }
+
+ /* Check if there's a last batch of data in the zlib buffer */
+ if (more_bytes) {
+ /* Allocate a fresh output buffer */
+ iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE);
+
+ if (!iovector.iov_base) {
+ fprintf(stderr, "Problem with iovec_fix_increase_or_free\n");
+ ZSTD_freeCCtx(zstream);
+ return;
+ }
+
+ outbuf.dst = iovector.iov_base;
+ outbuf.size = OT_SCRAPE_CHUNK_SIZE;
+ outbuf.pos = 0;
+
+ ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end);
+
+ /* Only pass the new buffer if there actually was some data left in the buffer */
+ iovector.iov_len = outbuf.pos;
+ if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector))
+ free(iovector.iov_base);
+ }
+
+ ZSTD_freeCCtx(zstream);
+}
+/* WANT_COMPRESSION_ZSTD */
+#endif
+
/* WANT_FULLSCRAPE */
#endif
if (iovec_entries) {
- if (cookie->flag & STRUCT_HTTP_FLAG_GZIP)
+ if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD)
+ encoding = "Content-Encoding: zstd\r\n";
+ else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP)
encoding = "Content-Encoding: gzip\r\n";
else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2)
encoding = "Content-Encoding: bzip2\r\n";
}
#endif
-#ifdef WANT_COMPRESSION_GZIP
+
+#if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD)
ws->request[ws->request_size - 1] = 0;
-#ifndef WANT_COMPRESSION_GZIP_ALWAYS
+#ifdef WANT_COMPRESSION_GZIP
if (strstr(ws->request, "gzip")) {
-#endif
cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
- format = TASK_FLAG_GZIP;
- stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip);
-#ifndef WANT_COMPRESSION_GZIP_ALWAYS
- } else
+ format |= TASK_FLAG_GZIP;
+ }
#endif
+#ifdef WANT_COMPRESSION_ZSTD
+ if (strstr(ws->request, "zstd")) {
+ cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
+ format |= TASK_FLAG_ZSTD;
+ }
+#endif
+
+#if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS)
+ cookie->flag |= STRUCT_HTTP_FLAG_ZSTD;
+ format |= TASK_FLAG_ZSTD;
#endif
- stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip);
+
+#if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS)
+ cookie->flag |= STRUCT_HTTP_FLAG_GZIP;
+ format |= TASK_FLAG_GZIP;
+#endif
+#endif
+
+ stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip);
#ifdef _DEBUG_HTTPERROR
fprintf(stderr, "%s", ws->debugbuf);