]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-compression: Add ZSTD support
authorAki Tuomi <aki.tuomi@dovecot.fi>
Tue, 16 May 2017 07:16:23 +0000 (10:16 +0300)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Fri, 6 Mar 2020 09:00:50 +0000 (09:00 +0000)
configure.ac
m4/want_zstd.m4 [new file with mode: 0644]
src/lib-compression/Makefile.am
src/lib-compression/compression.c
src/lib-compression/istream-zlib.h
src/lib-compression/istream-zstd.c [new file with mode: 0644]
src/lib-compression/ostream-zlib.h
src/lib-compression/ostream-zstd.c [new file with mode: 0644]

index 8a6d86c5826d93bd00df561bb468cdfbd42cce4c..5b0a3bebeb1dc82534556d6f272dedf89498f8af 100644 (file)
@@ -202,6 +202,11 @@ AS_HELP_STRING([--with-lz4], [Build with LZ4 compression support (auto)]),
   TEST_WITH(lz4, $withval),
   want_lz4=auto)
 
+AC_ARG_WITH(zstd,
+AS_HELP_STRING([--with-zstd], [Build with ZSTD compression support (auto)]),
+  TEST_WITH(zstd, $withval),
+  want_zstd=auto)
+
 AC_ARG_WITH(libcap,
 AS_HELP_STRING([--with-libcap], [Build with libcap support (Dropping capabilities) (auto)]),
   TEST_WITH(libcap, $withval),
@@ -734,6 +739,7 @@ DOVECOT_WANT_ZLIB
 DOVECOT_WANT_BZLIB
 DOVECOT_WANT_LZMA
 DOVECOT_WANT_LZ4
+DOVECOT_WANT_ZSTD
 
 AC_SUBST(COMPRESS_LIBS)
 
diff --git a/m4/want_zstd.m4 b/m4/want_zstd.m4
new file mode 100644 (file)
index 0000000..4ed262c
--- /dev/null
@@ -0,0 +1,18 @@
+AC_DEFUN([DOVECOT_WANT_ZSTD], [
+  have_zstd=no
+
+  AS_IF([test $want_zstd = yes], [
+    PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [AC_MSG_ERROR([libzstd not found])])
+  ], [AS_IF([test $want_zstd != no], [
+      PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no])
+    ])
+  ])
+
+  AS_IF([test $have_zstd = yes], [
+    have_compress_lib=yes
+    COMPRESS_LIBS="$COMPRESS_LIBS $ZSTD_LIBS"
+    AC_DEFINE([HAVE_ZSTD], [], [Define if you have ZSTD library])
+  ])
+
+  AM_CONDITIONAL([BUILD_ZSTD], test "$have_zstd" = "yes")
+])
index 3c6c875a6f976d3a5eb9f86438dcae7089ddc27a..8bdf3a1f762713b370bd1e26199ec0edf1280bd8 100644 (file)
@@ -2,7 +2,8 @@ noinst_LTLIBRARIES = libcompression.la
 
 AM_CPPFLAGS = \
        -I$(top_srcdir)/src/lib \
-       -I$(top_srcdir)/src/lib-test
+       -I$(top_srcdir)/src/lib-test \
+       $(ZSTD_CFLAGS)
 
 libcompression_la_SOURCES = \
        compression.c \
@@ -10,10 +11,12 @@ libcompression_la_SOURCES = \
        istream-lz4.c \
        istream-zlib.c \
        istream-bzlib.c \
+       istream-zstd.c \
        ostream-lzma.c \
        ostream-lz4.c \
        ostream-zlib.c \
-       ostream-bzlib.c
+       ostream-bzlib.c \
+       ostream-zstd.c
 libcompression_la_LIBADD = \
        $(COMPRESS_LIBS)
 
index 2b00392fcb22217594d68bc57af4ed7c536f748e..3561ffd6cdb76d00e64f229a3d7b997d85d466aa 100644 (file)
 #  define i_stream_create_lz4 NULL
 #  define o_stream_create_lz4 NULL
 #endif
+#ifndef HAVE_ZSTD
+#  define i_stream_create_zstd NULL
+#  define o_stream_create_zstd NULL
+#endif
 
 static bool is_compressed_zlib(struct istream *input)
 {
@@ -79,6 +83,19 @@ static bool is_compressed_lz4(struct istream *input)
        return memcmp(data, IOSTREAM_LZ4_MAGIC, IOSTREAM_LZ4_MAGIC_LEN) == 0;
 }
 
