// payload is counted either as part of session wire buffer (for stream) or as part of iter ctx (for datagrams)
+/// Async-signal-safe snprintf-like formatting function, it supports:
+/// * %s takes (char *);
+/// * %u takes unsigned, %NUMu allowed for padding with spaces or zeroes;
+/// * %x takes unsigned, %NUMx allowed;
+/// * %f takes double, behaves like %.3f;
+/// * %r takes (struct sockaddr *).
+int sigsafe_format(char *str, size_t size, const char *fmt, ...) {
+ char *strp = str; // ptr just after last written char
+ char *stre = str + size; // ptr just after str buffer
+ const char digits[] ="0123456789abcdef";
+ va_list ap;
+ va_start(ap, fmt); // NOLINT, should be safe in GCC
+ while (*fmt && (stre-strp > 1)) {
+ const char *append_str = NULL;
+ int append_len = -1;
+ bool mod_zero = false;
+ int mod_int = 0;
+ int base = 10;
+ char tmpstr[50];
+
+ if (*fmt != '%') {
+ char *perc = strchr(fmt, '%');
+ append_str = fmt;
+ append_len = perc ? perc - fmt : strlen(fmt);
+ fmt += append_len;
+ } else while(fmt++, !append_str) {
+ switch(*fmt) {
+ case '%': // %%
+ append_str = "%";
+ break;
+ case 's': // just %s
+ append_str = va_arg(ap, char *); // NOLINT, should be safe in GCC
+ break;
+ case 'x': // %x, %#x, %0#x
+ base = 16; // passthrough
+ case 'u': { // %u, %#u, %0#u
+ unsigned num = va_arg(ap, unsigned); // NOLINT, should be safe in GCC
+ char *sp = tmpstr + sizeof(tmpstr);
+ *--sp = '\0';
+ while ((num > 0) || !*sp) {
+ *--sp = digits[num % base];
+ num /= base;
+ mod_int--;
+ }
+ while (mod_int-- > 0) {
+ *--sp = mod_zero ? '0' : ' ';
+ }
+ append_str = sp;
+ } break;
+ case 'f': { // just %f, behaves like %.3f
+ double valf = va_arg(ap, double); // NOLINT, should be safe in GCC
+ const char *sign = "";
+ if (valf < 0) { sign = "-"; valf *= -1; }
+ uint64_t vali = valf * 1000 + 0.5; // NOLINT(bugprone-incorrect-roundings), just minor imprecisions
+ // larger numbers, NaNs, ... are not handled
+ strp += sigsafe_format(strp, stre-strp, "%s%u.%03u", sign, (unsigned)(vali / 1000), (unsigned)(vali % 1000));
+ append_str = "";
+ } break;
+ case 'r': { // just %r, takes (struct sockaddr *)
+ struct sockaddr *addr = va_arg(ap, void *); // NOLINT, should be safe in GCC
+ if (!addr) {
+ append_str = "(null)";
+ break;
+ }
+ switch (addr->sa_family) {
+ case AF_UNIX:
+ append_str = ((struct sockaddr_un *)addr)->sun_path;
+ break;
+ case AF_INET: {
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)addr;
+ uint8_t *ipv4 = (uint8_t *)&(addr4->sin_addr);
+ strp += sigsafe_format(strp, stre-strp, "%u.%u.%u.%u#%u", ipv4[0], ipv4[1], ipv4[2], ipv4[3], addr4->sin_port);
+ append_str = "";
+ } break;
+ case AF_INET6: {
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr;
+ uint8_t *ipv6 = (uint8_t *)&(addr6->sin6_addr);
+ int mzb = -2, mze = 0; // maximal zero-filled gap begin (incl.) and end (excl.)
+ { // find longest gap
+ int zb = 0, ze = 0;
+ for (size_t i = 0; i < 16; i += 2) {
+ if (!ipv6[i] && !ipv6[i+1]) {
+ if (i == ze) {
+ ze += 2;
+ } else {
+ if (ze - zb > mze - mzb) {
+ mzb = zb; mze = ze;
+ }
+ zb = i; ze = i + 2;
+ }
+ }
+ }
+ if (ze - zb > mze - mzb) {
+ mzb = zb; mze = ze;
+ }
+ }
+ for (int i = -!mzb; i < 15; i++) {
+ if (i == mzb) i = mze - 1; // after ':' (possibly for i=-1), skip sth. and continue with ':' (possibly for i=15)
+ if (i%2) {
+ if (strp < stre) *strp++ = ':';
+ } else {
+ strp += sigsafe_format(strp, stre-strp, "%x", (ipv6[i] << 8) | ipv6[i+1]);
+ }
+ }
+ strp += sigsafe_format(strp, stre-strp, "#%u", addr6->sin6_port);
+ append_str = "";
+ } break;
+ case AF_UNSPEC:
+ append_str = "(unspec)";
+ break;
+ default:
+ append_str = "(unknown)";
+ break;
+ }
+ } break;
+ default:
+ if (('0' <= *fmt) && (*fmt <= '9')) {
+ if ((mod_int == 0) && (*fmt == '0')) {
+ mod_zero = true;
+ } else {
+ mod_int = mod_int * 10 + *fmt - '0';
+ }
+ } else {
+ append_str = "[ERR]";
+ }
+ break;
+ }
+ }
+
+ // append to str (without \0)
+ append_len = MIN(append_len >= 0 ? append_len : strlen(append_str), stre-strp-1);
+ memcpy(strp, append_str, append_len);
+ strp += append_len;
+ }
+ *strp = '\0';
+ va_end(ap); // NOLINT, should be safe in GCC
+ return strp-str;
+}
+
#define VERBOSE_LOG(...) kr_log_debug(DEFER, " | " __VA_ARGS__)
/// Like VERBOSE_LOG, but avoids evaluating the parameters if not logging.
#define VERBOSE_LOG_PRICY(...) { if (kr_log_is_debug(DEFER, NULL)) VERBOSE_LOG(__VA_ARGS__); }
+// Uses NON-STANDARD format string, see sigsafe_format above.
+#define SIGSAFE_LOG(max_size, ...) { \
+ char msg[max_size]; \
+ int len = sigsafe_format(msg, sizeof(msg), "[defer ] "__VA_ARGS__); \
+ write(kr_log_target == LOG_TARGET_STDOUT ? 1 : 2, msg, len); \
+}
+#define SIGSAFE_VERBOSE_LOG(max_size, ...) { \
+ if (kr_log_is_debug(DEFER, NULL)) /* NOLINT */\
+ { SIGSAFE_LOG(max_size, " | " __VA_ARGS__)}}
+
+
struct defer {
size_t capacity;
kru_price_t max_decay;
uint16_t *out_load, uint8_t *out_prefix)
{
uint16_t loads[kru_conf->prefixes_cnt];
- KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(),
+ const uint64_t now = kr_now(); // NOLINT, async-signal-safe, uv_now only reads uint64_t
+ KRU.load_multi_prefix((struct kru *)defer->kru, now,
kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads);
int priority = 0;
uint8_t prefix;
kru_charge_classify(kru_conf, key, prices, &load, &prefix);
- VERBOSE_LOG_PRICY(
- " %s ADD %4.3f ms * %.2f -> load: %d on /%d\n",
- kr_straddr(&addr->ip), nsec / 1000000.0, pf16 / (float)(1<<16), load, prefix
- );
+ SIGSAFE_VERBOSE_LOG(KR_STRADDR_MAXLEN + 100,
+ " %r ADD %f ms * %f -> load: %u on /%u\n",
+ &addr->ip, nsec / 1000000.0, pf16 / (float)(1<<16), load, prefix);
}
/// Determine priority of the request in [0, QUEUES_CNT - 1];
uint64_t elapsed = 0;
if (defer_sample_state.is_accounting) {
elapsed = defer_get_stamp() - defer_sample_state.stamp;
- VERBOSE_LOG("SIGALRM %s, host %s used %.3f s of cpu time on ongoing operation\n",
+ SIGSAFE_VERBOSE_LOG(KR_STRADDR_MAXLEN + 100,
+ "SIGALRM %s, host %r used %f s of cpu time on ongoing operation.\n",
signum ? "received" : "initialized",
- kr_straddr(&defer_sample_state.addr.ip), elapsed / 1000000000.0); // XXX
+ &defer_sample_state.addr.ip, elapsed / 1000000000.0);
} else {
- VERBOSE_LOG("SIGALRM %s, no measuring in progress\n",
+ SIGSAFE_VERBOSE_LOG(KR_STRADDR_MAXLEN + 100,
+ "SIGALRM %s, no measuring in progress.\n",
signum ? "received" : "initialized");
}
int64_t rest_to_timeout_ms = defer->hard_timeout - elapsed / 1000000; // ms - ns
+
if (rest_to_timeout_ms <= 0) {
- uv_update_time(uv_default_loop()); // TODO more conceptual solution?
defer_charge(elapsed, &defer_sample_state.addr, defer_sample_state.stream);
- kr_log_crit(DEFER, "Host %s used %0.3f s of cpu time continuously, interrupting cresd.\n",
- kr_straddr(&defer_sample_state.addr.ip), elapsed / 1000000000.0);
- classify(&defer_sample_state.addr, defer_sample_state.stream); // XXX
- __sync_synchronize();
+ SIGSAFE_LOG(KR_STRADDR_MAXLEN + 100,
+ "Host %r used %f s of cpu time continuously, interrupting kresd.\n",
+ &defer_sample_state.addr.ip, elapsed / 1000000000.0);
abort();
}
alarm((rest_to_timeout_ms + 999) / 1000);