size_t size;
};
-/// Return whether we're using optimized variant right now.
-static bool using_avx2(void)
-{
- bool result = (KRU.initialize == KRU_AVX2.initialize);
- kr_require(result || KRU.initialize == KRU_GENERIC.initialize);
- return result;
-}
-
/// Print configuration into desc array.
void defer_str_conf(char *desc, int desc_len)
{
.max_decay = MAX_DECAY,
.log_period = log_period,
.cpus = cpus,
- .using_avx2 = using_avx2(),
+ .using_avx2 = kru_using_avx2(),
};
size_t capacity_log = 0;
sizeof(header.cpus),
"detected padding with undefined data inside mmapped header");
- ret = mmapped_init(&defer_mmapped, mmap_file, size, &header, header_size);
- if (ret == MMAPPED_WAS_FIRST) {
+ ret = mmapped_init(&defer_mmapped, mmap_file, size, &header, header_size, false);
+ if (ret == MMAPPED_PENDING) {
kr_log_info(DEFER, "Initializing defer...\n");
defer = defer_mmapped.mem;
defer->log_time = kr_now() - log_period;
- ret = mmapped_init_continue(&defer_mmapped);
+ ret = mmapped_init_finish(&defer_mmapped);
if (ret != 0) goto fail;
kr_log_info(DEFER, "Defer initialized (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
defer_str_conf(desc, sizeof(desc));
kr_log_info(DEFER, "Defer configuration:\n%s", desc);
}
- } else if (ret == 0) {
+ } else if (ret == MMAPPED_EXISTING) {
defer = defer_mmapped.mem;
kr_log_info(DEFER, "Using existing defer data (%s).\n", (defer->using_avx2 ? "AVX2" : "generic"));
- } else goto fail;
+ } else {
+ kr_assert(ret < 0); // no other combinations of mmapped state flags are allowed in non-persistent case
+ goto fail;
+ }
for (size_t i = 0; i < QUEUES_CNT; i++)
queue_init(queues[i]);
struct ratelimiting *ratelimiting = NULL;
struct mmapped ratelimiting_mmapped = {0};
-/// return whether we're using optimized variant right now
-static bool using_avx2(void)
-{
- bool result = (KRU.initialize == KRU_AVX2.initialize);
- kr_require(result || KRU.initialize == KRU_GENERIC.initialize);
- return result;
-}
-
int ratelimiting_init(const char *mmap_file, size_t capacity, uint32_t instant_limit,
uint32_t rate_limit, uint16_t slip, uint32_t log_period, bool dry_run)
{
.log_period = log_period,
.slip = slip,
.dry_run = dry_run,
- .using_avx2 = using_avx2()
+ .using_avx2 = kru_using_avx2()
};
size_t header_size = offsetof(struct ratelimiting, using_avx2) + sizeof(header.using_avx2);
sizeof(header.dry_run),
"detected padding with undefined data inside mmapped header");
- int ret = mmapped_init(&ratelimiting_mmapped, mmap_file, size, &header, header_size);
- if (ret == MMAPPED_WAS_FIRST) {
+ int ret = mmapped_init(&ratelimiting_mmapped, mmap_file, size, &header, header_size, false);
+ if (ret == MMAPPED_PENDING) {
kr_log_info(SYSTEM, "Initializing rate-limiting...\n");
ratelimiting = ratelimiting_mmapped.mem;
ratelimiting->v6_prices[i] = base_price / V6_RATE_MULT[i];
}
- ret = mmapped_init_continue(&ratelimiting_mmapped);
+ ret = mmapped_init_finish(&ratelimiting_mmapped);
if (ret != 0) goto fail;
kr_log_info(SYSTEM, "Rate-limiting initialized (%s).\n", (ratelimiting->using_avx2 ? "AVX2" : "generic"));
return 0;
- } else if (ret == 0) {
+ } else if (ret == MMAPPED_EXISTING) {
ratelimiting = ratelimiting_mmapped.mem;
kr_log_info(SYSTEM, "Using existing rate-limiting data (%s).\n", (ratelimiting->using_avx2 ? "AVX2" : "generic"));
return 0;
- } // else fail
+ } else {
+ kr_assert(ret < 0); // no other combinations of mmapped state flags are allowed in non-persistent case
+ // fail
+ }
fail:
kr_cache_emergency_file_to_remove = fpath;
}
- if (ret == 0 && opts->maxsize) {
- size_t maxsize = cache->api->get_maxsize(cache->db);
- if (maxsize > opts->maxsize) kr_log_warning(CACHE,
+ size_t maxsize = 0;
+ if (ret == 0) {
+ maxsize = cache->api->get_maxsize(cache->db);
+ if (opts->maxsize && (maxsize > opts->maxsize)) kr_log_warning(CACHE,
"Warning: real cache size is %zu instead of the requested %zu bytes."
" To reduce the size you need to remove the file '%s' by hand.\n",
- maxsize, opts->maxsize, fpath);
+ maxsize, opts->maxsize, fpath); // TODO remove file instead
}
if (ret != 0)
return ret;
cache->ttl_min = KR_CACHE_DEFAULT_TTL_MIN;
cache->ttl_max = KR_CACHE_DEFAULT_TTL_MAX;
kr_cache_make_checkpoint(cache);
+
+ char *top_path = kr_absolutize_path(opts->path, "top");
+ if (kr_fails_assert(top_path)) {
+ ret = kr_error(errno);
+ }
+ if (ret == 0) {
+ ret = kr_cache_top_init(&cache->top, top_path, maxsize);
+ free(top_path);
+ }
+ if (ret != 0) {
+ cache->api->close(cache->db, &cache->stats);
+ return ret;
+ }
+
return 0;
}
void kr_cache_close(struct kr_cache *cache)
{
+ kr_cache_top_deinit(&cache->top);
kr_cache_check_health(cache, -1);
if (cache_isvalid(cache)) {
cache_op(cache, close);
rdataset_dematerialize(rds_sigs, eh->data + rr_ssize);
if (kr_fails_assert(entry_h_consistent_E(val_new_entry, rr->type)))
return kr_error(EINVAL);
- kr_cache_top_access(cache->top, key.data, key.len, "stash_rrset");
+ kr_cache_top_access(&cache->top, key.data, key.len, "stash_rrset");
#if 0 /* Occasionally useful when debugging some kinds of changes. */
{
VERBOSE_MSG(qry, "=> EL write failed (ret: %d)\n", ret);
return kr_ok();
}
- kr_cache_top_access(cache->top, key.data, key.len, "stash_nsec_p");
+ kr_cache_top_access(&cache->top, key.data, key.len, "stash_nsec_p");
if (log_refresh_by) {
VERBOSE_MSG(qry, "=> nsec_p stashed for %s (refresh by %d, hash: %x)\n",
log_dname, log_refresh_by, log_hash);
.raw_data = val.data,
.raw_bound = knot_db_val_bound(val),
};
- kr_cache_top_access(cache->top, key.data, key.len, "peek_exact_real"); // hits only
+ kr_cache_top_access(&cache->top, key.data, key.len, "peek_exact_real"); // hits only
return kr_ok();
}
int kr_cache_peek_exact(struct kr_cache *cache, const knot_dname_t *name, uint16_t type,
#include "lib/cache/cdb_api.h"
#include "lib/defines.h"
#include "contrib/ucw/config.h" /*uint*/
+#include "lib/cache/top.h"
#include "lib/module.h"
/* Prototypes for the 'cache' module implementation. */
const struct kr_cdb_api *api; /**< Storage engine */
struct kr_cdb_stats stats;
uint32_t ttl_min, ttl_max; /**< TTL limits; enforced primarily in iterator actually. */
- struct kr_cache_top *top;
+ union kr_cache_top top;
/* A pair of stamps for detection of real-time shifts during runtime. */
struct timeval checkpoint_walltime; /**< Wall time on the last check-point. */
} txn;
bool is_cache; /**< cache vs. rules; from struct kr_cdb_opts::is_cache */
- struct kr_cache_top *top; // TODO remove
+ union kr_cache_top *top; // TODO remove
/* Cached part of struct stat for data.mdb. */
dev_t st_dev;
eh->has_optout = qf->DNSSEC_OPTOUT;
memcpy(eh->data, &pkt_size, sizeof(pkt_size));
memcpy(eh->data + sizeof(pkt_size), pkt->wire, pkt_size);
- kr_cache_top_access(cache->top, key.data, key.len, "stash_pkt");
+ kr_cache_top_access(&cache->top, key.data, key.len, "stash_pkt");
WITH_VERBOSE(qry) {
auto_free char *type_str = kr_rrtype_text(pkt_type),
success:
- kr_cache_top_access(cache->top, key_nsec.data, key_nsec.len, "leq_nsec1"); // hits only
+ kr_cache_top_access(&cache->top, key_nsec.data, key_nsec.len, "leq_nsec1"); // hits only
return NULL;
}
knot_db_val_t val = { NULL, 0 };
knot_db_val_t wild_low_kwz = { NULL, 0 };
uint32_t new_ttl;
- kr_cache_top_access(cache->top, key.data, key.len, "nsec1_src_synth"); // TODO remove, probably redundant, hit (exact/cover) or miss
+ kr_cache_top_access(&cache->top, key.data, key.len, "nsec1_src_synth"); // TODO remove, probably redundant, hit (exact/cover) or miss
const char *err = find_leq_NSEC1(cache, qry, key, k, &val,
&exact_match, &wild_low_kwz, NULL, &new_ttl);
if (err) {
success:
- kr_cache_top_access(cache->top, key_found.data, key_found.len, "leq_nsec3"); // hits only
+ kr_cache_top_access(&cache->top, key_found.data, key_found.len, "leq_nsec3"); // hits only
return NULL;
}
ret = found_exact_hit(qry, pkt, val, lowest_rank);
}
if (!ret) {
- kr_cache_top_access(cache->top, key.data, key.len, "peek_nosync:exact"); // hits only
+ kr_cache_top_access(&cache->top, key.data, key.len, "peek_nosync:exact"); // hits only
return KR_STATE_DONE;
} else if (kr_fails_assert(ret == kr_error(ENOENT))) {
VERBOSE_MSG(qry, "=> exact hit error: %d %s\n", ret, kr_strerror(ret));
ret = entry2answer(&ans, AR_SOA, eh, knot_db_val_bound(val),
k->zname, KNOT_RRTYPE_SOA, new_ttl);
if (ret) return ctx->state;
- kr_cache_top_access(cache->top, key.data, key.len, "peek_nosync:SOA"); // hits only
+ kr_cache_top_access(&cache->top, key.data, key.len, "peek_nosync:SOA"); // hits only
}
/* Find our target RCODE. */
ret, (int)new_ttl);
if (ret) return kr_error(ret);
ans->rcode = PKT_NOERROR;
- kr_cache_top_access(cache->top, key.data, key.len, "try_wild"); // hits only
+ kr_cache_top_access(&cache->top, key.data, key.len, "try_wild"); // hits only
return kr_ok();
}
success:
k->zlf_len = zlf_len;
- kr_cache_top_access(cache->top, key.data, key.len, "closest_NS"); // hits only
+ kr_cache_top_access(&cache->top, key.data, key.len, "closest_NS"); // hits only
return kr_ok();
}
#include <stdio.h>
#include <limits.h>
+#include "lib/utils.h"
#include "lib/defines.h"
#include "lib/cache/top.h"
+#include "lib/mmapped.h"
+#include "lib/kru.h"
// #ifdef LOG_GRP_MDB
#define VERBOSE_LOG(...) printf("GC KRU " __VA_ARGS__)
-struct kr_cache_top {
- bool using_avx2; // required consistency to use the same hash function
- // --- header end ---
- size_t capacity;
- uint32_t instant_limit; // warn about different settings, but require explicit file removal?
- uint32_t rate_limit;
- uint32_t time_offset; // to be set on reinit according to last_change timestamp
- _Atomic uint32_t last_change;
+#define FILE_FORMAT_VERSION 1 // fail if different
+
+#define TICK_MSEC 1000
+#define BASE_PRICE (((kru_price_t)1) << (KRU_PRICE_BITS - 16)) // increment by ones (16-bit)
+ // -> instant limit: ~2^16
+#define MAX_DECAY (BASE_PRICE / 2) // per sec
+ // -> rate limit: 1/2 per sec (more frequent accesses are incomparable)
+ // -> half-life: ~ 25h 14min
+
+struct top_data {
+ uint32_t version;
+ uint32_t base_price;
+ uint32_t max_decay;
_Alignas(64) uint8_t kru[];
};
-bool kr_cache_top_init(void) {
-
- return true;
+static inline uint32_t ticks_now(void)
+{
+ // TODO use clock_gettime directly or maintain time offset
+ return kr_now() / TICK_MSEC; // not working over reboots
}
-void kr_cache_top_deinit(void) {
+int kr_cache_top_init(union kr_cache_top *top, char *mmap_file, size_t cache_size) {
+ size_t size = 0, capacity_log = 0;
+ VERBOSE_LOG("INIT, cache size %d\n", cache_size);
+
+ if (cache_size > 0) {
+ const size_t capacity = 2<<19; // TODO calculate from cache_size
+ for (size_t c = capacity - 1; c > 0; c >>= 1) capacity_log++;
+
+ size = offsetof(struct top_data, kru) + KRU.get_size(capacity_log);
+ } // else use existing file settings
+
+ struct top_data header = {
+ .version = (FILE_FORMAT_VERSION << 1) | kru_using_avx2(),
+ .base_price = BASE_PRICE,
+ .max_decay = MAX_DECAY
+ };
+ size_t header_size = offsetof(struct top_data, max_decay) + sizeof(header.max_decay);
+ static_assert( // no padding up to .max_decay
+ offsetof(struct top_data, max_decay) ==
+ sizeof(header.version) +
+ sizeof(header.base_price),
+ "detected padding with undefined data inside mmapped header");
+
+ if (cache_size == 0) {
+ header_size = offsetof(struct top_data, base_price);
+ }
+
+ VERBOSE_LOG("INIT mmapped_init\n");
+ int state = mmapped_init(&top->mmapped, mmap_file, size, &header, header_size, true); // allocates top->data
+ bool using_existing = false;
+
+ // try using existing data
+ if ((state >= 0) && (state & MMAPPED_EXISTING)) {
+ if (!KRU.check_size((struct kru *)top->data->kru, top->mmapped.size - offsetof(struct top_data, kru))) {
+ VERBOSE_LOG("INIT reset, wrong size\n");
+ state = mmapped_init_reset(&top->mmapped, mmap_file, size, &header, header_size);
+ } else {
+ using_existing = true;
+ VERBOSE_LOG("INIT finish existing\n");
+ state = mmapped_init_finish(&top->mmapped);
+ }
+ }
+
+ // initialize new instance
+ if ((state >= 0) && !(state & MMAPPED_EXISTING) && (state & MMAPPED_PENDING)) {
+ bool succ = KRU.initialize((struct kru *)top->data->kru, capacity_log, top->data->max_decay);
+ if (!succ) {
+ state = kr_error(EINVAL);
+ goto fail;
+ }
+ kr_assert(KRU.check_size((struct kru *)top->data->kru, top->mmapped.size - offsetof(struct top_data, kru)));
+
+ VERBOSE_LOG("INIT finish new\n");
+ state = mmapped_init_finish(&top->mmapped);
+ }
+
+ if (state < 0) goto fail;
+ kr_assert(state == 0);
+
+ kr_log_info(CACHE, "Cache top initialized %s (%s).\n",
+ using_existing ? "using existing data" : "as empty",
+ (kru_using_avx2() ? "AVX2" : "generic"));
+ return 0;
+
+fail:
+ VERBOSE_LOG("INIT error, deinit\n");
+ mmapped_deinit(&top->mmapped);
+ kr_log_crit(SYSTEM, "Initialization of cache top failed.\n");
+ return state;
+}
+
+void kr_cache_top_deinit(union kr_cache_top *top) {
+ mmapped_deinit(&top->mmapped); // sets top->data to NULL
}
/* text mode: '\0' -> '|'
return str;
}
-void kr_cache_top_access(struct kr_cache_top *top, void *key, size_t len, char *debug_label) {
-
+void kr_cache_top_access(union kr_cache_top *top, void *key, size_t len, char *debug_label)
+{
VERBOSE_LOG("ACCESS %-19s %s\n", debug_label, str_key(key, len));
+ KRU.load_bytes((struct kru *)&top->data->kru, ticks_now(), (uint8_t *)key, len, top->data->base_price);
}
// temporal logging one level under _access
-void kr_cache_top_access_cdb(struct kr_cache_top *top, void *key, size_t len, char *debug_label) {
+void kr_cache_top_access_cdb(union kr_cache_top *top, void *key, size_t len, char *debug_label)
+{
VERBOSE_LOG("ACCESS %-17s %s\n", debug_label, str_key(key, len));
}
-uint16_t kr_cache_top_load(void *key, size_t len) {
- uint16_t load = 0;
+uint16_t kr_cache_top_load(union kr_cache_top *top, void *key, size_t len) {
+ uint16_t load = KRU.load_bytes((struct kru *)&top->data->kru, ticks_now(), (uint8_t *)key, len, 0);
VERBOSE_LOG("LOAD %s -> %d\n", str_key(key, len), load);
return load;
* SPDX-License-Identifier: GPL-3.0-or-later
*/
-struct kr_cache_top;
+#pragma once
+#include "lib/mmapped.h"
+
+union kr_cache_top {
+ struct mmapped mmapped;
+ struct top_data *data;
+};
+static_assert(&((union kr_cache_top *)0)->mmapped.mem == (void *)&((union kr_cache_top *)0)->data);
+
KR_EXPORT
-bool kr_cache_top_init(void);
+int kr_cache_top_init(union kr_cache_top *top, char *mmap_file, size_t cache_size);
KR_EXPORT
-void kr_cache_top_deinit(void);
+void kr_cache_top_deinit(union kr_cache_top *top);
KR_EXPORT
-void kr_cache_top_access_cdb(struct kr_cache_top *top, void *key, size_t len, char *debug_label); // temporal, TODO remove
+void kr_cache_top_access_cdb(union kr_cache_top *top, void *key, size_t len, char *debug_label); // temporal, TODO remove
KR_EXPORT
-void kr_cache_top_access(struct kr_cache_top *top, void *key, size_t len, char *debug_label);
+void kr_cache_top_access(union kr_cache_top *top, void *key, size_t len, char *debug_label);
KR_EXPORT
-uint16_t kr_cache_top_load(void *key, size_t len);
+uint16_t kr_cache_top_load(union kr_cache_top *top, void *key, size_t len);
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
+#include <assert.h>
#define ALIGNED_CPU_CACHE _Alignas(64)
/// Calculate size of the KRU structure.
size_t (*get_size)(int capacity_log);
+ /// Verify that given KRU structure expects just memory of the given size;
+ /// it accesses just the first size bytes of kru.
+ /// If false is returned, the memory is corrupted and calling other methods may cause SIGSEGV.
+ bool (*check_size)(struct kru *kru, size_t size);
+
/// Determine if a key should get limited (and update the KRU).
/// key needs to be aligned to a multiple of 16 bytes.
bool (*limited)(struct kru *kru, uint32_t time_now, uint8_t key[static const 16], kru_price_t price);
/// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above.
void (*load_multi_prefix)(struct kru *kru, uint32_t time_now,
uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out);
+
+ // TODO
+ uint16_t (*load_bytes)(struct kru *kru, uint32_t time_now, uint8_t *key, uint8_t key_size, kru_price_t price);
};
// The functions are stored this way to make it easier to switch
// implementation based on detected CPU.
extern struct kru_api KRU;
extern const struct kru_api KRU_GENERIC, KRU_AVX2;
+
+/// Return whether we're using optimized variant right now.
+static inline bool kru_using_avx2(void)
+{
+ bool result = (KRU.initialize == KRU_AVX2.initialize);
+ assert(result || KRU.initialize == KRU_GENERIC.initialize);
+ return result;
+}
}
}
+static_assert(LOADS_LEN == 15 && TABLE_COUNT == 2, "");
+// So, the pair of cache lines hold up to 2*15 elements.
+// Let's say that we can reliably store 16 = 1 << (1+3).
+// (probably more but certainly not 1 << 5)
+enum { LOADS_CAPACITY_SHIFT = 1 + 3 };
+
/// Convert capacity_log to loads_bits
static inline int32_t capacity2loads(int capacity_log)
{
- static_assert(LOADS_LEN == 15 && TABLE_COUNT == 2, "");
- // So, the pair of cache lines hold up to 2*15 elements.
- // Let's say that we can reliably store 16 = 1 << (1+3).
- // (probably more but certainly not 1 << 5)
- const int shift = 1 + 3;
- int loads_bits = capacity_log - shift;
+ int loads_bits = capacity_log - LOADS_CAPACITY_SHIFT;
// Let's behave reasonably for weird capacity_log values.
return loads_bits > 0 ? loads_bits : 1;
}
+ sizeof(struct load_cl) * TABLE_COUNT * (1 << loads_bits);
}
+static bool kru_check_size(struct kru *kru, size_t size) {
+ if (size < sizeof(struct kru)) return false;
+ return size == kru_get_size(kru->loads_bits + LOADS_CAPACITY_SHIFT);
+}
+
static bool kru_initialize(struct kru *kru, int capacity_log, kru_price_t max_decay)
{
if (!kru) {
ctx->id = hash;
}
+/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on one 16-byte key.
+static inline void kru_limited_prefetch_bytes(struct kru *kru, uint32_t time_now, uint8_t *key, size_t key_size, kru_price_t price, struct query_ctx *ctx)
+{
+ // Obtain hash of *buf.
+ hash_t hash;
+#if !USE_AES
+ hash = SipHash(&kru->hash_key, SIPHASH_RC, SIPHASH_RF, key, key_size);
+#else
+ // TODO
+ hash = 3;
+ for (size_t i = 0; i < key_size; i++) {
+ hash = hash * 257 + key[i];
+ }
+#endif
+
+ // Choose the cache-lines to operate on
+ const uint32_t loads_mask = (1 << kru->loads_bits) - 1;
+ // Fetch the two cache-lines in parallel before we really touch them.
+ for (int li = 0; li < TABLE_COUNT; ++li) {
+ struct load_cl * const l = &kru->load_cls[hash & loads_mask][li];
+ __builtin_prefetch(l, 0); // hope for read-only access
+ hash >>= kru->loads_bits;
+ ctx->l[li] = l;
+ }
+
+ ctx->time_now = time_now;
+ ctx->price = price;
+ ctx->id = hash;
+}
+
/// Phase 2/3 of a query -- returns answer with no state modification (except update_time).
static inline bool kru_limited_fetch(struct kru *kru, struct query_ctx *ctx)
{
return max_load;
}
+static uint16_t kru_load_bytes(struct kru *kru, uint32_t time_now, uint8_t *key, uint8_t key_size, kru_price_t price)
+{
+ struct query_ctx ctx;
+
+ kru_limited_prefetch_bytes(kru, time_now, key, key_size, price, &ctx);
+ kru_limited_fetch(kru, &ctx);
+
+ if (price) {
+ kru_limited_update(kru, &ctx, true);
+ }
+
+ return ctx.final_load_value;
+}
+
/// Update limiting and return true iff it hit the limit instead.
static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 16], kru_price_t price)
{
#define KRU_API_INITIALIZER { \
.get_size = kru_get_size, \
+ .check_size = kru_check_size, \
.initialize = kru_initialize, \
.limited = kru_limited, \
.limited_multi_or = kru_limited_multi_or, \
.limited_multi_prefix_or = kru_limited_multi_prefix_or, \
.load_multi_prefix = kru_load_multi_prefix, \
.load_multi_prefix_max = kru_load_multi_prefix_max, \
+ .load_bytes = kru_load_bytes, \
}
return fcntl(fd, (wait ? F_SETLKW : F_SETLK), &fl) != -1;
}
-int mmapped_init(struct mmapped *mmapped, const char *mmap_file, size_t size, void *header, size_t header_size)
+static inline int fail(struct mmapped *mmapped, int ret)
{
- int ret = 0;
- int fd = mmapped->fd = open(mmap_file, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
- if (fd == -1) {
- ret = kr_error(errno);
- kr_log_crit(SYSTEM, "Cannot open file %s with shared data: %s\n",
- mmap_file, strerror(errno));
- goto fail;
+ if (!ret) ret = kr_error(errno);
+ if (mmapped->mem) {
+ munmap(mmapped->mem, mmapped->size);
+ mmapped->mem = NULL;
}
+ if (mmapped->fd >= 0) {
+ fcntl_flock_whole(mmapped->fd, F_UNLCK, false);
+ close(mmapped->fd);
+ mmapped->fd = -1;
+ }
+ return ret;
+}
- // try to acquire write lock; copy header on success
- if (fcntl_flock_whole(fd, F_WRLCK, false)) {
- if (ftruncate(fd, 0) == -1 || ftruncate(fd, size) == -1) { // get all zeroed
- ret = kr_error(errno);
- kr_log_crit(SYSTEM, "Cannot change size of file %s containing shared data: %s\n",
- mmap_file, strerror(errno));
- goto fail;
- }
- mmapped->mem = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (mmapped->mem == MAP_FAILED) goto fail_errno;
+int mmapped_init_reset(struct mmapped *mmapped, const char *mmap_file, size_t size, void *header, size_t header_size)
+{
+ kr_require(mmapped->fd);
- memcpy(mmapped->mem, header, header_size);
+ if (!size) { // reset not allowed
+ kr_log_crit(SYSTEM, "File %s does not contain data in required format.\n", mmap_file);
+ errno = ENOTRECOVERABLE;
+ return fail(mmapped, 0);
+ }
- return MMAPPED_WAS_FIRST;
+ if (!mmapped->write_lock) {
+ kr_log_crit(SYSTEM, "Another instance of kresd uses file %s with different configuration.\n", mmap_file);
+ errno = ENOTRECOVERABLE;
+ return fail(mmapped, 0);
}
- // wait for acquiring shared lock; check header on success
- if (!fcntl_flock_whole(fd, F_RDLCK, true)) goto fail_errno;
+ if (mmapped->mem) {
+ munmap(mmapped->mem, mmapped->size);
+ mmapped->mem = NULL;
+ }
- struct stat s;
- bool succ = (fstat(fd, &s) == 0);
- if (!succ) goto fail_errno;
- if (s.st_size != size) goto fail_header_mismatch;
+ kr_assert(size >= header_size);
- mmapped->mem = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (mmapped->mem == MAP_FAILED) goto fail_errno;
- if (memcmp(mmapped->mem, header, header_size) != 0) {
- munmap(mmapped->mem, size);
- goto fail_header_mismatch;
+ if (ftruncate(mmapped->fd, 0) == -1 || ftruncate(mmapped->fd, size) == -1) { // get all zeroed
+ int ret = kr_error(errno);
+ kr_log_crit(SYSTEM, "Cannot change size of file %s containing shared data: %s\n",
+ mmap_file, strerror(errno));
+ return fail(mmapped, ret);
}
- return 0;
+ mmapped->size = size;
+ mmapped->mem = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, mmapped->fd, 0);
+ if (mmapped->mem == MAP_FAILED) return fail(mmapped, 0);
+
+ memcpy(mmapped->mem, header, header_size);
+ return MMAPPED_PENDING;
+}
+
+int mmapped_init(struct mmapped *mmapped, const char *mmap_file, size_t size, void *header, size_t header_size, bool persistent)
+{
+ // open file
+ int ret = 0;
+ mmapped->fd = open(mmap_file, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
+ if (mmapped->fd == -1) {
+ ret = kr_error(errno);
+ kr_log_crit(SYSTEM, "Cannot open file %s with shared data: %s\n",
+ mmap_file, strerror(errno));
+ return fail(mmapped, ret);
+ }
+
+ // try to acquire write lock; wait for shared lock otherwise
+ if (fcntl_flock_whole(mmapped->fd, F_WRLCK, false)) {
+ mmapped->write_lock = true;
+ } else if (fcntl_flock_whole(mmapped->fd, F_RDLCK, true)) {
+ mmapped->write_lock = false;
+ } else {
+ return fail(mmapped, 0);
+ }
-fail_header_mismatch:
- kr_log_crit(SYSTEM, "Another instance of kresd uses file %s with different configuration.\n", mmap_file);
- errno = ENOTRECOVERABLE;
+ // get file size
+ {
+ struct stat s;
+ bool succ = (fstat(mmapped->fd, &s) == 0);
+ if (!succ) return fail(mmapped, 0);
+ mmapped->size = s.st_size;
+ }
-fail_errno:
- ret = kr_error(errno);
+ // reinit if non-persistent or wrong size
+ if ((!persistent && mmapped->write_lock) || (size && (mmapped->size != size)) || (mmapped->size < header_size)) {
+ return mmapped_init_reset(mmapped, mmap_file, size, header, header_size);
+ }
-fail:
- if (fd >= 0) {
- fcntl_flock_whole(fd, F_UNLCK, false);
- close(fd);
+ // mmap
+ mmapped->mem = mmap(NULL, mmapped->size, PROT_READ | PROT_WRITE, MAP_SHARED, mmapped->fd, 0);
+ if (mmapped->mem == MAP_FAILED) {
+ return fail(mmapped, 0);
}
- mmapped->mem = NULL;
- return ret;
+
+ // check header
+ if (memcmp(mmapped->mem, header, header_size) != 0) {
+ return mmapped_init_reset(mmapped, mmap_file, size, header, header_size);
+ }
+
+ return MMAPPED_EXISTING | (mmapped->write_lock ? MMAPPED_PENDING : 0);
}
-int mmapped_init_continue(struct mmapped *mmapped)
+int mmapped_init_finish(struct mmapped *mmapped)
{
+ kr_require(mmapped->fd);
+ if (!mmapped->write_lock) return 0; // mmapped already finished
if (!fcntl_flock_whole(mmapped->fd, F_RDLCK, false)) return kr_error(errno);
+ mmapped->write_lock = false;
return 0;
}
void mmapped_deinit(struct mmapped *mmapped)
{
if (mmapped->mem == NULL) return;
- int fd = mmapped->fd;
munmap(mmapped->mem, mmapped->size);
mmapped->mem = NULL;
- fcntl_flock_whole(fd, F_UNLCK, false);
+ fcntl_flock_whole(mmapped->fd, F_UNLCK, false);
- // remove file data unless it is still locked by other processes
- if (fcntl_flock_whole(fd, F_WRLCK, false)) {
+ // remove file data if non-persistent unless it is still locked by other processes
+ if (!mmapped->persistent && fcntl_flock_whole(mmapped->fd, F_WRLCK, false)) {
/* If the configuration is updated at runtime, manager may remove the file
* and the new processes create it again while old processes are still using the old data.
* Here we keep zero-size file not to accidentally remove the new file instead of the old one.
* Still truncating the file will cause currently starting processes waiting for read lock on the same file to fail,
* but such processes are not expected to exist. */
- ftruncate(fd, 0);
+ ftruncate(mmapped->fd, 0);
- fcntl_flock_whole(fd, F_UNLCK, false);
+ fcntl_flock_whole(mmapped->fd, F_UNLCK, false);
}
- close(fd);
+ close(mmapped->fd);
}
* SPDX-License-Identifier: GPL-3.0-or-later
*/
+#pragma once
#include "lib/defines.h"
-#define MMAPPED_WAS_FIRST 1
+// All functions here return combination of the following flags or kr_error(...).
+enum mmapped_state {
+ MMAPPED_PENDING = 1, // write lock acquired, (re)initialize and call finish
+ MMAPPED_EXISTING = 2, // using existing data, check consistency
+};
struct mmapped {
void *mem;
size_t size;
int fd;
+ bool write_lock;
+ bool persistent;
};
/* Initialize/Use file data as mmapped memory.
*
- * If write flock can be acquired, the file is resized, zeroed and mmapped,
- * header is copied at its beginning and MMAPPED_WAS_FIRST is returned;
- * you should finish initialization and call mmapped_init_continue to degrade flock to shared.
- * Otherwise, it waits for shared flock, calls mmap, verifies that header is byte-wise identical and returns zero.
- * On header mismatch, kr_error(ENOTRECOVERABLE) is returned; on a system error, kr_error(errno) is returned. */
+ * If write flock can be acquired and persistency is not requested, the file is resized, zeroed and mmapped,
+ * header is copied at its beginning and MMAPPED_PENDING is returned;
+ * you should finish initialization and call mmapped_init_finish to degrade flock to shared.
+ *
+ * Otherwise, it either acquires write flock or waits for shared flock,
+ * calls mmap, verifies that header is byte-wise identical
+ * and returns MMAPPED_EXISTING, possibly ORed with MMAPPED_PENDING based on the lock type.
+ *
+ * On header mismatch, either the outcome is the same as in the first case (if write flock was acquired),
+ * or kr_error(ENOTRECOVERABLE) is returned;
+ * on a system error, kr_error(errno) is returned. */
KR_EXPORT
-int mmapped_init(struct mmapped *mmapped, const char *mmap_file, size_t size, void *header, size_t header_size);
+int mmapped_init(struct mmapped *mmapped, const char *mmap_file, size_t size, void *header, size_t header_size, bool persistent);
+
+/* Reinitialize mmapped data (incl. size) as in the first case of mmapped_init.
+ *
+ * To be called if existing mmapped file data cannot be used and we still own write flock
+ * (i.e. MMAPPED_PENDING flag was returned from the last mmapped_ call).
+ * Possible return values are the same as in mmapped_init.
+ *
+ * If MMAPPED_PENDING was not set, kr_error(ENOTRECOVERABLE) is returned. */
+int mmapped_init_reset(struct mmapped *mmapped, const char *mmap_file, size_t size, void *header, size_t header_size);
-/* Degrade flock to shared after getting MMAPPED_WAS_FIRST from mmapped_init.
+/* Degrade flock to shared after getting MMAPPED_PENDING; void if MMAPPED_PENDING wasn't set.
*
* Returns zero on success and kr_error(errno) on system error. */
KR_EXPORT
-int mmapped_init_continue(struct mmapped *mmapped);
+int mmapped_init_finish(struct mmapped *mmapped);
/* Free mmapped memory and, unless the underlying file is used by other processes, truncate it to zero size. */
KR_EXPORT
void mmapped_deinit(struct mmapped *mmapped);
+
+
+
+/* -- example usage, persistent case --
+ mmapped_init
+ if (>=0 && EXISTING) {
+ if (!valid) {
+ mmapped_init_reset
+ }
+ mmapped_init_finish
+ }
+ if (>=0 && !EXISTING && PENDING) { // == PENDING
+ // init
+ mmapped_init_finish
+ }
+ if (>=0 && !EXISTING && !PENDING) { // == 0
+ // done
+ }
+*/
+
+/* -- example usage, non-persistent case --
+ mmapped_init
+ if (>=0 && EXISTING) { // == EXISTING
+ if (!valid) {
+ // fail
+ }
+ mmapped_init_finish // not needed
+ } else if (>=0 && PENDING) { // == PENDING
+ // init
+ mmapped_init_finish
+ }
+ if (<0) fail
+ // done
+*/
+
state = default_rtt_state;
} else { // memcpy is safe for unaligned case (on non-x86)
memcpy(&state, value.data, sizeof(state));
- kr_cache_top_access(cache->top, key.data, key.len, "get_rtt");
+ kr_cache_top_access(&cache->top, key.data, key.len, "get_rtt");
}
free(key.data);
int ret = cache->api->write(db, stats, &key, &value, 1);
kr_cache_commit(cache);
- kr_cache_top_access(cache->top, key.data, key.len, "put_rtt");
+ kr_cache_top_access(&cache->top, key.data, key.len, "put_rtt");
free(key.data);
return ret;
}
// TODO this is just an example, make this more clever
-category_t kr_gc_categorize(gc_record_info_t * info, void *key, size_t key_len)
+category_t kr_gc_categorize(union kr_cache_top *top, gc_record_info_t * info, void *key, size_t key_len)
{
category_t res;
if (!info->valid)
return CATEGORIES - 1;
- uint16_t load = kr_cache_top_load(key, key_len); // TODO use it
+ uint16_t load = kr_cache_top_load(top, key, key_len); // TODO use it
switch (info->no_labels) {
case 0: /* root zone */
#pragma once
#include "kr_cache_gc.h"
+#include "lib/cache/top.h"
typedef uint8_t category_t;
#define CATEGORIES 100 // number of categories
-category_t kr_gc_categorize(gc_record_info_t * info, void *key, size_t key_len);
+category_t kr_gc_categorize(union kr_cache_top *top, gc_record_info_t * info, void *key, size_t key_len);
#include <lib/cache/impl.h>
#include <lib/defines.h>
#include "lib/cache/cdb_lmdb.h"
+#include "lib/cache/top.h"
#include "lib/generic/array.h"
#include "lib/utils.h"
typedef struct {
size_t categories_sizes[CATEGORIES];
size_t records;
+ union kr_cache_top *top;
} ctx_compute_categories_t;
int cb_compute_categories(const knot_db_val_t * key, gc_record_info_t * info,
void *vctx)
{
ctx_compute_categories_t *ctx = vctx;
- category_t cat = kr_gc_categorize(info, key->data, key->len);
+ category_t cat = kr_gc_categorize(ctx->top, info, key->data, key->len);
ctx->categories_sizes[cat] += info->entry_size;
ctx->records++;
return KNOT_EOK;
size_t cfg_temp_keys_space;
size_t used_space;
size_t oversize_records;
+ union kr_cache_top *top;
} ctx_delete_categories_t;
int cb_delete_categories(const knot_db_val_t * key, gc_record_info_t * info,
void *vctx)
{
ctx_delete_categories_t *ctx = vctx;
- category_t cat = kr_gc_categorize(info, key->data, key->len);
+ category_t cat = kr_gc_categorize(ctx->top, info, key->data, key->len);
if (cat >= ctx->limit_category) {
knot_db_val_t *todelete = dbval_copy(key);
size_t used = ctx->used_space + key->len + sizeof(*key);
{ 0 }, timer_rw_txn = { 0 };
kr_timer_start(&timer_analyze);
- ctx_compute_categories_t cats = { { 0 }
+ ctx_compute_categories_t cats = { { 0 },
+ .top = &(*state)->kres_db.top,
};
ret = kr_gc_cache_iter(db, cfg, cb_compute_categories, &cats);
if (ret != KNOT_EOK) {
//// 3. pass whole cache again to collect a list of keys that should be deleted.
kr_timer_start(&timer_choose);
- ctx_delete_categories_t to_del = { 0 };
+ ctx_delete_categories_t to_del = { .top = &(*state)->kres_db.top };
to_del.cfg_temp_keys_space = cfg->temp_keys_space;
to_del.limit_category = limit_category;
ret = kr_gc_cache_iter(db, cfg, cb_delete_categories, &to_del);