+#define ZSTD_MAGICNUMBER            0xFD2FB528    /* valid since v0.8.0 */
+static bool is_compressed_zstd(struct istream *input)
+{
+       const unsigned char *data;
+       size_t size = 0;
+
+       if (i_stream_read_bytes(input, &data, &size, sizeof(uint32_t)) <= 0)
+               return FALSE;
+       i_assert(size >= sizeof(uint32_t));
+
+       return le32_to_cpu_unaligned(data) == ZSTD_MAGICNUMBER;
+}
+
 const struct compression_handler *compression_lookup_handler(const char *name)
 {
        unsigned int i;
@@ -132,5 +149,7 @@ const struct compression_handler compression_handlers[] = {
          i_stream_create_lzma, o_stream_create_lzma },
        { "lz4", ".lz4", is_compressed_lz4,
          i_stream_create_lz4, o_stream_create_lz4 },
+       { "zstd", ".zstd", is_compressed_zstd,
+         i_stream_create_zstd, o_stream_create_zstd },
        { NULL, NULL, NULL, NULL, NULL }
 };
index 831d4717e1929a79284859cd3425e4e02b4aafa2..3928761c2da1ff70f3fdb4ac902db3b8d6c1e9b3 100644 (file)
@@ -6,5 +6,6 @@ struct istream *i_stream_create_deflate(struct istream *input, bool log_errors);
 struct istream *i_stream_create_bz2(struct istream *input, bool log_errors);
 struct istream *i_stream_create_lzma(struct istream *input, bool log_errors);
 struct istream *i_stream_create_lz4(struct istream *input, bool log_errors);
+struct istream *i_stream_create_zstd(struct istream *input, bool log_errors);
 
 #endif
