#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 }
#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 }
+#define V4_SUBPRIO (uint8_t[]) { 0, 1, 3, 7 }
#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 }
#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 }
+#define V6_SUBPRIO (uint8_t[]) { 2, 4, 5, 6, 7 }
+#define SUBPRIO_CNT 8
#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
+struct kru_conf {
+ uint8_t namespace;
+ size_t prefixes_cnt;
+ uint8_t *prefixes;
+ const kru_price_t *rate_mult;
+ const uint8_t *subprio;
+} const
+V4_CONF = {0, V4_PREFIXES_CNT, V4_PREFIXES, V4_RATE_MULT, V4_SUBPRIO},
+V6_CONF = {1, V6_PREFIXES_CNT, V6_PREFIXES, V6_RATE_MULT, V6_SUBPRIO};
+
#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<12, -1} // the last one should be UINT16_MAX
-#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
+#define QUEUES_CNT ((sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) * SUBPRIO_CNT + 2)
+ // priority 0 has no subpriorities, +1 for unverified
#define PRIORITY_SYNC (-1) // no queue
#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue
}
/// Print configuration into desc array.
-void defer_str_conf(char *desc, int desc_len) {
+void defer_str_conf(char *desc, int desc_len)
+{
int len = 0;
#define append(...) len += snprintf(desc + len, desc_len > len ? desc_len - len : 0, __VA_ARGS__)
#define append_time(prefix, ms, suffix) { \
else append(prefix "%7.1f s " suffix, ms / 1000); }
append( " Expected cpus/procs: %5d\n", defer->cpus);
+ const size_t thresholds = sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS);
append( " Max waiting requests:%7.1f MiB\n", MAX_WAITING_REQS_SIZE / 1024.0 / 1024.0);
append_time(" Request timeout: ", REQ_TIMEOUT / 1000000.0, "\n");
append_time(" Idle: ", IDLE_TIMEOUT / 1000000.0, "\n");
append_time(" UDP phase: ", PHASE_UDP_TIMEOUT / 1000000.0, "\n");
append_time(" Non-UDP phase: ", PHASE_NON_UDP_TIMEOUT / 1000000.0, "\n");
- append( " Priority levels: %5ld + UDP\n", QUEUES_CNT - 1);
+ append( " Priority levels: %5ld (%ld main levels, %d sublevels) + UDP\n", QUEUES_CNT - 1, thresholds, SUBPRIO_CNT);
size_t capacity_log = 0;
for (size_t c = defer->capacity - 1; c > 0; c >>= 1) capacity_log++;
append( " KRU capacity: %7.1f k (%0.1f MiB)\n", (1 << capacity_log) / 1000.0, size / 1000000.0);
bool uniform_thresholds = true;
- for (int i = 1; i < QUEUES_CNT - 2; i++)
- uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i-1] * LOADS_THRESHOLDS[0]);
- uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[QUEUES_CNT - 3] * LOADS_THRESHOLDS[0]);
+ for (int i = 1; i < thresholds - 1; i++)
+ uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i - 1] * LOADS_THRESHOLDS[0]);
+ uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[thresholds - 2] * LOADS_THRESHOLDS[0]);
append( " Decay: %7.3f %% per ms (32-bit max: %d)\n",
100.0 * MAX_DECAY / KRU_LIMIT, (kru_price_t)MAX_DECAY);
float half_life = -1.0 / log2f(1.0 - (float)MAX_DECAY / KRU_LIMIT);
append_time(" Half-life: ", half_life, "\n");
if (uniform_thresholds)
- append_time(" Priority rise in: ", half_life * 16 / (QUEUES_CNT - 1), "\n");
+ append_time(" Priority rise in: ", half_life * 16 / thresholds, "\n");
append_time(" Counter reset in: ", half_life * 16, "\n");
append(" Rate limits for crossing priority levels as single CPU utilization:\n");
for (int v = 0; v < 2; v++) {
for (int i = prefixes_cnt[v] - 1; i >= 0; i--) {
append("%9sv%d/%-3d: ", "", version[v], prefixes[v][i]);
- for (int j = 0; j < QUEUES_CNT - 1; j++) {
+ for (int j = 0; j < thresholds; j++) {
float needed_util = (float)MAX_DECAY / (1<<16) * LOADS_THRESHOLDS[j] / BASE_PRICE(1000000) * rate_mult[v][i];
append("%12.3f %%", needed_util * 100);
}
for (int v = 0; v < 2; v++) {
for (int i = prefixes_cnt[v] - 1; i >= 0; i--) {
append("%9sv%d/%-3d: ", "", version[v], prefixes[v][i]);
- for (int j = 0; j < QUEUES_CNT - 1; j++) {
+ for (int j = 0; j < thresholds; j++) {
float needed_time = (float)KRU_LIMIT / (1<<16) * LOADS_THRESHOLDS[j] / BASE_PRICE(1000000) * rate_mult[v][i];
if (needed_time < 1) {
append("%11.1f us", needed_time * 1000);
}
+/// Call KRU, return priority and as params load and prefix.
+static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *key, kru_price_t *prices,
+ uint16_t *out_load, uint8_t *out_prefix)
+{
+ uint16_t loads[kru_conf->prefixes_cnt];
+ KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(),
+ kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads);
+
+ int priority = 0;
+ int prefix_index = kru_conf->prefixes_cnt - 1;
+ for (int i = kru_conf->prefixes_cnt - 1, j = 0; i >= 0; i--) {
+ for (; LOADS_THRESHOLDS[j] < loads[i]; j++) {
+ prefix_index = i;
+ priority = 1 + j * SUBPRIO_CNT + kru_conf->subprio[i];
+ }
+ }
+ *out_load = loads[prefix_index];
+ *out_prefix = kru_conf->prefixes[prefix_index];
+ return priority;
+}
+
/// Increment KRU counters by given time.
void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
{
if (!stream) return; // UDP is not accounted in KRU
_Alignas(16) uint8_t key[16] = {0, };
- uint16_t max_load = 0;
- uint8_t prefix = 0;
- uint64_t base_price = BASE_PRICE(nsec);
-
+ const struct kru_conf *kru_conf;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
-
- kru_price_t prices[V6_PREFIXES_CNT];
- for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
- uint64_t price = base_price / V6_RATE_MULT[i];
- prices[i] = price > (kru_price_t)-1 ? -1 : price;
- }
-
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
+ kru_conf = &V6_CONF;
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
-
- kru_price_t prices[V4_PREFIXES_CNT];
- for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
- uint64_t price = base_price / V4_RATE_MULT[i];
- prices[i] = price > (kru_price_t)-1 ? -1 : price;
- }
-
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
+ kru_conf = &V4_CONF;
} else {
return;
}
+ uint64_t base_price = BASE_PRICE(nsec);
+ kru_price_t prices[kru_conf->prefixes_cnt];
+ for (size_t i = 0; i < kru_conf->prefixes_cnt; i++) {
+ uint64_t price = base_price / kru_conf->rate_mult[i];
+ prices[i] = price > (kru_price_t)-1 ? -1 : price;
+ }
+
+ uint16_t load;
+ uint8_t prefix;
+ kru_charge_classify(kru_conf, key, prices, &load, &prefix);
+
VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
- kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix);
+ kr_straddr(&addr->ip), nsec / 1000000.0, load, prefix);
}
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
}
_Alignas(16) uint8_t key[16] = {0, };
- uint16_t max_load = 0;
- uint8_t prefix = 0;
+ const struct kru_conf *kru_conf;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
+ kru_conf = &V6_CONF;
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
- max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
- 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
+ kru_conf = &V4_CONF;
}
- int priority = 0;
- for (; LOADS_THRESHOLDS[priority] < max_load; priority++);
+ uint16_t load;
+ uint8_t prefix;
+ int priority = kru_charge_classify(kru_conf, key, NULL, &load, &prefix);
- VERBOSE_LOG(" load %d on /%d\n", max_load, prefix);
+ VERBOSE_LOG(" load %d on /%d\n", load, prefix);
if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) {
phase_set(PHASE_NON_UDP);