#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)
-#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
-#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1)
+#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<11, -1} // the last one should be UINT16_MAX
+#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS)) // -1 for synchronous, +1 for unverified
+#define UNVERIFIED_PRIORITY 1 // -1 synchronous, 1 async UDP, {0, 2, 3} other async
#define KRU_CAPACITY (1<<10)
#define MAX_DECAY (KRU_LIMIT * 0.0006929) // -> halving counters in 1s
#define MAX_WAITING_REQS 10000 // if exceeded, process single deferred request immediatelly in poll phase
#define VERBOSE_LOG(...) kr_log_notice(DEVEL, "defer | " __VA_ARGS__)
-// #define VERBOSE_LOG(...)
+//#define VERBOSE_LOG(...)
struct defer {
size_t capacity;
max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
+ } else {
+ return;
}
VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
/// Determine priority of the request in [-1, QUEUES_CNT - 1].
/// Lower value has higher priority, -1 should be synchronous.
-static inline int classify(const union kr_sockaddr *addr)
+static inline int classify(const union kr_sockaddr *addr, bool stream)
{
+ if (!stream) {
+ VERBOSE_LOG(" unverified address\n");
+ return UNVERIFIED_PRIORITY; // UDP
+ }
+
_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
VERBOSE_LOG(" load %d on /%d\n", max_load, prefix);
- return threshold_index - 1;
+ int priority = threshold_index - 1;
+ if (priority >= UNVERIFIED_PRIORITY)
+ priority++;
+
+ return priority;
}
struct protolayer_iter_ctx *ctx = pop_query();
if (ctx == NULL) return;
- defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+ defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
struct pl_defer_iter_data *iter_data = protolayer_iter_data_get_current(ctx);
uint64_t age_ns = defer_sample_state.stamp - iter_data->req_stamp;
return;
}
- int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr);
+ int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
if (priority > queue_ix) { // priority dropped (got higher value)
VERBOSE_LOG(" PUSH to %d\n", priority);
push_query(ctx, priority);
if (ctx->session->outgoing)
return protolayer_continue(ctx);
- defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr);
+ defer_sample_addr((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
struct pl_defer_iter_data *data = iter_data;
data->req_stamp = defer_sample_state.stamp;
VERBOSE_LOG(" %s UNWRAP\n",
kr_straddr(ctx->comm->comm_addr));
- int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr);
+ int priority = classify((const union kr_sockaddr *)ctx->comm->comm_addr, ctx->session->stream);
if (priority == -1) {
VERBOSE_LOG(" CONTINUE\n");
defer_sample_restart();
}
defer_sample_stop(); // TODO skip calling and use just restart elsewhere?
- udp_queue_send_all();
+ udp_queue_send_all(); // TODO keep here or call after processing each priority level?
+ // (or after UNVERIFIED_PRIORITY but beware future QUIC)
if (waiting_requests > 0) {
VERBOSE_LOG(" %d waiting\n", waiting_requests);
}