diff --git a/src/lib-compression/istream-zstd.c b/src/lib-compression/istream-zstd.c
new file mode 100644 (file)
index 0000000..d19594b
--- /dev/null
@@ -0,0 +1,242 @@
+/* Copyright (c) 2020 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+
+#ifdef HAVE_ZSTD
+
+#include "buffer.h"
+#include "istream-private.h"
+#include "istream-zlib.h"
+
+#include "zstd.h"
+#include "zstd_errors.h"
+
+struct zstd_istream {
+       struct istream_private istream;
+
+       ZSTD_DStream *dstream;
+       ZSTD_inBuffer input;
+       ZSTD_outBuffer output;
+
+       struct stat last_parent_statbuf;
+
+       /* ZSTD input size */
+       size_t input_size;
+
+       /* storage for frames */
+       buffer_t *frame_buffer;
+
+       /* storage for data */
+       buffer_t *data_buffer;
+
+       bool log_errors:1;
+       bool marked:1;
+       bool zs_closed:1;
+       /* is there data remaining */
+       bool remain:1;
+};
+
+static void i_stream_zstd_init(struct zstd_istream *zstream)
+{
+       zstream->dstream = ZSTD_createDStream();
+       if (zstream->dstream == NULL)
+               i_fatal_status(FATAL_OUTOFMEM, "zstd: Out of memory");
+       ZSTD_initDStream(zstream->dstream);
+       zstream->input_size = ZSTD_DStreamInSize();
+       if (zstream->frame_buffer == NULL)
+               zstream->frame_buffer = buffer_create_dynamic(default_pool, ZSTD_DStreamInSize());
+       else
+               buffer_set_used_size(zstream->frame_buffer, 0);
+       if (zstream->data_buffer == NULL)
+               zstream->data_buffer = buffer_create_dynamic(default_pool, ZSTD_DStreamOutSize());
+       else
+               buffer_set_used_size(zstream->data_buffer, 0);
+       zstream->zs_closed = FALSE;
+}
+
+static void i_stream_zstd_deinit(struct zstd_istream *zstream, bool reuse_buffers)
+{
+       (void)ZSTD_freeDStream(zstream->dstream);
+       zstream->dstream = NULL;
+       if (!reuse_buffers) {
+               buffer_free(&zstream->frame_buffer);
+               buffer_free(&zstream->data_buffer);
+       }
+       zstream->zs_closed = TRUE;
+       i_zero(&zstream->input);
+}
+
+static void i_stream_zstd_close(struct iostream_private *stream,
+                               bool close_parent)
+{
+       struct istream_private *_istream =
+               container_of(stream, struct istream_private, iostream);
+       struct zstd_istream *zstream =
+               container_of(_istream, struct zstd_istream, istream);
+       if (!zstream->zs_closed)
+               i_stream_zstd_deinit(zstream, FALSE);
+       buffer_free(&zstream->frame_buffer);
+       if (close_parent)
+               i_stream_close(zstream->istream.parent);
+}
+
+static void i_stream_zstd_error(struct zstd_istream *zstream, const char *error)
+{
+       io_stream_set_error(&zstream->istream.iostream,
+                           "zstd.read(%s): %s at %"PRIuUOFF_T,
+                           i_stream_get_name(&zstream->istream.istream), error,
+                           i_stream_get_absolute_offset(&zstream->istream.istream));
+       zstream->istream.istream.stream_errno = EIO;
+       if (zstream->log_errors)
+               i_error("%s", zstream->istream.iostream.error);
+}
+
+static void i_stream_zstd_read_error(struct zstd_istream *zstream, size_t err)
+{
+       const char *error = ZSTD_getErrorName(err);
+       if (err == ZSTD_error_memory_allocation)
+               i_fatal_status(FATAL_OUTOFMEM, "zstd.read(%s): Out of memory",
+                              i_stream_get_name(&zstream->istream.istream));
+
+       i_stream_zstd_error(zstream, error);
+}
+
+static ssize_t i_stream_zstd_read(struct istream_private *stream)
+{
+       struct zstd_istream *zstream =
+               container_of(stream, struct zstd_istream, istream);
+       const unsigned char *data;
+       size_t size;
+       ssize_t ret;
+
+       if (stream->istream.eof)
+               return -1;
+
+       for (;;) {
+               if (zstream->data_buffer->used > 0) {
+                       if (!i_stream_try_alloc(stream, stream->max_buffer_size, &size))
+                               return -2;
+                       size = I_MIN(zstream->data_buffer->used, size);
+                       memcpy(PTR_OFFSET(stream->w_buffer,stream->pos),
+                              zstream->data_buffer->data, size);
+                       stream->pos += size;
+                       buffer_delete(zstream->data_buffer, 0, size);
+                       return size;
+               }
+
+               /* see if we can get more */
+               if (zstream->input.pos == zstream->input.size) {
+                       buffer_set_used_size(zstream->frame_buffer, 0);
+                       /* need to read more */
+                       if ((ret = i_stream_read_more(stream->parent, &data, &size)) < 0) {
+                               stream->istream.stream_errno =
+                                       stream->parent->stream_errno;
+                               stream->istream.eof = stream->parent->eof;
+                               if (zstream->remain &&
+                                   stream->istream.stream_errno == 0)
+                                       /* truncated data */
+                                       stream->istream.stream_errno = EPIPE;
+                               return ret;
+                       }
+                       buffer_append(zstream->frame_buffer, data, size);
+                       i_stream_skip(stream->parent, size);
+                       zstream->input.src = zstream->frame_buffer->data;
+                       zstream->input.size = zstream->frame_buffer->used;
+                       zstream->input.pos = 0;
+               }
+
+               i_assert(zstream->data_buffer->used == 0);
+               zstream->output.dst = buffer_append_space_unsafe(zstream->data_buffer,
+                                                                ZSTD_DStreamOutSize());
+               zstream->output.pos = 0;
+               zstream->output.size = ZSTD_DStreamOutSize();
+
+               ret = ZSTD_decompressStream(zstream->dstream, &zstream->output,
+                                           &zstream->input);
+
+               if (ZSTD_isError(ret) != 0) {
+                       i_stream_zstd_read_error(zstream, ret);
+                       return -1;
+               }
+
+               zstream->remain = ret > 0;
+               buffer_set_used_size(zstream->data_buffer, zstream->output.pos);
+       }
+       i_unreached();
+}
+
+static void i_stream_zstd_reset(struct zstd_istream *zstream)
+{
+       struct istream_private *stream = &zstream->istream;
+
+       i_stream_seek(stream->parent, stream->parent_start_offset);
+       stream->parent_expected_offset = stream->parent_start_offset;
+       stream->skip = stream->pos = 0;
+       stream->istream.v_offset = 0;
+       stream->high_pos = 0;
+
+       i_stream_zstd_deinit(zstream, TRUE);
+       i_stream_zstd_init(zstream);
+}
+
+static void
+i_stream_zstd_seek(struct istream_private *stream, uoff_t v_offset, bool mark)
+{
+       struct zstd_istream *zstream =
+               container_of(stream, struct zstd_istream, istream);
+
+       if (i_stream_nonseekable_try_seek(stream, v_offset))
+               return;
+
+       /* have to seek backwards - reset state and retry */
+       i_stream_zstd_reset(zstream);
+       if (!i_stream_nonseekable_try_seek(stream, v_offset))
+               i_unreached();
+
+       if (mark)
+               zstream->marked = TRUE;
+}
+
+static void i_stream_zstd_sync(struct istream_private *stream)
+{
+       struct zstd_istream *zstream =
+               container_of(stream, struct zstd_istream, istream);
+       const struct stat *st;
+
+       if (i_stream_stat(stream->parent, FALSE, &st) == 0) {
+               if (memcmp(&zstream->last_parent_statbuf,
+                          st, sizeof(*st)) == 0) {
+                       /* a compressed file doesn't change unexpectedly,
+                          don't clear our caches unnecessarily */
+                       return;
+               }
+               zstream->last_parent_statbuf = *st;
+       }
+       i_stream_zstd_reset(zstream);
+}
+
+struct istream *
+i_stream_create_zstd(struct istream *input, bool log_errors)
+{
+       struct zstd_istream *zstream;
+
+       zstream = i_new(struct zstd_istream, 1);
+       zstream->log_errors = log_errors;
+
+       i_stream_zstd_init(zstream);
+
+       zstream->istream.iostream.close = i_stream_zstd_close;
+       zstream->istream.max_buffer_size = input->real_stream->max_buffer_size;
+       zstream->istream.read = i_stream_zstd_read;
+       zstream->istream.seek = i_stream_zstd_seek;
+       zstream->istream.sync = i_stream_zstd_sync;
+
+       zstream->istream.istream.readable_fd = FALSE;
+       zstream->istream.istream.blocking = input->blocking;
+       zstream->istream.istream.seekable = input->seekable;
+
+       return i_stream_create(&zstream->istream, input,
+                              i_stream_get_fd(input), 0);
+}
+
+#endif
index 57cddf90706e04db6b168f02cbdc9dd08fdb51e6..59b61ab901c63b4a042b7bec387760943e541816 100644 (file)
@@ -6,5 +6,6 @@ struct ostream *o_stream_create_deflate(struct ostream *output, int level);
 struct ostream *o_stream_create_bz2(struct ostream *output, int level);
 struct ostream *o_stream_create_lzma(struct ostream *output, int level);
 struct ostream *o_stream_create_lz4(struct ostream *output, int level);
