From: Aki Tuomi Date: Tue, 16 May 2017 07:16:23 +0000 (+0300) Subject: lib-compression: Add ZSTD support X-Git-Tag: 2.3.11.2~569 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3c35ad415b057e640852dc6b276530b5c898807c;p=thirdparty%2Fdovecot%2Fcore.git lib-compression: Add ZSTD support --- diff --git a/configure.ac b/configure.ac index 8a6d86c582..5b0a3bebeb 100644 --- a/configure.ac +++ b/configure.ac @@ -202,6 +202,11 @@ AS_HELP_STRING([--with-lz4], [Build with LZ4 compression support (auto)]), TEST_WITH(lz4, $withval), want_lz4=auto) +AC_ARG_WITH(zstd, +AS_HELP_STRING([--with-zstd], [Build with ZSTD compression support (auto)]), + TEST_WITH(zstd, $withval), + want_zstd=auto) + AC_ARG_WITH(libcap, AS_HELP_STRING([--with-libcap], [Build with libcap support (Dropping capabilities) (auto)]), TEST_WITH(libcap, $withval), @@ -734,6 +739,7 @@ DOVECOT_WANT_ZLIB DOVECOT_WANT_BZLIB DOVECOT_WANT_LZMA DOVECOT_WANT_LZ4 +DOVECOT_WANT_ZSTD AC_SUBST(COMPRESS_LIBS) diff --git a/m4/want_zstd.m4 b/m4/want_zstd.m4 new file mode 100644 index 0000000000..4ed262cb6b --- /dev/null +++ b/m4/want_zstd.m4 @@ -0,0 +1,18 @@ +AC_DEFUN([DOVECOT_WANT_ZSTD], [ + have_zstd=no + + AS_IF([test $want_zstd = yes], [ + PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [AC_MSG_ERROR([libzstd not found])]) + ], [AS_IF([test $want_zstd != no], [ + PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no]) + ]) + ]) + + AS_IF([test $have_zstd = yes], [ + have_compress_lib=yes + COMPRESS_LIBS="$COMPRESS_LIBS $ZSTD_LIBS" + AC_DEFINE([HAVE_ZSTD], [], [Define if you have ZSTD library]) + ]) + + AM_CONDITIONAL([BUILD_ZSTD], test "$have_zstd" = "yes") +]) diff --git a/src/lib-compression/Makefile.am b/src/lib-compression/Makefile.am index 3c6c875a6f..8bdf3a1f76 100644 --- a/src/lib-compression/Makefile.am +++ b/src/lib-compression/Makefile.am @@ -2,7 +2,8 @@ noinst_LTLIBRARIES = libcompression.la AM_CPPFLAGS = \ -I$(top_srcdir)/src/lib \ - -I$(top_srcdir)/src/lib-test + -I$(top_srcdir)/src/lib-test \ + $(ZSTD_CFLAGS) libcompression_la_SOURCES = \ compression.c \ @@ -10,10 +11,12 @@ libcompression_la_SOURCES = \ istream-lz4.c \ istream-zlib.c \ istream-bzlib.c \ + istream-zstd.c \ ostream-lzma.c \ ostream-lz4.c \ ostream-zlib.c \ - ostream-bzlib.c + ostream-bzlib.c \ + ostream-zstd.c libcompression_la_LIBADD = \ $(COMPRESS_LIBS) diff --git a/src/lib-compression/compression.c b/src/lib-compression/compression.c index 2b00392fcb..3561ffd6cd 100644 --- a/src/lib-compression/compression.c +++ b/src/lib-compression/compression.c @@ -25,6 +25,10 @@ # define i_stream_create_lz4 NULL # define o_stream_create_lz4 NULL #endif +#ifndef HAVE_ZSTD +# define i_stream_create_zstd NULL +# define o_stream_create_zstd NULL +#endif static bool is_compressed_zlib(struct istream *input) { @@ -79,6 +83,19 @@ static bool is_compressed_lz4(struct istream *input) return memcmp(data, IOSTREAM_LZ4_MAGIC, IOSTREAM_LZ4_MAGIC_LEN) == 0; } +#define ZSTD_MAGICNUMBER 0xFD2FB528 /* valid since v0.8.0 */ +static bool is_compressed_zstd(struct istream *input) +{ + const unsigned char *data; + size_t size = 0; + + if (i_stream_read_bytes(input, &data, &size, sizeof(uint32_t)) <= 0) + return FALSE; + i_assert(size >= sizeof(uint32_t)); + + return le32_to_cpu_unaligned(data) == ZSTD_MAGICNUMBER; +} + const struct compression_handler *compression_lookup_handler(const char *name) { unsigned int i; @@ -132,5 +149,7 @@ const struct compression_handler compression_handlers[] = { i_stream_create_lzma, o_stream_create_lzma }, { "lz4", ".lz4", is_compressed_lz4, i_stream_create_lz4, o_stream_create_lz4 }, + { "zstd", ".zstd", is_compressed_zstd, + i_stream_create_zstd, o_stream_create_zstd }, { NULL, NULL, NULL, NULL, NULL } }; diff --git a/src/lib-compression/istream-zlib.h b/src/lib-compression/istream-zlib.h index 831d4717e1..3928761c2d 100644 --- a/src/lib-compression/istream-zlib.h +++ b/src/lib-compression/istream-zlib.h @@ -6,5 +6,6 @@ struct istream *i_stream_create_deflate(struct istream *input, bool log_errors); struct istream *i_stream_create_bz2(struct istream *input, bool log_errors); struct istream *i_stream_create_lzma(struct istream *input, bool log_errors); struct istream *i_stream_create_lz4(struct istream *input, bool log_errors); +struct istream *i_stream_create_zstd(struct istream *input, bool log_errors); #endif diff --git a/src/lib-compression/istream-zstd.c b/src/lib-compression/istream-zstd.c new file mode 100644 index 0000000000..d19594b19a --- /dev/null +++ b/src/lib-compression/istream-zstd.c @@ -0,0 +1,242 @@ +/* Copyright (c) 2020 Dovecot authors, see the included COPYING file */ + +#include "lib.h" + +#ifdef HAVE_ZSTD + +#include "buffer.h" +#include "istream-private.h" +#include "istream-zlib.h" + +#include "zstd.h" +#include "zstd_errors.h" + +struct zstd_istream { + struct istream_private istream; + + ZSTD_DStream *dstream; + ZSTD_inBuffer input; + ZSTD_outBuffer output; + + struct stat last_parent_statbuf; + + /* ZSTD input size */ + size_t input_size; + + /* storage for frames */ + buffer_t *frame_buffer; + + /* storage for data */ + buffer_t *data_buffer; + + bool log_errors:1; + bool marked:1; + bool zs_closed:1; + /* is there data remaining */ + bool remain:1; +}; + +static void i_stream_zstd_init(struct zstd_istream *zstream) +{ + zstream->dstream = ZSTD_createDStream(); + if (zstream->dstream == NULL) + i_fatal_status(FATAL_OUTOFMEM, "zstd: Out of memory"); + ZSTD_initDStream(zstream->dstream); + zstream->input_size = ZSTD_DStreamInSize(); + if (zstream->frame_buffer == NULL) + zstream->frame_buffer = buffer_create_dynamic(default_pool, ZSTD_DStreamInSize()); + else + buffer_set_used_size(zstream->frame_buffer, 0); + if (zstream->data_buffer == NULL) + zstream->data_buffer = buffer_create_dynamic(default_pool, ZSTD_DStreamOutSize()); + else + buffer_set_used_size(zstream->data_buffer, 0); + zstream->zs_closed = FALSE; +} + +static void i_stream_zstd_deinit(struct zstd_istream *zstream, bool reuse_buffers) +{ + (void)ZSTD_freeDStream(zstream->dstream); + zstream->dstream = NULL; + if (!reuse_buffers) { + buffer_free(&zstream->frame_buffer); + buffer_free(&zstream->data_buffer); + } + zstream->zs_closed = TRUE; + i_zero(&zstream->input); +} + +static void i_stream_zstd_close(struct iostream_private *stream, + bool close_parent) +{ + struct istream_private *_istream = + container_of(stream, struct istream_private, iostream); + struct zstd_istream *zstream = + container_of(_istream, struct zstd_istream, istream); + if (!zstream->zs_closed) + i_stream_zstd_deinit(zstream, FALSE); + buffer_free(&zstream->frame_buffer); + if (close_parent) + i_stream_close(zstream->istream.parent); +} + +static void i_stream_zstd_error(struct zstd_istream *zstream, const char *error) +{ + io_stream_set_error(&zstream->istream.iostream, + "zstd.read(%s): %s at %"PRIuUOFF_T, + i_stream_get_name(&zstream->istream.istream), error, + i_stream_get_absolute_offset(&zstream->istream.istream)); + zstream->istream.istream.stream_errno = EIO; + if (zstream->log_errors) + i_error("%s", zstream->istream.iostream.error); +} + +static void i_stream_zstd_read_error(struct zstd_istream *zstream, size_t err) +{ + const char *error = ZSTD_getErrorName(err); + if (err == ZSTD_error_memory_allocation) + i_fatal_status(FATAL_OUTOFMEM, "zstd.read(%s): Out of memory", + i_stream_get_name(&zstream->istream.istream)); + + i_stream_zstd_error(zstream, error); +} + +static ssize_t i_stream_zstd_read(struct istream_private *stream) +{ + struct zstd_istream *zstream = + container_of(stream, struct zstd_istream, istream); + const unsigned char *data; + size_t size; + ssize_t ret; + + if (stream->istream.eof) + return -1; + + for (;;) { + if (zstream->data_buffer->used > 0) { + if (!i_stream_try_alloc(stream, stream->max_buffer_size, &size)) + return -2; + size = I_MIN(zstream->data_buffer->used, size); + memcpy(PTR_OFFSET(stream->w_buffer,stream->pos), + zstream->data_buffer->data, size); + stream->pos += size; + buffer_delete(zstream->data_buffer, 0, size); + return size; + } + + /* see if we can get more */ + if (zstream->input.pos == zstream->input.size) { + buffer_set_used_size(zstream->frame_buffer, 0); + /* need to read more */ + if ((ret = i_stream_read_more(stream->parent, &data, &size)) < 0) { + stream->istream.stream_errno = + stream->parent->stream_errno; + stream->istream.eof = stream->parent->eof; + if (zstream->remain && + stream->istream.stream_errno == 0) + /* truncated data */ + stream->istream.stream_errno = EPIPE; + return ret; + } + buffer_append(zstream->frame_buffer, data, size); + i_stream_skip(stream->parent, size); + zstream->input.src = zstream->frame_buffer->data; + zstream->input.size = zstream->frame_buffer->used; + zstream->input.pos = 0; + } + + i_assert(zstream->data_buffer->used == 0); + zstream->output.dst = buffer_append_space_unsafe(zstream->data_buffer, + ZSTD_DStreamOutSize()); + zstream->output.pos = 0; + zstream->output.size = ZSTD_DStreamOutSize(); + + ret = ZSTD_decompressStream(zstream->dstream, &zstream->output, + &zstream->input); + + if (ZSTD_isError(ret) != 0) { + i_stream_zstd_read_error(zstream, ret); + return -1; + } + + zstream->remain = ret > 0; + buffer_set_used_size(zstream->data_buffer, zstream->output.pos); + } + i_unreached(); +} + +static void i_stream_zstd_reset(struct zstd_istream *zstream) +{ + struct istream_private *stream = &zstream->istream; + + i_stream_seek(stream->parent, stream->parent_start_offset); + stream->parent_expected_offset = stream->parent_start_offset; + stream->skip = stream->pos = 0; + stream->istream.v_offset = 0; + stream->high_pos = 0; + + i_stream_zstd_deinit(zstream, TRUE); + i_stream_zstd_init(zstream); +} + +static void +i_stream_zstd_seek(struct istream_private *stream, uoff_t v_offset, bool mark) +{ + struct zstd_istream *zstream = + container_of(stream, struct zstd_istream, istream); + + if (i_stream_nonseekable_try_seek(stream, v_offset)) + return; + + /* have to seek backwards - reset state and retry */ + i_stream_zstd_reset(zstream); + if (!i_stream_nonseekable_try_seek(stream, v_offset)) + i_unreached(); + + if (mark) + zstream->marked = TRUE; +} + +static void i_stream_zstd_sync(struct istream_private *stream) +{ + struct zstd_istream *zstream = + container_of(stream, struct zstd_istream, istream); + const struct stat *st; + + if (i_stream_stat(stream->parent, FALSE, &st) == 0) { + if (memcmp(&zstream->last_parent_statbuf, + st, sizeof(*st)) == 0) { + /* a compressed file doesn't change unexpectedly, + don't clear our caches unnecessarily */ + return; + } + zstream->last_parent_statbuf = *st; + } + i_stream_zstd_reset(zstream); +} + +struct istream * +i_stream_create_zstd(struct istream *input, bool log_errors) +{ + struct zstd_istream *zstream; + + zstream = i_new(struct zstd_istream, 1); + zstream->log_errors = log_errors; + + i_stream_zstd_init(zstream); + + zstream->istream.iostream.close = i_stream_zstd_close; + zstream->istream.max_buffer_size = input->real_stream->max_buffer_size; + zstream->istream.read = i_stream_zstd_read; + zstream->istream.seek = i_stream_zstd_seek; + zstream->istream.sync = i_stream_zstd_sync; + + zstream->istream.istream.readable_fd = FALSE; + zstream->istream.istream.blocking = input->blocking; + zstream->istream.istream.seekable = input->seekable; + + return i_stream_create(&zstream->istream, input, + i_stream_get_fd(input), 0); +} + +#endif diff --git a/src/lib-compression/ostream-zlib.h b/src/lib-compression/ostream-zlib.h index 57cddf9070..59b61ab901 100644 --- a/src/lib-compression/ostream-zlib.h +++ b/src/lib-compression/ostream-zlib.h @@ -6,5 +6,6 @@ struct ostream *o_stream_create_deflate(struct ostream *output, int level); struct ostream *o_stream_create_bz2(struct ostream *output, int level); struct ostream *o_stream_create_lzma(struct ostream *output, int level); struct ostream *o_stream_create_lz4(struct ostream *output, int level); +struct ostream *o_stream_create_zstd(struct ostream *output, int level); #endif diff --git a/src/lib-compression/ostream-zstd.c b/src/lib-compression/ostream-zstd.c new file mode 100644 index 0000000000..bb0c57ee14 --- /dev/null +++ b/src/lib-compression/ostream-zstd.c @@ -0,0 +1,196 @@ +/* Copyright (c) 2020 Dovecot authors, see the included COPYING file */ + +#include "lib.h" + +#ifdef HAVE_ZSTD + +#include "ostream.h" +#include "ostream-private.h" +#include "ostream-zlib.h" + +#include "zstd.h" +#include "zstd_errors.h" + +struct zstd_ostream { + struct ostream_private ostream; + + ZSTD_CStream *cstream; + ZSTD_outBuffer output; + + unsigned char *outbuf; + + bool flushed:1; + bool log_errors:1; + bool closed:1; + bool finished:1; +}; + +static void o_stream_zstd_error(struct zstd_ostream *zstream, const char *error) +{ + io_stream_set_error(&zstream->ostream.iostream, + "zstd.write(%s): %s at %"PRIuUOFF_T, + o_stream_get_name(&zstream->ostream.ostream), error, + zstream->ostream.ostream.offset); + if (zstream->log_errors) + i_error("%s", zstream->ostream.iostream.error); +} + +static void o_stream_zstd_write_error(struct zstd_ostream *zstream, size_t err) +{ + const char *error = ZSTD_getErrorName(err); + if (err == ZSTD_error_memory_allocation) + i_fatal_status(FATAL_OUTOFMEM, "zstd.write(%s): Out of memory", + o_stream_get_name(&zstream->ostream.ostream)); + o_stream_zstd_error(zstream, error); +} + +static ssize_t o_stream_zstd_send_outbuf(struct zstd_ostream *zstream) +{ + ssize_t ret; + /* nothing to send */ + if (zstream->output.pos == 0) + return 1; + ret = o_stream_send(zstream->ostream.parent, zstream->output.dst, + zstream->output.pos); + if (ret < 0) { + o_stream_copy_error_from_parent(&zstream->ostream); + return -1; + } else { + memmove(zstream->outbuf, zstream->outbuf+ret, zstream->output.pos-ret); + zstream->output.pos -= ret; + } + if (zstream->output.pos > 0) + return 0; + return 1; +} + +static ssize_t +o_stream_zstd_sendv(struct ostream_private *stream, + const struct const_iovec *iov, unsigned int iov_count) +{ + struct zstd_ostream *zstream = + container_of(stream, struct zstd_ostream, ostream); + ssize_t total = 0; + size_t ret; + + for (unsigned int i = 0; i < iov_count; i++) { + /* does it actually fit there */ + if (zstream->output.pos + iov[i].iov_len >= zstream->output.size) + break; + ZSTD_inBuffer input = { + .src = iov[i].iov_base, + .pos = 0, + .size = iov[i].iov_len + }; + ret = ZSTD_compressStream(zstream->cstream, &zstream->output, + &input); + if (ZSTD_isError(ret) != 0) { + o_stream_zstd_write_error(zstream, ret); + return -1; + } + total += input.pos; + } + if (o_stream_zstd_send_outbuf(zstream) < 0) + return -1; + stream->ostream.offset += total; + return total; +} + +static int o_stream_zstd_send_flush(struct zstd_ostream *zstream, bool final) +{ + int ret; + + if (zstream->flushed) + return 1; + + if ((ret = o_stream_flush_parent_if_needed(&zstream->ostream)) <= 0) + return ret; + + if (zstream->output.pos == 0) + ZSTD_flushStream(zstream->cstream, &zstream->output); + + if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0) + return ret; + + if (!final) + return 1; + + if (!zstream->finished) { + ret = ZSTD_endStream(zstream->cstream, &zstream->output); + if (ZSTD_isError(ret) != 0) { + o_stream_zstd_write_error(zstream, ret); + return -1; + } + zstream->finished = TRUE; + } + + if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0) + return ret; + + if (final) + zstream->flushed = TRUE; + return 1; +} + +static int o_stream_zstd_flush(struct ostream_private *stream) +{ + struct zstd_ostream *zstream = + container_of(stream, struct zstd_ostream, ostream); + + int ret; + if ((ret = o_stream_zstd_send_flush(zstream, stream->finished)) < 0) + return -1; + else if (ret > 0) + return o_stream_flush_parent(stream); + return ret; +} + +static void o_stream_zstd_close(struct iostream_private *stream, + bool close_parent) +{ + struct ostream_private *_ostream = + container_of(stream, struct ostream_private, iostream); + struct zstd_ostream *zstream = + container_of(_ostream, struct zstd_ostream, ostream); + + i_assert(zstream->ostream.finished || + zstream->ostream.ostream.stream_errno != 0 || + zstream->ostream.error_handling_disabled); + if (zstream->cstream != NULL) { + ZSTD_freeCStream(zstream->cstream); + zstream->cstream = NULL; + } + i_free(zstream->outbuf); + i_zero(&zstream->output); + if (close_parent) + o_stream_close(zstream->ostream.parent); +} + +struct ostream * +o_stream_create_zstd(struct ostream *output, int level) +{ + struct zstd_ostream *zstream; + size_t ret; + + i_assert(level >= 1 && level <= ZSTD_maxCLevel()); + + zstream = i_new(struct zstd_ostream, 1); + zstream->ostream.sendv = o_stream_zstd_sendv; + zstream->ostream.flush = o_stream_zstd_flush; + zstream->ostream.iostream.close = o_stream_zstd_close; + zstream->cstream = ZSTD_createCStream(); + if (zstream->cstream == NULL) + i_fatal_status(FATAL_OUTOFMEM, "zstd: Out of memory"); + ret = ZSTD_initCStream(zstream->cstream, level); + if (ZSTD_isError(ret) != 0) + o_stream_zstd_write_error(zstream, ret); + else { + zstream->outbuf = i_malloc(ZSTD_CStreamOutSize()); + zstream->output.dst = zstream->outbuf; + zstream->output.size = ZSTD_CStreamOutSize(); + } + return o_stream_create(&zstream->ostream, output, + o_stream_get_fd(output)); +} + +#endif