From: Jan Hák Date: Fri, 17 Jan 2025 07:28:15 +0000 (+0100) Subject: redis: initial implementation of a Knot module for Redis X-Git-Tag: v3.5.0~11^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=de35e927076a248d929e534df545f5d84d250a8c;p=thirdparty%2Fknot-dns.git redis: initial implementation of a Knot module for Redis --- diff --git a/Knot.files b/Knot.files index 487f305295..247ef7dc8d 100644 --- a/Knot.files +++ b/Knot.files @@ -575,6 +575,14 @@ src/libzscanner/functions.h src/libzscanner/scanner.h src/libzscanner/scanner.rl src/libzscanner/scanner_body.rl +src/redis/arg.h +src/redis/error.h +src/redis/internal.h +src/redis/knot.c +src/redis/knot.h +src/redis/libs.h +src/redis/type_diff.h +src/redis/type_rrset.h src/utils/common/exec.c src/utils/common/exec.h src/utils/common/hex.c diff --git a/src/redis/Makefile.am b/src/redis/Makefile.am index 3141f34891..6722d0c394 100644 --- a/src/redis/Makefile.am +++ b/src/redis/Makefile.am @@ -5,8 +5,16 @@ pkglibdir = ${libdir}/knot/redis if ENABLE_REDIS_MODULE pkglib_LTLIBRARIES = knot.la -knot_la_SOURCES = knot.c +knot_la_SOURCES = \ + arg.h \ + error.h \ + internal.h \ + knot.c \ + knot.h \ + libs.h \ + type_diff.h \ + type_rrset.h -knot_la_CPPFLAGS = $(AM_CPPFLAGS) +knot_la_CPPFLAGS = $(AM_CPPFLAGS) $(CFLAG_VISIBILITY) $(gnutls_CFLAGS) knot_la_LDFLAGS = $(AM_LDFLAGS) -module -shared -avoid-version endif diff --git a/src/redis/arg.h b/src/redis/arg.h new file mode 100644 index 0000000000..bc94fa4a83 --- /dev/null +++ b/src/redis/arg.h @@ -0,0 +1,213 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see + */ + +#pragma once + +#define INSTANCE_MIN 1 +#define INSTANCE_MAX 8 +#define TXN_MIN 0 +#define TXN_MAX 8 +#define TXN_MAX_COUNT (TXN_MAX - TXN_MIN + 1) +#define TXN_ID_ACTIVE UINT8_MAX + +static uint32_t rdb_default_ttl = 600; +static uint32_t rdb_event_age = 1200; +static uint32_t rdb_upd_history_len = 20; + +typedef enum { + DUMP_BIN, + DUMP_TXT, + DUMP_COMPACT +} dump_mode_t; + +typedef struct { + const uint8_t *data; + uint8_t len; + // Next items are used if the dname was parsed from TXT. + const char *txt; + knot_dname_storage_t buff; +} arg_dname_t; + +#define ARG_OPT_TXT(out, name, dflt, value) { \ + out = dflt; \ + if (argc > 1) { \ + size_t len; \ + if (strcmp(RedisModule_StringPtrLen(argv[1], &len), "--" name) == 0) { \ + out = value; \ + argc--; \ + argv++; \ + } else if (strncmp(RedisModule_StringPtrLen(argv[1], &len), "--", 2) == 0) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid option")); \ + } \ + } \ +} + +#define ARG_INST(arg, out) { \ + size_t len; \ + const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \ + if (len != 1 || ptr[0] < INSTANCE_MIN || ptr[0] > INSTANCE_MAX) { \ + return RedisModule_ReplyWithError(ctx, RDB_EINST); \ + } \ + out.instance = ptr[0]; \ + out.id = TXN_ID_ACTIVE; \ +} + +#define ARG_INST_TXT(arg, out) { \ + size_t len; \ + const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \ + if (len != 1 || ptr[0] < '0' + INSTANCE_MIN || ptr[0] > '0' + INSTANCE_MAX) { \ + return RedisModule_ReplyWithError(ctx, RDB_EINST); \ + } \ + out.instance = ptr[0] - '0'; \ + out.id = TXN_ID_ACTIVE; \ +} + +#define ARG_TXN(arg, out) { \ + size_t len; \ + const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \ + if (len != 2 || ptr[0] < INSTANCE_MIN || ptr[0] > INSTANCE_MAX \ + || ptr[1] < TXN_MIN || ptr[1] > TXN_MAX) { \ + return RedisModule_ReplyWithError(ctx, RDB_ETXN); \ + } \ + memcpy(&out, ptr, len); \ +} + +#define ARG_TXN_TXT(arg, out) { \ + size_t len; \ + const char *ptr = RedisModule_StringPtrLen(arg, &len); \ + if (len != 2 || ptr[0] < '0' + INSTANCE_MIN || ptr[0] > '0' + INSTANCE_MAX \ + || ptr[1] < '0' + TXN_MIN || ptr[1] > '0' + TXN_MAX) { \ + return RedisModule_ReplyWithError((ctx), RDB_ETXN); \ + } \ + out.instance = ptr[0] - '0'; \ + out.id = ptr[1] - '0'; \ +} + +#define ARG_INST_TXN(arg, out) { \ + out.id = TXN_ID_ACTIVE; \ + size_t len; \ + const char *ptr = RedisModule_StringPtrLen(arg, &len); \ + switch (len) { \ + case 2: \ + if (ptr[1] < TXN_MIN || ptr[1] > TXN_MAX) { \ + return RedisModule_ReplyWithError((ctx), RDB_ETXN); \ + } \ + out.id = ptr[1]; \ + case 1: /* FALLTHROUGH */ \ + if (ptr[0] < INSTANCE_MIN || ptr[0] > INSTANCE_MAX) { \ + return RedisModule_ReplyWithError(ctx, RDB_EINST); \ + } \ + out.instance = ptr[0]; \ + break; \ + default: \ + return RedisModule_ReplyWithError(ctx, RDB_ETXN); \ + } \ +} + +#define ARG_INST_TXN_TXT(arg, out) { \ + out.id = TXN_ID_ACTIVE; \ + size_t len; \ + const char *ptr = RedisModule_StringPtrLen(arg, &len); \ + switch (len) { \ + case 2: \ + if (ptr[1] < '0' - TXN_MIN || ptr[1] > '0' + TXN_MAX) { \ + return RedisModule_ReplyWithError((ctx), RDB_ETXN); \ + } \ + out.id = ptr[1] - '0'; \ + case 1: /* FALLTHROUGH */ \ + if (ptr[0] < '0' + INSTANCE_MIN || ptr[0] > '0' + INSTANCE_MAX) { \ + return RedisModule_ReplyWithError(ctx, RDB_EINST); \ + } \ + out.instance = ptr[0] - '0'; \ + break; \ + default: \ + return RedisModule_ReplyWithError(ctx, RDB_ETXN); \ + } \ +} + +#define ARG_NUM(arg, out, name) { \ + long long val; \ + long long max = (1ULL << (sizeof(out) * 8)) - 1; \ + if (RedisModule_StringToLongLong(arg, &val) != REDISMODULE_OK || val > max) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \ + } \ + out = val; \ +} + +#define ARG_DATA(arg, out_len, out, name) { \ + if ((out = (uint8_t *)RedisModule_StringPtrLen(arg, &out_len)) == NULL) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \ + } \ +} + +#define ARG_DNAME(arg, out, name) { \ + size_t len; \ + const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \ + int ret = knot_dname_wire_check(ptr, ptr + len, NULL); \ + if (ret < 1 || ret != len) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \ + } \ + /* We assume the dname is properly lowercased! */ \ + out.data = ptr; \ + out.len = ret; \ +} + +#define ARG_DNAME_TXT(arg, out, origin, name) { \ + size_t len; \ + out.txt = RedisModule_StringPtrLen(arg, &len); \ + if (dname_from_str(out.txt, len, out.buff, origin) == NULL) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \ + } \ + out.data = out.buff; \ + out.len = knot_dname_size(out.data); \ +} + +#define ARG_RTYPE_TXT(arg, out) { \ + size_t len; \ + const char *ptr = RedisModule_StringPtrLen(arg, &len); \ + if (knot_rrtype_from_string(ptr, &out) != 0) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid record type")); \ + } \ +} + +static knot_dname_t *dname_from_str(const char *ptr, size_t len, uint8_t *out, arg_dname_t *origin) +{ + assert(ptr != NULL && out != NULL); + + if (knot_dname_from_str(out, ptr, KNOT_DNAME_MAXLEN) == NULL) { + return NULL; + } + knot_dname_to_lower(out); + + if (origin != NULL) { + bool fqdn = false; + size_t prefix_len = 0; + + if (len > 0 && (len != 1 || ptr[0] != '@')) { + // Check if the owner is FQDN. + if (ptr[len - 1] == '.') { + fqdn = true; + } + + prefix_len = knot_dname_size(out); + if (prefix_len == 0) { + return NULL; + } + + // Ignore trailing dot. + prefix_len--; + } + + // Append the origin. + if (!fqdn) { + if (origin->len == 0 || origin->len > KNOT_DNAME_MAXLEN - prefix_len) { + return NULL; + } + memcpy(out + prefix_len, origin->data, origin->len); + } + } + + return out; +} diff --git a/src/redis/error.h b/src/redis/error.h new file mode 100644 index 0000000000..2ff243b032 --- /dev/null +++ b/src/redis/error.h @@ -0,0 +1,24 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see + */ + +#pragma once + +#define RDB_E(str) ("" str) + +#define RDB_EALLOC RDB_E("failed to allocate memory") +#define RDB_ECOMPAT RDB_E("incompatible module version") +#define RDB_ECORRUPTED RDB_E("corrupted metadata") +#define RDB_EEVENT RDB_E("failed to emit event") +#define RDB_EEXIST RDB_E("already set") +#define RDB_EHISTORY RDB_E("failed to adjust updates history") +#define RDB_EINST RDB_E("invalid instance") +#define RDB_EMALF RDB_E("malformed data") +#define RDB_ENOSOA RDB_E("missing SOA") +#define RDB_EPARSE RDB_E("failed to parse") +#define RDB_ESEMCHECK RDB_E("semantic check failed") +#define RDB_ESTORE RDB_E("failed to store data") +#define RDB_ETXN RDB_E("invalid transaction") +#define RDB_ETXN_MANY RDB_E("too many transactions") +#define RDB_EZONE RDB_E("zone not available") diff --git a/src/redis/internal.h b/src/redis/internal.h new file mode 100644 index 0000000000..9e9a205ef9 --- /dev/null +++ b/src/redis/internal.h @@ -0,0 +1,2236 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: LGPL-2.1-or-later + * For more information, see + */ + +#pragma once + +#define SCORE_SOA 0. +#define SCORE_DEFAULT 1. + +#define TXN_ID_ACTIVE UINT8_MAX +#define ZONE_META_INACTIVE UINT8_MAX +#define TTL_EMPTY UINT32_MAX +#define TTL_EMPTY_STR "NONE" + +#define foreach_in_zset_subset(key, min, max) \ + for (RedisModule_ZsetFirstInScoreRange(key, min, max, 0, 0); \ + RedisModule_ZsetRangeEndReached(key) == 0; \ + RedisModule_ZsetRangeNext(key)) +#define foreach_in_zset(key) foreach_in_zset_subset(key, REDISMODULE_NEGATIVE_INFINITE, REDISMODULE_POSITIVE_INFINITE) + +#define zone_meta_keyname_construct(...) meta_keyname_construct(ZONE_META, __VA_ARGS__) +#define upd_meta_keyname_construct(...) meta_keyname_construct(UPD_META, __VA_ARGS__) + +#define delete_zone_index(...) delete_index(ZONE, __VA_ARGS__) +#define delete_upd_index(...) delete_index(UPD_TMP, __VA_ARGS__) + +#define throw(_ret, _msg) return (exception_t){ .ret = _ret, .what = _msg } +#define raise(e) return e +#define return_ok throw(KNOT_EOK, NULL) + +typedef enum { + EVENT = 1, // Keep synchronized with RDB_EVENT_KEY! + ZONES = 2, + ZONE_META = 3, + ZONE = 4, + RRSET = 5, + UPD_META = 6, + UPD_TMP = 7, + UPD = 8, + DIFF = 9, +} rdb_type_t; + +typedef struct { + const char *what; + int ret; +} exception_t; + +typedef struct { + uint8_t active; + uint8_t lock[TXN_MAX_COUNT]; +} zone_meta_storage_t; + +typedef struct { + uint16_t counter; + uint16_t lock[TXN_MAX_COUNT]; + uint16_t depth; +} upd_meta_storage_t; + +typedef struct { + RedisModuleCtx *ctx; + const rdb_txn_t *txn; + uint32_t dflt_ttl; + enum { + STORE, + ADD, + REM, + } mode; + bool replied; +} scanner_ctx_t; + +typedef int (*upd_callback)(diff_v *diff, const void *data, uint32_t ttl); + +typedef RedisModuleKey *rrset_k; +typedef RedisModuleKey *diff_k; +typedef RedisModuleKey *upd_meta_k; +typedef RedisModuleKey *zone_meta_k; +typedef RedisModuleKey *index_k; + +static void *redismodule_alloc(void *ptr, size_t bytes) +{ + return RedisModule_Alloc(bytes); +} + +static void redismodule_free(void *ptr) +{ + RedisModule_Free(ptr); +} + +static knot_mm_t mm = { + .alloc = redismodule_alloc, + .ctx = NULL, + .free = redismodule_free +}; + +static double evaluate_score(uint16_t rtype) +{ + return (rtype == KNOT_RRTYPE_SOA) ? SCORE_SOA : SCORE_DEFAULT; +} + +static RedisModuleString *meta_keyname_construct(const uint8_t prefix, RedisModuleCtx *ctx, + const arg_dname_t *origin, uint8_t instance) +{ + char buf[RDB_PREFIX_LEN + 1 + 1 + KNOT_DNAME_MAXLEN + 1]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + wire_ctx_write_u8(&w, origin->len); + wire_ctx_write(&w, origin->data, origin->len); + wire_ctx_write_u8(&w, instance); + RedisModule_Assert(w.error == KNOT_EOK); + + return RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); +} + +static RedisModuleString *rrset_keyname_construct(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + uint16_t rtype) +{ + static const uint8_t prefix = RRSET; + + char buf[RDB_PREFIX_LEN + 1 + 1 + KNOT_DNAME_MAXLEN + 1 + KNOT_DNAME_MAXLEN + 2 + 2]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + wire_ctx_write_u8(&w, origin->len); + wire_ctx_write(&w, origin->data, origin->len); + wire_ctx_write_u8(&w, owner->len); + wire_ctx_write(&w, owner->data, owner->len); + wire_ctx_write_u16(&w, rtype); + wire_ctx_write(&w, txn, sizeof(*txn)); + RedisModule_Assert(w.error == KNOT_EOK); + + return RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); +} + +static RedisModuleString *diff_keyname_construct(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + uint16_t rtype, uint16_t id) +{ + static const uint8_t prefix = DIFF; + + char buf[RDB_PREFIX_LEN + 1 + 1 + KNOT_DNAME_MAXLEN + 1 + KNOT_DNAME_MAXLEN + 2 + 2 + 2]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + wire_ctx_write_u8(&w, origin->len); + wire_ctx_write(&w, origin->data, origin->len); + wire_ctx_write_u8(&w, owner->len); + wire_ctx_write(&w, owner->data, owner->len); + wire_ctx_write_u16(&w, rtype); + wire_ctx_write(&w, txn, sizeof(*txn)); + wire_ctx_write_u16(&w, id); + RedisModule_Assert(w.error == KNOT_EOK); + + return RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); +} + +static zone_meta_k zone_meta_get_when_open(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, int rights) +{ + RedisModuleString *txn_k = zone_meta_keyname_construct(ctx, origin, txn->instance); + zone_meta_k key = RedisModule_OpenKey(ctx, txn_k, rights); + RedisModule_FreeString(ctx, txn_k); + if (key == NULL || RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + return NULL; + } + + size_t len = 0; + const zone_meta_storage_t *meta = (const zone_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_READ); + if (txn->id != meta->active && meta->lock[txn->id] != 0) { + return key; + } + RedisModule_CloseKey(key); + return NULL; +} + +static bool zone_txn_is_open(RedisModuleCtx *ctx, const arg_dname_t *origin, const rdb_txn_t *txn) +{ + zone_meta_k key = zone_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ); + bool out = (key != NULL); + RedisModule_CloseKey(key); + return out; +} + +static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_t *origin, + uint8_t instance, uint32_t serial) +{ + RedisModuleString *keyname = RedisModule_CreateString(ctx, RDB_EVENT_KEY, strlen(RDB_EVENT_KEY)); + RedisModuleKey *stream_key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_FreeString(ctx, keyname); + + int zone_stream_type = RedisModule_KeyType(stream_key); + if (zone_stream_type != REDISMODULE_KEYTYPE_EMPTY && zone_stream_type != REDISMODULE_KEYTYPE_STREAM) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, RDB_EEVENT); + RedisModule_CloseKey(stream_key); + return; + } + + RedisModuleString *events[] = { + RedisModule_CreateString(ctx, RDB_EVENT_ARG_EVENT, strlen(RDB_EVENT_ARG_EVENT)), + RedisModule_CreateStringFromLongLong(ctx, type), + RedisModule_CreateString(ctx, RDB_EVENT_ARG_ORIGIN, strlen(RDB_EVENT_ARG_ORIGIN)), + RedisModule_CreateString(ctx, (const char *)origin->data, origin->len), + RedisModule_CreateString(ctx, RDB_EVENT_ARG_INSTANCE, strlen(RDB_EVENT_ARG_INSTANCE)), + RedisModule_CreateStringFromLongLong(ctx, instance), + RedisModule_CreateString(ctx, RDB_EVENT_ARG_SERIAL, strlen(RDB_EVENT_ARG_SERIAL)), + RedisModule_CreateStringFromLongLong(ctx, serial), + NULL, + }; + + RedisModuleStreamID ts; + int ret = RedisModule_StreamAdd(stream_key, REDISMODULE_STREAM_ADD_AUTOID, &ts, events, 4); + + for (RedisModuleString **event = events; *event != NULL; event++) { + RedisModule_FreeString(ctx, *event); + } + + if (ret != REDISMODULE_OK) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, RDB_EEVENT); + RedisModule_CloseKey(stream_key); + return; + } + + if (rdb_event_age == 0) { + RedisModule_CloseKey(stream_key); + return; + } + + ts.ms -= 1000LLU * rdb_event_age; + ts.seq = 0; + + // NOTE Trimming with REDISMODULE_STREAM_TRIM_APPROX improves preformance + long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &ts); + if (removed_cnt) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "stream cleanup %lld old events", removed_cnt); + } + RedisModule_CloseKey(stream_key); +} + +static RedisModuleKey *get_zones_index(RedisModuleCtx *ctx, int rights) +{ + static const uint8_t prefix = ZONES; + + char buf[RDB_PREFIX_LEN + 1 + 1]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + RedisModule_Assert(w.error == KNOT_EOK); + + RedisModuleString *keyname = RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, rights); + RedisModule_FreeString(ctx, keyname); + + return key; +} + +static index_k get_zone_index(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, int rights) +{ + static const uint8_t prefix = ZONE; + + char buf[RDB_PREFIX_LEN + 1 + 1 + KNOT_DNAME_MAXLEN + 2]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + wire_ctx_write_u8(&w, origin->len); + wire_ctx_write(&w, origin->data, origin->len); + wire_ctx_write(&w, txn, sizeof(*txn)); + RedisModule_Assert(w.error == KNOT_EOK); + + RedisModuleString *keyname = RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); + index_k key = RedisModule_OpenKey(ctx, keyname, rights); + RedisModule_FreeString(ctx, keyname); + + return key; +} + +static index_k get_upd_index(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, uint16_t id, int rights) +{ + static const uint8_t prefix = UPD_TMP; + + char buf[RDB_PREFIX_LEN + 1 + 1 + KNOT_DNAME_MAXLEN + 2 + 2]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + wire_ctx_write_u8(&w, origin->len); + wire_ctx_write(&w, origin->data, origin->len); + wire_ctx_write(&w, txn, sizeof(*txn)); + wire_ctx_write_u16(&w, id); + RedisModule_Assert(w.error == KNOT_EOK); + + RedisModuleString *keyname = RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); + index_k key = RedisModule_OpenKey(ctx, keyname, rights); + RedisModule_FreeString(ctx, keyname); + + return key; +} + +static index_k get_commited_upd_index(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const uint32_t serial, int rights) +{ + static const uint8_t prefix = UPD; + + char buf[RDB_PREFIX_LEN + 1 + 1 + KNOT_DNAME_MAXLEN + 1 + 4]; + + wire_ctx_t w = wire_ctx_init((uint8_t *)buf, sizeof(buf)); + wire_ctx_write(&w, RDB_PREFIX, RDB_PREFIX_LEN); + wire_ctx_write_u8(&w, prefix); + wire_ctx_write_u8(&w, origin->len); + wire_ctx_write(&w, origin->data, origin->len); + wire_ctx_write_u8(&w, txn->instance); + wire_ctx_write_u32(&w, serial); + RedisModule_Assert(w.error == KNOT_EOK); + + RedisModuleString *keyname = RedisModule_CreateString(ctx, buf, wire_ctx_offset(&w)); + index_k key = RedisModule_OpenKey(ctx, keyname, rights); + RedisModule_FreeString(ctx, keyname); + + return key; +} + +static int rrset_key_set(RedisModuleCtx *ctx, rrset_k key, RedisModuleString *keyname, + const arg_dname_t *origin, const rdb_txn_t *txn, uint16_t rtype, rrset_v *val) +{ + RedisModule_Assert(RedisModule_ModuleTypeSetValue(key, rdb_rrset_t, val) == REDISMODULE_OK); + + index_k zone_index_key = get_zone_index(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); + int zone_keytype = RedisModule_KeyType(zone_index_key); + if (zone_keytype != REDISMODULE_KEYTYPE_EMPTY && + zone_keytype != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_CloseKey(zone_index_key); + return KNOT_EMALF; + } + int ret = RedisModule_ZsetAdd(zone_index_key, evaluate_score(rtype), keyname, NULL); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(zone_index_key); + return KNOT_ENOMEM; + } + RedisModule_CloseKey(zone_index_key); + + return KNOT_EOK; +} + +static exception_t rdata_add(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + uint32_t ttl, uint16_t rtype, const knot_rdata_t *rdata) +{ + RedisModuleString *rrset_keystr = rrset_keyname_construct(ctx, origin, txn, owner, rtype); + rrset_k rrset_key = RedisModule_OpenKey(ctx, rrset_keystr, REDISMODULE_READ | REDISMODULE_WRITE); + rrset_v *rrset = RedisModule_ModuleTypeGetValue(rrset_key); + if (rrset == NULL) { + rrset = RedisModule_Calloc(1, sizeof(*rrset)); + if (rrset == NULL) { + RedisModule_CloseKey(rrset_key); + RedisModule_FreeString(ctx, rrset_keystr); + throw(KNOT_ENOMEM, RDB_EALLOC); + } + int ret = rrset_key_set(ctx, rrset_key, rrset_keystr, origin, txn, rtype, rrset); + if (ret != KNOT_EOK) { + RedisModule_CloseKey(rrset_key); + RedisModule_FreeString(ctx, rrset_keystr); + throw(ret, RDB_ESTORE); + } + rrset->ttl = (ttl == TTL_EMPTY) ? rdb_default_ttl : ttl; + } else { + RedisModule_Assert(RedisModule_ModuleTypeGetType(rrset_key) == rdb_rrset_t); + if (ttl != TTL_EMPTY) { + rrset->ttl = ttl; + } + } + RedisModule_FreeString(ctx, rrset_keystr); + + int ret = knot_rdataset_add(&rrset->rrs, rdata, &mm); + RedisModule_CloseKey(rrset_key); + if (ret != KNOT_EOK) { + throw(ret, RDB_ESTORE); + } + return_ok; +} + +static int rdata_add_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + uint32_t ttl, uint16_t rtype, const knot_rdata_t *rdata) +{ + exception_t e = rdata_add(ctx, origin, txn, owner, ttl, rtype, rdata); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } + return e.ret; +} + +static exception_t rdata_remove(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + uint32_t *ttl, uint16_t rtype, const knot_rdata_t *rdata) +{ + // Existence of the rrset is ensured by the previous check. + RedisModuleString *rrset_keystr = rrset_keyname_construct(ctx, origin, txn, owner, rtype); + rrset_k rrset_key = RedisModule_OpenKey(ctx, rrset_keystr, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_Assert(RedisModule_ModuleTypeGetType(rrset_key) == rdb_rrset_t); + rrset_v *rrset = RedisModule_ModuleTypeGetValue(rrset_key); + RedisModule_Assert(rrset != NULL); + + if (*ttl == TTL_EMPTY) { + *ttl = rrset->ttl; + } + + int ret = knot_rdataset_remove(&rrset->rrs, rdata, &mm); + if (ret != KNOT_EOK) { + RedisModule_FreeString(ctx, rrset_keystr); + RedisModule_CloseKey(rrset_key); + throw(ret, RDB_ESTORE); + } + + if (rrset->rrs.count == 0 && rtype != KNOT_RRTYPE_SOA) { + RedisModule_DeleteKey(rrset_key); + + index_k zone_index = get_zone_index(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_Assert(RedisModule_KeyType(zone_index) == REDISMODULE_KEYTYPE_ZSET); + RedisModule_ZsetRem(zone_index, rrset_keystr, NULL); + RedisModule_CloseKey(zone_index); + } + + RedisModule_FreeString(ctx, rrset_keystr); + RedisModule_CloseKey(rrset_key); + return_ok; +} + +static void zone_meta_storage_init(zone_meta_storage_t *meta) +{ + meta->active = ZONE_META_INACTIVE; + memset(meta->lock, 0, sizeof(meta->lock)); +} + +static zone_meta_k zone_meta_key_get(RedisModuleCtx *ctx, const arg_dname_t *origin, + rdb_txn_t *txn, int rights) +{ + RedisModule_Assert(txn->instance > 0); + + RedisModuleString *txn_k = zone_meta_keyname_construct(ctx, origin, txn->instance); + zone_meta_k key = RedisModule_OpenKey(ctx, txn_k, rights); + RedisModule_FreeString(ctx, txn_k); + + int keytype = RedisModule_KeyType(key); + if (keytype == REDISMODULE_KEYTYPE_EMPTY) { + zone_meta_storage_t meta; + zone_meta_storage_init(&meta); + RedisModuleString *meta_str = RedisModule_CreateString(ctx, (const char *)&meta, sizeof(meta)); + RedisModule_StringSet(key, meta_str); + RedisModule_FreeString(ctx, meta_str); + } else if (keytype != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + return NULL; + } + + return key; +} + +static upd_meta_k upd_meta_key_get(RedisModuleCtx *ctx, const arg_dname_t *origin, + rdb_txn_t *txn, int rights) +{ + RedisModule_Assert(txn->instance > 0); + + RedisModuleString *txn_k = upd_meta_keyname_construct(ctx, origin, txn->instance); + upd_meta_k key = RedisModule_OpenKey(ctx, txn_k, rights); + RedisModule_FreeString(ctx, txn_k); + + int keytype = RedisModule_KeyType(key); + if (keytype == REDISMODULE_KEYTYPE_EMPTY) { + upd_meta_storage_t meta = { 0 }; + RedisModuleString *meta_str = RedisModule_CreateString(ctx, (const char *)&meta, sizeof(meta)); + RedisModule_StringSet(key, meta_str); + RedisModule_FreeString(ctx, meta_str); + } else if (keytype != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + RedisModule_ReplyWithError(ctx, RDB_EMALF); + return NULL; + } + + return key; +} + +static int zone_txn_lock(zone_meta_k key, rdb_txn_t *txn) +{ + RedisModule_Assert(txn->instance > 0); + + size_t len = 0; + zone_meta_storage_t *meta = (zone_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_WRITE); + if (meta == NULL || len != sizeof(zone_meta_storage_t)) { + return KNOT_EINVAL; + } + + for (txn->id = TXN_MIN; txn->id <= TXN_MAX; ++txn->id) { + if (meta->lock[txn->id] == 0) { + meta->lock[txn->id] = 1; + break; + } + } + if (txn->id > TXN_MAX) { + return KNOT_EBUSY; + } + + return KNOT_EOK; +} + +static exception_t upd_txn_lock(upd_meta_k key, rdb_txn_t *txn) +{ + RedisModule_Assert(txn->instance > 0); + + size_t len; + upd_meta_storage_t *meta = (upd_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_WRITE); + if (meta == NULL || len != sizeof(upd_meta_storage_t)) { + throw(KNOT_EINVAL, RDB_EMALF); + } + + for (txn->id = TXN_MIN; txn->id <= TXN_MAX; ++txn->id) { + if (meta->lock[txn->id] == 0) { + meta->counter = MAX(meta->counter + 1, 1); + meta->lock[txn->id] = meta->counter; + break; + } + } + if (txn->id > TXN_MAX) { + throw(KNOT_EBUSY, RDB_ETXN_MANY); + } + + return_ok; +} + +static int serialize_transaction(const rdb_txn_t *txn) +{ + return 10 * txn->instance + txn->id; +} + +static int set_active_transaction(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + RedisModule_Assert(txn->instance > 0); + + RedisModuleString *txn_k = zone_meta_keyname_construct(ctx, origin, txn->instance); + zone_meta_k key = RedisModule_OpenKey(ctx, txn_k, REDISMODULE_READ); + RedisModule_FreeString(ctx, txn_k); + if (key == NULL) { + return KNOT_EEXIST; + } else if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + return KNOT_EMALF; + } + size_t len = 0; + const zone_meta_storage_t *meta = (const zone_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_READ); + if (meta->active != ZONE_META_INACTIVE) { + txn->id = meta->active; + RedisModule_CloseKey(key); + return KNOT_EOK; + } + RedisModule_CloseKey(key); + return KNOT_EEXIST; +} + +static int get_id(RedisModuleCtx *ctx, const arg_dname_t *origin, const rdb_txn_t *txn) +{ + RedisModuleString *txn_k = upd_meta_keyname_construct(ctx, origin, txn->instance); + upd_meta_k meta_key = RedisModule_OpenKey(ctx, txn_k, REDISMODULE_READ); + RedisModule_FreeString(ctx, txn_k); + + int keytype = RedisModule_KeyType(meta_key); + if (keytype != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(meta_key); + return KNOT_EINVAL; + } + + size_t meta_len = 0; + upd_meta_storage_t *meta = (upd_meta_storage_t *)RedisModule_StringDMA(meta_key, &meta_len, REDISMODULE_READ); + if (meta_len != sizeof(upd_meta_storage_t)) { + RedisModule_CloseKey(meta_key); + return KNOT_EINVAL; + } + + uint16_t id = meta->lock[txn->id]; + if (id == 0) { + RedisModule_CloseKey(meta_key); + return KNOT_EEXIST; + } + + RedisModule_CloseKey(meta_key); + return id; +} + +static int delete_index(const uint8_t prefix, RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn) +{ + index_k index_key = NULL; + switch (prefix) { + case ZONE: + index_key = get_zone_index(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); + break; + case UPD_TMP:; + int ret = get_id(ctx, origin, txn); + if (ret < 0 || ret > UINT16_MAX) { + return KNOT_EEXIST; + } + uint16_t id = ret; + index_key = get_upd_index(ctx, origin, txn, id, REDISMODULE_READ | REDISMODULE_WRITE); + break; + default: + return KNOT_ENOTSUP; + } + + if (RedisModule_KeyType(index_key) == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(index_key); + return KNOT_EOK; + } else if (RedisModule_KeyType(index_key) != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_CloseKey(index_key); + return KNOT_EINVAL; + } + foreach_in_zset(index_key) { + RedisModuleString *el = RedisModule_ZsetRangeCurrentElement(index_key, NULL); + RedisModuleKey *el_key = RedisModule_OpenKey(ctx, el, REDISMODULE_READ | REDISMODULE_WRITE); + if (el_key != NULL) { + RedisModule_DeleteKey(el_key); + RedisModule_CloseKey(el_key); + } + RedisModule_FreeString(ctx, el); + } + + RedisModule_DeleteKey(index_key); + RedisModule_CloseKey(index_key); + + return KNOT_EOK; +} + +static exception_t zone_begin(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + zone_meta_k key = zone_meta_key_get(ctx, origin, txn, REDISMODULE_WRITE); + if (key == NULL) { + throw(KNOT_EMALF, RDB_ECORRUPTED); + } + + int ret = zone_txn_lock(key, txn); + RedisModule_CloseKey(key); + if (ret == KNOT_EBUSY) { + throw(KNOT_EBUSY, RDB_ETXN_MANY); + } else if (ret != KNOT_EOK) { + throw(ret, RDB_ECORRUPTED); + } + + ret = delete_upd_index(ctx, origin, txn); + if (ret != KNOT_EOK && ret != KNOT_EEXIST) { + throw(ret, RDB_ECORRUPTED); + } + + return_ok; +} + +static void zone_begin_txt_format(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + exception_t e = zone_begin(ctx, origin, txn); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } else { + RedisModule_ReplyWithLongLong(ctx, serialize_transaction(txn)); + } +} + +static void zone_begin_bin_format(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + exception_t e = zone_begin(ctx, origin, txn); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } else { + RedisModule_ReplyWithStringBuffer(ctx, (const char *)txn, sizeof(*txn)); + } +} + +static exception_t upd_begin(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + rdb_txn_t zone_txn = { + .instance = txn->instance, + .id = TXN_ID_ACTIVE + }; + if (set_active_transaction(ctx, origin, &zone_txn) == KNOT_EEXIST) { + throw(KNOT_EINVAL, RDB_EZONE); + } + + upd_meta_k key = upd_meta_key_get(ctx, origin, txn, REDISMODULE_WRITE); + if (key == NULL) { + throw(KNOT_EMALF, RDB_ECORRUPTED); + } + + exception_t e = upd_txn_lock(key, txn); + RedisModule_CloseKey(key); + if (e.ret != KNOT_EOK) { + raise(e); + } + + int ret = delete_upd_index(ctx, origin, txn); + if (ret != KNOT_EOK && ret != KNOT_EEXIST) { + throw(ret, RDB_ECORRUPTED); + } + + return_ok; +} + +static void upd_begin_txt_format(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + exception_t e = upd_begin(ctx, origin, txn); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + return; + } + + RedisModule_ReplyWithLongLong(ctx, serialize_transaction(txn)); +} + +static void upd_begin_bin_format(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + exception_t e = upd_begin(ctx, origin, txn); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + return; + } + + RedisModule_ReplyWithStringBuffer(ctx, (const char *)txn, sizeof(*txn)); +} + +static upd_meta_k upd_meta_get_when_open(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, int rights) +{ + RedisModuleString *txn_k = upd_meta_keyname_construct(ctx, origin, txn->instance); + upd_meta_k key = RedisModule_OpenKey(ctx, txn_k, rights); + RedisModule_FreeString(ctx, txn_k); + if (key == NULL) { + return NULL; + } else if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + return NULL; + } + size_t len = 0; + const upd_meta_storage_t *transaction = (const upd_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_WRITE); + if (transaction->lock[txn->id] == 0) { + RedisModule_CloseKey(key); + return NULL; + } + return key; +} + +static int upd_add_txt_cb(diff_v *diff, const void *data, uint32_t ttl) +{ + const knot_rdata_t *rdata = data; + + if (knot_rdataset_member(&diff->rem_rrs, rdata)) { + return knot_rdataset_remove(&diff->rem_rrs, rdata, &mm); + } else { + diff->add_ttl = (ttl == TTL_EMPTY) ? rdb_default_ttl : ttl; + return knot_rdataset_add(&diff->add_rrs, rdata, &mm); + } +} + +static int upd_remove_txt_cb(diff_v *diff, const void *data, uint32_t ttl) +{ + const knot_rdata_t *rdata = data; + + if (knot_rdataset_member(&diff->add_rrs, rdata)) { + return knot_rdataset_remove(&diff->add_rrs, rdata, &mm); + } else { + diff->rem_ttl = ttl; + return knot_rdataset_add(&diff->rem_rrs, rdata, &mm); + } +} + +static int upd_add_bin_cb(diff_v *diff, const void *data, uint32_t ttl) +{ + const knot_rdataset_t *rdataset = data; + + if (diff->add_rrs.count > 0) { + return KNOT_EEXIST; + } + int ret = knot_rdataset_copy(&diff->add_rrs, rdataset, &mm); + diff->add_ttl = ttl; + + return ret; +} + +static int upd_remove_bin_cb(diff_v *diff, const void *data, uint32_t ttl) +{ + const knot_rdataset_t *rdataset = data; + + if (diff->rem_rrs.count > 0) { + return KNOT_EEXIST; + } + int ret = knot_rdataset_copy(&diff->rem_rrs, rdataset, &mm); + diff->rem_ttl = ttl; + + return ret; +} + +static exception_t upd_add_rem(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + const uint32_t ttl, const uint16_t rtype, + void *data, upd_callback cb) +{ + int ret = get_id(ctx, origin, txn); + if (ret < 0 || ret > UINT16_MAX) { + throw(ret, RDB_ETXN); + } + uint16_t id = ret; + + RedisModuleString *diff_keystr = diff_keyname_construct(ctx, origin, txn, owner, rtype, id); + diff_k diff_key = RedisModule_OpenKey(ctx, diff_keystr, REDISMODULE_READ | REDISMODULE_WRITE); + diff_v *diff = RedisModule_ModuleTypeGetValue(diff_key); + if (diff == NULL) { + diff = RedisModule_Calloc(1, sizeof(diff_v)); + if (diff == NULL) { + RedisModule_CloseKey(diff_key); + RedisModule_FreeString(ctx, diff_keystr); + throw(KNOT_ENOMEM, RDB_EALLOC); + } + RedisModule_Assert(RedisModule_ModuleTypeSetValue(diff_key, rdb_diff_t, diff) == REDISMODULE_OK); + + index_k diff_index_key = get_upd_index(ctx, origin, txn, id, REDISMODULE_READ | REDISMODULE_WRITE); + int diff_keytype = RedisModule_KeyType(diff_index_key); + if (diff_keytype != REDISMODULE_KEYTYPE_EMPTY && + diff_keytype != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_FreeString(ctx, diff_keystr); + RedisModule_CloseKey(diff_key); + RedisModule_CloseKey(diff_index_key); + throw(KNOT_EMALF, RDB_EMALF); + } + ret = RedisModule_ZsetAdd(diff_index_key, evaluate_score(rtype), diff_keystr, NULL); + RedisModule_CloseKey(diff_index_key); + if (ret != REDISMODULE_OK) { + RedisModule_FreeString(ctx, diff_keystr); + RedisModule_CloseKey(diff_key); + throw(KNOT_ENOMEM, RDB_ESTORE); + } + } else { + RedisModule_Assert(RedisModule_ModuleTypeGetType(diff_key) == rdb_diff_t); + } + RedisModule_FreeString(ctx, diff_keystr); + + ret = cb(diff, data, ttl); + if (ret == KNOT_EEXIST) { + RedisModule_CloseKey(diff_key); + throw(KNOT_EEXIST, RDB_EEXIST); + } + + RedisModule_CloseKey(diff_key); + return_ok; +} + +static int upd_add_txt_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + const uint32_t ttl, const uint16_t rtype, + const knot_rdata_t *data) +{ + exception_t e = upd_add_rem(ctx, origin, txn, owner, ttl, rtype, (void *)data, upd_add_txt_cb); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } + return e.ret; +} + +static int upd_remove_txt_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + const uint32_t ttl, const uint16_t rtype, + const knot_rdata_t *data) +{ + exception_t e = upd_add_rem(ctx, origin, txn, owner, ttl, rtype, (void *)data, upd_remove_txt_cb); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } + return e.ret; +} + +static void upd_add_bin_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + const uint32_t ttl, const uint16_t rtype, + const knot_rdataset_t *data) +{ + exception_t e = upd_add_rem(ctx, origin, txn, owner, ttl, rtype, (void *)data, upd_add_bin_cb); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } else { + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); + } +} + +static void upd_remove_bin_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + const uint32_t ttl, const uint16_t rtype, + const knot_rdataset_t *data) +{ + exception_t e = upd_add_rem(ctx, origin, txn, owner, ttl, rtype, (void *)data, upd_remove_bin_cb); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } else { + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); + } +} + +static void scanner_data(zs_scanner_t *s) +{ + scanner_ctx_t *s_ctx = s->process.data; + + arg_dname_t origin = { + .data = s->zone_origin, + .len = s->zone_origin_length + }; + arg_dname_t owner = { + .data = s->r_owner, + .len = s->r_owner_length + }; + + uint8_t buf[knot_rdata_size(s->r_data_length)]; + knot_rdata_t *rdata = (knot_rdata_t *)buf; + knot_rdata_init(rdata, s->r_data_length, s->r_data); + if (knot_rdata_to_canonical(rdata, s->r_type) != KNOT_EOK) { + RedisModule_ReplyWithError(s_ctx->ctx, RDB_EMALF); + s_ctx->replied = true; + s->error.fatal = true; + s->state = ZS_STATE_STOP; + return; + } + + int ret = KNOT_EOK; + switch (s_ctx->mode) { + case STORE: + ret = rdata_add_format(s_ctx->ctx, &origin, s_ctx->txn, &owner, + s->r_ttl, s->r_type, rdata); + break; + case ADD: + ret = upd_add_txt_format(s_ctx->ctx, &origin, s_ctx->txn, &owner, + s->r_ttl, s->r_type, rdata); + break; + case REM: + ret = upd_remove_txt_format(s_ctx->ctx, &origin, s_ctx->txn, &owner, + s->r_ttl, s->r_type, rdata); + break; + default: + RedisModule_Assert(0); + } + if (ret != KNOT_EOK) { + s_ctx->replied = true; + s->error.fatal = true; + s->state = ZS_STATE_STOP; + } +} + +static void scanner_error(zs_scanner_t *s) +{ + scanner_ctx_t *s_ctx = s->process.data; + + char msg[128]; + (void)snprintf(msg, sizeof(msg), RDB_E("parser failed (%s), line %"PRIu64), + zs_strerror(s->error.code), s->line_counter); + RedisModule_ReplyWithError(s_ctx->ctx, msg); + + s_ctx->replied = true; + s->state = ZS_STATE_STOP; +} + +static void zone_store_bin_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const arg_dname_t *owner, + uint16_t rtype, uint32_t ttl, uint16_t rcount, + const uint8_t *zone_data, const size_t zone_data_len) +{ + if (zone_txn_is_open(ctx, origin, txn) == false) { + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + + RedisModuleString *rrset_keyname = rrset_keyname_construct(ctx, origin, txn, owner, rtype); + rrset_k rrset_key = RedisModule_OpenKey(ctx, rrset_keyname, REDISMODULE_READ | REDISMODULE_WRITE); + if (RedisModule_KeyType(rrset_key) != REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_FreeString(ctx, rrset_keyname); + RedisModule_CloseKey(rrset_key); + RedisModule_ReplyWithError(ctx, RDB_EEXIST); + return; + } + + rrset_v *rrset = RedisModule_Calloc(1, sizeof(*rrset)); + if (rrset == NULL) { + RedisModule_FreeString(ctx, rrset_keyname); + RedisModule_CloseKey(rrset_key); + RedisModule_ReplyWithError(ctx, RDB_EALLOC); + return; + } + + rrset->ttl = ttl; + rrset->rrs.count = rcount; + if (zone_data_len != 0) { + rrset->rrs.rdata = RedisModule_Alloc(zone_data_len); + if (rrset->rrs.rdata == NULL) { + RedisModule_Free(rrset); + RedisModule_FreeString(ctx, rrset_keyname); + RedisModule_CloseKey(rrset_key); + RedisModule_ReplyWithError(ctx, RDB_EALLOC); + return; + } + rrset->rrs.size = zone_data_len; + memcpy(rrset->rrs.rdata, zone_data, zone_data_len); + } else { + rrset->rrs.rdata = NULL; + rrset->rrs.size = 0; + } + + int ret = rrset_key_set(ctx, rrset_key, rrset_keyname, origin, txn, rtype, rrset); + RedisModule_FreeString(ctx, rrset_keyname); + RedisModule_CloseKey(rrset_key); + if (ret != KNOT_EOK) { + RedisModule_Free(rrset); + RedisModule_ReplyWithError(ctx, RDB_ESTORE); + return; + } + + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); +} + +static void run_scanner(scanner_ctx_t *s_ctx, const arg_dname_t *origin, + const char *data, size_t data_len) +{ + zs_scanner_t s; + if (zs_init(&s, origin->txt, KNOT_CLASS_IN, s_ctx->dflt_ttl) != 0 || + zs_set_input_string(&s, data, data_len) != 0 || + zs_set_processing(&s, scanner_data, scanner_error, s_ctx) != 0 || + zs_parse_all(&s) != 0 || s.error.fatal) { + if (!s_ctx->replied) { + RedisModule_ReplyWithError(s_ctx->ctx, RDB_EPARSE); + } + zs_deinit(&s); + return; + } + zs_deinit(&s); + + RedisModule_ReplyWithSimpleString(s_ctx->ctx, RDB_RETURN_OK); +} + +static void zone_store_txt_format(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const char *zone_data, + const size_t zone_data_len) +{ + if (zone_txn_is_open(ctx, origin, txn) == false) { + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + + scanner_ctx_t s_ctx = { + .ctx = ctx, + .txn = txn, + .dflt_ttl = rdb_default_ttl, + .mode = STORE + }; + + run_scanner(&s_ctx, origin, zone_data, zone_data_len); +} + +static int zone_meta_release(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + RedisModuleString *txn_k = zone_meta_keyname_construct(ctx, origin, txn->instance); + zone_meta_k key = RedisModule_OpenKey(ctx, txn_k, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_FreeString(ctx, txn_k); + if (key == NULL || RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + return KNOT_EEXIST; + } + + size_t len = 0; + zone_meta_storage_t *meta = (zone_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_READ | REDISMODULE_WRITE); + if (len != sizeof(*meta)) { + RedisModule_CloseKey(key); + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return KNOT_EINVAL; + } + if (meta->active == txn->id) { + meta->active = ZONE_META_INACTIVE; + } + meta->lock[txn->id] = 0; + RedisModule_CloseKey(key); + + return KNOT_EOK; +} + +static RedisModuleString *index_soa_keyname(RedisModuleCtx *ctx, index_k index) +{ + size_t key_strlen = 0; + const RedisModuleString *index_keyname = RedisModule_GetKeyNameFromModuleKey(index); + uint8_t *key_str = (uint8_t *)RedisModule_StringPtrLen(index_keyname, &key_strlen); + + wire_ctx_t index_w = wire_ctx_init(key_str, key_strlen); + wire_ctx_skip(&index_w, RDB_PREFIX_LEN + 1); + uint8_t origin_len = wire_ctx_read_u8(&index_w); + + foreach_in_zset_subset(index, SCORE_SOA, SCORE_SOA) { + RedisModuleString *soa_keyname = RedisModule_ZsetRangeCurrentElement(index, NULL); + key_str = (uint8_t *)RedisModule_StringPtrLen(soa_keyname, &key_strlen); + + wire_ctx_t soa_w = wire_ctx_init((uint8_t *)key_str, key_strlen); + wire_ctx_skip(&soa_w, RDB_PREFIX_LEN + 2 + origin_len); + uint8_t owner_len = wire_ctx_read_u8(&soa_w); + + if (origin_len == owner_len && memcmp(index_w.position, soa_w.position, origin_len) == 0) { + RedisModule_ZsetRangeStop(index); + return soa_keyname; + } + RedisModule_FreeString(ctx, soa_keyname); + } + RedisModule_ZsetRangeStop(index); + return NULL; +} + +static exception_t index_soa_serial(RedisModuleCtx *ctx, index_k index, bool incremental, uint32_t *out_serial) +{ + RedisModuleString *keyname = index_soa_keyname(ctx, index); + if (keyname == NULL) { + throw(KNOT_EINVAL, RDB_ENOSOA); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ); + RedisModule_FreeString(ctx, keyname); + if (key == NULL) { + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + void *val = RedisModule_ModuleTypeGetValue(key); + if (val == NULL) { + RedisModule_CloseKey(key); + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + const knot_rdataset_t *rd = incremental ? &((diff_v *)val)->rem_rrs : &((rrset_v *)val)->rrs; + if (rd->count != 1) { + RedisModule_CloseKey(key); + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + *out_serial = knot_soa_serial(rd->rdata); + RedisModule_CloseKey(key); + + return_ok; +} + +static exception_t upd_deep_delete(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const uint32_t serial, + size_t *counter) +{ + index_k upd_index_key = get_commited_upd_index(ctx, origin, txn, serial, REDISMODULE_READ | REDISMODULE_WRITE); + int upd_index_type = RedisModule_KeyType(upd_index_key); + if (upd_index_type == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(upd_index_key); + return_ok; + } else if (upd_index_type != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_CloseKey(upd_index_key); + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + uint32_t serial_prev = 0; + exception_t e = index_soa_serial(ctx, upd_index_key, true, &serial_prev); + if (e.ret != KNOT_EOK) { + RedisModule_CloseKey(upd_index_key); + throw(KNOT_EINVAL, e.what); + } + + *counter += 1; + + foreach_in_zset(upd_index_key) { + RedisModuleString *el = RedisModule_ZsetRangeCurrentElement(upd_index_key, NULL); + diff_k diff_key = RedisModule_OpenKey(ctx, el, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_FreeString(ctx, el); + RedisModule_DeleteKey(diff_key); + RedisModule_CloseKey(diff_key); + } + + RedisModule_DeleteKey(upd_index_key); + RedisModule_CloseKey(upd_index_key); + + raise(upd_deep_delete(ctx, origin, txn, serial_prev, counter)); +} + +static uint8_t change_bit(uint8_t val, int idx, bool bitval) +{ + return (val & ~(1 << idx)) | (bitval << idx); +} + +static exception_t zone_purge(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + if (set_active_transaction(ctx, origin, txn) != KNOT_EOK) { + throw(KNOT_EINVAL, RDB_EZONE); + } + + RedisModuleString *soa_rrset_keyname = rrset_keyname_construct(ctx, origin, txn, origin, KNOT_RRTYPE_SOA); + rrset_k soa_rrset_key = RedisModule_OpenKey(ctx, soa_rrset_keyname, REDISMODULE_READ); + RedisModule_FreeString(ctx, soa_rrset_keyname); + if (soa_rrset_key == NULL) { + throw(KNOT_ESOAINVAL, RDB_ENOSOA); + } + rrset_v *rrset = RedisModule_ModuleTypeGetValue(soa_rrset_key); + if (rrset == NULL) { + RedisModule_CloseKey(soa_rrset_key); + throw(KNOT_ESOAINVAL, RDB_ENOSOA); + } + uint32_t serial = knot_soa_serial(rrset->rrs.rdata); + RedisModule_CloseKey(soa_rrset_key); + + delete_zone_index(ctx, origin, txn); + + size_t counter; + upd_deep_delete(ctx, origin, txn, serial, &counter); + + RedisModuleString *meta_k = upd_meta_keyname_construct(ctx, origin, txn->instance); + upd_meta_k meta_key = RedisModule_OpenKey(ctx, meta_k, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_FreeString(ctx, meta_k); + if (RedisModule_KeyType(meta_key) == REDISMODULE_KEYTYPE_STRING) { + size_t len = 0; + upd_meta_storage_t *meta = (upd_meta_storage_t *)RedisModule_StringDMA(meta_key, &len, REDISMODULE_WRITE); + RedisModule_Assert(len == sizeof(*meta)); + meta->depth = 0; + } + RedisModule_CloseKey(meta_key); + + if (zone_meta_release(ctx, origin, txn) != KNOT_EOK) { + throw(KNOT_EDENIED, RDB_ECORRUPTED); + } + + RedisModuleKey *zones_index = get_zones_index(ctx, REDISMODULE_READ | REDISMODULE_WRITE); + if (RedisModule_KeyType(zones_index) == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(zones_index); + return_ok; + } + + RedisModuleString *origin_str = RedisModule_CreateString(ctx, (const char *)origin->data, origin->len); + if (origin_str == NULL) { + RedisModule_CloseKey(zones_index); + throw(KNOT_ENOMEM, RDB_EALLOC); + } + RedisModuleString *value = NULL; + if (RedisModule_HashGet(zones_index, REDISMODULE_HASH_NONE, origin_str, &value, NULL) != REDISMODULE_OK) { + RedisModule_FreeString(ctx, origin_str); + RedisModule_CloseKey(zones_index); + throw(KNOT_EMALF, RDB_ECORRUPTED); + } + if (value == NULL) { + RedisModule_FreeString(ctx, origin_str); + RedisModule_CloseKey(zones_index); + return_ok; + } + + size_t value_len = 0; + uint8_t val; + const char *val_ptr = RedisModule_StringPtrLen(value, &value_len); + if (value_len != sizeof(val)) { + RedisModule_CloseKey(zones_index); + throw(KNOT_EMALF, RDB_ECORRUPTED); + } + memcpy(&val, val_ptr, sizeof(val)); + RedisModule_FreeString(ctx, value); + val = change_bit(val, txn->instance - 1, false); + if (val == 0) { + RedisModule_HashSet(zones_index, REDISMODULE_HASH_NONE, origin_str, REDISMODULE_HASH_DELETE, NULL); + } else { + value = RedisModule_CreateString(ctx, (const char *)&val, sizeof(val)); + if (value == NULL) { + RedisModule_CloseKey(zones_index); + throw(KNOT_ENOMEM, RDB_EALLOC); + } + RedisModule_HashSet(zones_index, REDISMODULE_HASH_NONE, origin_str, value, NULL); + RedisModule_FreeString(ctx, value); + } + RedisModule_FreeString(ctx, origin_str); + RedisModule_CloseKey(zones_index); + + return_ok; +} + +static void zone_purge_v(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + exception_t e = zone_purge(ctx, origin, txn); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, RDB_EEVENT); + } else { + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); + } +} + +typedef struct { + RedisModuleCtx *ctx; + size_t count; + int ret; + bool txt; + bool instances; +} scan_ctx; + +static void zone_list_cb(RedisModuleKey *key, RedisModuleString *zone_name, + RedisModuleString *mask, void *privdata) +{ + scan_ctx *sctx = privdata; + if (sctx->txt) { + size_t len; + const char *dname = RedisModule_StringPtrLen(zone_name, &len); + char buf[KNOT_DNAME_TXT_MAXLEN + TXN_MAX * 3]; + if (knot_dname_to_str(buf, (knot_dname_t *)dname, sizeof(buf)) == NULL) { + sctx->ret = KNOT_EMALF; + RedisModule_ReplyWithError(sctx->ctx, RDB_ECORRUPTED); + ++(sctx->count); + return; + } + char *buf_ptr = buf + knot_dname_size((knot_dname_t *)dname) - 1; + size_t mask_len = 0; + const uint8_t *mask_p = (const uint8_t *)RedisModule_StringPtrLen(mask, &mask_len); + if (mask_len != sizeof(uint8_t)) { + sctx->ret = KNOT_EMALF; + RedisModule_ReplyWithError(sctx->ctx, RDB_ECORRUPTED); + ++(sctx->count); + return; + } + if (sctx->instances) { + int count = 0; + for (int it = 0; it < TXN_MAX; ++it) { + const char separator = (count) ? ',' : ':'; + if ((*mask_p) & (1 << it)) { + *(buf_ptr++) = separator; + *(buf_ptr++) = ' '; + *(buf_ptr++) = '0' + it + 1; + *buf_ptr = '\0'; + count++; + } + } + } + RedisModule_ReplyWithCString(sctx->ctx, buf); + } else { + if (sctx->instances) { + RedisModule_ReplyWithArray(sctx->ctx, 2); + RedisModule_ReplyWithString(sctx->ctx, zone_name); + RedisModule_ReplyWithString(sctx->ctx, mask); + } else { + RedisModule_ReplyWithString(sctx->ctx, zone_name); + } + } + ++(sctx->count); +} + +static void zone_list(RedisModuleCtx *ctx, bool instances, bool txt) +{ + RedisModuleKey *zones_index = get_zones_index(ctx, REDISMODULE_READ); + if (zones_index == NULL) { + RedisModule_ReplyWithEmptyArray(ctx); + return; + } + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + scan_ctx sctx = { + .ctx = ctx, + .txt = txt, + .instances = instances + }; + RedisModuleScanCursor *cursor = RedisModule_ScanCursorCreate(); + while (RedisModule_ScanKey(zones_index, cursor, zone_list_cb, &sctx) && sctx.ret == KNOT_EOK); + RedisModule_ReplySetArrayLength(ctx, sctx.count); + RedisModule_CloseKey(zones_index); + RedisModule_ScanCursorDestroy(cursor); +} + +static exception_t zone_meta_active_exchange(RedisModuleCtx *ctx, zone_meta_k key, + const arg_dname_t *origin, rdb_txn_t *txn) +{ + size_t len = 0; + zone_meta_storage_t *meta = (zone_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_WRITE); + if (len != sizeof(*meta)) { + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + exception_t e = { .ret = KNOT_EOK }; + uint8_t active_old = meta->active; + if (active_old != ZONE_META_INACTIVE) { + rdb_txn_t txn_old = { + .instance = txn->instance, + .id = active_old + }; + e = zone_purge(ctx, origin, &txn_old); + meta->lock[active_old] = 0; + } + meta->active = txn->id; + raise(e); +} + +static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + zone_meta_k meta_key = zone_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); + if (meta_key == NULL) { + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + + uint32_t serial = 0; + index_k zone_index_key = get_zone_index(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); // NOTE for iteration need also key opened for writing + exception_t e = index_soa_serial(ctx, zone_index_key, false, &serial); + RedisModule_CloseKey(zone_index_key); + if (e.ret != KNOT_EOK) { + RedisModule_CloseKey(meta_key); + RedisModule_ReplyWithError(ctx, e.what); + return; + } + + e = zone_meta_active_exchange(ctx, meta_key, origin, txn); + RedisModule_CloseKey(meta_key); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + return; + } + + RedisModuleKey *zones_index = get_zones_index(ctx, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModuleString *origin_str = RedisModule_CreateString(ctx, (const char *)origin->data, origin->len); + if (origin_str == NULL) { + RedisModule_CloseKey(zones_index); + RedisModule_ReplyWithError(ctx, RDB_EALLOC); + return; + } + RedisModuleString *value = NULL; + uint8_t val = 0; + if (RedisModule_KeyType(zones_index) != REDISMODULE_KEYTYPE_EMPTY) { + if (RedisModule_HashGet(zones_index, REDISMODULE_HASH_NONE, origin_str, &value, NULL) != REDISMODULE_OK) { + RedisModule_FreeString(ctx, origin_str); + RedisModule_CloseKey(zones_index); + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return; + } + if (value != NULL) { + size_t value_len = 0; + const char *val_ptr = RedisModule_StringPtrLen(value, &value_len); + if (value_len != sizeof(val)) { + RedisModule_FreeString(ctx, origin_str); + RedisModule_CloseKey(zones_index); + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return; + } + memcpy(&val, val_ptr, sizeof(val)); + RedisModule_FreeString(ctx, value); + } + } + val = change_bit(val, txn->instance - 1, true); + value = RedisModule_CreateString(ctx, (const char *)&val, sizeof(val)); + if (value == NULL) { + RedisModule_FreeString(ctx, origin_str); + RedisModule_CloseKey(zones_index); + RedisModule_ReplyWithError(ctx, RDB_EALLOC); + return; + } + RedisModule_HashSet(zones_index, REDISMODULE_HASH_NONE, origin_str, value, NULL); + RedisModule_FreeString(ctx, origin_str); + RedisModule_FreeString(ctx, value); + RedisModule_CloseKey(zones_index); + + commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial); + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); +} + +static void zone_abort(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + zone_meta_k meta_key = zone_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); + if (meta_key == NULL) { + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + + size_t len = 0; + zone_meta_storage_t *meta = (zone_meta_storage_t *)RedisModule_StringDMA(meta_key, &len, REDISMODULE_WRITE); + if (meta == NULL || len != sizeof(zone_meta_storage_t)) { + RedisModule_CloseKey(meta_key); + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return; + } + meta->lock[txn->id] = 0; + RedisModule_CloseKey(meta_key); + + int ret = delete_zone_index(ctx, origin, txn); + if (ret == KNOT_EOK) { + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); + } else if (ret == KNOT_EEXIST) { + RedisModule_ReplyWithError(ctx, RDB_EZONE); + } else { + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + } +} + +static void zone_exists(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + int ret = set_active_transaction(ctx, origin, txn); + if (ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, RDB_EZONE); + return; + } + + index_k zone_index_key = get_zone_index(ctx, origin, txn, REDISMODULE_READ); + if (zone_index_key == NULL) { + RedisModule_ReplyWithError(ctx, RDB_ENOSOA); + return; + } + int zone_keytype = RedisModule_KeyType(zone_index_key); + if (zone_keytype == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(zone_index_key); + RedisModule_ReplyWithError(ctx, RDB_ENOSOA); + return; + } else if (zone_keytype != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_CloseKey(zone_index_key); + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return; + } + + uint32_t serial = 0; + exception_t e = index_soa_serial(ctx, zone_index_key, false, &serial); + RedisModule_CloseKey(zone_index_key); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + return; + } + + RedisModule_ReplyWithLongLong(ctx, serial); +} + +static int dump_rrset(RedisModuleCtx *ctx, knot_rrset_t *rrset, char *buf, + size_t buf_size, long *count, dump_mode_t mode) +{ + const knot_dump_style_t style = KNOT_DUMP_STYLE_DEFAULT; + + knot_dname_txt_storage_t owner; + (void)knot_dname_to_str(owner, rrset->owner, sizeof(owner)); + + char rtype[16]; + (void)knot_rrtype_to_string(rrset->type, rtype, sizeof(rtype)); + + char ttl[16]; + if (rrset->type != KNOT_RRTYPE_RRSIG) { + if (rrset->ttl == TTL_EMPTY) { + strlcpy(ttl, TTL_EMPTY_STR, sizeof(ttl)); + } else { + (void)snprintf(ttl, sizeof(ttl), "%u", rrset->ttl); + } + } + + knot_rdata_t *rr = rrset->rrs.rdata; + for (uint16_t i = 0; i < rrset->rrs.count; i++) { + if (rrset->type == KNOT_RRTYPE_RRSIG) { + uint32_t orig_ttl = knot_rrsig_original_ttl(rr); + if (orig_ttl == TTL_EMPTY) { + strlcpy(ttl, TTL_EMPTY_STR, sizeof(ttl)); + } else { + (void)snprintf(ttl, sizeof(ttl), "%u", orig_ttl); + } + } + + int ret = knot_rrset_txt_dump_data(rrset, i, buf, buf_size, &style); + if (ret == KNOT_ESPACE) { + (*count)++; + RedisModule_ReplyWithError(ctx, RDB_EALLOC); + return -1; + } else if (ret < 0) { + (*count)++; + RedisModule_ReplyWithError(ctx, RDB_EMALF); + return -1; + } + + if (mode == DUMP_COMPACT) { + char *line = sprintf_alloc("%s %s %s %s", owner, ttl, rtype, buf); + if (line == NULL) { + (*count)++; + RedisModule_ReplyWithError(ctx, RDB_EALLOC); + return -1; + } + RedisModule_ReplyWithStringBuffer(ctx, line, strlen(line)); + free(line); + } else { + RedisModule_Assert(mode == DUMP_TXT); + RedisModule_ReplyWithArray(ctx, 4); + RedisModule_ReplyWithStringBuffer(ctx, owner, strlen(owner)); + RedisModule_ReplyWithStringBuffer(ctx, ttl, strlen(ttl)); + RedisModule_ReplyWithStringBuffer(ctx, rtype, strlen(rtype)); + RedisModule_ReplyWithStringBuffer(ctx, buf, strlen(buf)); + } + (*count)++; + + rr = knot_rdataset_next(rr); + } + + return 0; +} + +static bool meta_exists(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + RedisModule_Assert(txn->instance != 0); + + RedisModuleString *txn_k = zone_meta_keyname_construct(ctx, origin, txn->instance); + zone_meta_k key = RedisModule_OpenKey(ctx, txn_k, REDISMODULE_READ); + RedisModule_FreeString(ctx, txn_k); + if (key == NULL) { + return false; + } else if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { + RedisModule_CloseKey(key); + return false; + } + size_t len = 0; + const zone_meta_storage_t *meta = (const zone_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_READ); + bool out = meta->lock[txn->id] != 0; + RedisModule_CloseKey(key); + return out; +} + +static void zone_load(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn, + const arg_dname_t *opt_owner, uint16_t *opt_rtype, dump_mode_t mode) +{ + if (txn->id == TXN_ID_ACTIVE) { + int ret = set_active_transaction(ctx, origin, txn); + if (ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, RDB_EZONE); + return; + } + } else if (meta_exists(ctx, origin, txn) == false) { + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return; + } + + index_k index_key = get_zone_index(ctx, origin, txn, REDISMODULE_READ); + if (index_key == NULL) { + RedisModule_ReplyWithEmptyArray(ctx); + return; + } + int zone_keytype = RedisModule_KeyType(index_key); + if (zone_keytype != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_CloseKey(index_key); + RedisModule_ReplyWithError(ctx, RDB_EMALF); + return; + } + + char buf[128 * 1024]; + + long count = 0; + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + foreach_in_zset (index_key) { + RedisModuleString *el = RedisModule_ZsetRangeCurrentElement(index_key, NULL); + rrset_k rrset_key = RedisModule_OpenKey(ctx, el, REDISMODULE_READ); + if (rrset_key == NULL) { + RedisModule_FreeString(ctx, el); + count++; + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + break; + } + + size_t key_strlen = 0; + const char *key_str = RedisModule_StringPtrLen(el, &key_strlen); + wire_ctx_t w = wire_ctx_init((uint8_t *)key_str, key_strlen); + wire_ctx_skip(&w, RDB_PREFIX_LEN + 2 + origin->len); + uint8_t owner_len = wire_ctx_read_u8(&w); + knot_dname_t *owner = w.position; + wire_ctx_skip(&w, owner_len); + uint16_t rtype = wire_ctx_read_u16(&w); + RedisModule_Assert(w.error == KNOT_EOK); + RedisModule_FreeString(ctx, el); + + if (opt_owner != NULL && + (opt_owner->len != owner_len || memcmp(owner, opt_owner->data, owner_len) != 0)) { + RedisModule_CloseKey(rrset_key); + continue; + } + if (opt_rtype != NULL && rtype != *opt_rtype) { + RedisModule_CloseKey(rrset_key); + continue; + } + + rrset_v *rrset = RedisModule_ModuleTypeGetValue(rrset_key); + + if (mode == DUMP_BIN) { + RedisModule_ReplyWithArray(ctx, 5); + RedisModule_ReplyWithStringBuffer(ctx, (char *)owner, owner_len); + RedisModule_ReplyWithLongLong(ctx, rtype); + RedisModule_ReplyWithLongLong(ctx, rrset->ttl); + RedisModule_ReplyWithLongLong(ctx, rrset->rrs.count); + RedisModule_ReplyWithStringBuffer(ctx, (const char *)rrset->rrs.rdata, rrset->rrs.size); + count++; + } else { + knot_rrset_t rrset_out; + knot_rrset_init(&rrset_out, owner, rtype, KNOT_CLASS_IN, rrset->ttl); + rrset_out.rrs = rrset->rrs; + if (dump_rrset(ctx, &rrset_out, buf, sizeof(buf), &count, mode) != 0) { + RedisModule_CloseKey(rrset_key); + break; + } + } + RedisModule_CloseKey(rrset_key); + } + RedisModule_ZsetRangeStop(index_key); + RedisModule_CloseKey(index_key); + RedisModule_ReplySetArrayLength(ctx, count); +} + +static void upd_meta_unlock(RedisModuleCtx *ctx, upd_meta_k key, uint8_t id) +{ + size_t len = 0; + upd_meta_storage_t *meta = (upd_meta_storage_t *)RedisModule_StringDMA(key, &len, REDISMODULE_WRITE); + RedisModule_Assert(len == sizeof(*meta)); + meta->lock[id] = 0; +} + +static exception_t upd_abort(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + upd_meta_k meta_key = upd_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); + if (meta_key == NULL) { + throw(KNOT_ENOENT, RDB_ETXN); + } + + if (delete_upd_index(ctx, origin, txn) != KNOT_EOK) { + RedisModule_CloseKey(meta_key); + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + upd_meta_unlock(ctx, meta_key, txn->id); + + RedisModule_CloseKey(meta_key); + return_ok; +} + +static void upd_abort_v(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +{ + exception_t e = upd_abort(ctx, origin, txn); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } else { + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); + } +} + +static int upd_check(const diff_v *diff, const rrset_v *rrset, uint16_t rtype, + int64_t *serial_upd, const char **err) +{ + if (diff->rem_rrs.count > 0) { + if (rrset == NULL) { + *err = "failed to remove non-existent record"; + return KNOT_ESEMCHECK; + } else if (!knot_rdataset_subset(&diff->rem_rrs, &rrset->rrs)) { + *err = "failed to remove non-existent record"; + return KNOT_ESEMCHECK; + } else if (diff->rem_ttl != TTL_EMPTY && rrset->ttl != diff->rem_ttl) { + *err = "failed to remove record with non-matching TTL"; + return KNOT_ESEMCHECK; + } + } + + if (rrset != NULL) { + uint16_t rr_count = diff->add_rrs.count; + knot_rdata_t *rr = diff->add_rrs.rdata; + for (size_t i = 0; i < rr_count; ++i) { + if (find_rr_pos(&rrset->rrs, rr) != KNOT_ENOENT) { + *err = "failed to add existing record"; + return KNOT_ESEMCHECK; + } + rr = knot_rdataset_next(rr); + } + } + + if (rtype == KNOT_RRTYPE_SOA) { + RedisModule_Assert(rrset != NULL); + knot_rdataset_t soa_tmp; + if (knot_rdataset_copy(&soa_tmp, &rrset->rrs, &mm) != KNOT_EOK || + knot_rdataset_subtract(&soa_tmp, &diff->rem_rrs, &mm) != KNOT_EOK || + knot_rdataset_merge(&soa_tmp, &diff->add_rrs, &mm) != KNOT_EOK) { + *err = "failed to update SOA"; + return KNOT_ENOMEM; + } + + if (soa_tmp.count != 1) { + knot_rdataset_clear(&soa_tmp, &mm); + *err = "exactly one SOA expected"; + return KNOT_ESEMCHECK; + } + + uint32_t serial_new = knot_soa_serial(soa_tmp.rdata); + uint32_t serial_diff = serial_new - knot_soa_serial(rrset->rrs.rdata); + if (serial_diff == 0 || serial_diff >= 0x80000000U) { + knot_rdataset_clear(&soa_tmp, &mm); + *err = "new SOA serial not increased"; + return KNOT_ESEMCHECK; + } + *serial_upd = serial_new; + + knot_rdataset_clear(&soa_tmp, &mm); + } + + return KNOT_EOK; +} + +static exception_t upd_trim_history(RedisModuleCtx *ctx, const arg_dname_t *origin, + const rdb_txn_t *txn, const uint16_t depth) +{ + rdb_txn_t zone_txn = { + .instance = txn->instance, + .id = TXN_ID_ACTIVE + }; + int ret = set_active_transaction(ctx, origin, &zone_txn); + if (ret != KNOT_EOK) { + throw(KNOT_EEXIST, RDB_EZONE); + } + + RedisModuleString *soa_rrset_keyname = rrset_keyname_construct(ctx, origin, &zone_txn, origin, KNOT_RRTYPE_SOA); + rrset_k soa_rrset_key = RedisModule_OpenKey(ctx, soa_rrset_keyname, REDISMODULE_READ); + if (soa_rrset_key == NULL) { + throw(KNOT_EEXIST, RDB_ENOSOA); + } + RedisModule_FreeString(ctx, soa_rrset_keyname); + + rrset_v *rrset = RedisModule_ModuleTypeGetValue(soa_rrset_key); + if (rrset == NULL) { + RedisModule_CloseKey(soa_rrset_key); + throw(KNOT_EEXIST, RDB_ENOSOA); + } + + uint32_t serial_it = knot_soa_serial(rrset->rrs.rdata); + uint32_t serial_begin = serial_it; + RedisModule_CloseKey(soa_rrset_key); + + uint16_t counter = 0; + for (counter = 0; counter < depth; ++counter) { + if (counter != 0 && serial_begin == serial_it) { + break; + } + index_k upd_index_key = get_commited_upd_index(ctx, origin, txn, serial_it, REDISMODULE_READ | REDISMODULE_WRITE); + int upd_index_type = RedisModule_KeyType(upd_index_key); + if (upd_index_type == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(upd_index_key); + return_ok; + } else if (upd_index_type != REDISMODULE_KEYTYPE_ZSET) { + RedisModule_CloseKey(upd_index_key); + throw(KNOT_EINVAL, RDB_ECORRUPTED); + } + + exception_t e = index_soa_serial(ctx, upd_index_key, true, &serial_it); + RedisModule_CloseKey(upd_index_key); + if (e.ret != KNOT_EOK) { + throw(KNOT_EINVAL, e.what); + } + } + + if (counter < depth) { + return_ok; + } + + size_t c; + exception_t e = upd_deep_delete(ctx, origin, txn, serial_it, &c); + if (e.ret != KNOT_EOK) { + raise(e); + } + + return_ok; +} + +static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn) +{ + upd_meta_k meta_key = upd_meta_get_when_open(ctx, origin, upd_txn, REDISMODULE_READ | REDISMODULE_WRITE); + if (meta_key == NULL) { + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + + int ret = get_id(ctx, origin, upd_txn); + if (ret < 0 || ret > UINT16_MAX) { + RedisModule_CloseKey(meta_key); + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + uint16_t id = ret; + + rdb_txn_t zone_txn = { + .instance = upd_txn->instance + }; + ret = set_active_transaction(ctx, origin, &zone_txn); + if (ret != KNOT_EOK) { + RedisModule_CloseKey(meta_key); + RedisModule_ReplyWithError(ctx, RDB_EZONE); + return; + } + + // Check the update before its application. + int64_t serial_upd = -1; + index_k upd_key = get_upd_index(ctx, origin, upd_txn, id, REDISMODULE_READ | REDISMODULE_WRITE); + foreach_in_zset(upd_key) { + RedisModuleString *el = RedisModule_ZsetRangeCurrentElement(upd_key, NULL); + size_t el_len = 0; + const char *el_str = RedisModule_StringPtrLen(el, &el_len); + RedisModule_Assert(el_str != NULL && el_len > 0); + + wire_ctx_t w = wire_ctx_init((uint8_t *)el_str, el_len); + wire_ctx_skip(&w, RDB_PREFIX_LEN + 2 + origin->len); + uint8_t owner_len = wire_ctx_read_u8(&w); + arg_dname_t owner = { + .data = w.position, + .len = owner_len + }; + wire_ctx_skip(&w, owner.len); + uint16_t rtype = wire_ctx_read_u16(&w); + + diff_k diff_key = RedisModule_OpenKey(ctx, el, REDISMODULE_READ); + RedisModule_FreeString(ctx, el); + const diff_v *diff = RedisModule_ModuleTypeGetValue(diff_key); + RedisModule_Assert(diff != NULL); + + RedisModuleString *rrset_keystr = rrset_keyname_construct(ctx, origin, &zone_txn, &owner, rtype); + rrset_k rrset_key = RedisModule_OpenKey(ctx, rrset_keystr, REDISMODULE_READ); + RedisModule_FreeString(ctx, rrset_keystr); + rrset_v *rrset = RedisModule_ModuleTypeGetValue(rrset_key); + + const char *err = NULL; + ret = upd_check(diff, rrset, rtype, &serial_upd, &err); + RedisModule_CloseKey(rrset_key); + RedisModule_CloseKey(diff_key); + if (ret != KNOT_EOK) { + char msg[512], owner_str[256], rtype_str[16]; + (void)knot_dname_to_str(owner_str, owner.data, sizeof(owner_str)); + (void)knot_rrtype_to_string(rtype, rtype_str, sizeof(rtype_str)); + (void)snprintf(msg, sizeof(msg), RDB_E("%s, owner %s, type %s"), + err, owner_str, rtype_str); + RedisModule_CloseKey(upd_key); + RedisModule_CloseKey(meta_key); + RedisModule_ReplyWithError(ctx, msg); + return; + } + } + + // Check if SOA serial was explicitly incremented; compute new serial otherwise. + rrset_k soa_key = NULL; + rrset_v *soa_rrset = NULL; + uint32_t serial_new; + if (serial_upd == -1) { + index_k zone_key = get_zone_index(ctx, origin, &zone_txn, REDISMODULE_READ); + RedisModuleString *soa_keyname = index_soa_keyname(ctx, zone_key); + RedisModule_CloseKey(zone_key); + soa_key = RedisModule_OpenKey(ctx, soa_keyname, REDISMODULE_WRITE); + RedisModule_FreeString(ctx, soa_keyname); + RedisModule_Assert(RedisModule_ModuleTypeGetType(soa_key) == rdb_rrset_t); + soa_rrset = RedisModule_ModuleTypeGetValue(soa_key); + RedisModule_Assert(soa_rrset != NULL); + serial_new = knot_soa_serial(soa_rrset->rrs.rdata) + 1; + } else { + serial_new = serial_upd; + } + + // Delete collision updates + size_t len = 0; + upd_meta_storage_t *meta = (upd_meta_storage_t *)RedisModule_StringDMA(meta_key, &len, REDISMODULE_WRITE); + RedisModule_Assert(len == sizeof(*meta)); + + size_t counter = 0; + exception_t e = upd_deep_delete(ctx, origin, upd_txn, serial_new, &counter); + if (e.ret != KNOT_EOK) { + RedisModule_CloseKey(upd_key); + RedisModule_CloseKey(meta_key); + RedisModule_ReplyWithError(ctx, RDB_EHISTORY); + return; + } + meta->depth -= counter; + + // Commit the update. + index_k new_upd_key = get_commited_upd_index(ctx, origin, upd_txn, serial_new, REDISMODULE_READ | REDISMODULE_WRITE); + foreach_in_zset(upd_key) { + double score = 0.0; + RedisModuleString *el = RedisModule_ZsetRangeCurrentElement(upd_key, &score); + size_t el_len = 0; + const char *el_str = RedisModule_StringPtrLen(el, &el_len); + RedisModule_Assert(el_str != NULL && el_len > 0); + + RedisModule_Assert(RedisModule_ZsetAdd(new_upd_key, score, el, NULL) == REDISMODULE_OK); + + wire_ctx_t w = wire_ctx_init((uint8_t *)el_str, el_len); + wire_ctx_skip(&w, RDB_PREFIX_LEN + 2 + origin->len); + uint8_t owner_len = wire_ctx_read_u8(&w); + arg_dname_t owner = { + .data = w.position, + .len = owner_len + }; + wire_ctx_skip(&w, owner.len); + uint16_t rtype = wire_ctx_read_u16(&w); + + diff_k diff_key = RedisModule_OpenKey(ctx, el, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModule_FreeString(ctx, el); + diff_v *diff = RedisModule_ModuleTypeGetValue(diff_key); + RedisModule_Assert(diff != NULL); + + uint16_t rr_count = diff->rem_rrs.count; + knot_rdata_t *rr = diff->rem_rrs.rdata; + for (size_t i = 0; i < rr_count; ++i) { + exception_t ex = rdata_remove(ctx, origin, &zone_txn, &owner, &diff->rem_ttl, rtype, rr); + if (ex.ret != KNOT_EOK) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, RDB_ESTORE); + } + rr = knot_rdataset_next(rr); + } + + rr_count = diff->add_rrs.count; + rr = diff->add_rrs.rdata; + for (size_t i = 0; i < rr_count; ++i) { + exception_t ex = rdata_add(ctx, origin, &zone_txn, &owner, diff->add_ttl, rtype, rr); + if (ex.ret != KNOT_EOK) { + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, RDB_ESTORE); + } + rr = knot_rdataset_next(rr); + } + + RedisModule_CloseKey(diff_key); + } + ++meta->depth; + RedisModule_DeleteKey(upd_key); + RedisModule_CloseKey(upd_key); + + // Increment SOA serial and add a corresponding diff. + if (serial_upd == -1) { + RedisModuleString *soa_diff_keyname = diff_keyname_construct(ctx, origin, upd_txn, origin, KNOT_RRTYPE_SOA, id); + diff_k soa_diff_key = RedisModule_OpenKey(ctx, soa_diff_keyname, REDISMODULE_WRITE); + + diff_v *diff = RedisModule_Calloc(1, sizeof(diff_v)); + RedisModule_Assert(diff != NULL); + diff->add_ttl = soa_rrset->ttl; + diff->rem_ttl = soa_rrset->ttl; + RedisModule_Assert(soa_rrset != NULL); + (void)knot_rdataset_copy(&diff->rem_rrs, &soa_rrset->rrs, &mm); + knot_soa_serial_set(soa_rrset->rrs.rdata, serial_new); + (void)knot_rdataset_copy(&diff->add_rrs, &soa_rrset->rrs, &mm); + RedisModule_Assert(RedisModule_ModuleTypeSetValue(soa_diff_key, rdb_diff_t, diff) == REDISMODULE_OK); + RedisModule_Assert(RedisModule_ZsetAdd(new_upd_key, evaluate_score(KNOT_RRTYPE_SOA), soa_diff_keyname, NULL) == REDISMODULE_OK); + + RedisModule_FreeString(ctx, soa_diff_keyname); + RedisModule_CloseKey(soa_diff_key); + RedisModule_CloseKey(soa_key); + } + RedisModule_CloseKey(new_upd_key); + + e = upd_trim_history(ctx, origin, upd_txn, rdb_upd_history_len); + if (e.ret == KNOT_EOK) { + meta->depth = MIN(meta->depth, rdb_upd_history_len); + } + + upd_meta_unlock(ctx, meta_key, upd_txn->id); + RedisModule_CloseKey(meta_key); + + commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new); + if (e.ret != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, e.what); + } else { + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); + } +} + +static int upd_dump(RedisModuleCtx *ctx, index_k index_key, const arg_dname_t *origin, + const arg_dname_t *opt_owner, const uint16_t *opt_rtype, dump_mode_t mode) +{ + RedisModule_Assert(RedisModule_KeyType(index_key) == REDISMODULE_KEYTYPE_ZSET); + + char buf[128 * 1024]; + + long count = 0; + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + foreach_in_zset (index_key) { + double score = 0.0; + RedisModuleString *el = RedisModule_ZsetRangeCurrentElement(index_key, &score); + diff_k diff_key = RedisModule_OpenKey(ctx, el, REDISMODULE_READ); + if (diff_key == NULL) { + RedisModule_FreeString(ctx, el); + continue; + } + + size_t key_strlen = 0; + const char *key_str = RedisModule_StringPtrLen(el, &key_strlen); + wire_ctx_t w = wire_ctx_init((uint8_t *)key_str, key_strlen); + wire_ctx_skip(&w, RDB_PREFIX_LEN + 2 + origin->len); + uint8_t owner_len = wire_ctx_read_u8(&w); + knot_dname_t *owner = w.position; + wire_ctx_skip(&w, owner_len); + uint16_t rtype = wire_ctx_read_u16(&w); + RedisModule_Assert(w.error == KNOT_EOK); + RedisModule_FreeString(ctx, el); + + if (opt_owner != NULL && + (opt_owner->len != owner_len || memcmp(owner, opt_owner->data, owner_len) != 0)) { + RedisModule_CloseKey(diff_key); + continue; + } + + if (opt_rtype != NULL && rtype != *opt_rtype) { + RedisModule_CloseKey(diff_key); + continue; + } + + diff_v *diff = RedisModule_ModuleTypeGetValue(diff_key); + if (mode == DUMP_BIN) { + RedisModule_ReplyWithArray(ctx, 8); + RedisModule_ReplyWithStringBuffer(ctx, (char *)owner, owner_len); + RedisModule_ReplyWithLongLong(ctx, rtype); + RedisModule_ReplyWithLongLong(ctx, diff->rem_ttl); + RedisModule_ReplyWithLongLong(ctx, diff->add_ttl); + RedisModule_ReplyWithLongLong(ctx, diff->rem_rrs.count); + RedisModule_ReplyWithStringBuffer(ctx, (const char *)diff->rem_rrs.rdata, diff->rem_rrs.size); + RedisModule_ReplyWithLongLong(ctx, diff->add_rrs.count); + RedisModule_ReplyWithStringBuffer(ctx, (const char *)diff->add_rrs.rdata, diff->add_rrs.size); + } else { + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + long count_sub = 0; + knot_rrset_t rrset_out; + knot_rrset_init(&rrset_out, owner, rtype, KNOT_CLASS_IN, diff->rem_ttl); + rrset_out.rrs = diff->rem_rrs; + if (dump_rrset(ctx, &rrset_out, buf, sizeof(buf), &count_sub, mode) != 0) { + RedisModule_CloseKey(diff_key); + break; + } + RedisModule_ReplySetArrayLength(ctx, count_sub); + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + count_sub = 0; + knot_rrset_init(&rrset_out, owner, rtype, KNOT_CLASS_IN, diff->add_ttl); + rrset_out.rrs = diff->add_rrs; + if (dump_rrset(ctx, &rrset_out, buf, sizeof(buf), &count_sub, mode) != 0) { + RedisModule_CloseKey(diff_key); + break; + } + RedisModule_ReplySetArrayLength(ctx, count_sub); + } + count++; + RedisModule_CloseKey(diff_key); + } + RedisModule_ZsetRangeStop(index_key); + RedisModule_ReplySetArrayLength(ctx, count); + + return REDISMODULE_OK; +} + +static void upd_diff(RedisModuleCtx *ctx, const arg_dname_t *origin, const rdb_txn_t *txn, + const arg_dname_t *opt_owner, uint16_t *opt_rtype, dump_mode_t mode) +{ + int ret = get_id(ctx, origin, txn); + if (ret < 0 || ret > UINT16_MAX) { + RedisModule_ReplyWithError(ctx, RDB_ETXN); + return; + } + uint16_t id = ret; + index_k index_key = get_upd_index(ctx, origin, txn, id, REDISMODULE_READ | REDISMODULE_WRITE); + if (index_key == NULL) { + RedisModule_ReplyWithError(ctx, RDB_ECORRUPTED); + return; + } + + upd_dump(ctx, index_key, origin, opt_owner, opt_rtype, mode); + + RedisModule_CloseKey(index_key); +} + +/*! + * \notice The changesets are searched for in reverse order: first the surrent SOA is read from the zone and + * then every changeset is found by its own "serial to". From inside of the changeset, the + * "serial from" is gained, digging deeper into the history down to the given limit. Thanks to + * a trick with recursion, the changesets are in the end returned in chronological order. The + * serial-related params are as follows: + * + * \param serial_begin The current SOA serial of the zone, where the search starts. It is the newest (highest) serial. + * \param serial_end The serial where the search ends, specified by the user (caller of knot.upd.load). It is the oldest (lowest) serial. + * \param serial The "serial to" of the changeset that is being loaded. + */ +static exception_t upd_load_serial(RedisModuleCtx *ctx, size_t *counter, const arg_dname_t *origin, + const rdb_txn_t *txn, const uint32_t serial_begin, const uint32_t serial_end, + const uint32_t serial, const arg_dname_t *opt_owner, const uint16_t *opt_rtype, + const dump_mode_t mode) +{ + index_k upd_index_key = get_commited_upd_index(ctx, origin, txn, serial, REDISMODULE_READ); + if (upd_index_key == NULL) { + throw(KNOT_ENOENT, NULL); + } + + uint32_t serial_next = 0; + exception_t e = index_soa_serial(ctx, upd_index_key, true, &serial_next); + if (e.ret != KNOT_EOK) { + RedisModule_CloseKey(upd_index_key); + raise(e); + } + + if (serial_next != serial_end && serial_next != serial_begin) { + e = upd_load_serial(ctx, counter, origin, txn, serial_begin, + serial_end, serial_next, opt_owner, + opt_rtype, mode); + if (e.ret != KNOT_EOK) { + RedisModule_CloseKey(upd_index_key); + raise(e); + } + } + + upd_dump(ctx, upd_index_key, origin, opt_owner, opt_rtype, mode); + ++(*counter); + + RedisModule_CloseKey(upd_index_key); + + return_ok; +} + +static void upd_load(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn, + const uint32_t serial, const arg_dname_t *opt_owner, + const uint16_t *opt_rtype, dump_mode_t mode) +{ + if (set_active_transaction(ctx, origin, txn) != KNOT_EOK) { + RedisModule_ReplyWithError(ctx, RDB_EZONE); + return; + } + + RedisModuleString *soa_rrset_keyname = rrset_keyname_construct(ctx, origin, txn, origin, KNOT_RRTYPE_SOA); + rrset_k soa_rrset_key = RedisModule_OpenKey(ctx, soa_rrset_keyname, REDISMODULE_READ); + RedisModule_FreeString(ctx, soa_rrset_keyname); + rrset_v *rrset = RedisModule_ModuleTypeGetValue(soa_rrset_key); + if (rrset == NULL) { + RedisModule_CloseKey(soa_rrset_key); + RedisModule_ReplyWithError(ctx, RDB_ENOSOA); + return; + } + uint32_t serial_it = knot_soa_serial(rrset->rrs.rdata); + RedisModule_CloseKey(soa_rrset_key); + + size_t counter = 0; + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + exception_t e = upd_load_serial(ctx, &counter, origin, txn, serial_it, serial, + serial_it, opt_owner, opt_rtype, mode); + if (e.ret != KNOT_EOK && e.what != NULL) { + RedisModule_ReplyWithError(ctx, e.what); + counter++; + } + RedisModule_ReplySetArrayLength(ctx, counter); + + return; +} diff --git a/src/redis/knot.c b/src/redis/knot.c new file mode 100644 index 0000000000..5074b9ab22 --- /dev/null +++ b/src/redis/knot.c @@ -0,0 +1,886 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: LGPL-2.1-or-later + * For more information, see + */ + +#include +#include +#include +#include +#include + +#define REDISMODULE_MAIN // Fixes loading error undefined symbol: RedisModule_ReplySetArrayLength. +#include "contrib/redis/redismodule.h" +#include "redis/knot.h" +#include "redis/error.h" +#include "redis/libs.h" +#include "redis/arg.h" +#include "redis/type_diff.h" +#include "redis/type_rrset.h" +#include "redis/internal.h" + +#define register_command_txt(name, cb, rights) \ + RedisModule_CreateCommand(ctx, name, cb, rights, 1, 1, 1) == REDISMODULE_ERR || \ + (cmd = RedisModule_GetCommand(ctx, name)) == NULL || \ + RedisModule_SetCommandInfo(cmd, &cb##_info) == REDISMODULE_ERR + +#define register_command_bin(name, cb, rights) \ + RedisModule_CreateCommand(ctx, name, cb, rights, 1, 1, 1) == REDISMODULE_ERR + +static RedisModuleCommandArg instance_info_args[] = { + {"zone", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"instance", REDISMODULE_ARG_TYPE_INTEGER, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + { 0 } +}; + +static RedisModuleCommandArg transaction_info_args[] = { + {"zone", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"transaction", REDISMODULE_ARG_TYPE_INTEGER, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + { 0 } +}; + +static RedisModuleCommandArg store_txt_info_args[] = { + {"zone", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"transaction", REDISMODULE_ARG_TYPE_INTEGER, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"data", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + { 0 } +}; + +static RedisModuleCommandArg upd_load_txt_info_args[] = { + {"opt", REDISMODULE_ARG_TYPE_PURE_TOKEN, -1, "--compact", NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + {"zone", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"instance", REDISMODULE_ARG_TYPE_INTEGER, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"serial", REDISMODULE_ARG_TYPE_INTEGER, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"owner", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + {"rtype", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + { 0 } +}; + +static RedisModuleCommandArg zone_load_txt_info_args[] = { + {"opt", REDISMODULE_ARG_TYPE_PURE_TOKEN, -1, "--compact", NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + {"zone", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"instance", REDISMODULE_ARG_TYPE_INTEGER, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_NONE}, + {"owner", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + {"rtype", REDISMODULE_ARG_TYPE_STRING, -1, NULL, NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + { 0 } +}; + +static RedisModuleCommandArg zone_list_txt_info_args[] = { + {"opt", REDISMODULE_ARG_TYPE_PURE_TOKEN, -1, "--instances", NULL, NULL, REDISMODULE_CMD_ARG_OPTIONAL}, + { 0 } +}; + +static const RedisModuleCommandInfo zone_begin_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Create a zone full transaction", + .complexity = "O(1)", + .since = "7.0.0", + .arity = 3, + .args = instance_info_args, +}; + +static const RedisModuleCommandInfo zone_store_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Store records in a zone full transaction", + .complexity = "O(m), where m is the number of stored records", + .since = "7.0.0", + .arity = 4, + .args = store_txt_info_args, +}; + +static const RedisModuleCommandInfo zone_commit_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Commit a zone full transaction", + .complexity = "O(1)", + .since = "7.0.0", + .arity = 3, + .args = transaction_info_args, +}; + +static const RedisModuleCommandInfo zone_abort_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Abort a zone full transaction", + .complexity = "O(n), where n is the number of records in the transaction", + .since = "7.0.0", + .arity = 3, + .args = transaction_info_args, +}; + +static const RedisModuleCommandInfo zone_load_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Load a zone instance or full transaction", + .complexity = "O(n), where n is the number of records in the zone", + .since = "7.0.0", + .arity = -3, + .args = zone_load_txt_info_args, +}; + +static const RedisModuleCommandInfo zone_purge_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Purge a zone instance", + .complexity = "O(n), where n is the number of records in the zone and its updates", + .since = "7.0.0", + .arity = 3, + .args = instance_info_args, +}; + +static const RedisModuleCommandInfo zone_list_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "List zones stored in the database", + .complexity = "O(z), where z is the number of zones", + .since = "7.0.0", + .arity = -1, + .args = zone_list_txt_info_args, +}; + +static const RedisModuleCommandInfo upd_begin_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Create an zone update transaction", + .complexity = "O(1)", + .since = "7.0.0", + .arity = 3, + .args = instance_info_args, +}; + +static const RedisModuleCommandInfo upd_add_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Add records to a zone update transaction", + .complexity = "O(m), where m is the number of added records", + .since = "7.0.0", + .arity = 4, + .args = store_txt_info_args, +}; + +static const RedisModuleCommandInfo upd_remove_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Remove records from a zone update transaction", + .complexity = "O(m), where m is the number of removed records", + .since = "7.0.0", + .arity = 4, + .args = store_txt_info_args, +}; + +static const RedisModuleCommandInfo upd_commit_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Commit a zone update transaction to a zone", + .complexity = "O(u), where u is the number of records in the update", + .since = "7.0.0", + .arity = 3, + .args = transaction_info_args, +}; + +static const RedisModuleCommandInfo upd_abort_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Abort a zone update transaction", + .complexity = "O(u), where u is the number of records in the update", + .since = "7.0.0", + .arity = 3, + .args = transaction_info_args, +}; + +static const RedisModuleCommandInfo upd_diff_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Load the contents of a zone update transaction", + .complexity = "O(u), where u is the number of records in the update", + .since = "7.0.0", + .arity = -3, + .args = zone_load_txt_info_args, +}; + +static const RedisModuleCommandInfo upd_load_txt_info = { + .version = REDISMODULE_COMMAND_INFO_VERSION, + .summary = "Load zone updates since a specified serial", + .complexity = "O(u), where u is the number of records in the retrieved updates", + .since = "7.0.0", + .arity = -4, + .args = upd_load_txt_info_args, +}; + +static int zone_begin_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_INST_TXT(argv[2], txn); + + zone_begin_txt_format(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_begin_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_INST(argv[2], txn); + + zone_begin_bin_format(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_store_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + void *zone_data; + size_t data_len; + ARG_DATA(argv[3], data_len, zone_data, "zone data"); + + zone_store_txt_format(ctx, &origin, &txn, zone_data, data_len); + return REDISMODULE_OK; +} + +static int zone_store_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 8) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn); + + arg_dname_t owner; + ARG_DNAME(argv[3], owner, "record owner"); + + uint16_t rtype; + ARG_NUM(argv[4], rtype, "record type"); + + uint32_t ttl; + ARG_NUM(argv[5], ttl, "TTL"); + + uint16_t rcount; + ARG_NUM(argv[6], rcount, "record count"); + + uint8_t *rdataset; + size_t rdataset_len; + ARG_DATA(argv[7], rdataset_len, rdataset, "rdataset"); + + zone_store_bin_format(ctx, &origin, &txn, &owner, rtype, ttl, rcount, rdataset, rdataset_len); + return REDISMODULE_OK; +} + +static int zone_commit_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + zone_commit(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn); + + zone_commit(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_abort_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + zone_abort(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_abort_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn); + + zone_abort(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_exists_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_INST(argv[2], txn); + + zone_exists(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_load_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + dump_mode_t mode; + ARG_OPT_TXT(mode, "compact", DUMP_TXT, DUMP_COMPACT); + + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_INST_TXN_TXT(argv[2], txn); + + arg_dname_t owner; + if (argc > 3) { + ARG_DNAME_TXT(argv[3], owner, &origin, "record owner"); + } + + uint16_t rtype; + if (argc > 4) { + ARG_RTYPE_TXT(argv[4], rtype); + } + + if (argc > 5) { + return RedisModule_WrongArity(ctx); + } + + zone_load(ctx, &origin, &txn, (argc >= 4) ? &owner : NULL, + (argc >= 5) ? &rtype : NULL, mode); + return REDISMODULE_OK; +} + +static int zone_load_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc < 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_INST_TXN(argv[2], txn); + + arg_dname_t owner; + if (argc > 3) { + ARG_DNAME(argv[3], owner, "record owner"); + } + + uint16_t rtype; + if (argc > 4) { + ARG_NUM(argv[4], rtype, "record type"); + } + + if (argc > 5) { + return RedisModule_WrongArity(ctx); + } + + zone_load(ctx, &origin, &txn, (argc >= 4) ? &owner : NULL, + (argc >= 5) ? &rtype : NULL, DUMP_BIN); + return REDISMODULE_OK; +} + +static int zone_purge_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_INST_TXT(argv[2], txn); + + zone_purge_v(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_purge_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_INST(argv[2], txn); + + zone_purge_v(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int zone_list_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + bool instances; + ARG_OPT_TXT(instances, "instances", false, true); + + zone_list(ctx, instances, true); + return REDISMODULE_OK; +} + +static int zone_list_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 1) { + return RedisModule_WrongArity(ctx); + } + + zone_list(ctx, true, false); + return REDISMODULE_OK; +} + +static int upd_begin_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_INST_TXT(argv[2], txn); + + upd_begin_txt_format(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int upd_begin_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_INST(argv[2], txn); + + upd_begin_bin_format(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int upd_add_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + void *zone_data; + size_t data_len; + ARG_DATA(argv[3], data_len, zone_data, "zone data"); + + scanner_ctx_t s_ctx = { + .ctx = ctx, + .txn = &txn, + .dflt_ttl = rdb_default_ttl, + .mode = ADD + }; + + run_scanner(&s_ctx, &origin, zone_data, data_len); + return REDISMODULE_OK; +} + +static int upd_add_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 8) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn); + + arg_dname_t owner; + ARG_DNAME(argv[3], owner, "record owner"); + + uint16_t rtype; + ARG_NUM(argv[4], rtype, "record type"); + + uint32_t ttl; + ARG_NUM(argv[5], ttl, "TTL"); + + uint16_t rcount; + ARG_NUM(argv[6], rcount, "record count"); + + uint8_t *rdata; + size_t rdata_len; + ARG_DATA(argv[7], rdata_len, rdata, "rdata"); + + knot_rdataset_t rdataset = { + .count = rcount, + .size = rdata_len, + .rdata = (knot_rdata_t *)rdata + }; + + upd_add_bin_format(ctx, &origin, &txn, &owner, ttl, rtype, &rdataset); + return REDISMODULE_OK; +} + +static int upd_remove_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + void *zone_data; + size_t data_len; + ARG_DATA(argv[3], data_len, zone_data, "zone data"); + + scanner_ctx_t s_ctx = { + .ctx = ctx, + .txn = &txn, + .dflt_ttl = TTL_EMPTY, + .mode = REM + }; + + run_scanner(&s_ctx, &origin, zone_data, data_len); + return REDISMODULE_OK; +} + +static int upd_remove_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 8) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn); + + arg_dname_t owner; + ARG_DNAME(argv[3], owner, "record owner"); + + uint16_t rtype; + ARG_NUM(argv[4], rtype, "record type"); + + uint32_t ttl; + ARG_NUM(argv[5], ttl, "TTL"); + + uint16_t rcount; + ARG_NUM(argv[6], rcount, "record count"); + + uint8_t *rdata; + size_t rdata_len; + ARG_DATA(argv[7], rdata_len, rdata, "rdata"); + + knot_rdataset_t rdataset = { + .count = rcount, + .size = rdata_len, + .rdata = (knot_rdata_t *)rdata + }; + + upd_remove_bin_format(ctx, &origin, &txn, &owner, ttl, rtype, &rdataset); + return REDISMODULE_OK; +} + +static int upd_commit_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + upd_commit(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int upd_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn) + + upd_commit(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int upd_abort_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + upd_abort_v(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int upd_abort_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn) + + upd_abort_v(ctx, &origin, &txn); + return REDISMODULE_OK; +} + +static int upd_diff_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + dump_mode_t mode; + ARG_OPT_TXT(mode, "compact", DUMP_TXT, DUMP_COMPACT); + + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_TXN_TXT(argv[2], txn); + + arg_dname_t owner; + if (argc > 3) { + ARG_DNAME_TXT(argv[3], owner, &origin, "record owner"); + } + + uint16_t rtype; + if (argc > 4) { + ARG_RTYPE_TXT(argv[4], rtype); + } + + if (argc > 5) { + return RedisModule_WrongArity(ctx); + } + + upd_diff(ctx, &origin, &txn, (argc >= 4) ? &owner : NULL, + (argc >= 5) ? &rtype : NULL, mode); + return REDISMODULE_OK; +} + +static int upd_diff_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc < 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_TXN(argv[2], txn); + + arg_dname_t owner; + if (argc > 3) { + ARG_DNAME(argv[3], owner, "record owner"); + } + + uint16_t rtype; + if (argc > 4) { + ARG_NUM(argv[4], rtype, "record type"); + } + + if (argc > 5) { + return RedisModule_WrongArity(ctx); + } + + upd_diff(ctx, &origin, &txn, (argc >= 4) ? &owner : NULL, + (argc >= 5) ? &rtype : NULL, DUMP_BIN); + return REDISMODULE_OK; +} + +static int upd_load_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + dump_mode_t mode; + ARG_OPT_TXT(mode, "compact", DUMP_TXT, DUMP_COMPACT); + + arg_dname_t origin; + ARG_DNAME_TXT(argv[1], origin, NULL, "zone origin"); + + rdb_txn_t txn; + ARG_INST_TXT(argv[2], txn); + + uint32_t serial; + ARG_NUM(argv[3], serial, "zone SOA serial"); + + arg_dname_t owner; + if (argc > 4) { + ARG_DNAME_TXT(argv[3], owner, &origin, "record owner"); + } + + uint16_t rtype; + if (argc > 5) { + ARG_RTYPE_TXT(argv[4], rtype); + } + + if (argc > 6) { + return RedisModule_WrongArity(ctx); + } + + upd_load(ctx, &origin, &txn, serial, (argc >= 5) ? &owner : NULL, + (argc >= 6) ? &rtype : NULL, mode); + return REDISMODULE_OK; +} + +static int upd_load_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc < 3) { + return RedisModule_WrongArity(ctx); + } + + arg_dname_t origin; + ARG_DNAME(argv[1], origin, "zone origin"); + + rdb_txn_t txn; + ARG_INST(argv[2], txn); + + uint32_t serial; + ARG_NUM(argv[3], serial, "zone SOA serial"); + + arg_dname_t owner; + if (argc > 4) { + ARG_DNAME(argv[3], owner, "record owner"); + } + + uint16_t rtype; + if (argc > 5) { + ARG_NUM(argv[4], rtype, "record type"); + } + + if (argc > 6) { + return RedisModule_WrongArity(ctx); + } + + upd_load(ctx, &origin, &txn, serial, (argc >= 5) ? &owner : NULL, + (argc >= 6) ? &rtype : NULL, DUMP_BIN); + return REDISMODULE_OK; +} + +#define LOAD_ERROR(ctx, msg) { \ + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_WARNING, RDB_E(msg)); \ + RedisModule_ReplyWithError(ctx, RDB_E(msg)); \ + return REDISMODULE_ERR; \ +} + +__attribute__((visibility("default"))) +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (RedisModule_Init(ctx, "knot", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) { + LOAD_ERROR(ctx, "module already loaded"); + } + + for (int i = 0; i < argc; i += 2) { + long long num; + size_t key_len; + const char *key = RedisModule_StringPtrLen(argv[i], &key_len); + if (i + 1 >= argc) { + LOAD_ERROR(ctx, "missing configuration option value"); + } + if (strcmp(key, RDB_PARAM_DFLT_TTL) == 0) { + if (RedisModule_StringToLongLong(argv[i + 1], &num) == REDISMODULE_OK && + num <= INT32_MAX) { + rdb_default_ttl = num; + } else { + LOAD_ERROR(ctx, "invalid value of " RDB_PARAM_DFLT_TTL); + } + } else if (strcmp(key, RDB_PARAM_EVENT_AGE) == 0) { + if (RedisModule_StringToLongLong(argv[i + 1], &num) == REDISMODULE_OK) { + rdb_event_age = num; + } else { + LOAD_ERROR(ctx, "invalid value of " RDB_PARAM_EVENT_AGE); + } + } else if (strcmp(key, RDB_PARAM_UPD_DEPTH) == 0) { + if (RedisModule_StringToLongLong(argv[i + 1], &num) == REDISMODULE_OK) { + rdb_upd_history_len = num; + } else { + LOAD_ERROR(ctx, "invalid value of " RDB_PARAM_UPD_DEPTH); + } + } else { + LOAD_ERROR(ctx, "unknown configuration option"); + } + } + + rdb_rrset_t = RedisModule_CreateDataType(ctx, RRSET_NAME, RRSET_ENCODING_VERSION, &rrset_tm); + if (rdb_rrset_t == NULL) { + LOAD_ERROR(ctx, "failed to load type " RRSET_NAME); + } + + rdb_diff_t = RedisModule_CreateDataType(ctx, DIFF_NAME, DIFF_ENCODING_VERSION, &diff_tm); + if (rdb_diff_t == NULL) { + LOAD_ERROR(ctx, "failed to load type " DIFF_NAME); + } + + RedisModuleCommand *cmd = NULL; + if (register_command_txt("KNOT.ZONE.BEGIN", zone_begin_txt, "write fast") || + register_command_txt("KNOT.ZONE.STORE", zone_store_txt, "write fast") || + register_command_txt("KNOT.ZONE.COMMIT", zone_commit_txt, "write") || + register_command_txt("KNOT.ZONE.ABORT", zone_abort_txt, "write") || + register_command_txt("KNOT.ZONE.LOAD", zone_load_txt, "readonly") || + register_command_txt("KNOT.ZONE.PURGE", zone_purge_txt, "write") || + register_command_txt("KNOT.ZONE.LIST", zone_list_txt, "readonly") || + register_command_txt("KNOT.UPD.BEGIN", upd_begin_txt, "write fast") || + register_command_txt("KNOT.UPD.ADD", upd_add_txt, "write fast") || + register_command_txt("KNOT.UPD.REMOVE", upd_remove_txt, "write fast") || + register_command_txt("KNOT.UPD.COMMIT", upd_commit_txt, "write") || + register_command_txt("KNOT.UPD.ABORT", upd_abort_txt, "write") || + register_command_txt("KNOT.UPD.DIFF", upd_diff_txt, "readonly") || + register_command_txt("KNOT.UPD.LOAD", upd_load_txt, "readonly") || + register_command_bin(RDB_CMD_ZONE_EXISTS, zone_exists_bin, "readonly") || + register_command_bin(RDB_CMD_ZONE_BEGIN, zone_begin_bin, "write") || + register_command_bin(RDB_CMD_ZONE_STORE, zone_store_bin, "write") || + register_command_bin(RDB_CMD_ZONE_COMMIT, zone_commit_bin, "write") || + register_command_bin(RDB_CMD_ZONE_ABORT, zone_abort_bin, "write") || + register_command_bin(RDB_CMD_ZONE_LOAD, zone_load_bin, "readonly") || + register_command_bin(RDB_CMD_ZONE_PURGE, zone_purge_bin, "write") || + register_command_bin(RDB_CMD_ZONE_LIST, zone_list_bin, "readonly") || + register_command_bin(RDB_CMD_UPD_BEGIN, upd_begin_bin, "write") || + register_command_bin(RDB_CMD_UPD_ADD, upd_add_bin, "write") || + register_command_bin(RDB_CMD_UPD_REMOVE, upd_remove_bin, "write") || + register_command_bin(RDB_CMD_UPD_COMMIT, upd_commit_bin, "write") || + register_command_bin(RDB_CMD_UPD_ABORT, upd_abort_bin, "write") || + register_command_bin(RDB_CMD_UPD_DIFF, upd_diff_bin, "readonly") || + register_command_bin(RDB_CMD_UPD_LOAD, upd_load_bin, "readonly") || + register_command_bin("KNOT_BIN.AOF.RRSET", rrset_aof_rewrite, "write") || // Add "internal" with newer Redis. + register_command_bin("KNOT_BIN.AOF.DIFF", diff_aof_rewrite, "write")) // Add "internal" with newer Redis. + { + LOAD_ERROR(ctx, "failed to load commands"); + } + + RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "%s loaded with %s=%u %s=%u %s=%u", + PACKAGE_VERSION, + RDB_PARAM_DFLT_TTL, rdb_default_ttl, + RDB_PARAM_EVENT_AGE, rdb_event_age, + RDB_PARAM_UPD_DEPTH, rdb_upd_history_len); + + return REDISMODULE_OK; +} diff --git a/src/redis/knot.h b/src/redis/knot.h new file mode 100644 index 0000000000..8cf4103cb5 --- /dev/null +++ b/src/redis/knot.h @@ -0,0 +1,50 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: GPL-2.0-or-later + * For more information, see + */ + +#pragma once + +#include + +#define RDB_PARAM_DFLT_TTL "default-ttl" +#define RDB_PARAM_EVENT_AGE "max-event-age" +#define RDB_PARAM_UPD_DEPTH "max-update-depth" + +#define RDB_VERSION "\x01" +#define RDB_PREFIX "k" RDB_VERSION +#define RDB_PREFIX_LEN (sizeof(RDB_PREFIX) - 1) + +#define RDB_CMD_ZONE_EXISTS "KNOT_BIN.ZONE.EXISTS" +#define RDB_CMD_ZONE_BEGIN "KNOT_BIN.ZONE.BEGIN" +#define RDB_CMD_ZONE_STORE "KNOT_BIN.ZONE.STORE" +#define RDB_CMD_ZONE_COMMIT "KNOT_BIN.ZONE.COMMIT" +#define RDB_CMD_ZONE_ABORT "KNOT_BIN.ZONE.ABORT" +#define RDB_CMD_ZONE_LOAD "KNOT_BIN.ZONE.LOAD" +#define RDB_CMD_ZONE_PURGE "KNOT_BIN.ZONE.PURGE" +#define RDB_CMD_ZONE_LIST "KNOT_BIN.ZONE.LIST" +#define RDB_CMD_UPD_BEGIN "KNOT_BIN.UPD.BEGIN" +#define RDB_CMD_UPD_ADD "KNOT_BIN.UPD.ADD" +#define RDB_CMD_UPD_REMOVE "KNOT_BIN.UPD.REM" +#define RDB_CMD_UPD_COMMIT "KNOT_BIN.UPD.COMMIT" +#define RDB_CMD_UPD_ABORT "KNOT_BIN.UPD.ABORT" +#define RDB_CMD_UPD_DIFF "KNOT_BIN.UPD.DIFF" +#define RDB_CMD_UPD_LOAD "KNOT_BIN.UPD.LOAD" + +#define RDB_RETURN_OK "OK" + +#define RDB_EVENT_KEY (RDB_PREFIX "\x01") +#define RDB_EVENT_ARG_EVENT "e" +#define RDB_EVENT_ARG_ORIGIN "o" +#define RDB_EVENT_ARG_INSTANCE "i" +#define RDB_EVENT_ARG_SERIAL "s" + +typedef struct { + uint8_t instance; + uint8_t id; +} rdb_txn_t; + +typedef enum { + RDB_EVENT_ZONE = 1, + RDB_EVENT_UPD = 2, +} rdb_event_t; diff --git a/src/redis/libs.h b/src/redis/libs.h new file mode 100644 index 0000000000..7ce3a655f3 --- /dev/null +++ b/src/redis/libs.h @@ -0,0 +1,35 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: LGPL-2.1-or-later + * For more information, see + */ + +#pragma once + +#include "libknot/attribute.h" +#undef _public_ +#define _public_ _hidden_ + +#include "contrib/base32hex.c" +#include "contrib/base64.c" +#include "contrib/mempattern.c" +#include "contrib/musl/inet_ntop.c" +#include "contrib/openbsd/strlcat.c" +#include "contrib/openbsd/strlcpy.c" +#include "contrib/sockaddr.c" +#include "contrib/string.c" +#include "contrib/time.c" +#include "contrib/ucw/mempool.c" +#include "libdnssec/key/keytag.c" +#include "libknot/codes.c" +#include "libknot/descriptor.c" +#include "libknot/dname.c" +#include "libknot/mm_ctx.h" +#include "libknot/rdataset.c" +#include "libknot/rrset-dump.c" +#include "libknot/rrset.c" +#include "libknot/rrtype/naptr.c" +#include "libknot/rrtype/opt.c" +#include "libknot/rrtype/soa.h" +#include "libzscanner/error.c" +#include "libzscanner/functions.c" +#include "libzscanner/scanner.c.t0" diff --git a/src/redis/type_diff.h b/src/redis/type_diff.h new file mode 100644 index 0000000000..7c2d4323c6 --- /dev/null +++ b/src/redis/type_diff.h @@ -0,0 +1,192 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: LGPL-2.1-or-later + * For more information, see + */ + +#pragma once + +#define DIFF_ENCODING_VERSION 1 +#define DIFF_NAME "KnotRdiff" + +typedef struct { + knot_rdataset_t add_rrs; + knot_rdataset_t rem_rrs; + uint32_t add_ttl; + uint32_t rem_ttl; +} diff_v; + +static RedisModuleType *rdb_diff_t; + +static void *diff_load(RedisModuleIO *io, int encver) +{ + if (encver != DIFF_ENCODING_VERSION) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_ECOMPAT); + return NULL; + } + + diff_v *diff = RedisModule_Alloc(sizeof(diff_v)); + if (diff == NULL) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_EALLOC); + return NULL; + } + size_t len = 0; + diff->add_rrs.count = RedisModule_LoadUnsigned(io); + diff->add_rrs.rdata = (knot_rdata_t *)RedisModule_LoadStringBuffer(io, &len); + if (len > UINT32_MAX) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_EMALF); + RedisModule_Free(diff->add_rrs.rdata); + RedisModule_Free(diff); + return NULL; + } + diff->add_rrs.size = len; + + diff->rem_rrs.count = RedisModule_LoadUnsigned(io); + diff->rem_rrs.rdata = (knot_rdata_t *)RedisModule_LoadStringBuffer(io, &len); + if (len > UINT32_MAX) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_EMALF); + RedisModule_Free(diff->add_rrs.rdata); + RedisModule_Free(diff->rem_rrs.rdata); + RedisModule_Free(diff); + return NULL; + } + diff->rem_rrs.size = len; + + diff->add_ttl = RedisModule_LoadUnsigned(io); + diff->rem_ttl = RedisModule_LoadUnsigned(io); + + return diff; +} + +static void diff_save(RedisModuleIO *io, void *value) +{ + diff_v *diff = (diff_v *)value; + + RedisModule_SaveUnsigned(io, diff->add_rrs.count); + RedisModule_SaveStringBuffer(io, (const char *)diff->add_rrs.rdata, diff->add_rrs.size); + + RedisModule_SaveUnsigned(io, diff->rem_rrs.count); + RedisModule_SaveStringBuffer(io, (const char *)diff->rem_rrs.rdata, diff->rem_rrs.size); + + RedisModule_SaveUnsigned(io, diff->add_ttl); + RedisModule_SaveUnsigned(io, diff->rem_ttl); +} + +static size_t diff_mem_usage(const void *value) +{ + const diff_v *diff = (const diff_v *)value; + if (value == NULL) { + return 0UL; + } + return sizeof(*diff) + diff->add_rrs.size + diff->rem_rrs.size; +} + +static void diff_rewrite(RedisModuleIO *io, RedisModuleString *key, void *value) +{ + size_t key_strlen = 0; + const diff_v *diff = (const diff_v *)value; + const uint8_t *key_str = (const uint8_t *)RedisModule_StringPtrLen(key, &key_strlen); + RedisModule_EmitAOF(io, "KNOT_BIN.AOF.DIFF", "blblbll", + key_str, key_strlen, + (long long)diff->add_rrs.count, + diff->add_rrs.rdata, (long long)diff->add_rrs.size, + (long long)diff->rem_rrs.count, + diff->rem_rrs.rdata, (long long)diff->rem_rrs.size, + (long long)diff->add_ttl, + (long long)diff->rem_ttl); +} + +static void diff_free(void *value) +{ + diff_v *diff = (diff_v *)value; + RedisModule_Free(diff->add_rrs.rdata); + RedisModule_Free(diff->rem_rrs.rdata); + RedisModule_Free(diff); +} + +static int diff_aof_rewrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 7) { + return RedisModule_WrongArity(ctx); + } + + diff_v *diff = RedisModule_Calloc(1, sizeof(diff_v)); + if (diff == NULL) { + return RedisModule_ReplyWithError(ctx, RDB_EALLOC); + } + + RedisModuleKey *diff_key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE); + + long long add_rrs_count_val = 0; + int ret = RedisModule_StringToLongLong(argv[2], &add_rrs_count_val); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } else if (add_rrs_count_val < 0 || add_rrs_count_val > UINT16_MAX) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + diff->add_rrs.count = add_rrs_count_val; + + size_t add_rrs_len = 0; + diff->add_rrs.rdata = (knot_rdata_t *)RedisModule_StringPtrLen(argv[3], &add_rrs_len); + if (add_rrs_len > UINT32_MAX) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + diff->add_rrs.size = add_rrs_len; + + long long rem_rrs_count_val = 0; + ret = RedisModule_StringToLongLong(argv[4], &rem_rrs_count_val); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } else if (rem_rrs_count_val < 0 || rem_rrs_count_val > UINT16_MAX) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + diff->rem_rrs.count = rem_rrs_count_val; + + size_t rem_rrs_len = 0; + diff->rem_rrs.rdata = (knot_rdata_t *)RedisModule_StringPtrLen(argv[5], &rem_rrs_len); + if (rem_rrs_len > UINT32_MAX) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + diff->rem_rrs.size = rem_rrs_len; + + long long ttl_val = 0; + ret = RedisModule_StringToLongLong(argv[6], &ttl_val); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } else if (ttl_val < 0 || ttl_val > UINT32_MAX) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + diff->add_ttl = ttl_val; + + ttl_val = 0; + ret = RedisModule_StringToLongLong(argv[7], &ttl_val); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } else if (ttl_val < 0 || ttl_val > UINT32_MAX) { + RedisModule_CloseKey(diff_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + diff->rem_ttl = ttl_val; + + RedisModule_ModuleTypeSetValue(diff_key, rdb_diff_t, diff); + RedisModule_CloseKey(diff_key); + + return RedisModule_ReplyWithNull(ctx); +} + +RedisModuleTypeMethods diff_tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = diff_load, + .rdb_save = diff_save, + .mem_usage = diff_mem_usage, + .aof_rewrite = diff_rewrite, + .free = diff_free +}; diff --git a/src/redis/type_rrset.h b/src/redis/type_rrset.h new file mode 100644 index 0000000000..434e0d8b79 --- /dev/null +++ b/src/redis/type_rrset.h @@ -0,0 +1,143 @@ +/* Copyright (C) CZ.NIC, z.s.p.o. and contributors + * SPDX-License-Identifier: LGPL-2.1-or-later + * For more information, see + */ + +#pragma once + +#define RRSET_ENCODING_VERSION 1 +#define RRSET_NAME "KnotRRset" + +typedef struct { + knot_rdataset_t rrs; + uint32_t ttl; +} rrset_v; + +static RedisModuleType *rdb_rrset_t; + +static void *rrset_load(RedisModuleIO *io, int encver) +{ + if (encver != RRSET_ENCODING_VERSION) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_ECOMPAT); + return NULL; + } + + rrset_v *rrset = RedisModule_Alloc(sizeof(rrset_v)); + if (rrset == NULL) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_EALLOC); + return NULL; + } + size_t len = 0; + rrset->rrs.count = RedisModule_LoadUnsigned(io); + rrset->rrs.rdata = (knot_rdata_t *)RedisModule_LoadStringBuffer(io, &len); + if (len > UINT32_MAX) { + RedisModule_LogIOError(io, REDISMODULE_LOGLEVEL_WARNING, RDB_EMALF); + RedisModule_Free(rrset->rrs.rdata); + RedisModule_Free(rrset); + return NULL; + } + rrset->rrs.size = len; + + rrset->ttl = RedisModule_LoadUnsigned(io); + + return rrset; +} + +static void rrset_save(RedisModuleIO *io, void *value) +{ + rrset_v *rrset = (rrset_v *)value; + + RedisModule_SaveUnsigned(io, rrset->rrs.count); + RedisModule_SaveStringBuffer(io, (const char *)rrset->rrs.rdata, rrset->rrs.size); + + RedisModule_SaveUnsigned(io, rrset->ttl); +} + +static size_t rrset_mem_usage(const void *value) +{ + const rrset_v *rrset = (const rrset_v *)value; + if (value == NULL) { + return 0UL; + } + return sizeof(*rrset) + rrset->rrs.size; +} + +static void rrset_rewrite(RedisModuleIO *io, RedisModuleString *key, void *value) +{ + size_t key_strlen = 0; + const rrset_v *rrset = (const rrset_v *)value; + const uint8_t *key_str = (const uint8_t *)RedisModule_StringPtrLen(key, &key_strlen); + RedisModule_EmitAOF(io, "KNOT_BIN.AOF.RRSET", "blbl", + key_str, key_strlen, + (long long)rrset->rrs.count, + rrset->rrs.rdata, rrset->rrs.size, + (long long)rrset->ttl); +} + +static void rrset_free(void *value) +{ + rrset_v *rrset = (rrset_v *)value; + RedisModule_Free(rrset->rrs.rdata); + RedisModule_Free(rrset); +} + +static int rrset_aof_rewrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 5) { + return RedisModule_WrongArity(ctx); + } + + rrset_v *rrset = RedisModule_Calloc(1, sizeof(rrset_v)); + if (rrset == NULL) { + return RedisModule_ReplyWithError(ctx, RDB_EALLOC); + } + + RedisModuleKey *rrset_key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE); + + long long count_val = 0; + int ret = RedisModule_StringToLongLong(argv[3], &count_val); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(rrset_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } else if (count_val < 0 || count_val > UINT16_MAX) { + RedisModule_CloseKey(rrset_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + rrset->rrs.count = count_val; + + size_t rdataset_strlen; + const char *rdataset_str = RedisModule_StringPtrLen(argv[4], &rdataset_strlen); + if (rdataset_strlen != 0) { + rrset->rrs.rdata = RedisModule_Alloc(rdataset_strlen); + rrset->rrs.size = rdataset_strlen; + memcpy(rrset->rrs.rdata, rdataset_str, rdataset_strlen); + } else { + rrset->rrs.rdata = NULL; + rrset->rrs.size = 0; + } + + long long ttl_val = 0; + ret = RedisModule_StringToLongLong(argv[2], &ttl_val); + if (ret != REDISMODULE_OK) { + RedisModule_CloseKey(rrset_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } else if (ttl_val < 0 || ttl_val > UINT32_MAX) { + RedisModule_CloseKey(rrset_key); + return RedisModule_ReplyWithError(ctx, RDB_EMALF); + } + rrset->ttl = ttl_val; + + RedisModule_ModuleTypeSetValue(rrset_key, rdb_rrset_t, rrset); + RedisModule_CloseKey(rrset_key); + + return RedisModule_ReplyWithNull(ctx); +} + +RedisModuleTypeMethods rrset_tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = rrset_load, + .rdb_save = rrset_save, + .mem_usage = rrset_mem_usage, + .aof_rewrite = rrset_rewrite, + .free = rrset_free +};