+struct ostream *o_stream_create_zstd(struct ostream *output, int level);
 
 #endif
diff --git a/src/lib-compression/ostream-zstd.c b/src/lib-compression/ostream-zstd.c
new file mode 100644 (file)
index 0000000..bb0c57e
--- /dev/null
@@ -0,0 +1,196 @@
+/* Copyright (c) 2020 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+
+#ifdef HAVE_ZSTD
+
+#include "ostream.h"
+#include "ostream-private.h"
+#include "ostream-zlib.h"
+
+#include "zstd.h"
+#include "zstd_errors.h"
+
+struct zstd_ostream {
+       struct ostream_private ostream;
+
+       ZSTD_CStream *cstream;
+       ZSTD_outBuffer output;
+
+       unsigned char *outbuf;
+
+       bool flushed:1;
+       bool log_errors:1;
+       bool closed:1;
+       bool finished:1;
+};
+
+static void o_stream_zstd_error(struct zstd_ostream *zstream, const char *error)
+{
+       io_stream_set_error(&zstream->ostream.iostream,
+                           "zstd.write(%s): %s at %"PRIuUOFF_T,
+                           o_stream_get_name(&zstream->ostream.ostream), error,
+                           zstream->ostream.ostream.offset);
+       if (zstream->log_errors)
+               i_error("%s", zstream->ostream.iostream.error);
+}
+
+static void o_stream_zstd_write_error(struct zstd_ostream *zstream, size_t err)
+{
+       const char *error = ZSTD_getErrorName(err);
+       if (err == ZSTD_error_memory_allocation)
+               i_fatal_status(FATAL_OUTOFMEM, "zstd.write(%s): Out of memory",
+                              o_stream_get_name(&zstream->ostream.ostream));
+       o_stream_zstd_error(zstream, error);
+}
+
+static ssize_t o_stream_zstd_send_outbuf(struct zstd_ostream *zstream)
+{
+       ssize_t ret;
+       /* nothing to send */
+       if (zstream->output.pos == 0)
+               return 1;
+       ret = o_stream_send(zstream->ostream.parent, zstream->output.dst,
+                           zstream->output.pos);
+       if (ret < 0) {
+               o_stream_copy_error_from_parent(&zstream->ostream);
+               return -1;
+       } else {
+               memmove(zstream->outbuf, zstream->outbuf+ret, zstream->output.pos-ret);
+               zstream->output.pos -= ret;
+       }
+       if (zstream->output.pos > 0)
+               return 0;
+       return 1;
+}
+
+static ssize_t
+o_stream_zstd_sendv(struct ostream_private *stream,
+                   const struct const_iovec *iov, unsigned int iov_count)
+{
+       struct zstd_ostream *zstream =
+               container_of(stream, struct zstd_ostream, ostream);
+       ssize_t total = 0;
+       size_t ret;
+
+       for (unsigned int i = 0; i < iov_count; i++) {
+               /* does it actually fit there */
+               if (zstream->output.pos + iov[i].iov_len >= zstream->output.size)
+                       break;
+               ZSTD_inBuffer input = {
+                       .src = iov[i].iov_base,
+                       .pos = 0,
+                       .size = iov[i].iov_len
+               };
+               ret = ZSTD_compressStream(zstream->cstream, &zstream->output,
+                                         &input);
+               if (ZSTD_isError(ret) != 0) {
+                       o_stream_zstd_write_error(zstream, ret);
+                       return -1;
+               }
+               total += input.pos;
+       }
+       if (o_stream_zstd_send_outbuf(zstream) < 0)
+               return -1;
+       stream->ostream.offset += total;
+       return total;
+}
+
+static int o_stream_zstd_send_flush(struct zstd_ostream *zstream, bool final)
+{
+       int ret;
+
+       if (zstream->flushed)
+               return 1;
+
+       if ((ret = o_stream_flush_parent_if_needed(&zstream->ostream)) <= 0)
+               return ret;
+
+       if (zstream->output.pos == 0)
+               ZSTD_flushStream(zstream->cstream, &zstream->output);
+
+       if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0)
+               return ret;
+
+       if (!final)
+               return 1;
+
+       if (!zstream->finished) {
+               ret = ZSTD_endStream(zstream->cstream, &zstream->output);
+               if (ZSTD_isError(ret) != 0) {
+                       o_stream_zstd_write_error(zstream, ret);
+                       return -1;
+               }
+               zstream->finished = TRUE;
+       }
+
+       if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0)
+               return ret;
+
+       if (final)
+               zstream->flushed = TRUE;
+       return 1;
+}
+
+static int o_stream_zstd_flush(struct ostream_private *stream)
+{
+       struct zstd_ostream *zstream =
+               container_of(stream, struct zstd_ostream, ostream);
+
+       int ret;
+       if ((ret = o_stream_zstd_send_flush(zstream, stream->finished)) < 0)
+               return -1;
+       else if (ret > 0)
+               return o_stream_flush_parent(stream);
+       return ret;
+}
+
+static void o_stream_zstd_close(struct iostream_private *stream,
+                               bool close_parent)
+{
+       struct ostream_private *_ostream =
+               container_of(stream, struct ostream_private, iostream);
+       struct zstd_ostream *zstream =
+               container_of(_ostream, struct zstd_ostream, ostream);
+
+       i_assert(zstream->ostream.finished ||
+                zstream->ostream.ostream.stream_errno != 0 ||
+                zstream->ostream.error_handling_disabled);
+       if (zstream->cstream != NULL) {
+               ZSTD_freeCStream(zstream->cstream);
+               zstream->cstream = NULL;
+       }
+       i_free(zstream->outbuf);
+       i_zero(&zstream->output);
+       if (close_parent)
+               o_stream_close(zstream->ostream.parent);
+}
+
+struct ostream *
+o_stream_create_zstd(struct ostream *output, int level)
+{
+       struct zstd_ostream *zstream;
+       size_t ret;
+
+       i_assert(level >= 1 && level <= ZSTD_maxCLevel());
+
+       zstream = i_new(struct zstd_ostream, 1);
+       zstream->ostream.sendv = o_stream_zstd_sendv;
+       zstream->ostream.flush = o_stream_zstd_flush;
+       zstream->ostream.iostream.close = o_stream_zstd_close;
+       zstream->cstream = ZSTD_createCStream();
+       if (zstream->cstream == NULL)
+               i_fatal_status(FATAL_OUTOFMEM, "zstd: Out of memory");
+       ret = ZSTD_initCStream(zstream->cstream, level);
+       if (ZSTD_isError(ret) != 0)
+               o_stream_zstd_write_error(zstream, ret);
+       else {
+               zstream->outbuf = i_malloc(ZSTD_CStreamOutSize());
+               zstream->output.dst = zstream->outbuf;
+               zstream->output.size = ZSTD_CStreamOutSize();
+       }
+       return o_stream_create(&zstream->ostream, output,
+                              o_stream_get_fd(output));
+}
+
+#endif