* instead of their equivalents in the C++ stdlib :P
* Everything here is an unstable internal API of public-inbox and
* NOT intended for ordinary users; only public-inbox hackers
+ *
+ * As of public-inbox 2.2+, lei forks a subprocess on every request
+ * to scale to local demand. Public-facing use is prefork-only,
+ * since we expect public-facing servers to have dedicated resources
+ * allocated for it and local user software to have more dynamic
+ * resource usage.
*/
#ifndef _ALL_SOURCE
# define _ALL_SOURCE
} else if (r < 0) {
switch (errno) {
case EINTR: goto again;
+ case EAGAIN: exit(0); // spurious wakeup for lei dynfork
case EBADF: if (sock_fd < 0) exit(0);
// fall-through
default: err(EXIT_FAILURE, "recvmsg");
}
}
-static void recv_loop(void) // worker process loop
+static bool recv_once(void)
{
static char rbuf[4096 * 33]; // per-process
+ size_t len = sizeof(rbuf);
+ CLEANUP_REQ struct req req = {};
+
+ if (!recv_req(&req, rbuf, &len))
+ return false;
+ if (req.fp[1])
+ stderr_set(req.fp[1]);
+
+ // MY_ARG_MAX limits the return value of SPLIT2ARGV
+ req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
+ dispatch(&req);
+ ERR_CLOSE(req.fp[0], 0);
+ if (req.fp[1]) {
+ stderr_restore();
+ ERR_CLOSE(req.fp[1], 0);
+ }
+ return true;
+}
+
+static void recv_loop(void) // worker process loop
+{
struct sigaction sa = {};
sa.sa_handler = sigw;
CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
while (sock_fd == 0) {
- size_t len = sizeof(rbuf);
- CLEANUP_REQ struct req req = {};
-
- if (!recv_req(&req, rbuf, &len))
+ if (!recv_once())
continue;
- if (req.fp[1])
- stderr_set(req.fp[1]);
-
- // MY_ARG_MAX limits the return value of SPLIT2ARGV
- req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
- dispatch(&req);
- ERR_CLOSE(req.fp[0], 0);
- if (req.fp[1]) {
- stderr_restore();
- ERR_CLOSE(req.fp[1], 0);
- }
if (worker_needs_reopen) {
worker_needs_reopen = 0;
reopen_logs();
worker_pids[nr] = pid;
}
+#define XSIGNAL(signame, handler) do { \
+ if (signal(signame, handler) == SIG_ERR) \
+ err(EXIT_FAILURE, "signal(" #signame ", " #handler ")"); \
+} while (0)
+
static void start_worker(unsigned nr)
{
pid_t pid = fork();
cleanup_pids();
xclose(pipefds[0]);
xclose(pipefds[1]);
- if (signal(SIGCHLD, SIG_DFL) == SIG_ERR)
- err(EXIT_FAILURE, "signal CHLD");
- if (signal(SIGTTIN, SIG_IGN) == SIG_ERR)
- err(EXIT_FAILURE, "signal TTIN");
- if (signal(SIGTTOU, SIG_IGN) == SIG_ERR)
- err(EXIT_FAILURE, "signal TTIN");
+ XSIGNAL(SIGCHLD, SIG_DFL);
+ XSIGNAL(SIGTTIN, SIG_IGN);
+ XSIGNAL(SIGTTOU, SIG_IGN);
recv_loop();
exit(0);
}
_exit(EXIT_FAILURE);
}
-static void reaped_worker(pid_t pid, int st)
+static void reaped_prefork_worker(pid_t pid, int st)
{
unsigned long nr = 0;
for (; nr < nworker_hwm; nr++) {
start_workers();
}
+static void reaped_dynfork_worker(pid_t pid, int st) // lei-only
+{
+ if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+ alive = false;
+ else if (st)
+ warnx("worker died $?=%d alive=%d", st, (int)alive);
+}
+
static void do_sigchld(void)
{
while (1) {
int st;
pid_t pid = waitpid(-1, &st, WNOHANG);
+
if (pid > 0) {
- reaped_worker(pid, st);
+ if (lei)
+ reaped_dynfork_worker(pid, st);
+ else
+ reaped_prefork_worker(pid, st);
} else if (pid == 0) {
return;
} else {
return ret;
}
+static void prefork_loop(void)
+{
+ struct rlimit rl;
+
+ if (getrlimit(RLIMIT_NOFILE, &rl))
+ err(EXIT_FAILURE, "getrlimit");
+ my_fd_max = rl.rlim_cur;
+ if (my_fd_max < 72)
+ warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
+ my_fd_max -= 64;
+ nworker_hwm = nworker;
+ worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t));
+
+ start_workers();
+
+ while (alive || living_workers()) {
+ char sbuf[64];
+ ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+ if (n < 0) {
+ if (errno == EINTR) continue;
+ err(EXIT_FAILURE, "read");
+ } else if (n == 0) {
+ errx(EXIT_FAILURE, "read EOF");
+ }
+ do_sigchld();
+ for (ssize_t i = 0; i < n; i++) {
+ switch (sbuf[i]) {
+ case '.': break; // do_sigchld already called
+ case '-': do_sigttou(); break;
+ case '+': do_sigttin(); break;
+ case '#': parent_reopen_logs(); break;
+ default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+ }
+ }
+ }
+}
+
+static void xsetfl(int fd, int newflags)
+{
+ int fl = fcntl(fd, F_GETFL);
+ if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
+ if (fcntl(fd, F_SETFL, fl | newflags))
+ err(EXIT_FAILURE, "F_SETFL");
+}
+
+static void dynfork_dispatch(void) // lei-only
+{
+ sigset_t old;
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old));
+ pid_t pid = fork();
+ if (pid < 0)
+ err(EXIT_FAILURE, "fork");
+ if (pid > 0) {
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL));
+ return; // don't even care about the PID
+ }
+ if (pid == 0) {
+ xclose(pipefds[0]);
+ xclose(pipefds[1]);
+ XSIGNAL(SIGCHLD, SIG_DFL);
+ XSIGNAL(SIGTTIN, SIG_IGN);
+ XSIGNAL(SIGTTOU, SIG_IGN);
+ XSIGNAL(SIGUSR1, SIG_DFL);
+ XSIGNAL(SIGTERM, SIG_DFL);
+ (void)recv_once();
+ exit(0);
+ }
+}
+
+static void dynfork_check_sigs(void)
+{
+ char sbuf[64];
+ ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+ if (n < 0) {
+ if (errno == EAGAIN) return; // spurious wakeup
+ err(EXIT_FAILURE, "read");
+ } else if (n == 0) {
+ errx(EXIT_FAILURE, "read EOF");
+ }
+ do_sigchld();
+ for (ssize_t i = 0; i < n; i++) {
+ switch (sbuf[i]) {
+ case '.': break; // do_sigchld already called
+ case '-': break; // ignore (TTOU|TTIN)
+ case '+': break;
+ case '#': reopen_logs(); break;
+ default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+ }
+ }
+}
+
+static void dynfork_loop(void) // for lei
+{
+ struct pollfd pfd[2];
+ pfd[0].fd = sock_fd;
+ pfd[1].fd = pipefds[0];
+ pfd[0].events = pfd[1].events = POLLIN;
+
+ xsetfl(pipefds[0], O_NONBLOCK);
+ xsetfl(sock_fd, O_NONBLOCK);
+ my_fd_max = 128; // just keep srch_init happy
+
+ while (alive) {
+ int rc = poll(pfd, MY_ARRAY_SIZE(pfd), -1);
+
+ if (rc < 0) {
+ if (errno == EINTR) continue;
+ err(EXIT_FAILURE, "poll");
+ } else {
+ if (pfd[0].revents & POLLIN)
+ dynfork_dispatch();
+ if (pfd[1].revents & POLLIN)
+ dynfork_check_sigs();
+ }
+ }
+}
+
int main(int argc, char *argv[])
{
int c;
socklen_t slen = (socklen_t)sizeof(c);
- stdout_path = getenv("STDOUT_PATH");
- stderr_path = getenv("STDERR_PATH");
- struct rlimit rl;
if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
err(EXIT_FAILURE, "getsockopt");
if (c != SOCK_SEQPACKET)
errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
- if (getrlimit(RLIMIT_NOFILE, &rl))
- err(EXIT_FAILURE, "getrlimit");
- my_fd_max = rl.rlim_cur;
- if (my_fd_max < 72)
- warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
- my_fd_max -= 64;
+ stdout_path = getenv("STDOUT_PATH");
+ stderr_path = getenv("STDERR_PATH");
thread_fp = new ThreadFieldProcessor();
xh_date_init();
}
CHECK(int, 0, sigdelset(&workerset, SIGTERM));
CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
- nworker_hwm = nworker;
- worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t));
if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
- int fl = fcntl(pipefds[1], F_GETFL);
- if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
- if (fcntl(pipefds[1], F_SETFL, fl | O_NONBLOCK))
- err(EXIT_FAILURE, "F_SETFL");
-
+ xsetfl(pipefds[1], O_NONBLOCK);
CHECK(int, 0, sigdelset(&pset, SIGCHLD));
CHECK(int, 0, sigdelset(&pset, SIGTTIN));
CHECK(int, 0, sigdelset(&pset, SIGTTOU));
CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL));
-
- start_workers();
-
- char sbuf[64];
- while (alive || living_workers()) {
- ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
- if (n < 0) {
- if (errno == EINTR) continue;
- err(EXIT_FAILURE, "read");
- } else if (n == 0) {
- errx(EXIT_FAILURE, "read EOF");
- }
- do_sigchld();
- for (ssize_t i = 0; i < n; i++) {
- switch (sbuf[i]) {
- case '.': break; // do_sigchld already called
- case '-': do_sigttou(); break;
- case '+': do_sigttin(); break;
- case '#': parent_reopen_logs(); break;
- default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
- }
- }
- }
-
+ if (lei)
+ dynfork_loop();
+ else
+ prefork_loop();
return 0;
}