I/O path, a delay is injected before sending back a reply to the caller,
thus causing a rate-limit ceiling.
+ A configurable burst allowance is supported via a burst multiplier,
+ allowing short-term bursts above the steady-state rate while still
+ enforcing a long-term ceiling.
+
+ Rate-limiter state (token counters and timestamps) is periodically
+ persisted to a local TDB, allowing limits to be enforced consistently
+ across client reconnects and smbd restarts.
+
An example to smb.conf segment (zero value implies ignore-this-option):
[share]
vfs objects = aio_ratelimit ...
aio_ratelimit: read_iops_limit = 2000
- aio_ratelimit: read_bw_limit = 2000000
+ aio_ratelimit: read_bw_limit = 2M
aio_ratelimit: read_burst_mult = 15 # == 1.5x burst
aio_ratelimit: write_iops_limit = 0
- aio_ratelimit: write_bw_limit = 1000000
+ aio_ratelimit: write_bw_limit = 1M
aio_ratelimit: write_burst_mult = 15 # == 1.5x burst
...
Upon successful completion of async I/O request, tokens are produced based on
the time which elapsed from previous requests, and tokens are consumed based
- on actual I/O size. When current tokens value is negative, a delay is
+ on actual I/O size. When current token value is negative, a delay is
calculated and injected to in-flight request. The delay value (microseconds)
is calculated based on the current tokens deficit.
*/
#include "includes.h"
#include "lib/util/time.h"
#include "lib/util/tevent_unix.h"
+#include "lib/util/util_tdb.h"
+#include "tdb.h"
+#include "system/filesys.h"
#undef DBGC_CLASS
#define DBGC_CLASS DBGC_VFS
#define DELAY_SEC_MAX (100L)
+
/* Default burst multiplier (1.5x) */
#define BURST_MULT_DEF (15)
-/* Maximal value for iops_limit */
+
+/* Maximum value for iops_limit */
#define IOPS_LIMIT_MAX (1000000L)
-/* Maximal value for bw_limit */
+
+/* Maximum value for bw_limit */
#define BYTES_LIMIT_MAX (1L << 40)
-/* Module type-name in smb.conf & debug logging */
+
+/* Module name in smb.conf & debug logging */
#define MODULE_NAME "aio_ratelimit"
+/* How often to save token state to the local TDB, in microseconds */
+#define SAVE_INTERVAL_USEC (30 * 1000000L) /* 30 seconds */
+
+/* TDB schema version */
+#define RATELIMIT_TDB_VERSION 1
+
+static unsigned int ref_count = 0;
+static TDB_CONTEXT *ratelimit_tdb;
+
+/* TDB persistence structure */
+struct ratelimit_tdb_record {
+ uint64_t last_usec;
+ float iops_tokens;
+ float bytes_tokens;
+
+ /* Reserved for future extensions, keeps struct size stable */
+ uint8_t reserved[64 - (8 + 4 + 4)];
+} PACKED_STRUCT;
+
/* Token-based rate-limiter control state using a token-bucket. */
struct ratelimiter {
- const char *oper;
- uint64_t last_us;
+ const char *op;
+ uint64_t last_usec;
+ uint64_t last_save_usec;
float iops_tokens;
float bytes_tokens;
int64_t iops_total;
float iops_capacity;
float bytes_capacity;
/*
- * burst_mult is kept as configuration policy.
+ * burst_mult is kept as a configuration policy.
* It allows capacity to be recalculated if limits
* are reconfigured in the future (e.g. reload, per-client limits).
*/
float burst_mult;
-
int snum;
};
return MIN(x, y);
}
-static uint64_t time_now_us(void)
+static uint64_t time_now_usec(void)
{
struct timespec ts;
return (uint64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
+static bool ratelimit_tdb_check_version(void)
+{
+ TDB_DATA key = {};
+ TDB_DATA val = {};
+ uint32_t version = 0;
+ int ret;
+
+ if (ratelimit_tdb == NULL) {
+ return false;
+ }
+
+ /* Check for existing version */
+ key = string_tdb_data("VERSION");
+ val = tdb_fetch(ratelimit_tdb, key);
+
+ if (val.dptr == NULL) {
+ /* No version key - this is a new TDB, write our version */
+ version = RATELIMIT_TDB_VERSION;
+ val = make_tdb_data((uint8_t *)&version, sizeof(version));
+ ret = tdb_store(ratelimit_tdb, key, val, TDB_INSERT);
+ if (ret != 0) {
+ DBG_ERR("[%s] Failed to store TDB version\n",
+ MODULE_NAME);
+ return false;
+ }
+ DBG_DEBUG("[%s] Initialized TDB version %u\n",
+ MODULE_NAME,
+ version);
+ return true;
+ }
+
+ if (val.dsize != sizeof(uint32_t)) {
+ DBG_ERR("[%s] TDB version key has invalid size\n",
+ MODULE_NAME);
+ SAFE_FREE(val.dptr);
+ return false;
+ }
+
+ memcpy(&version, val.dptr, sizeof(version));
+ SAFE_FREE(val.dptr);
+
+ if (version != RATELIMIT_TDB_VERSION) {
+ DBG_ERR("[%s] TDB version mismatch: found %u, expected %u\n",
+ MODULE_NAME,
+ version,
+ RATELIMIT_TDB_VERSION);
+ return false;
+ }
+
+ DBG_DEBUG("[%s] TDB version %u verified\n", MODULE_NAME, version);
+ return true;
+}
+
+static bool ratelimit_tdb_init(void)
+{
+ char *dbpath = NULL;
+
+ if (ratelimit_tdb != NULL) {
+ ref_count++;
+ DBG_DEBUG("[%s] TDB already open: ref_count now %u\n",
+ MODULE_NAME,
+ ref_count);
+ return true;
+ }
+
+ dbpath = state_path(talloc_tos(), "aio_ratelimit.tdb");
+ if (dbpath == NULL) {
+ DBG_ERR("[%s] Failed to allocate TDB path\n", MODULE_NAME);
+ return false;
+ }
+
+ become_root();
+ ratelimit_tdb = tdb_open(
+ dbpath, 0, TDB_DEFAULT, O_RDWR | O_CREAT, 0600);
+ unbecome_root();
+
+ TALLOC_FREE(dbpath);
+
+ if (ratelimit_tdb == NULL) {
+ DBG_NOTICE("[%s] Failed to open TDB, "
+ "rate limiting will work without persistence\n",
+ MODULE_NAME);
+ return false;
+ }
+
+ if (!ratelimit_tdb_check_version()) {
+ DBG_ERR("[%s] TDB version check failed, closing TDB\n",
+ MODULE_NAME);
+ tdb_close(ratelimit_tdb);
+ ratelimit_tdb = NULL;
+ return false;
+ }
+
+ ref_count++;
+ DBG_DEBUG("[%s] Opened TDB, ref_count now %u\n",
+ MODULE_NAME,
+ ref_count);
+ return true;
+}
+
+static TDB_DATA ratelimit_make_tdb_key(TALLOC_CTX *mem_ctx,
+ const struct ratelimiter *rl,
+ const char *servicename)
+{
+ char *keystr = NULL;
+
+ keystr = talloc_asprintf(mem_ctx, "share/%s/%s", servicename, rl->op);
+
+ return string_tdb_data(keystr);
+}
+
+static void ratelimit_save_tdb(struct ratelimiter *rl)
+{
+ TDB_DATA key = {};
+ TDB_DATA val = {};
+ struct ratelimit_tdb_record record = {};
+ char *servicename = NULL;
+ const struct loadparm_substitution
+ *lp_sub = loadparm_s3_global_substitution();
+
+ servicename = lp_servicename(talloc_tos(), lp_sub, rl->snum);
+
+ if (ratelimit_tdb == NULL) {
+ return;
+ }
+
+ key = ratelimit_make_tdb_key(talloc_tos(), rl, servicename);
+ if (key.dptr == NULL) {
+ return;
+ }
+
+ record.iops_tokens = rl->iops_tokens;
+ record.bytes_tokens = rl->bytes_tokens;
+ record.last_usec = rl->last_usec;
+
+ val = make_tdb_data((uint8_t *)&record, sizeof(record));
+
+ if (tdb_store(ratelimit_tdb, key, val, TDB_REPLACE) != 0) {
+ DBG_ERR("[%s] Failed to store TDB record for %s service=%s\n",
+ MODULE_NAME,
+ rl->op,
+ servicename);
+ TALLOC_FREE(key.dptr);
+ return;
+ }
+
+ DBG_DEBUG("[%s] saved TDB for %s service=%s "
+ "tokens(i=%.2f,b=%.2f)\n",
+ MODULE_NAME,
+ rl->op,
+ servicename,
+ rl->iops_tokens,
+ rl->bytes_tokens);
+
+ TALLOC_FREE(key.dptr);
+}
+
+static int ratelimit_parse_tdb(TDB_DATA key, TDB_DATA val, void *private_data)
+{
+ struct ratelimiter *rl = (struct ratelimiter *)private_data;
+ struct ratelimit_tdb_record record = {};
+
+ if (val.dsize != sizeof(record)) {
+ DBG_WARNING("[%s] TDB record size mismatch\n", MODULE_NAME);
+ return -1;
+ }
+
+ memcpy(&record, val.dptr, sizeof(record));
+ rl->iops_tokens = record.iops_tokens;
+ rl->bytes_tokens = record.bytes_tokens;
+ rl->last_usec = record.last_usec;
+
+ DBG_DEBUG("[%s] loaded TDB for %s tokens(i=%.2f,b=%.2f)\n",
+ MODULE_NAME,
+ rl->op,
+ rl->iops_tokens,
+ rl->bytes_tokens);
+
+ return 0;
+}
+
+static void ratelimit_load_tdb(struct ratelimiter *rl)
+{
+ TDB_DATA key = {};
+ int ret;
+ char *servicename = NULL;
+ const struct loadparm_substitution
+ *lp_sub = loadparm_s3_global_substitution();
+ servicename = lp_servicename(talloc_tos(), lp_sub, rl->snum);
+
+ if (ratelimit_tdb == NULL) {
+ return;
+ }
+
+ key = ratelimit_make_tdb_key(talloc_tos(), rl, servicename);
+ if (key.dptr == NULL) {
+ return;
+ }
+
+ ret = tdb_parse_record(ratelimit_tdb, key, ratelimit_parse_tdb, rl);
+ if (ret != 0) {
+ DBG_DEBUG("[%s] no existing TDB record for %s service=%s\n",
+ MODULE_NAME,
+ rl->op,
+ servicename);
+ }
+
+ TALLOC_FREE(key.dptr);
+}
+
static void ratelimiter_init(struct ratelimiter *rl,
int snum,
- const char *oper_name,
+ const char *op,
int64_t iops_limit,
int64_t bw_limit,
float burst_mult)
{
ZERO_STRUCTP(rl);
-
- rl->oper = oper_name;
+ rl->op = op;
rl->snum = snum;
rl->iops_limit = iops_limit;
rl->iops_total = 0;
rl->bytes_total = 0;
- rl->iops_capacity = (float)iops_limit * burst_mult;
- rl->bytes_capacity = (float)bw_limit * burst_mult;
+ rl->iops_capacity = (float)(iops_limit)*burst_mult;
+ rl->bytes_capacity = (float)(bw_limit)*burst_mult;
- rl->last_us = 0;
+ rl->last_usec = 0;
+ rl->last_save_usec = rl->last_usec;
rl->iops_tokens = rl->iops_capacity;
rl->bytes_tokens = rl->bytes_capacity;
+ /* Load from global TDB if available */
+ ratelimit_load_tdb(rl);
+
DBG_DEBUG("[%s snum:%d %s] init ratelimiter:"
" iops_limit=%" PRId64 " bw_limit=%" PRId64
" burst_mult=%.2f\n",
MODULE_NAME,
rl->snum,
- rl->oper,
+ rl->op,
rl->iops_limit,
rl->bw_limit,
rl->burst_mult);
int64_t rate)
{
float refill;
- uint64_t max_refill_us;
+ uint64_t max_refill_usec;
/* If idle long enough to fill entire bucket, return full capacity */
- max_refill_us = (uint64_t)((capacity * 1e6f) / (float)rate);
- if (elapsed >= max_refill_us) {
+ max_refill_usec = (uint64_t)((capacity * 1e6f) / (float)rate);
+ if (elapsed >= max_refill_usec) {
return capacity;
}
static void ratelimiter_refill(struct ratelimiter *rl)
{
- uint64_t now = time_now_us();
- uint64_t elapsed = now - rl->last_us;
+ uint64_t now = time_now_usec();
+ uint64_t elapsed;
- if (rl->last_us == 0) {
- rl->last_us = now;
+ if (rl->last_usec == 0) {
+ rl->last_usec = now;
return;
}
+ if (now < rl->last_usec) {
+ DBG_DEBUG("[%s snum:%d %s] Stale timestamp detected "
+ "(system reboot?), resetting to full capacity\n",
+ MODULE_NAME,
+ rl->snum,
+ rl->op);
+ rl->iops_tokens = rl->iops_capacity;
+ rl->bytes_tokens = rl->bytes_capacity;
+ rl->last_usec = now;
+ return;
+ }
+
+ elapsed = now - rl->last_usec;
+
if (rl->iops_limit > 0) {
float refill;
rl->bytes_capacity);
}
- rl->last_us = now;
+ rl->last_usec = now;
}
/* Convert token deficit into a bounded delay in microseconds */
}
delay = (uint32_t)((deficit * 1e6f) / (float)rate);
- return MIN(delay, DELAY_SEC_MAX * 1000000L);
+ return minf(delay, DELAY_SEC_MAX * 1000000L);
}
static uint32_t ratelimiter_pre_io(struct ratelimiter *rl, int64_t nbytes)
float bw_deficit = 0.0f;
uint32_t delay_usec = 0;
uint32_t bw_delay = 0;
+ uint64_t now = 0;
if (!ratelimiter_enabled(rl)) {
return 0;
rl->iops_total += 1;
rl->bytes_total += nbytes;
+ now = time_now_usec();
+
+ if ((now - rl->last_save_usec) > SAVE_INTERVAL_USEC) {
+ ratelimit_save_tdb(rl);
+ rl->last_save_usec = now;
+ }
DBG_DEBUG("[%s snum:%d %s] delay_usec=%" PRIu32
" iops_tokens=%.2f bytes_tokens=%.2f\n",
MODULE_NAME,
rl->snum,
- rl->oper,
+ rl->op,
delay_usec,
rl->iops_tokens,
rl->bytes_tokens);
if (config == NULL) {
return -1;
}
+
vfs_aio_ratelimit_setup(config, SNUM(handle->conn));
SMB_VFS_HANDLE_SET_DATA(handle,
return ret;
}
- DBG_INFO("[%s] connect: service=%s snum=%d\n",
- MODULE_NAME,
- service,
- SNUM(handle->conn));
+ if (!ratelimit_tdb_init()) {
+ DBG_NOTICE("[%s] TDB init failed, continuing without "
+ "persistence\n",
+ MODULE_NAME);
+ }
+
+ DBG_DEBUG("[%s] connect: service=%s snum=%d\n",
+ MODULE_NAME,
+ service,
+ SNUM(handle->conn));
ret = vfs_aio_ratelimit_new_config(handle);
if (ret < 0) {
static void vfs_aio_ratelimit_disconnect(struct vfs_handle_struct *handle)
{
- DBG_INFO("[%s] disconnect: snum=%d\n", MODULE_NAME, SNUM(handle->conn));
+ struct vfs_aio_ratelimit_config *config = NULL;
+
+ DBG_DEBUG("[%s] disconnect: snum=%d\n",
+ MODULE_NAME,
+ SNUM(handle->conn));
+
+ SMB_VFS_HANDLE_GET_DATA(handle,
+ config,
+ struct vfs_aio_ratelimit_config,
+ goto out);
+
+ /* Save state before disconnect */
+ ratelimit_save_tdb(&config->rd_ratelimiter);
+ ratelimit_save_tdb(&config->wr_ratelimiter);
+
+ ref_count--;
+
+ if (ref_count == 0 && ratelimit_tdb != NULL) {
+ DBG_DEBUG("[%s] No more connections, closing TDB\n",
+ MODULE_NAME);
+ tdb_close(ratelimit_tdb);
+ ratelimit_tdb = NULL;
+ }
+
SMB_VFS_HANDLE_FREE_DATA(handle);
+
+out:
SMB_VFS_NEXT_DISCONNECT(handle);
}