From: Jim Jagielski Date: Tue, 2 Jun 2026 10:04:18 +0000 (+0000) Subject: [PATCH] mpm_motorz: performance, bug fixes, concurrency hardening, async HTTP/2 X-Git-Url: http://git.ipfire.org/gitweb/index.cgi?a=commitdiff_plain;h=51651aec5397f9ba2b474ba7f39ddb7278369365;p=thirdparty%2Fapache%2Fhttpd.git [PATCH] mpm_motorz: performance, bug fixes, concurrency hardening, async HTTP/2 Performance: - Accept drain loop: motorz_io_accept now drains the kernel accept queue in one poll wakeup (do/while until EAGAIN, admission-disabled, or die_now) instead of one connection per round-trip through apr_pollset_poll. Eliminates O(N) poll wakeups for N burst connections. - Hot-path log levels: 25 APLOG_DEBUG calls on the per-request path demoted to APLOG_TRACE6/TRACE7/TRACE8, matching event MPM practice. Error and admission- control events remain at DEBUG. - Redundant pollset_remove removed: the defensive apr_pollset_remove in motorz_io_process is gone from the common dispatch path (connection is already claimed before reaching here); isolated to the clogging-filter branch where it is actually needed. - Admission control: added active-thread saturation check (active >= threads_per_child) alongside the existing idle==0 and pending>=hi checks, catching slow-client / keep-alive-heavy saturation where the task queue appears empty but all workers are blocked in I/O. apr_size_t underflow fix: read total before idle, clamp active = (total > idle) ? total - idle : 0, preventing spurious disable during graceful restart. - Hysteresis low-water mark tightened from 50% to 75% of ThreadsPerChild, so the listener re-enables sooner after a burst subsides. Bug fixes: - Clogging-filter timer race (use-after-free): the SSL/clogging path in motorz_io_process bypassed motorz_conn_claim(). A pending timer could fire concurrently while the worker was inside ap_run_process_connection(), dispatching a timeout worker on the same scon. Fixed by replacing the bare pollset_remove with motorz_conn_claim(), which atomically disarms both the pollset entry and the timer under poller->mtx. - motorz_resume_suspended: restore c->sbh before ap_run_resume_connection(). motorz_suspend_connection() NULLs c->sbh (matching event's notify_suspend); without this fix any module calling ap_update_child_status(c->sbh) after resume dereferenced NULL. - SERVER_BUSY_READ scoreboard update: add ap_update_child_status(scon->sbh, SERVER_BUSY_READ, NULL) before ap_run_process_connection(), matching event MPM and fixing misleading mod_status output where motorz connections showed as SERVER_READY throughout the read/process phase. - requests_this_child data race: written by the accepting poller thread, read by the supervisor main thread. Declared volatile to prevent stale cached reads across threads. - conn_id always zero: every connection was created with conn_id=0, breaking %{connection} log formats and any module keying state on c->id. Now derived as ID_FROM_CHILD_THREAD(my_child_num, atomic_seq), matching worker/event MPM formula for globally unique IDs across children and connections. - ap_create_sb_handle hardcoded child 0: all connections reported as child 0 in the scoreboard, making mod_status show all activity in slot [0][0]. Now passes my_child_num so each child's activity appears in the correct slot. - Worker threads not drained on exit: apr_thread_pool_destroy() is now called in clean_child_exit() before apr_pool_destroy(pchild), joining all workers and preventing use-after-free in ap_log_error / apr_pool_clear after the pool is torn down. - ThreadsPerChild 1 throughput collapse: the hysteresis low-water mark becomes (1*3)/4=0, so listeners only re-enable when the task queue is completely empty. Added startup/runtime warnings (APLOGNO 10555/10556) advising ThreadsPerChild >= 4. next-number advanced to 10557. Multi-poller scale-out (PollersPerChild): - motorz_core_t no longer holds a single pollset/timeout_ring/mtx/recycle-list; these are moved to per-poller motorz_poller_t contexts. - Each poller owns its pollset, skiplist timer ring, ring mutex, and lock-free MPSC transaction-pool recycle list, so pollers never contend with each other. - Connections are sharded round-robin to pollers at accept time (scon->poller); pool recycling returns to the accepting poller's free-list (scon->pool_poller). - PollersPerChild directive added (0 = auto from online CPUs, capped at 8). - Listener admission control and the pipe-of-death/generation supervision are isolated to poller 0 and the main-thread supervisor respectively. - AP_MPMQ_CAN_SUSPEND / motorz_resume_suspended hook wired in for full async-suspend support (fix for CONN_STATE_SUSPENDED lifecycle). - Non-blocking lingering close: replaced blocking ap_lingering_close() with motorz_start_lingering_close() / motorz_lingering_close() that hand the draining socket back to the poll loop with a bounded linger timeout. - Pool cleanup (motorz_conn_pool_cleanup) cancels the timer under poller->mtx so pool destruction never leaves a dangling skiplist entry. Async HTTP/2 handoff -- ENABLED (MOTORZ_ENABLE_ASYNC 1): - motorz reports AP_MPMQ_IS_ASYNC=1 / AP_MPMQ_CAN_WAITIO=1. motorz_io_process() implements CONN_STATE_ASYNC_WAITIO: arm the pollset for read/write per c->cs->sense under Timeout and re-dispatch into PROCESSING, mirroring event. New APLOGNOs 10557-10559. - Clogging-filter branch honors the hook-returned connection state (WRITE_COMPLETION / ASYNC_WAITIO / SUSPENDED) and maps KEEPALIVE to WRITE_COMPLETION instead of force-closing to LINGER. h2 c2 connections set clogging_input_filters unconditionally, so the old behavior collapsed h2 keep-alive into one-shot connections. - Forward-declare motorz_update_listeners() (called from motorz_io_accept before its definition; an implicit declaration is a hard error under strict C). Replaced a dead duplicate clean_child_exit prototype. The async-handoff churn bug -- FIXED in mod_http2 (h2_session.c): Under async handoff mod_http2 hands the master (c1) connection back to the MPM between requests; motorz re-dispatches it on a fresh worker. Under rapid HTTP/2 connection churn this raced mod_http2's stream lifecycle: a client's graceful GOAWAY drove the c1 session straight to DONE -> CONN_STATE_LINGER, and the MPM close ran m_stream_cleanup()/h2_c2_abort() on any stream whose secondary connection (c2) had emitted its response but not yet called c2_prod_done() -- silently dropping that response (~0.2-3% under h2load -n.. -c50 -m1). The fix establishes the invariant "a c1 connection is closed only after every stream's c2 has finished and flushed", in two points in h2_session.c: * h2_session_ev_remote_goaway(): a graceful GOAWAY (error code 0) with streams still in flight no longer transits to H2_SESSION_ST_DONE. It RSTs only the unprocessed streams and keeps the session running so the in-flight streams complete and their c2 output is written. (An error GOAWAY, or one with no open streams, still goes to DONE immediately.) This also matches RFC 9113: a peer GOAWAY stops new streams, it does not abort streams at or below its last-stream-id. * H2_SESSION_ST_IDLE handling: once those streams drain (open_streams == 0) and the remote has shut down, send our GOAWAY and go to DONE from IDLE. Reaching DONE only here -- after the c2s are done and flushed -- keeps the close from racing an in-flight c2. This benefits mpm_event too and is a conformance improvement, not just a motorz workaround. MOTORZ_ENABLE_ASYNC remains a single flip point (set 0 to fall back to advertising IS_ASYNC=0) should a regression ever reappear. Hardening of the fix (this change): - motorz.c, smoke.sh: corrected stale "async DISABLED" comments that still described the old workaround while the code already enabled async. - h2_session.c: documented the liveness bound -- keeping the session alive on a graceful GOAWAY cannot pin c1 open indefinitely, since a wedged c2 is bounded by its own request Timeout, which drops open_streams and lets c1 reach IDLE. - h2_session.h: documented the open_streams threading invariant the fix now relies on (c1-thread only; async re-dispatch is successive not concurrent, so no atomics/volatile needed; decrements to 0 only after each c2 has flushed). Tests (server/mpm/motorz/test/): setup.sh configures+builds httpd; run-all.sh runs the smoke, HTTP/1.1, and HTTP/2-over-TLS suites; bench.sh compares motorz vs event throughput. The async assertions expect async ON (CONN_STATE_ASYNC_WAITIO arms / "returning to mpm c1 monitoring" appears). The churn regression measures the fix correctly (two pitfalls, documented in MOTORZ.README and encoded in the tests): * Assert on RESPONSE LOSS (started - succeeded), NOT on h2load's "failed" total. "failed" also counts connection-establishment errors (ephemeral port / accept-queue pressure on busy loopback) which are environmental and appear with and without the fix; only started > succeeded is this bug. * Measure at LogLevel info, NOT trace8. The bug is a Heisenbug; trace8 slows the hot path enough to hide it, so a churn assertion run under trace8 passes even with the fix removed (vacuous). The load-bearing churn regression is in run-http2.sh at info; smoke.sh runs at trace8 for its state-machine traces, so its churn check is a gross-sanity pass only. Full analysis, reproduction recipe, and the fix are in server/mpm/motorz/MOTORZ.README ("HTTP/2 async handoff"). Official mod_http2 pytest suite (test/modules/http2/): test_h2_106_02 now skips on MPMs that do not register ServerLimit (i.e. mpm_motorz, whose static fixed-size process pool makes StartServers the hard daemon limit, so ServerLimit is meaningless and unregistered). The test's ServerLimit/MaxConnectionsPerChild config is a syntax error there; prefork/worker/event still run it unchanged. MaxConnectionsPerChild itself IS supported by motorz (a core directive honored by the supervisor). Validated: full http2 suite green on both event and motorz (only known flaky proxy-backend tests aside), and the motorz custom suite (smoke/http1/http2) passes with 0 churn response-loss. Docs and packaging: add the mpm_motorz manual page (docs/manual/mod/motorz.xml + .meta, registered in allmodules.xml) documenting the threading model, async handling, admission control, and the PollersPerChild directive; add a CHANGES entry; and insert motorz into the default-MPM fallback chain (server/mpm/ config2.m4) between event and worker. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1934868 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 0139c07e8f..e62d80d187 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,13 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.1 + *) mpm_motorz, mod_http2: Rework the MotorZ MPM with + multi-poller scale-out (new PollersPerChild directive) and async + keep-alive/HTTP/2 handoff, plus concurrency hardening and bug + fixes. Includes a required mod_http2 fix so a client's graceful + GOAWAY does not drop an in-flight response under async MPMs, and + a self-contained motorz test suite. [Jim Jagielski] + *) mod_http2: improved early cleanup of streams. [Stefan Eissing] diff --git a/docs/log-message-tags/next-number b/docs/log-message-tags/next-number index 37eec71096..b3d29a4262 100644 --- a/docs/log-message-tags/next-number +++ b/docs/log-message-tags/next-number @@ -1 +1 @@ -10547 +10557 diff --git a/docs/manual/mod/allmodules.xml b/docs/manual/mod/allmodules.xml index 2149c70674..9f114e808e 100644 --- a/docs/manual/mod/allmodules.xml +++ b/docs/manual/mod/allmodules.xml @@ -138,6 +138,7 @@ mod_xml2enc.xml mpm_common.xml event.xml + motorz.xml mpm_netware.xml mpmt_os2.xml prefork.xml diff --git a/docs/manual/mod/motorz.xml b/docs/manual/mod/motorz.xml new file mode 100644 index 0000000000..8950e47673 --- /dev/null +++ b/docs/manual/mod/motorz.xml @@ -0,0 +1,258 @@ + + + + + + + + +motorz +A lean, fast, self-contained event-driven Multi-Processing Module +built on the APR pollset and thread pool especially suited as a reverse proxy +MPM +motorz.c +mpm_motorz_module + + +

The motorz Multi-Processing Module (MPM) is an + asynchronous, event-driven implementation. It combines a + prefork-style fixed pool of child processes with an event core built on + APR's pollset and a shared thread pool. Each child + runs one or more dedicated poller threads that watch sockets + and timers, dispatching ready I/O events and expired timers to a pool of + worker threads. The workers never poll; they only process the + connection/request work pushed to them.

+ +

The design goal is a fast, efficient, single, compact MPM that runs on modern + Unix platforms by leaning on APR as much as possible, while still supporting + the asynchronous connection handling needed for efficient keep-alive and + HTTP/2.

+ +

To use the motorz MPM, add + --with-mpm=motorz to the configure + script's arguments when building the httpd, or build + it as a loadable module with + --enable-mpms-shared=motorz.

+ +
+ +The event MPM +The worker MPM +The prefork MPM +Setting which addresses and ports Apache HTTP Server uses + +
How it Works +

motorz uses prefork as the framework for process + management and an event core for connection handling. A single control + process (the parent) launches a fixed number of child processes, as set + by the StartServers directive. + Unlike worker and event, the number of + children does not float with load: motorz maintains a + static pool, replacing children one-for-one as they exit. Concurrency + within a host is scaled by adding worker threads + (ThreadsPerChild) and, where + the poll/dispatch path is the bottleneck, poller threads + (PollersPerChild), rather than by spawning more + processes.

+ +

Each child process runs:

+
    +
  • One or more poller threads. Each poller owns its + own pollset, timer ring (with a guarding mutex) and lock-free + transaction-pool recycle list, so pollers never contend with one + another. A poller polls, dispatches ready I/O events and expired + timers to the worker pool, and (for the poller that owns the listening + sockets) accepts new connections. The number of pollers is controlled + by PollersPerChild.
  • + +
  • A shared pool of worker threads + (ThreadsPerChild) that run + the actual connection and request processing pushed to them. Workers + never poll.
  • + +
  • A supervisor (the child's main thread) that + watches MaxConnectionsPerChild + and the pipe-of-death / generation, signals the pollers to wind down, + and then joins them on exit.
  • +
+ +

A connection is sharded to one poller at accept time (round-robin) and + bound to it for its whole lifetime: it re-arms in, and times out on, that + poller's pollset and timer ring. Using multiple pollers lifts the + single-poll-thread throughput ceiling, so accept, event dispatch and timer + expiry scale with PollersPerChild instead of being + serialized on one thread.

+ +

While the parent process is usually started as root under + Unix in order to bind to port 80, the child processes and threads are + launched by the server as a less-privileged user. The + User and + Group directives are used to set + the privileges of the Apache HTTP Server child processes. The child + processes must be able to read all the content that will be served, but + should have as few privileges beyond that as possible.

+ +

MaxConnectionsPerChild + controls how frequently the server recycles processes by retiring old ones + and launching new ones.

+
+ +
Asynchronous connection handling +

motorz reports itself as an asynchronous MPM. When a + worker finishes the active phase of a connection (for example, an + HTTP keep-alive connection between requests, or a connection waiting on + further I/O), it hands the socket back to its poller rather than holding a + worker thread idle. The poller waits for the next event on that socket, + bounded by the configured Timeout, + and re-dispatches the connection to a worker only when there is work to do. + This frees worker threads from idle keep-alive connections and is what + allows efficient HTTP/2 handling, where the master connection is handed + back to the MPM between requests.

+ +

Lingering close is also non-blocking: instead of blocking a worker for + the duration of the lingering-close timeout, the draining socket is handed + back to the poll loop with a bounded linger timeout, so the worker is + returned to the pool immediately.

+ +

Modules that take a connection fully asynchronous (suspending it and + resuming it later) are supported; a suspended connection is parked and + re-armed on its owning poller when resumed.

+
+ +
Admission control +

To keep a child safe under overload, motorz applies + listener backpressure. When the worker pool saturates, the poller that + owns the listening sockets removes them from its pollset and stops + accepting; it re-adds them once the backlog drains. This keeps the work + queue and per-connection memory bounded rather than growing without limit. + The decision is based on the worker pool's idle, pending and active-thread + counts, with hysteresis to avoid flapping the listeners on and off.

+ + ThreadsPerChild and admission control +

Because the admission-control low-water mark is a fraction of + ThreadsPerChild, very small + values (in particular ThreadsPerChild 1) cause the listeners + to re-enable only when the work queue is completely empty, which severely + degrades throughput. A value of ThreadsPerChild of at least 4 + is strongly recommended; the server emits a warning otherwise.

+
+
+ +
Relationship to other MPMs +

motorz uses prefork for process management and an APR + thread pool for workers, with pollers dispatching work to that pool. This + is distinct from event's listener/worker/fdqueue design, + in which the worker threads themselves re-arm a shared, thread-safe + pollset.

+ +

Whether additional pollers help depends on the workload. If the worker + threads are the CPU bottleneck—typical for real request + processing—the poller threads are not the limiting factor, and a + PollersPerChild beyond one or two yields little. The + multiple-poller design removes motorz's + structural single-thread ceiling, but per-host throughput is still + governed by worker CPU.

+ + No ServerLimit / dynamic process scaling +

Unlike worker and event, + motorz does not scale the number of child processes with + load and does not provide a separate + ServerLimit ceiling. The process + pool is fixed at StartServers, + which therefore acts as the hard daemon limit, and there are no + MinSpareThreads / + MaxSpareThreads / + MaxRequestWorkers controls. + Scale concurrency with + ThreadsPerChild (and, if the + poll path saturates, PollersPerChild).

+
+
+ +CoreDumpDirectory + +EnableExceptionHook + +Group + +Listen + +ListenBacklog + +MaxConnectionsPerChild + +MaxMemFree + +PidFile + +ScoreBoardFile + +SendBufferSize + +StartServers + +ThreadLimit + +ThreadsPerChild + +ThreadStackSize + +User + + + +PollersPerChild +Number of poll threads per child process +PollersPerChild number +PollersPerChild 0 +server config +motorz + + +

The PollersPerChild directive sets the number of + poller threads created in each child process. Each poller owns its own + pollset, timer ring and connection-recycle list, and handles a shard of + the child's connections, so adding pollers raises the rate at which a + single child can accept connections and dispatch I/O events and timer + expiries.

+ +

A value of 0 (the default) means auto: the number + of pollers is derived from the number of online CPUs, capped at a built-in + maximum. In all cases the number of pollers is clamped so that it never + exceeds ThreadsPerChild and is + never less than one.

+ +

Because event dispatch is rarely the bottleneck for real request + processing—worker CPU usually is—values beyond one or two + seldom improve throughput. Raising PollersPerChild + is mainly useful for workloads dominated by very high connection churn or + large numbers of idle, event-driven connections, where the poll/accept + path itself becomes the limit.

+ + Example + +StartServers 2 +ThreadsPerChild 64 +ThreadLimit 64 +PollersPerChild 2 + + +
+
+ +
diff --git a/docs/manual/mod/motorz.xml.meta b/docs/manual/mod/motorz.xml.meta new file mode 100644 index 0000000000..0d8012243a --- /dev/null +++ b/docs/manual/mod/motorz.xml.meta @@ -0,0 +1,12 @@ + + + + + motorz + /mod/ + .. + + + en + + diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 3affaf2a20..a2ddab8d57 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -1536,7 +1536,42 @@ static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char session->remote.accepting = 0; session->remote.shutdown = 1; cleanup_unprocessed_streams(session); - transit(session, "remote goaway", H2_SESSION_ST_DONE); + if (arg == 0 && session->open_streams > 0) { + /* Graceful client GOAWAY while we are still processing streams it + * sent us. Do NOT go to DONE here: that makes h2_c1_run() put the + * connection into CONN_STATE_LINGER and the MPM close it, which + * runs h2_mplx_c1_destroy() and h2_c2_abort()s any stream whose + * secondary connection (c2) has not yet flushed its response onto + * c1 -- silently dropping that response. The window between a c2 + * finishing its output and signalling done is small, but an async + * MPM that hands c1 back to a fresh worker between events (e.g. + * mpm_motorz) drives the close into it far more often than + * mpm_event, whose scheduling happens to let c2 drain first. + * + * Instead keep the session running. The remaining streams complete + * normally, their output is written, and once open_streams reaches + * 0 we finish from the IDLE state below (or via NO_MORE_STREAMS), + * i.e. only after every c2 is done and flushed. This also honors + * RFC 9113: a peer's GOAWAY does not abort streams at or below its + * last-stream-id, it just stops new ones. + * + * Liveness: keeping the session running here does NOT risk pinning + * c1 open indefinitely on a slow or wedged c2. While draining we + * return to ST_BUSY/ST_WAIT and poll the mplx with session->s->timeout + * (see h2_session_process()), and each c2 runs its request under the + * same server Timeout. A c2 that stops making progress is aborted by + * its own timeout, which drops open_streams and lets us reach IDLE + * and finish. The new dependency this introduces -- relative to the + * old straight-to-DONE behaviour -- is exactly that c1 teardown now + * waits on c2 progress/timeout instead of racing ahead of it; that + * is the point of the fix, and it is bounded by Timeout. */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c1, + H2_SSSN_MSG(session, + "remote goaway, draining open streams")); + } + else { + transit(session, "remote goaway", H2_SESSION_ST_DONE); + } } } @@ -1944,6 +1979,24 @@ apr_status_t h2_session_process(h2_session *session, int async, case H2_SESSION_ST_IDLE: ap_assert(session->open_streams == 0); + if (session->remote.shutdown) { + /* The client sent a GOAWAY and all streams it sent us have now + * been processed and their output written (open_streams == 0). + * It will not open new streams, so there is nothing to wait + * for: send our GOAWAY and finish. Reaching DONE only now -- as + * opposed to when the client's GOAWAY arrived -- guarantees the + * connection is closed only after every c2 is done and flushed, + * which is what keeps async handoff (e.g. mpm_motorz) from + * dropping the last response under HTTP/2 connection churn. + * (Checked before the want_read assert below: after receiving a + * GOAWAY nghttp2 may no longer want to read.) */ + if (!session->local.shutdown) { + h2_session_shutdown(session, 0, "done", 0); + } + transit(session, "remote goaway, streams drained", + H2_SESSION_ST_DONE); + break; + } ap_assert(nghttp2_session_want_read(session->ngh2)); if (!h2_session_want_send(session)) { /* Give any new incoming request a short grace period to diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index 497f7e1936..fab2001a99 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -91,7 +91,20 @@ typedef struct h2_session { struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */ struct h2_stream_monitor *monitor;/* monitor callbacks for streams */ - unsigned int open_streams; /* number of streams processing */ + unsigned int open_streams; /* number of streams processing. + * c1-thread only: written via + * h2_mplx_c1_stream_cleanup() on H2_SS_CLEANUP + * and read throughout h2_session_process(), + * all on the c1 connection thread. An async + * MPM (e.g. motorz) re-dispatches c1 to a + * fresh worker between events -- successive, + * never concurrent -- so no atomics/volatile + * are needed; the MPM pollset handoff + * establishes the happens-before. Decrements + * to 0 only after each stream's c2 has + * finished and flushed; the graceful-GOAWAY + * drain in h2_session_process() relies on + * this (see h2_session_ev_remote_goaway). */ unsigned int streams_done; /* number of http/2 streams handled */ unsigned int responses_submitted; /* number of http/2 responses submitted */ diff --git a/server/mpm/config2.m4 b/server/mpm/config2.m4 index e8ea42e14a..3de3dbc1d6 100644 --- a/server/mpm/config2.m4 +++ b/server/mpm/config2.m4 @@ -8,9 +8,9 @@ APACHE_HELP_STRING(--with-mpm=MPM,Choose the process model for Apache to use by default_mpm=$withval AC_MSG_RESULT($withval); ],[ - dnl Order of preference for default MPM: + dnl Order of preference for default MPM: dnl The Windows and OS/2 MPMs are used on those platforms. - dnl Everywhere else: event, worker, prefork + dnl Everywhere else: event, motorz, worker, prefork if ap_mpm_is_supported "winnt"; then default_mpm=winnt AC_MSG_RESULT(winnt) @@ -20,12 +20,15 @@ APACHE_HELP_STRING(--with-mpm=MPM,Choose the process model for Apache to use by elif ap_mpm_is_supported "event"; then default_mpm=event AC_MSG_RESULT(event) + elif ap_mpm_is_supported "motorz"; then + default_mpm=motorz + AC_MSG_RESULT(motorz - event is not supported) elif ap_mpm_is_supported "worker"; then default_mpm=worker - AC_MSG_RESULT(worker - event is not supported) + AC_MSG_RESULT(worker - event and motorz are not supported) else default_mpm=prefork - AC_MSG_RESULT(prefork - event and worker are not supported) + AC_MSG_RESULT(prefork - event, motorz and worker are not supported) fi ]) diff --git a/server/mpm/motorz/MOTORZ.README b/server/mpm/motorz/MOTORZ.README index bda07adb77..35d2f450d1 100644 --- a/server/mpm/motorz/MOTORZ.README +++ b/server/mpm/motorz/MOTORZ.README @@ -8,3 +8,260 @@ APR Thread pool is used. MotorZ uses Prefork as the framework and Simple for the actual event structure. + + +Threading model +=============== + +Each child process runs: + + - N poll threads ("pollers", PollersPerChild; default: one per online CPU, + capped). Each poller owns its OWN pollset, timer ring (+ guarding mutex) + and lock-free transaction-pool recycle list, so pollers never contend with + each other. A poller polls, dispatches ready I/O events and expired timers + to the worker pool, and (for the listener-owning poller) accepts. + - A shared APR thread pool of worker threads (ThreadsPerChild) that run the + actual connection/request processing pushed to them; workers never poll. + - A supervisor (the child's main thread) that watches MaxRequestsPerChild and + the pipe-of-death / generation, signalling the pollers to wind down, then + joins them on exit. + +A connection is sharded to one poller at accept time (round-robin) and bound to +it for its whole lifetime: it re-arms in and times out on that poller's +pollset/ring. (Its transaction pool is recycled back to the ACCEPTING poller's +free-list, which is the single consumer of that list -- recycling is not +sharded, only I/O is.) Multiple pollers lift the old single-poll-thread +throughput ceiling: accept + event dispatch + timer expiry now scale with +PollersPerChild instead of being serialized on one thread. + +Admission control (listener backpressure) keeps the child safe under overload: +when the worker pool saturates, the listener-owning poller removes the +listening sockets from its pollset (motorz_update_listeners / +motorz_disable_listeners) and re-adds them once the backlog drains, so the work +queue and per-connection memory stay bounded rather than growing without limit. + +Relationship to mpm_event +------------------------- + +MotorZ uses Prefork for process management and an APR thread pool for workers, +with pollers dispatching to that pool -- distinct from mpm_event's +listener/worker/fdqueue design where workers themselves re-arm a shared +thread-safe pollset. In practice, whether more pollers help depends on the +workload: if the worker threads are the CPU bottleneck (typical for real +request processing), the poll threads are not the limit and PollersPerChild +beyond 1-2 yields little. The multiple-poller design removes motorz's +*structural* single-thread ceiling, but per-box throughput is still governed by +worker CPU. For the broadest, most battle-tested high-concurrency async +behavior, mpm_event remains the reference; MotorZ stays a lean, self-contained +alternative. + + +HTTP/2 async handoff (ENABLED -- mod_http2 close-ordering fixed) +=============================================================== + +Status: motorz reports AP_MPMQ_IS_ASYNC = 1 and AP_MPMQ_CAN_WAITIO = 1 +(MOTORZ_ENABLE_ASYNC = 1 in motorz.c). Async was previously disabled to work +around a real defect in the interaction between motorz's async connection +handoff and mod_http2's stream lifecycle that dropped a small fraction of +requests under HTTP/2 connection churn. That defect has now been fixed in +mod_http2 (see "The fix" below), so async is safe to advertise: the c1 +connection is no longer closed until every secondary connection (c2) has +finished and flushed its response. CONN_STATE_ASYNC_WAITIO in +motorz_io_process() is now actually exercised by mod_http2 (it only requests +WAITIO of an async MPM). + +The symptom / root cause / diagnosis below are retained as the rationale for +the fix and as a guide if a regression ever reappears. + +Symptom +------- +Under HTTP/2 with many short-lived, rapidly churning connections (the worst case +is one request per connection at high concurrency, e.g. `h2load -n 30000 -c 50 +-m 1`), motorz intermittently dropped ~0.2-3% of requests. h2load reports them +as "Process Request Failure"; the responses are lost. All h2 error codes are 0 +(graceful), there is no TCP RST, no server-side 4xx/5xx, and -- crucially -- no +data race on motorz's own per-connection state (the claim/ownership model holds; +verified with ThreadSanitizer). mpm_event does NOT exhibit this; motorz does. +It is timing-sensitive (a Heisenbug): trace8 logging, lldb, and Guard Malloc all +slow the hot path enough to hide it; it reproduces under TSan and at info level. + +Root cause: a c1/c2 close-ordering race in mod_http2, exposed by async handoff +--------------------------------------------------------------------------- +mod_http2 runs the HTTP/2 master connection ("c1") on the MPM-provided thread +and runs each request stream on a SECONDARY connection ("c2") on its OWN worker +thread pool (h2_workers.c). A stream's response is produced by its c2 worker and +written out through c1. + +Two facts combine to make the bug: + + 1. A stream is considered "running" until its c2 worker calls c2_prod_done(), + which sets conn_ctx->done = 1 (h2_mplx.c). h2_mplx.c:stream_is_running() == + (started && !done). There is a WINDOW between the c2 submitting its final + output + EOS to its output beam (which lets c1/nghttp2 see the stream as + CLOSED -> CLEANUP) and the c2 worker actually returning and calling + c2_prod_done(). During that window the stream is CLOSED at the protocol + level but still "running". + + 2. When the c1 session ends (e.g. the client sends GOAWAY after its last + request -- normal for one-request-per-connection churn), h2_c1_run() + (h2_c1.c) sets c->cs->state = CONN_STATE_LINGER for the ST_DONE/ST_CLEANUP + session states. The MPM then performs a lingering close, whose pre_close + hook (ap_prep_lingering_close -> ap_run_pre_close_connection -> h2_c1_pre_close + -> h2_session_pre_close) sends GOAWAY and tears the session down. + m_stream_cleanup() (h2_mplx.c), for any stream still "running" at that + moment, calls h2_c2_abort() -- which aborts the c2 output beam + ("(53)Software caused connection abort"), discarding the in-flight response. + +So if the c1 close runs DURING the window in (1), the just-finished stream's +response is aborted instead of flushed. mpm_event keeps the c1 connection +scheduled such that the c2 reaches done=1 first (its trace shows nearly all +streams "c2 is done, move to spurge", almost none "c2 is running, abort"). +motorz, dispatching c1 work to a fresh pool thread on each async re-entry, +drives the close ahead of c2_prod_done far more often (many "c2 is running, +abort", and GOAWAYs logged with reason='timeout' -- the reason string is just +"state was IDLE at close", NOT an actual timeout: elapsed times were ~tens of ms +against a 10s Timeout). + +Why disabling async fixes it +---------------------------- +With AP_MPMQ_IS_ASYNC = 0, mod_http2's h2_c1_run() does NOT return the c1 +connection to the MPM between requests. Instead it LOOPS on h2_session_process(), +holding one worker thread and driving mod_http2's own multiplexer pollset +(h2_mplx_c1_poll, which polls both the c1 socket and the c2 beam pipes) until the +session is genuinely done -- i.e. until every stream's c2 has completed and its +output has been written. The early c1 close-vs-c2 race cannot occur because the +same thread that would close the connection is the one pumping the c2 output to +completion first. Measured: zero dropped requests, and HTTP/2 throughput is +unchanged (mod_http2 pins a worker per active c1 connection either way). + +Cost of the workaround +---------------------- +The trade-off is HTTP/1.1, not HTTP/2. With async on, an idle keep-alive +connection is handed back to the MPM and frees its worker; with async off it +holds a worker for the connection's lifetime, so the count of concurrent +kept-alive HTTP/1.1 connections is bounded by ThreadsPerChild. (No requests are +lost -- HTTP/1.1 correctness is unaffected; only keep-alive worker-occupancy +scaling regresses.) This is an acceptable trade to be correct under HTTP/2; +revisit if/when motorz targets large HTTP/1.1 keep-alive fan-out. + +The fix (implemented in mod_http2) +---------------------------------- +The fix lives in mod_http2, not motorz, and follows the second candidate +approach above: never let the c1 session reach DONE -> LINGER (which is what +lets the MPM close the connection) while a stream it is processing still has a +c2 whose response has not been written. The invariant established is: "a c1 +connection is only closed once every stream's c2 has finished and its output is +flushed." Two small changes in h2_session.c do this for the path that actually +triggered the loss -- the client's graceful GOAWAY: + + * h2_session_ev_remote_goaway(): a graceful GOAWAY (error code 0) with streams + still in flight (open_streams > 0) no longer transits straight to + H2_SESSION_ST_DONE. It records the remote shutdown, RSTs only the + unprocessed streams (as before), and keeps the session running so the + in-flight streams complete and their c2 output is written. (An error GOAWAY, + or one with no streams in flight, still goes to DONE immediately.) This also + matches RFC 9113: a peer GOAWAY stops new streams, it does not abort streams + at or below its last-stream-id. + + * H2_SESSION_ST_IDLE handling in h2_session_process(): once those streams have + drained (open_streams == 0) and the remote has shut down, the session sends + our GOAWAY and goes to DONE from IDLE instead of parking the connection back + on the MPM to wait for new streams that a departing client will never open. + Reaching DONE only here -- after the c2s are done and flushed -- is what + keeps the close from racing an in-flight c2. + +With this in place the abort-on-close in m_stream_cleanup()/h2_mplx_c1_destroy() +is no longer reached while a c2's response is unsent, so MOTORZ_ENABLE_ASYNC=1 +is lossless under churn. The churn regression (server/mpm/motorz/test/smoke.sh +and run-http2.sh) asserts it at n=.. c=50 m=1, and the async assertions there +have been inverted to expect async ON (CONN_STATE_ASYNC_WAITIO arms / "returning +to mpm c1 monitoring" appears). + +Measuring the regression correctly (two pitfalls) +................................................. +The committed tests learned two lessons the hard way; preserve them if you edit: + + 1. Assert on RESPONSE LOSS, not on h2load's "failed" total. h2load's "failed" + counts BOTH connection-establishment errors AND dropped responses: + failed = (total - started) + (started - succeeded) + \__ connection setup _/ \__ THIS bug ___/ + The first term is environmental -- ephemeral-port exhaustion and accept- + queue pressure when hammering loopback at high -c (it appears with the fix, + without the fix, and even on mpm_event; on a busy macOS loopback a few + hundred per 30000 is normal). Only the second term, started - succeeded + (responses dropped on connections that DID start), is the close-ordering + bug. The tests compute `lost = started - succeeded` and assert that is 0; + asserting "failed == 0" or "started == total" gives flaky failures that + have nothing to do with this fix. + + 2. Measure at LogLevel info, NOT trace8. This is a Heisenbug: trace8 slows the + hot path enough to hide it (the same reason it hides under lldb / Guard + Malloc, noted below). A churn assertion run under trace8 passes even with + the fix deliberately removed -- i.e. it is vacuous. smoke.sh runs at trace8 + for its state-machine traces, so its churn check is only a gross sanity + pass; the load-bearing, non-vacuous churn regression is the one in + run-http2.sh, which runs at info. (Verified: defeating the fix and + re-running at info brings response loss back; under trace8 it does not.) + +How to trigger the bug (step by step) +------------------------------------- +The bug is masked while async is OFF, so you must first turn it back on: + + 1. Re-enable async: set MOTORZ_ENABLE_ASYNC to 1 in motorz.c, rebuild + (`make`). (motorz_query will now report AP_MPMQ_IS_ASYNC=1 and + AP_MPMQ_CAN_WAITIO=1, so mod_http2 takes the async c1 hand-back path.) + + 2. Build at a NORMAL/fast level. Do NOT use trace8/trace2 logging, lldb, or + Guard Malloc while trying to TRIGGER it -- the bug is a Heisenbug and any + of those slow the hot path enough to hide it. (Use trace2 only afterwards + to observe the close path, see below.) + + 3. Config that maximizes the race (TLS vhost with `Protocols h2 http/1.1`): + StartServers 1 + PollersPerChild 1 ; fewer pollers => worse (more serialized on + ; poller 0), so 1 is the most reliable trigger + ThreadsPerChild 128 ; large, so the failures are NOT worker + ; starvation -- rules that out as a cause + ThreadLimit 128 + LogLevel info + (Any document works; a tiny static file is fine.) + + 4. Drive maximum HTTP/2 CONNECTION CHURN with h2load -- the trigger is many + short connections each doing very few streams, NOT many streams per conn: + + h2load -n 30000 -c 50 -m 1 https://localhost:PORT/ + + -m 1 (one stream per connection) is the strongest trigger; -m 25 still + fails sometimes; -m >= 50 essentially hides it. Run it 5-10 times: roughly + 1 in 3-5 runs drops a few hundred requests (h2load: "Process Request + Failure", "failed"/"errored" > 0, and "started" < "total"). Higher -c (e.g. + 100) raises the hit rate. The committed test harness encodes this: + `server/mpm/motorz/test/smoke.sh` and `run-http2.sh` (which currently + assert 0 failures *because async is off* -- with async on they will fail, + which IS the reproduction). + + 5. The failures are graceful: no TCP RST, no 4xx/5xx, no crash -- just lost + responses. (Very rarely the child SIGBUSes; that is the same race hitting + freed memory, not a separate bug.) + +How it was diagnosed (for whoever picks this up) +------------------------------------------------ + * Confirm the close path: with async on, LogLevel trace2 (GLOBAL -- a + per-module "http2:trace2" spec does NOT emit on this build), small runs + (e.g. n=2000) until a fail, then grep the failing connection id for + "c2 is running, abort", "Software caused connection abort", and + "GOAWAY[... reason='timeout'". Those three together are the signature. + * Confirm it is the async handoff (not anything else): flip + MOTORZ_ENABLE_ASYNC back to 0 -> failures vanish entirely (0/N runs). That + single toggle is the proof, and is the shipped workaround. + * Confirm it is NOT a motorz data race: ThreadSanitizer build + (`make EXTRA_CFLAGS="-fsanitize=thread -g -O1" LDFLAGS="-fsanitize=thread"`, + run with `TSAN_OPTIONS=log_path=...`) shows no race on motorz's scon + fields; the only h2 races are the by-design c2->aborted signal flag and + benign lock-free free-list idioms. + * Contrast with mpm_event: run the identical h2load churn against event -> + 0 failures, and its trace shows nearly all streams "c2 is done, move to + spurge" (clean) vs motorz's many "c2 is running, abort". + * Earlier dead ends (do NOT re-chase): the scoreboard slot-0 contention and + the c2->cs conn_state aliasing were both real races but NOT the cause -- + fixing either did not stop the dropped requests. diff --git a/server/mpm/motorz/motorz.c b/server/mpm/motorz/motorz.c index 8feff2965c..8b5b4fb62b 100644 --- a/server/mpm/motorz/motorz.c +++ b/server/mpm/motorz/motorz.c @@ -16,13 +16,76 @@ #include "motorz.h" +/* Upper bound on the number of transaction pools kept on the recycle + * free-list (motorz_ptrans_get/put). Beyond this, freed pools are destroyed + * outright so a burst of connections does not pin memory forever. + */ +#define MAX_RECYCLED_POOLS 64 + +/* Lingering close timeouts. Not exported by the core, so mirror the values + * connection.c uses (also mirrored this way by mpm_event). MAX_SECS_TO_LINGER + * bounds the whole non-blocking drain; SECONDS_TO_LINGER is the shortened + * period used when a module requested it (e.g. DoS mitigation). + */ +#ifndef MAX_SECS_TO_LINGER +#define MAX_SECS_TO_LINGER 30 +#endif +#define SECONDS_TO_LINGER 2 + /** * config globals */ static motorz_core_t *g_motorz_core; static int threads_per_child = 16; static int ap_num_kids = DEFAULT_START_DAEMON; -static int thread_limit = MAX_THREAD_LIMIT/10; +/* Number of poll threads per child (#2 / scaling). 0 means "auto": derive from + * online CPUs in child_main, capped so a small box doesn't over-thread. Each + * poller owns its own pollset/timer-ring/recycle-list and a shard of the + * connections, lifting the single-poll-thread throughput ceiling. + */ +static int num_pollers = 0; +#define MOTORZ_MAX_POLLERS 8 + +/* Async HTTP/2 handoff is ENABLED (MOTORZ_ENABLE_ASYNC 1). + * + * When motorz advertises AP_MPMQ_IS_ASYNC=1, mod_http2 hands the master (c1) + * connection back to the MPM between requests; motorz then re-dispatches it on + * a fresh worker thread when its socket is readable. This previously raced + * mod_http2's stream lifecycle under rapid HTTP/2 connection churn: motorz + * could drive the c1 close/cleanup faster than a just-finished stream's + * secondary (c2) worker called c2_prod_done(), so the stream was still + * "running" at cleanup and its in-flight response got aborted -- the client + * saw a dropped request. + * + * That race is now FIXED in mod_http2 (h2_session.c): a graceful client GOAWAY + * with streams still in flight no longer transits the session straight to DONE; + * the session keeps running until those streams' c2s have finished and flushed + * (open_streams == 0), and only then -- from the IDLE state -- sends our GOAWAY + * and closes. The c1 connection is therefore handed to LINGER only after every + * c2 is done, so async handoff is lossless under churn. The full analysis, + * reproduction recipe, and the fix are in MOTORZ.README ("HTTP/2 async + * handoff"). + * + * This remains a single flip point: set to 0 to fall back to the old workaround + * (report IS_ASYNC=0 so mod_http2 keeps c1 on one worker, driving its own + * multiplexer pollset until every c2 completes) should a regression ever + * reappear. It gates both AP_MPMQ_IS_ASYNC and AP_MPMQ_CAN_WAITIO + * (CONN_STATE_ASYNC_WAITIO is only meaningful when async). + */ +#define MOTORZ_ENABLE_ASYNC 1 +/* Upper bound for ThreadsPerChild; matches worker/event in using + * DEFAULT_THREAD_LIMIT rather than an arbitrary fraction of MAX_THREAD_LIMIT. + */ +static int thread_limit = DEFAULT_THREAD_LIMIT; + +/* Unique connection ID: child_slot * thread_limit + per-child sequence number. + * Mirrors the formula used by the worker and event MPMs so that c->id values + * are globally unique across children and connections within a child. + * conn_seq is a per-child atomic counter; thread_limit slots per child ensures + * no overlap between children. + */ +#define ID_FROM_CHILD_THREAD(c, t) ((long)(c) * (long)thread_limit + (long)(t)) +static apr_uint32_t conn_seq = 0; /* one_process --- debugging mode variable; can be set from the command line * with the -X flag. If set, this gets you the child_main loop running @@ -42,6 +105,20 @@ static apr_pool_t *pchild; /* Pool for httpd child stuff */ static pid_t ap_my_pid; /* it seems silly to call getpid all the time */ static pid_t parent_pid; static int my_child_num; +/* Number of connections accepted by this child so far; compared against + * ap_max_requests_per_child. Written by the accepting poller thread + * (motorz_io_accept) and read by the supervisor on the main thread + * (motorz_supervise). volatile ensures neither side caches a stale value; + * a torn read is harmless for a monotone counter used only for a soft cap. + */ +static volatile int requests_this_child; +/* Set to stop the child's main loop. volatile because it's updated from a + * signal handler (stop_listening), from poller threads, and from the + * supervisor. On ARM (Apple Silicon) the poller->mtx lock/unlock performed + * on every poll-loop iteration provides acquire/release barriers, so the + * practical visibility lag is bounded by the 500ms poll timeout at worst. + */ +static int volatile die_now = 0; static motorz_child_bucket *all_buckets, /* All listeners buckets */ *my_bucket; /* Current child bucket */ @@ -49,23 +126,133 @@ static void clean_child_exit(int code) __attribute__ ((noreturn)); static apr_status_t motorz_io_process(motorz_conn_t *scon); -static void clean_child_exit(int code) __attribute__ ((noreturn)); - -static apr_pollset_t *motorz_pollset; -static apr_skiplist *motorz_timer_ring; +static void motorz_pollset_del(motorz_poller_t *poller, motorz_conn_t *scon); +static void motorz_conn_claim(motorz_poller_t *poller, motorz_conn_t *scon); +static void motorz_conn_done(motorz_conn_t *scon); +static void motorz_start_lingering_close(motorz_conn_t *scon); +static apr_status_t motorz_lingering_close(motorz_conn_t *scon); +static void motorz_update_listeners(motorz_poller_t *poller); static motorz_core_t *motorz_core_get(void) { return g_motorz_core; } +/* Obtain a transaction pool for a new connection, reusing one from the + * recycle free-list if available, otherwise creating a fresh one with its + * own allocator (so per-connection memory is released as a unit and the + * allocator's free blocks can be reused). + * + * SINGLE-CONSUMER: the lock-free CAS pop below is only safe with one popper, + * because it dereferences first->next without atomicity (see mpm_fdqueue.c's + * ap_queue_info_pop_pool and its PR caveat). This MUST be called only from the + * owning poller's thread (its sole caller is motorz_io_accept, which runs on + * that poller). Each poller has its own free-list, so "one popper" holds. + * Concurrent lock-free pushes (motorz_ptrans_put, from any worker) are fine. + */ +static apr_pool_t *motorz_ptrans_get(motorz_poller_t *poller) +{ + apr_pool_t *ptrans; + + for (;;) { + motorz_recycled_pool *first = poller->recycled_pools; + if (first == NULL) { + break; + } + if (apr_atomic_casptr((void *)&poller->recycled_pools, + first->next, first) == first) { + apr_atomic_dec32(&poller->num_recycled); + /* The node lived inside the pool it describes; the pool is now + * ours to hand out (it will be cleared again on next reuse). + */ + return first->pool; + } + /* CAS lost a race with another pop... but there is only one popper, so + * this only happens transiently vs. a push changing the head; retry. + */ + } + + { + apr_allocator_t *allocator; + apr_allocator_create(&allocator); + apr_allocator_max_free_set(allocator, ap_max_mem_free); + apr_pool_create_ex(&ptrans, pconf, NULL, allocator); + apr_allocator_owner_set(allocator, ptrans); + apr_pool_tag(ptrans, "transaction"); + } + return ptrans; +} + +/* Return a finished connection's transaction pool to the recycle free-list, + * or destroy it if the list is already at MAX_RECYCLED_POOLS. Clearing the + * pool runs all its cleanups (closing the socket, de-registering timers) and + * resets it for reuse. + * + * MULTI-PRODUCER: the lock-free CAS push is safe from any thread (workers and + * the poll thread), concurrently with each other and with a single popper. + */ +static void motorz_ptrans_put(motorz_poller_t *poller, apr_pool_t *ptrans) +{ + motorz_recycled_pool *node; + + /* Bound the free-list. apr_atomic_read32 + inc is not a strict CAS, so the + * count may momentarily overshoot MAX_RECYCLED_POOLS under concurrency; + * that is harmless (it just caps roughly). + */ + if (apr_atomic_read32(&poller->num_recycled) >= MAX_RECYCLED_POOLS) { + apr_pool_destroy(ptrans); + return; + } + apr_atomic_inc32(&poller->num_recycled); + + /* Clear (don't destroy) to keep the allocator and its free blocks; this + * also runs the pool's cleanups (closing the socket, de-registering any + * timer). Then carve the list node out of the now-empty pool. + */ + apr_pool_clear(ptrans); + apr_pool_tag(ptrans, "transaction"); + node = apr_palloc(ptrans, sizeof(*node)); + node->pool = ptrans; + + for (;;) { + /* Save the current head in a local before the CAS: node->next must not + * be re-read after a successful CAS, as a concurrent pusher may have + * already changed it (see mpm_fdqueue.c push_pool, PR 44402). + */ + motorz_recycled_pool *next = poller->recycled_pools; + node->next = next; + if (apr_atomic_casptr((void *)&poller->recycled_pools, node, next) == next) { + break; + } + } +} + static int timer_comp(void *a, void *b) { - apr_time_t t1 = (apr_time_t) (((motorz_timer_t *) a)->expires); - apr_time_t t2 = (apr_time_t) (((motorz_timer_t *) b)->expires); + motorz_timer_t *ta = (motorz_timer_t *) a; + motorz_timer_t *tb = (motorz_timer_t *) b; + apr_time_t t1 = ta->expires; + apr_time_t t2 = tb->expires; AP_DEBUG_ASSERT(t1); AP_DEBUG_ASSERT(t2); - return ((t1 < t2) ? -1 : 1); + /* Identity match: required so that apr_skiplist_remove() (which relies on + * the compare function returning 0) can locate the exact timer node. We + * must never return 0 for two *distinct* timers, otherwise + * apr_skiplist_insert() would drop duplicates (timers created within the + * same microsecond) and remove() could delete the wrong connection's + * timer. Equal expiry on distinct timers therefore falls back to a stable + * total order on the timer address. + */ + if (ta == tb) { + return 0; + } + if (t1 < t2) { + return -1; + } + if (t1 > t2) { + return 1; + } + return (ta < tb) ? -1 : 1; } static apr_status_t motorz_conn_pool_cleanup(void *baton) @@ -73,11 +260,11 @@ static apr_status_t motorz_conn_pool_cleanup(void *baton) motorz_conn_t *scon = (motorz_conn_t *)baton; if (scon->timer.expires) { - motorz_core_t *mz = scon->mz; + motorz_poller_t *poller = scon->poller; - apr_thread_mutex_lock(mz->mtx); - apr_skiplist_remove(mz->timeout_ring, &scon->timer, NULL); - apr_thread_mutex_unlock(mz->mtx); + apr_thread_mutex_lock(poller->mtx); + apr_skiplist_remove(poller->timeout_ring, &scon->timer, NULL); + apr_thread_mutex_unlock(poller->mtx); } return APR_SUCCESS; @@ -110,31 +297,53 @@ static void motorz_io_timeout_cb(motorz_core_t *mz, void *baton) motorz_conn_t *scon = (motorz_conn_t *) baton; conn_rec *c = scon->c; - scon->cs.state = CONN_STATE_LINGER; - ap_lingering_close(c); - ap_log_error(APLOG_MARK, APLOG_WARNING, 0, ap_server_conf, APLOGNO(02842) - "io timeout hit (?) scon: %pp, c: %pp", scon, c); + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02842) + "io timeout hit scon: %pp, c: %pp", scon, c); + + /* The keep-alive/write timeout expired. Begin a non-blocking lingering + * close rather than blocking this worker; scon is handed to the poll loop + * or torn down inside, and is invalid afterwards. The timer has already + * been popped from the ring by the caller. + */ + motorz_start_lingering_close(scon); } static void *motorz_io_setup_conn(apr_thread_t *thread, void *baton) { apr_status_t status; ap_sb_handle_t *sbh; - long conn_id = 0; + long conn_id; motorz_sb_t *sb; motorz_conn_t *scon = (motorz_conn_t *) baton; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03316) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03316) "motorz_io_setup_conn(): entered"); - ap_create_sb_handle(&sbh, scon->pool, 0, 0); + /* Derive a unique connection ID matching worker/event's formula. + * apr_atomic_inc32 returns the value BEFORE increment, so add 1 to get + * the sequence number for this connection (sequence starts at 1). + * my_child_num is set once at child startup and read-only from here. + */ + conn_id = ID_FROM_CHILD_THREAD(my_child_num, + (apr_uint32_t)apr_atomic_inc32(&conn_seq) + 1); + ap_create_sb_handle(&sbh, scon->pool, my_child_num, 0); scon->sbh = sbh; scon->ba = apr_bucket_alloc_create(scon->pool); scon->c = ap_run_create_connection(scon->pool, ap_server_conf, scon->sock, conn_id, sbh, scon->ba); - /* XXX: handle failure */ + if (scon->c == NULL) { + /* create_connection failed (e.g. a module declined or hit a resource + * limit). There is no conn_rec to process or linger-close; just + * release the transaction pool, which closes the accepted socket via + * its pool cleanup. + */ + ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf, APLOGNO(10547) + "motorz_io_setup_conn: ap_run_create_connection failed"); + motorz_conn_done(scon); + return NULL; + } scon->c->cs = &scon->cs; sb = apr_pcalloc(scon->pool, sizeof(motorz_sb_t)); @@ -153,77 +362,138 @@ static void *motorz_io_setup_conn(apr_thread_t *thread, void *baton) ap_update_vhost_given_ip(scon->c); status = ap_pre_connection(scon->c, scon->sock); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03317) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03317) "motorz_io_setup_conn(): did pre-conn"); if (status != OK && status != DONE) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(02843) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(02843) "motorz_io_setup_conn: connection aborted"); } + /* pfd is initialized here to ensure reqevents == 0, so the defensive + * pollset_remove guard in motorz_io_process is a no-op on this first call. + */ + scon->pfd.reqevents = 0; scon->cs.state = CONN_STATE_PROCESSING; scon->cs.sense = CONN_SENSE_DEFAULT; status = motorz_io_process(scon); - if (1) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, status, ap_server_conf, APLOGNO(02844) - "motorz_io_setup_conn: motorz_io_process status: %d", (int)status); - } + ap_log_error(APLOG_MARK, APLOG_TRACE8, status, ap_server_conf, APLOGNO(02844) + "motorz_io_setup_conn: motorz_io_process status: %d", (int)status); return NULL; } -static apr_status_t motorz_io_user(motorz_core_t *mz, motorz_sb_t *sb) +static apr_status_t motorz_io_user(motorz_poller_t *poller, motorz_sb_t *sb) { - /* TODO */ + /* PT_USER poll events are not implemented yet. Nothing currently + * registers a PT_USER descriptor in the pollset, so reaching here means + * an unexpected event; log it rather than silently dropping it. + */ + ap_log_error(APLOG_MARK, APLOG_WARNING, 0, ap_server_conf, APLOGNO(10548) + "motorz_io_user: PT_USER poll events are not implemented"); return APR_SUCCESS; } -static apr_status_t motorz_io_accept(motorz_core_t *mz, motorz_sb_t *sb) +static apr_status_t motorz_io_accept(motorz_poller_t *poller, motorz_sb_t *sb) { + motorz_core_t *mz = poller->mz; apr_status_t rv; apr_pool_t *ptrans; - apr_socket_t *socket; + apr_socket_t *socket = NULL; ap_listen_rec *lr = (ap_listen_rec *) sb->baton; - apr_allocator_t *allocator; + motorz_conn_t *scon; + + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03318) + "motorz_io_accept(): entered"); + + /* Drain the kernel accept queue in one poll wakeup instead of returning + * to apr_pollset_poll() for each connection. Without this, N queued + * connections require N round-trips through the poll loop, costing O(N) + * wakeups under burst. The loop stops when accept() returns EAGAIN (queue + * empty), on a fatal error, when admission control disables the listener, + * or when the child is shutting down. + * + * ap_unixd_accept() outcome buckets: + * - APR_SUCCESS + socket set: a connection was accepted; + * - APR_EGENERAL: fatal/resource condition (E[MN]FILE, ENETDOWN, etc.) -- + * stop gracefully rather than spin; + * - any other non-success (EAGAIN, EINTR, ECONNABORTED, ...): transient, + * log and stop draining. + * socket == NULL on every non-SUCCESS path. + */ + do { + ptrans = motorz_ptrans_get(poller); + socket = NULL; + rv = lr->accept_func((void *)&socket, lr, ptrans); + + if (rv == APR_SUCCESS && socket != NULL) { + static apr_uint32_t rr; + motorz_poller_t *target; + + scon = apr_pcalloc(ptrans, sizeof(motorz_conn_t)); + scon->pool = ptrans; + scon->sock = socket; + scon->mz = mz; + + /* Shard I/O across pollers round-robin. The accepting poller is + * always poller 0, so this counter needs no atomics. + */ + target = mz->pollers[rr % (apr_uint32_t)mz->num_pollers]; + rr++; + scon->poller = target; - apr_allocator_create(&allocator); - apr_allocator_max_free_set(allocator, ap_max_mem_free); - apr_pool_create_ex(&ptrans, pconf, NULL, allocator); - apr_allocator_owner_set(allocator, ptrans); - apr_pool_tag(ptrans, "transaction"); + /* Recycling is NOT sharded: the ptrans came from THIS poller's + * free-list (its single-consumer pop home). Return it here. + */ + scon->pool_poller = poller; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03318) - "motorz_io_accept(): entered"); + requests_this_child++; - rv = lr->accept_func((void *)&socket, lr, ptrans); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO(02845) - "motorz_io_accept failed"); - clean_child_exit(APEXIT_CHILDSICK); - } - else if (ap_accept_error_is_nonfatal(rv)) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, - "accept() on client socket failed"); - } + apr_pool_cleanup_register(scon->pool, scon, motorz_conn_pool_cleanup, + apr_pool_cleanup_null); - else { - motorz_conn_t *scon = apr_pcalloc(ptrans, sizeof(motorz_conn_t)); - scon->pool = ptrans; - scon->sock = socket; - scon->mz = mz; + rv = apr_thread_pool_push(mz->workers, + motorz_io_setup_conn, + scon, + APR_THREAD_TASK_PRIORITY_HIGHEST, NULL); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, + APLOGNO(03319) + "motorz_io_accept: could not queue connection to " + "worker pool"); + motorz_ptrans_put(poller, ptrans); + } - apr_pool_cleanup_register(scon->pool, scon, motorz_conn_pool_cleanup, - apr_pool_cleanup_null); + /* Re-check admission after each accept: if the worker pool has + * become saturated, motorz_update_listeners() will remove the + * listener from the pollset and set listeners_disabled, which + * terminates the drain loop below. + */ + motorz_update_listeners(poller); + } + else { + /* Nothing accepted (EAGAIN/EINTR/error): recycle the pool. */ + motorz_ptrans_put(poller, ptrans); + + if (rv == APR_EGENERAL) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, + APLOGNO(02845) + "motorz_io_accept: accept failed, shutting down " + "child gracefully"); + mz->mpm->mpm_state = AP_MPMQ_STOPPING; + die_now = 1; + } + else if (ap_accept_error_is_nonfatal(rv)) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, + APLOGNO(10549) + "accept() on client socket failed"); + } - rv = apr_thread_pool_push(mz->workers, - motorz_io_setup_conn, - scon, - APR_THREAD_TASK_PRIORITY_HIGHEST, NULL); - } - ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO(03319) - "motorz_io_accept(): exited: %d", (int)rv); + break; + } + } while (!poller->listeners_disabled && !die_now); - return rv; + return APR_SUCCESS; } static void *motorz_timer_invoke(apr_thread_t *thread, void *baton) @@ -233,26 +503,37 @@ static void *motorz_timer_invoke(apr_thread_t *thread, void *baton) scon->c->current_thread = thread; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03320) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03320) "motorz_timer_invoke(): entered"); ep->cb(ep->mz, ep->baton); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03321) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03321) "motorz_timer_invoke(): exited"); return NULL; } -static apr_status_t motorz_timer_event_process(motorz_core_t *mz, motorz_timer_t *te) +static apr_status_t motorz_timer_event_process(motorz_poller_t *poller, motorz_timer_t *te) { motorz_conn_t *scon = (motorz_conn_t *)te->baton; scon->timer.expires = 0; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03322) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03322) "motorz_timer_event_process(): entered"); - return apr_thread_pool_push(mz->workers, + /* Claim the connection on the poll thread before dispatching the timeout + * (fix #5). The timer has already been popped from the ring by the caller + * (so there is nothing to remove there -- and we must not take poller->mtx + * here as the caller holds it), but the connection's descriptor may still + * be armed in the pollset; disarm it so a concurrent/subsequent poll + * cannot dispatch the same scon while the timeout worker is closing it. + * apr_pollset_remove() takes only the (leaf) pollset lock, so calling it + * under poller->mtx introduces no lock-ordering inversion. + */ + motorz_pollset_del(poller, scon); + + return apr_thread_pool_push(poller->mz->workers, motorz_timer_invoke, te, APR_THREAD_TASK_PRIORITY_NORMAL, NULL); } @@ -263,22 +544,37 @@ static void *motorz_io_invoke(apr_thread_t *thread, void *baton) motorz_conn_t *scon = (motorz_conn_t *) sb->baton; apr_status_t rv; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03323) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03323) "motorz_io_invoke(): entered"); scon->c->current_thread = thread; rv = motorz_io_process(scon); if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO(02846) + ap_log_error(APLOG_MARK, APLOG_TRACE8, rv, ap_server_conf, APLOGNO(02846) "motorz_io_invoke: motorz_io_process failed (?)"); } return NULL; } -static apr_status_t motorz_io_event_process(motorz_core_t *mz, motorz_sb_t *sb) +static apr_status_t motorz_io_event_process(motorz_poller_t *poller, motorz_sb_t *sb) { - return apr_thread_pool_push(mz->workers, + motorz_conn_t *scon = (motorz_conn_t *) sb->baton; + + /* Take ownership of this connection on the poll thread before handing it + * to a worker: disarm its pollset entry and cancel any pending timeout + * (fix #5). This guarantees the poll thread cannot dispatch the same scon + * again -- neither re-reported by the pollset nor via timer expiry -- + * until the worker re-arms it at the end of motorz_io_process(). Without + * this, two workers could race on one scon and, now that the transaction + * pool is freed on teardown, that race is a use-after-free. + * + * The identity-correct timer_comp (fix #3) is what makes the targeted + * skiplist removal reliable. + */ + motorz_conn_claim(poller, scon); + + return apr_thread_pool_push(poller->mz->workers, motorz_io_invoke, sb, APR_THREAD_TASK_PRIORITY_NORMAL, NULL); } @@ -286,107 +582,411 @@ static apr_status_t motorz_io_event_process(motorz_core_t *mz, motorz_sb_t *sb) static apr_status_t motorz_io_callback(void *baton, const apr_pollfd_t *pfd) { apr_status_t status = APR_SUCCESS; - motorz_core_t *mz = (motorz_core_t *) baton; + motorz_poller_t *poller = (motorz_poller_t *) baton; motorz_sb_t *sb = pfd->client_data; if (sb->type == PT_ACCEPT) { - status = motorz_io_accept(mz, sb); + status = motorz_io_accept(poller, sb); } else if (sb->type == PT_CSD) { - status = motorz_io_event_process(mz, sb); + status = motorz_io_event_process(poller, sb); } else if (sb->type == PT_USER) { - status = motorz_io_user(mz, sb); + status = motorz_io_user(poller, sb); } return status; } -static void motorz_register_timeout(motorz_conn_t *scon, - motorz_timer_cb cb, - apr_interval_time_t relative_time) +/* Insert/refresh scon's timer in the ring. CALLER MUST HOLD mz->mtx. + * + * Everything that touches the sort key (expires) and the ring must happen + * under mz->mtx. In particular: + * + * - If this connection's timer is still linked in the ring from an earlier + * registration (expires != 0 is, under the lock, exactly the "in ring" + * predicate), remove it first -- using its *current* expiry as the key, + * before we overwrite it. + * - Only then mutate expires and re-insert. + * + * Re-inserting the same node, or mutating a linked node's sort key in place, + * corrupts the skiplist and sends apr_skiplist_insert()'s insert_compare() + * into an infinite loop *while holding mz->mtx*, which deadlocks the entire + * child. (Found by a load test with StartServers 1 and MaxRequestsPerChild + * churn.) + */ +static void motorz_register_timeout_locked(motorz_conn_t *scon, + motorz_timer_cb cb, + apr_interval_time_t relative_time) { apr_time_t t = apr_time_now() + relative_time; motorz_timer_t *elem = &scon->timer; - motorz_core_t *mz = scon->mz; + motorz_poller_t *poller = scon->poller; + + if (elem->expires) { + apr_skiplist_remove(poller->timeout_ring, elem, NULL); + } elem->expires = t; elem->cb = cb; elem->baton = scon; elem->pool = scon->pool; - elem->mz = mz; + elem->mz = poller->mz; + elem->poller = poller; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03324) - "motorz_register_timer(): insert ELEM: %pp", elem); - - apr_thread_mutex_lock(mz->mtx); #ifdef AP_DEBUG - ap_assert(apr_skiplist_insert(mz->timeout_ring, elem)); + ap_assert(apr_skiplist_insert(poller->timeout_ring, elem)); #else - apr_skiplist_insert(mz->timeout_ring, elem); + apr_skiplist_insert(poller->timeout_ring, elem); #endif - apr_thread_mutex_unlock(mz->mtx); +} + +/* Hand a connection back to the poll thread: arm its pollset entry for + * 'reqevents' AND register its timeout, atomically under mz->mtx. This is the + * ONLY safe way for a worker to release a connection it still holds a pointer + * to: once either the timer or the pollset entry is armed, the poll thread may + * fire the timeout (or a readable event) and tear the connection down -- + * freeing scon. Doing both under one lock, and touching scon nowhere after + * this returns, closes the use-after-free window. MUST be the worker's last + * action on scon; returns the pollset_add status (scon may already be freed + * on a concurrent timeout by the time we look at the return, so the caller + * must not deref scon regardless of it). + */ +static apr_status_t motorz_conn_register(motorz_conn_t *scon, + apr_int16_t reqevents, + motorz_timer_cb cb, + apr_interval_time_t timeout) +{ + motorz_poller_t *poller = scon->poller; + apr_status_t rv; + + apr_thread_mutex_lock(poller->mtx); + scon->pfd.reqevents = reqevents; + scon->cs.sense = CONN_SENSE_DEFAULT; + motorz_register_timeout_locked(scon, cb, timeout); + rv = apr_pollset_add(poller->pollset, &scon->pfd); + if (rv != APR_SUCCESS) { + /* Roll back the timer so the half-armed connection isn't left + * reachable via the ring with no pollset entry; the caller will tear + * it down. + */ + if (scon->pfd.reqevents != 0) { + scon->pfd.reqevents = 0; + } + apr_skiplist_remove(poller->timeout_ring, &scon->timer, NULL); + scon->timer.expires = 0; + } + apr_thread_mutex_unlock(poller->mtx); + return rv; +} + +/* Remove scon's descriptor from the pollset if it is currently armed, and + * mark it disarmed. Does NOT touch the timer ring. Safe to call without + * holding mz->mtx: the pollset is created APR_POLLSET_THREADSAFE, and APR's + * pollset lock is never held while acquiring mz->mtx (or vice versa), so no + * lock-ordering inversion is possible. + * + * Some pollset backends (kqueue, epoll) automatically drop a descriptor when + * its socket is closed, so APR_NOTFOUND is an acceptable, non-error result. + */ +static void motorz_pollset_del(motorz_poller_t *poller, motorz_conn_t *scon) +{ + if (scon->pfd.reqevents != 0) { + apr_status_t rv = apr_pollset_remove(poller->pollset, &scon->pfd); + if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, rv, ap_server_conf, + "motorz_pollset_del: apr_pollset_remove failure"); + } + scon->pfd.reqevents = 0; + } +} + +/* Claim a connection on behalf of a worker, on the poll/main thread, before + * dispatching it. This is the heart of the per-connection ownership model + * (fix #5): it makes the connection invisible to the poll thread for as long + * as a worker owns it, so the same scon can never be dispatched twice (once + * for an I/O event and again for a timeout, or re-reported by a level- + * triggered pollset before the worker has run). + * + * It removes scon's descriptor from the pollset and cancels any pending + * timeout. The worker re-arms the connection (pollset_add + register_timeout) + * only at the very end of motorz_io_process(), at which point ownership + * returns to the poll thread. MUST be called on the poll thread only. + */ +static void motorz_conn_claim(motorz_poller_t *poller, motorz_conn_t *scon) +{ + motorz_pollset_del(poller, scon); + + if (scon->timer.expires) { + apr_thread_mutex_lock(poller->mtx); + apr_skiplist_remove(poller->timeout_ring, &scon->timer, NULL); + scon->timer.expires = 0; + apr_thread_mutex_unlock(poller->mtx); + } +} + +/* Terminal teardown for a connection: remove it from the pollset (if still + * registered) and recycle its transaction pool. Clearing the pool (inside + * motorz_ptrans_put) releases the conn_rec, bucket allocator and scoreboard + * handle allocated within it, and fires motorz_conn_pool_cleanup(), which + * de-registers any pending timer from the ring under mz->mtx. + * + * This MUST be called exactly once per connection, on every path that ends + * it (lingering close, abort, or fired timeout). It runs on a worker-pool + * thread; removing from the pollset concurrently with the polling thread is + * safe because the pollset is created APR_POLLSET_THREADSAFE. By the time a + * worker reaches a terminal state the connection has already been claimed + * (disarmed) by the poll thread, so motorz_pollset_del() is normally a no-op + * here -- it remains as a defensive backstop. + */ +static void motorz_conn_done(motorz_conn_t *scon) +{ + motorz_poller_t *poller = scon->poller; + motorz_poller_t *pool_poller = scon->pool_poller; + apr_pool_t *ptrans = scon->pool; + + ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, + "motorz_conn_done(): scon: %pp", scon); + + /* Disarm on the I/O poller (its pollset), then recycle to the accepting + * poller's free-list (its single-consumer pop home -- not the I/O poller). + */ + motorz_pollset_del(poller, scon); + + /* scon lives in ptrans, so it (and scon->pool) are invalid afterwards. */ + motorz_ptrans_put(pool_poller, ptrans); +} + +/* Timer callback for a lingering close that ran out of time: force the + * connection closed. Mirrors motorz_io_timeout_cb but for the linger phase. + */ +static void motorz_linger_timeout_cb(motorz_core_t *mz, void *baton) +{ + motorz_conn_t *scon = (motorz_conn_t *) baton; + + ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, + "motorz_linger_timeout_cb(): scon: %pp", scon); + + /* The timer has already been popped from the ring; tear down. */ + motorz_conn_done(scon); +} + +/* Drain and discard any data the peer is still sending, without blocking. + * Called (on a worker thread) when a lingering socket is readable or its + * linger timer fires. Returns when the peer has closed/erred (-> teardown) + * or there is nothing more to read right now (-> re-arm in the pollset). + */ +static apr_status_t motorz_lingering_close(motorz_conn_t *scon) +{ + apr_socket_t *csd = scon->sock; + char dummybuf[512]; + apr_size_t nbytes; + apr_status_t rv; + + do { + nbytes = sizeof(dummybuf); + rv = apr_socket_recv(csd, dummybuf, &nbytes); + } while (rv == APR_SUCCESS); + + if (!APR_STATUS_IS_EAGAIN(rv)) { + /* Peer closed, reset, or hard error: we are done. */ + motorz_conn_done(scon); + return APR_SUCCESS; + } + + /* Nothing left to read for now; wait for more readability, bounded by the + * linger timeout. A readable PT_CSD dispatch goes through + * motorz_conn_claim(), which cancels this connection's timer, so we must + * (re)register the linger timeout here alongside (re)arming the pollset. + * This means a peer that keeps dribbling data resets the deadline each + * time -- the same bounded imprecision mpm_event accepts for its linger + * queues, and exactly the slow-drain case the timeout exists to cap. + * Honour a module's request for a shortened linger period. + * + * Arm pollset + timer atomically (motorz_conn_register); after it returns + * scon may already have been freed by a concurrent timeout, so we must not + * touch it again -- including on the error path, where the rollback inside + * motorz_conn_register has disarmed it and we just close. + */ + { + apr_interval_time_t linger = + apr_table_get(scon->c->notes, "short-lingering-close") + ? apr_time_from_sec(SECONDS_TO_LINGER) + : apr_time_from_sec(MAX_SECS_TO_LINGER); + rv = motorz_conn_register(scon, + APR_POLLIN | APR_POLLHUP | APR_POLLERR, + motorz_linger_timeout_cb, linger); + } + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, rv, ap_server_conf, + "motorz_lingering_close: apr_pollset_add failed; closing"); + motorz_conn_done(scon); + } + return APR_SUCCESS; +} + +/* Begin a non-blocking lingering close (fix #3/A3). Runs on a worker thread, + * but unlike the old inline ap_lingering_close() it never blocks the worker + * for up to MAX_SECS_TO_LINGER: it shuts the write side down, then arms a + * linger timeout and hands the socket back to the poll loop, which drives the + * drain via motorz_lingering_close() as data arrives. + * + * Pre-condition: scon has already been claimed (not in the pollset, no timer). + */ +static void motorz_start_lingering_close(motorz_conn_t *scon) +{ + conn_rec *c = scon->c; + apr_socket_t *csd = scon->sock; + + scon->cs.state = CONN_STATE_LINGER; + + /* ap_start_lingering_close() flushes and shuts down the write side. A + * true return means there is nothing to linger over (aborted or no + * half-close needed), so close immediately. + */ + if (ap_start_lingering_close(c)) { + motorz_conn_done(scon); + return; + } + + scon->linger_started = 1; + + /* All draining from here is non-blocking. */ + apr_socket_timeout_set(csd, 0); + apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0); + + /* First drain attempt. If the peer still has data to send, + * motorz_lingering_close() arms both the pollset and the linger timeout; + * otherwise it tears the connection down here. We deliberately do not + * pre-register a timer (the drain owns that), so scon->timer is inserted + * into the ring at most once at a time. + */ + motorz_lingering_close(scon); +} + +/* Park a connection that a process_connection hook left in + * CONN_STATE_SUSPENDED (A4). Ownership passes to the module, which interacts + * with the MPM only through the suspend/resume_connection hooks until it calls + * ap_mpm_resume_suspended() -> motorz_resume_suspended(). The connection is + * intentionally left out of the pollset and timer ring (it has been claimed), + * and its transaction pool is NOT recycled, so nothing here tears it down -- + * which is what previously leaked. Runs on a worker thread. + */ +static void motorz_suspend_connection(motorz_conn_t *scon) +{ + conn_rec *c = scon->c; + + ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, + "motorz_suspend_connection(): scon: %pp", scon); + + c->suspended_baton = scon; + scon->suspended = 1; + ap_run_suspend_connection(c, scon->r); + /* sbh is owned by the (now parked) connection; drop our reference like + * mpm_event's notify_suspend() does. + */ + c->sbh = NULL; } static apr_status_t motorz_io_process(motorz_conn_t *scon) { apr_status_t rv; - motorz_core_t *mz; conn_rec *c; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03325) + ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf, APLOGNO(03325) "motorz_io_process(): entered"); + /* A connection already in non-blocking lingering close (its socket became + * readable again, or it was re-dispatched) just continues draining. It + * has been claimed, so its pollset entry/timer were cleared; the drain + * re-arms them or tears down. + */ + if (scon->linger_started) { + return motorz_lingering_close(scon); + } + if (scon->c->clogging_input_filters && !scon->c->aborted) { /* Since we have an input filter which 'clogs' the input stream, * like mod_ssl used to, lets just do the normal read from input * filters, like the Worker MPM does. Filters that need to write * where they would otherwise read, or read where they would * otherwise write, should set the sense appropriately. + * + * This path bypasses the normal motorz_conn_claim() that precedes + * every other call to motorz_io_process(). Do a full claim now: + * disarm the pollset entry AND cancel any pending timer under the + * poller mutex. Without the timer cancel, a concurrent timer expiry + * can dispatch a timeout worker on the same scon while this worker + * is inside ap_run_process_connection() -- a use-after-free race. */ + motorz_conn_claim(scon->poller, scon); ap_run_process_connection(scon->c); - if (scon->cs.state != CONN_STATE_SUSPENDED) { + /* The process_connection hooks set the next connection state on + * return; honor it and let the dispatch below act on it, mirroring + * the event MPM (see event.c:process_socket()). Async modules reach + * this clogging path too: mod_http2's secondary (c2) connections set + * clogging_input_filters unconditionally, and come back wanting either + * to wait for I/O (CONN_STATE_ASYNC_WAITIO), flush + * (CONN_STATE_WRITE_COMPLETION), or suspend -- all of which must be + * preserved rather than force-closed. + * + * A hook-returned CONN_STATE_KEEPALIVE is mapped to + * CONN_STATE_WRITE_COMPLETION (as event does) so it flushes any + * pending output and then waits for the next request: passing bare + * KEEPALIVE through to the dispatch below would hit the + * KEEPALIVE -> PROCESSING entry transition and synchronously re-run + * ap_run_process_connection() instead of returning to the poller. + * + * Anything left unfinished -- still CONN_STATE_PROCESSING because a + * hook returned DECLINED or OK without setting a state, as a non-async + * module would -- gets a lingering close, like the worker MPM. That + * also keeps us out of the CONN_STATE_PROCESSING branch below. + */ + if (scon->cs.state == CONN_STATE_KEEPALIVE) { + scon->cs.state = CONN_STATE_WRITE_COMPLETION; + } + else if (scon->cs.state != CONN_STATE_ASYNC_WAITIO + && scon->cs.state != CONN_STATE_WRITE_COMPLETION + && scon->cs.state != CONN_STATE_SUSPENDED) { scon->cs.state = CONN_STATE_LINGER; } } - mz = scon->mz; c = scon->c; if (!c->aborted) { - if (scon->pfd.reqevents != 0) { - /* - * Some of the pollset backends, like KQueue or Epoll - * automagically remove the FD if the socket is closed, - * therefore, we can accept _SUCCESS or _NOTFOUND, - * and we still want to keep going - */ - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03326) - "motorz_io_process(): apr_pollset_remove"); - - rv = apr_pollset_remove(mz->pollset, &scon->pfd); - if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { - ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(02847) - "motorz_io_process: apr_pollset_remove failure"); - /*AP_DEBUG_ASSERT(rv == APR_SUCCESS);*/ - } - scon->pfd.reqevents = 0; - } + /* On the normal dispatch path (from motorz_io_event_process or + * motorz_io_setup_conn), the connection has already been claimed -- + * pollset entry removed and reqevents cleared -- before reaching here. + * No redundant apr_pollset_remove() is needed or performed. + */ if (scon->cs.state == CONN_STATE_KEEPALIVE) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03327) - "motorz_io_process(): Set to CONN_STATE_PROCESSING"); + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(03327) + "motorz_io_process(): keepalive -> processing"); + scon->cs.state = CONN_STATE_PROCESSING; + } + else if (scon->cs.state == CONN_STATE_ASYNC_WAITIO) { + /* The socket this connection was waiting on (CONN_STATE_ASYNC_WAITIO, + * armed below) became readable/writable, so we were re-dispatched: + * re-enter the process_connection hooks, mirroring how event's loop + * maps ASYNC_WAITIO back to PROCESSING. (A Timeout expiry does not + * arrive here -- motorz_io_timeout_cb lingers/closes directly.) + */ + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(10559) + "motorz_io_process(): async waitio -> processing"); scon->cs.state = CONN_STATE_PROCESSING; } read_request: if (scon->cs.state == CONN_STATE_PROCESSING) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03328) - "motorz_io_process(): CONN_STATE_PROCESSING"); + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(03328) "motorz_io_process(): processing"); if (!c->aborted) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03329) - "motorz_io_process(): !aborted"); + ap_update_child_status(scon->sbh, SERVER_BUSY_READ, NULL); ap_run_process_connection(c); /* state will be updated upon return * fall thru to either wait for readability/timeout or @@ -394,41 +994,90 @@ read_request: */ } else { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03330) - "motorz_io_process(): aborted"); + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(03330) + "motorz_io_process(): aborted -> linger"); scon->cs.state = CONN_STATE_LINGER; } } + if (scon->cs.state == CONN_STATE_SUSPENDED) { + /* A module has taken the connection asynchronous (A4). Park it; + * ownership returns only via motorz_resume_suspended(). Do not + * re-arm the pollset/timer or tear it down. + */ + ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, + APLOGNO(10550) + "motorz_io_process(): suspended"); + motorz_suspend_connection(scon); + return APR_SUCCESS; + } + + if (scon->cs.state == CONN_STATE_ASYNC_WAITIO) { + /* A process_connection hook wants the MPM to wait for the + * connection to become readable or writable (per c->cs->sense, + * defaulting to read) within the configured Timeout, and then + * re-enter the hooks. This is the same wait the WANT_READ + * workaround does through WRITE_COMPLETION, but explicit and + * without first checking ap_run_output_pending() -- the hook has + * told us it is done writing and is now waiting on I/O. Arm the + * pollset + timer atomically and do not touch scon afterwards (a + * concurrent timeout may free it). On failure scon is already + * disarmed by the rollback; close it. + */ + apr_int16_t reqevents; + + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(10557) + "motorz_io_process(): async waitio"); + + ap_update_child_status(scon->sbh, SERVER_BUSY_READ, NULL); + + reqevents = + (scon->cs.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT + : APR_POLLIN) + | APR_POLLHUP | APR_POLLERR; + rv = motorz_conn_register(scon, reqevents, + motorz_io_timeout_cb, + motorz_get_timeout(scon)); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_WARNING, rv, + ap_server_conf, APLOGNO(10558) + "apr_pollset_add: failed in async waitio"); + motorz_conn_done(scon); + } + return APR_SUCCESS; + } + if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) { int pending; - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03331) - "motorz_io_process(): CONN_STATE_WRITE_COMPLETION"); + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(03331) + "motorz_io_process(): write completion"); ap_update_child_status(scon->sbh, SERVER_BUSY_WRITE, NULL); pending = ap_run_output_pending(c); if (pending == OK) { - /* Still in WRITE_COMPLETION_STATE: - * Set a write timeout for this connection, and let the - * event thread poll for writeability. + /* Still in WRITE_COMPLETION_STATE: set a write timeout and let + * the poll thread wait for writeability. Arm pollset + timer + * atomically and do not touch scon afterwards (it may be freed + * by a concurrent timeout). On failure scon is already + * disarmed by the rollback; close it. */ - motorz_register_timeout(scon, - motorz_io_timeout_cb, - motorz_get_timeout(scon)); - - scon->pfd.reqevents = ( - scon->cs.sense == CONN_SENSE_WANT_READ ? APR_POLLIN : - APR_POLLOUT) | APR_POLLHUP | APR_POLLERR; - scon->cs.sense = CONN_SENSE_DEFAULT; - - rv = apr_pollset_add(mz->pollset, &scon->pfd); - + apr_int16_t reqevents = + (scon->cs.sense == CONN_SENSE_WANT_READ ? APR_POLLIN + : APR_POLLOUT) + | APR_POLLHUP | APR_POLLERR; + rv = motorz_conn_register(scon, reqevents, + motorz_io_timeout_cb, + motorz_get_timeout(scon)); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(02849) "apr_pollset_add: failed in write completion"); + motorz_conn_done(scon); } return APR_SUCCESS; } @@ -447,42 +1096,97 @@ read_request: } if (scon->cs.state == CONN_STATE_LINGER) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03332) - "motorz_io_process(): CONN_STATE_LINGER"); - ap_lingering_close(c); + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(03332) "motorz_io_process(): linger"); + /* Begin a non-blocking lingering close instead of blocking this + * worker for up to MAX_SECS_TO_LINGER (A3). scon may be torn down + * or handed back to the poll loop inside; invalid afterwards. + */ + motorz_start_lingering_close(scon); + return APR_SUCCESS; } if (scon->cs.state == CONN_STATE_KEEPALIVE) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03333) - "motorz_io_process(): CONN_STATE_KEEPALIVE"); - motorz_register_timeout(scon, - motorz_io_timeout_cb, - motorz_get_keep_alive_timeout(scon)); - - scon->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; - scon->cs.sense = CONN_SENSE_DEFAULT; - - rv = apr_pollset_add(mz->pollset, &scon->pfd); - + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, + APLOGNO(03333) "motorz_io_process(): keepalive"); + /* Arm pollset + keep-alive timer atomically; do not touch scon + * afterwards (a concurrent timeout may free it). On failure scon + * is already disarmed by the rollback; close it. + */ + rv = motorz_conn_register(scon, + APR_POLLIN | APR_POLLHUP | APR_POLLERR, + motorz_io_timeout_cb, + motorz_get_keep_alive_timeout(scon)); if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(02850) - "process_socket: apr_pollset_add failure in read request line"); - return rv; + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, + APLOGNO(02850) + "process_socket: apr_pollset_add failure in " + "read request line"); + motorz_conn_done(scon); + return APR_SUCCESS; } } } else { - ap_lingering_close(c); + /* Aborted: begin (non-blocking) lingering close. */ + motorz_start_lingering_close(scon); + return APR_SUCCESS; } return APR_SUCCESS; } -static apr_status_t motorz_pollset_cb(motorz_core_t *mz, apr_interval_time_t timeout) +/* mpm_resume_suspended hook (A4): a module that previously suspended this + * connection is handing it back. Recover scon from the suspended_baton, run + * the resume_connection hooks, and re-inject it into the worker pool to + * continue in write-completion (flush, then keep-alive or close). May be + * called from a module's own thread, so we hand off rather than process + * inline. + */ +static apr_status_t motorz_resume_suspended(conn_rec *c) +{ + motorz_conn_t *scon = (motorz_conn_t *) c->suspended_baton; + motorz_core_t *mz; + + if (scon == NULL || !scon->suspended) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c, APLOGNO(10551) + "motorz_resume_suspended: connection not suspended"); + return APR_EGENERAL; + } + mz = scon->mz; + + c->suspended_baton = NULL; + scon->suspended = 0; + + /* Restore sbh before running resume hooks: motorz_suspend_connection + * NULLed c->sbh (matching event's notify_suspend), but any module or + * filter calling ap_update_child_status(c->sbh, ...) after resume would + * dereference NULL without this. scon->sbh is valid for the connection's + * lifetime (it lives in scon->pool which is not recycled during suspend). + */ + c->sbh = scon->sbh; + ap_run_resume_connection(c, scon->r); + + /* Continue where a normal request would after processing: flush pending + * output, then decide keep-alive vs. close. + */ + scon->cs.state = CONN_STATE_WRITE_COMPLETION; + scon->cs.sense = CONN_SENSE_DEFAULT; + + return apr_thread_pool_push(mz->workers, motorz_io_invoke, + scon->pfd.client_data, + APR_THREAD_TASK_PRIORITY_NORMAL, NULL); +} + +/* One poll thread per poller drives accept/dispatch/timer work for the + * connections bound to it; workers only process. Each child runs num_pollers + * of these in parallel. See "Scaling / architecture limits" in MOTORZ.README. + */ +static apr_status_t motorz_pollset_cb(motorz_poller_t *poller, apr_interval_time_t timeout) { apr_status_t rc; const apr_pollfd_t *out_pfd = NULL; apr_int32_t num = 0; - rc = apr_pollset_poll(mz->pollset, timeout, &num, &out_pfd); + rc = apr_pollset_poll(poller->pollset, timeout, &num, &out_pfd); if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc) || APR_STATUS_IS_TIMEUP(rc)) { return APR_SUCCESS; @@ -491,7 +1195,7 @@ static apr_status_t motorz_pollset_cb(motorz_core_t *mz, apr_interval_time_t tim } } while (num > 0) { - rc = motorz_io_callback(mz, out_pfd); + rc = motorz_io_callback(poller, out_pfd); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, NULL, APLOGNO(03334) "Call to motorz_io_callback() failed"); @@ -523,21 +1227,26 @@ static apr_status_t motorz_setup_workers(motorz_core_t *mz) return APR_SUCCESS; } -static int motorz_setup_pollset(motorz_core_t *mz) +static int motorz_setup_pollset(motorz_poller_t *poller) { int i; apr_status_t rv; int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; + /* The pollset is mutated (apr_pollset_{add,remove}) from worker-pool + * threads while this poller's thread is blocked in apr_pollset_poll(), so + * it MUST be thread-safe. All the preferred backends below + * (kqueue/port/epoll) support APR_POLLSET_THREADSAFE. + */ for (i = 0; i < sizeof(good_methods) / sizeof(good_methods[0]); i++) { - rv = apr_pollset_create_ex(&mz->pollset, + rv = apr_pollset_create_ex(&poller->pollset, 512, - mz->pool, - APR_POLLSET_NODEFAULT, + poller->pool, + APR_POLLSET_NODEFAULT | APR_POLLSET_THREADSAFE, good_methods[i]); if (rv == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO(02852) - "motorz_setup_pollset: apr_pollset_create_ex using %s", apr_pollset_method_name(mz->pollset)); + "motorz_setup_pollset: apr_pollset_create_ex using %s", apr_pollset_method_name(poller->pollset)); break; } @@ -545,17 +1254,17 @@ static int motorz_setup_pollset(motorz_core_t *mz) if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_INFO, rv, ap_server_conf, APLOGNO(02853) "motorz_setup_pollset: apr_pollset_create_ex failed for all possible backends!"); - rv = apr_pollset_create(&mz->pollset, + rv = apr_pollset_create(&poller->pollset, 512, - mz->pool, - 0); + poller->pool, + APR_POLLSET_THREADSAFE); } if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, APLOGNO(02854) "motorz_setup_pollset: apr_pollset_create failed for all possible backends!"); } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03335) - "motorz_setup_pollset: Using %s", apr_pollset_method_name(mz->pollset)); + "motorz_setup_pollset: Using %s", apr_pollset_method_name(poller->pollset)); return rv; } @@ -588,6 +1297,19 @@ static void clean_child_exit(int code) apr_signal(SIGHUP, SIG_IGN); apr_signal(SIGTERM, SIG_IGN); + /* Drain the worker thread pool before tearing down pools. Without this, + * worker threads executing motorz_io_process or motorz_conn_done (which + * call ap_log_error and apr_pool_clear) may still be running when pchild + * and its log state are destroyed, causing use-after-free crashes. + * apr_thread_pool_destroy() joins all worker threads before returning. + * mz->workers is NULL only if motorz_setup_workers() was never called + * (i.e. we're exiting very early, before child_main set up workers). + */ + if (mz->workers) { + apr_thread_pool_destroy(mz->workers); + mz->workers = NULL; + } + if (pchild) { apr_pool_destroy(pchild); } @@ -662,8 +1384,20 @@ static int motorz_query(int query_code, int *result, apr_status_t *rv) *rv = APR_SUCCESS; switch(query_code){ case AP_MPMQ_IS_ASYNC: + /* See MOTORZ_ENABLE_ASYNC at the top of this file: async HTTP/2 handoff + * is disabled pending a mod_http2 c1/c2 close-ordering fix. */ + *result = MOTORZ_ENABLE_ASYNC; + break; + case AP_MPMQ_CAN_SUSPEND: *result = 1; break; + case AP_MPMQ_CAN_WAITIO: + /* CONN_STATE_ASYNC_WAITIO is only requested by modules when the MPM is + * async; motorz honors it (polls per c->cs->sense under Timeout and + * re-enters the process_connection hooks -- see motorz_io_process()), + * but gate it on MOTORZ_ENABLE_ASYNC so it tracks IS_ASYNC. */ + *result = MOTORZ_ENABLE_ASYNC; + break; case AP_MPMQ_MAX_DAEMON_USED: *result = ap_num_kids; break; @@ -727,9 +1461,6 @@ static void just_die(int sig) clean_child_exit(0); } -/* volatile because it's updated from a signal handler */ -static int volatile die_now = 0; - static void stop_listening(int sig) { motorz_core_t *mz = motorz_core_get(); @@ -747,9 +1478,330 @@ static void stop_listening(int sig) * they are really private to child_main. */ -static int requests_this_child; static int num_listensocks = 0; +/* Listener admission control (#1). The listener pollfds live in the poller + * that owns the listeners (poller 0); only that poller toggles them, on its + * own thread, so no locking is needed. The hysteresis band is derived from + * threads_per_child in child_main. + */ +static apr_size_t motorz_throttle_hi; +static apr_size_t motorz_throttle_lo; + +/* Stop accepting: remove the listener sockets from the poller's pollset so it + * stops dispatching new connections while workers are saturated. Runs on the + * owning poller's thread only; idempotent. + */ +static void motorz_disable_listeners(motorz_poller_t *poller) +{ + int i; + + if (poller->listeners_disabled) { + return; + } + for (i = 0; i < poller->num_listener_pfds; i++) { + apr_status_t rv = apr_pollset_remove(poller->pollset, + poller->listener_pfds[i]); + if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, rv, ap_server_conf, + "motorz_disable_listeners: apr_pollset_remove failed"); + } + } + poller->listeners_disabled = 1; + if (my_child_num >= 0) { + ap_scoreboard_image->parent[my_child_num].not_accepting = 1; + } + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(10552) + "Workers busy, not accepting new connections in this child"); +} + +/* Resume accepting: re-add the listener sockets to the poller's pollset. Runs + * on the owning poller's thread only; idempotent; a no-op while shutting down. + */ +static void motorz_enable_listeners(motorz_poller_t *poller) +{ + int i; + + if (!poller->listeners_disabled || die_now) { + return; + } + for (i = 0; i < poller->num_listener_pfds; i++) { + apr_status_t rv = apr_pollset_add(poller->pollset, + poller->listener_pfds[i]); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_TRACE1, rv, ap_server_conf, + "motorz_enable_listeners: apr_pollset_add failed"); + } + } + poller->listeners_disabled = 0; + if (my_child_num >= 0) { + ap_scoreboard_image->parent[my_child_num].not_accepting = 0; + } + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(10553) + "Accepting new connections again in this child"); +} + +/* Reconsider admission once per poll-loop iteration (owning poller's thread). + * Disable listeners when the worker pool is saturated and re-enable once it has + * drained. Three complementary saturation signals: + * + * 1. idle == 0: no thread is free to pick up a new connection right now. + * 2. pending >= throttle_hi: the push queue has a full wave of unstarted tasks + * (each accepted connection becomes one task), so we are ahead of the workers. + * 3. active >= threads_per_child: all threads are occupied, including those + * blocked in I/O waits -- catches the slow-client / keep-alive-heavy case + * where the task queue looks empty but workers are fully tied up. + * + * The hysteresis band (hi/lo) on the pending count avoids enable/disable + * flapping. A poller that does not own listeners (num_listener_pfds == 0) no-ops. + */ +static void motorz_update_listeners(motorz_poller_t *poller) +{ + apr_size_t idle, pending, active; + + if (poller->num_listener_pfds == 0) { + return; + } + /* Read total before idle: if a thread exits between the two reads, + * reading idle first risks unsigned underflow (idle > total -> wrap). + * Clamp the subtraction to zero so a transient race never yields a + * spuriously huge 'active' value that trips the saturation check. + */ + { + apr_size_t total; + total = apr_thread_pool_threads_count(poller->mz->workers); + idle = apr_thread_pool_idle_count(poller->mz->workers); + active = (total > idle) ? (total - idle) : 0; + } + pending = apr_thread_pool_tasks_count(poller->mz->workers); + + if (!poller->listeners_disabled) { + if (idle == 0 + || pending >= motorz_throttle_hi + || active >= (apr_size_t)threads_per_child) { + motorz_disable_listeners(poller); + } + } + else if (idle > 0 + && pending <= motorz_throttle_lo + && active < (apr_size_t)threads_per_child) { + motorz_enable_listeners(poller); + } +} + +/* Create and initialize one poller context (its own pool, pollset, timer ring + * and ring mutex). The recycle free-list and listener state start zeroed. + * 'owns_listeners' marks the poller that holds the accept sockets. + */ +static motorz_poller_t *motorz_poller_create(motorz_core_t *mz, int index) +{ + apr_status_t rv; + motorz_poller_t *poller = apr_pcalloc(mz->pool, sizeof(*poller)); + + poller->mz = mz; + poller->index = index; + apr_pool_create(&poller->pool, mz->pool); + apr_pool_tag(poller->pool, "motorz-poller"); + + rv = apr_thread_mutex_create(&poller->mtx, APR_THREAD_MUTEX_DEFAULT, + poller->pool); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, APLOGNO(02966) + "motorz_poller_create: apr_thread_mutex_create failed"); + clean_child_exit(APEXIT_CHILDSICK); + } + + apr_skiplist_init(&poller->timeout_ring, poller->pool); + apr_skiplist_set_compare(poller->timeout_ring, timer_comp, timer_comp); + + rv = motorz_setup_pollset(poller); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, APLOGNO(02869) + "Couldn't setup pollset in child; check system or user limits"); + clean_child_exit(APEXIT_CHILDSICK); /* assume temporary resource issue */ + } + + return poller; +} + +/* Add this child's listening sockets to 'poller' and capture them so admission + * control can pause/resume accepting (#1). Only the listener-owning poller + * calls this. + */ +static void motorz_poller_add_listeners(motorz_poller_t *poller) +{ + apr_status_t status; + ap_listen_rec *lr; + int i; + + poller->listener_pfds = apr_pcalloc(poller->pool, + num_listensocks * sizeof(apr_pollfd_t *)); + poller->num_listener_pfds = 0; + poller->listeners_disabled = 0; + + for (lr = my_bucket->listeners, i = num_listensocks; i--; lr = lr->next) { + apr_pollfd_t *pfd = apr_pcalloc(poller->pool, sizeof *pfd); + motorz_sb_t *sb = apr_pcalloc(poller->pool, sizeof(motorz_sb_t)); + + pfd->desc_type = APR_POLL_SOCKET; + pfd->desc.s = lr->sd; + pfd->reqevents = APR_POLLIN; + pfd->p = poller->pool; + pfd->client_data = sb; + + sb->type = PT_ACCEPT; + sb->baton = lr; + + poller->listener_pfds[poller->num_listener_pfds++] = pfd; + + status = apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1); + if (status != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, status, NULL, APLOGNO(02870) + "apr_socket_opt_set(APR_SO_NONBLOCK = 1) failed on %pI", + lr->bind_addr); + clean_child_exit(0); + } + + status = apr_pollset_add(poller->pollset, pfd); + if (status != APR_SUCCESS) { + /* If the child processed a SIGWINCH before setting up the + * pollset, this error path is expected and harmless, + * since the listener fd was already closed; so don't + * pollute the logs in that case. + */ + if (!die_now) { + ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO(02871) + "Couldn't add listener to pollset; check system or user limits"); + clean_child_exit(APEXIT_CHILDSICK); + } + clean_child_exit(0); + } + + lr->accept_func = ap_unixd_accept; + } +} + +/* One poller's poll loop: poll, dispatch ready events to workers, expire + * timers, reconsider admission. Runs until die_now / shutdown / restart. Each + * poller runs this on its own thread; the child's main thread is the + * supervisor (motorz_supervise) that owns the MaxRequestsPerChild / pod / + * generation checks and sets die_now. A fatal poll error sets die_now and + * returns rather than exiting the process, so the other pollers can wind down + * and the supervisor can clean up. + */ +static void *APR_THREAD_FUNC motorz_poller_main(apr_thread_t *thread, void *baton) +{ + motorz_poller_t *poller = (motorz_poller_t *) baton; + motorz_core_t *mz = poller->mz; + apr_status_t status; + + while (!die_now + && !mz->mpm->shutdown_pending + && !mz->mpm->restart_pending) { + apr_time_t tnow = apr_time_now(); + motorz_timer_t *te; + apr_interval_time_t timeout = apr_time_from_msec(500); + + apr_thread_mutex_lock(poller->mtx); + te = apr_skiplist_peek(poller->timeout_ring); + + if (te) { + if (tnow < te->expires) { + timeout = (te->expires - tnow); + if (timeout > apr_time_from_msec(500)) { + timeout = apr_time_from_msec(500); + } + } + else { + timeout = 0; + } + } + apr_thread_mutex_unlock(poller->mtx); + + status = motorz_pollset_cb(poller, timeout); + + tnow = apr_time_now(); + + if (status != APR_SUCCESS) { + if (!APR_STATUS_IS_EINTR(status) && !APR_STATUS_IS_TIMEUP(status)) { + ap_log_error(APLOG_MARK, APLOG_CRIT, status, NULL, APLOGNO(03117) + "motorz_main_loop: apr_pollcb_poll failed"); + die_now = 1; + break; + } + } + + apr_thread_mutex_lock(poller->mtx); + + /* Now iterate any expired timers and push them to the worker + * pool. The loop is driven entirely off a fresh peek taken under + * the lock rather than the 'te' cached before the poll: while the + * lock was dropped for polling, a worker thread may have inserted + * a timer that is now the earliest in the ring. Peeking and + * popping the minimum in lock-step keeps the popped node and the + * processed node consistent. + */ + while ((te = apr_skiplist_peek(poller->timeout_ring)) + && te->expires < tnow) { + apr_skiplist_pop(poller->timeout_ring, NULL); + motorz_timer_event_process(poller, te); + } + + apr_thread_mutex_unlock(poller->mtx); + + /* Admission control (#1): pause/resume accepting based on worker-pool + * saturation. Done here, on the poll thread and outside poller->mtx, + * once per iteration. While listeners are disabled the loop still wakes + * via the 500ms timeout floor and timer expiries, bounding resume + * latency. No-op on pollers that do not own the listeners. + */ + motorz_update_listeners(poller); + } + + return NULL; +} + +/* Child supervisor loop, run on the child's main thread while the poller + * threads do the I/O. Watches MaxRequestsPerChild and the pipe-of-death / + * generation change, setting die_now so the pollers wind down. Returns when + * the child should exit. + */ +static void motorz_supervise(motorz_core_t *mz, ap_sb_handle_t *sbh) +{ + while (!die_now + && !mz->mpm->shutdown_pending + && !mz->mpm->restart_pending) { + + /* requests_this_child is bumped per accepted connection by the + * listener-owning poller; once the cap is reached, wind down. + */ + if (ap_max_requests_per_child > 0 + && requests_this_child >= ap_max_requests_per_child) { + die_now = 1; + break; + } + + ap_update_child_status(sbh, SERVER_READY, NULL); + + if (ap_mpm_pod_check(my_bucket->pod) == APR_SUCCESS) { /* idle kill? */ + die_now = 1; + } + else if (mz->mpm->my_generation != + ap_scoreboard_image->global->running_generation) { /* restart? */ + /* yeah, this could be non-graceful restart, in which case the + * parent will kill us soon enough, but why bother checking? + */ + die_now = 1; + } + else { + /* Nothing to do; sleep briefly so we don't spin. The pollers run + * independently, so the supervisor only needs coarse latency. + */ + apr_sleep(apr_time_from_msec(100)); + } + } +} + static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) { #if APR_HAS_THREADS @@ -758,9 +1810,9 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) #endif apr_status_t status; int i; - ap_listen_rec *lr; ap_sb_handle_t *sbh; const char *lockfile; + motorz_poller_t *poller; /* for benefit of any hooks that run as this child initializes */ mz->mpm->mpm_state = AP_MPMQ_STARTING; @@ -815,8 +1867,6 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) ap_update_child_status(sbh, SERVER_READY, NULL); - apr_skiplist_init(&mz->timeout_ring, mz->pool); - apr_skiplist_set_compare(mz->timeout_ring, timer_comp, timer_comp); status = motorz_setup_workers(mz); if (status != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, status, ap_server_conf, APLOGNO(02868) @@ -824,50 +1874,49 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) clean_child_exit(APEXIT_CHILDSICK); } - status = motorz_setup_pollset(mz); - if (status != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO(02869) - "Couldn't setup pollset in child; check system or user limits"); - clean_child_exit(APEXIT_CHILDSICK); /* assume temporary resource issue */ - } - - for (lr = my_bucket->listeners, i = num_listensocks; i--; lr = lr->next) { - apr_pollfd_t *pfd = apr_pcalloc(mz->pool, sizeof *pfd); - motorz_sb_t *sb = apr_pcalloc(mz->pool, sizeof(motorz_sb_t)); - - pfd->desc_type = APR_POLL_SOCKET; - pfd->desc.s = lr->sd; - pfd->reqevents = APR_POLLIN; - pfd->p = mz->pool; - pfd->client_data = sb; - - sb->type = PT_ACCEPT; - sb->baton = lr; - - status = apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1); - if (status != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_CRIT, status, NULL, APLOGNO(02870) - "apr_socket_opt_set(APR_SO_NONBLOCK = 1) failed on %pI", - lr->bind_addr); - clean_child_exit(0); - } + /* Admission-control hysteresis band: pause accepting once the pending + * backlog reaches a full wave (threads_per_child) and resume once it + * drains to 75%. The 75% low-water mark (vs. the old 50%) re-enables the + * listener sooner, reducing latency spikes at the cost of slightly more + * frequent enable/disable transitions -- a good trade under variable load. + */ + motorz_throttle_hi = threads_per_child; + motorz_throttle_lo = (threads_per_child * 3) / 4; - status = apr_pollset_add(mz->pollset, pfd); - if (status != APR_SUCCESS) { - /* If the child processed a SIGWINCH before setting up the - * pollset, this error path is expected and harmless, - * since the listener fd was already closed; so don't - * pollute the logs in that case. */ - if (!die_now) { - ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO(02871) - "Couldn't add listener to pollset; check system or user limits"); - clean_child_exit(APEXIT_CHILDSICK); - } - clean_child_exit(0); + /* Resolve the poller count: explicit PollersPerChild, else auto from online + * CPUs (capped). Never more pollers than worker threads, and at least 1. + */ + mz->num_pollers = num_pollers; + if (mz->num_pollers <= 0) { +#ifdef _SC_NPROCESSORS_ONLN + long ncpu = sysconf(_SC_NPROCESSORS_ONLN); + mz->num_pollers = (ncpu > 0) ? (int)ncpu : 1; +#else + mz->num_pollers = 1; +#endif + if (mz->num_pollers > MOTORZ_MAX_POLLERS) { + mz->num_pollers = MOTORZ_MAX_POLLERS; } + } + if (mz->num_pollers > threads_per_child) { + mz->num_pollers = threads_per_child; + } + if (mz->num_pollers < 1) { + mz->num_pollers = 1; + } - lr->accept_func = ap_unixd_accept; + /* Create N pollers, each on its own thread, supervised by this thread. + * Poller 0 owns the listening sockets (and thus does the accepting); + * Stage 3 will shard accepted connections across all pollers. With a + * single poller this is behaviourally identical to the old design. + */ + mz->pollers = apr_pcalloc(mz->pool, + mz->num_pollers * sizeof(motorz_poller_t *)); + for (i = 0; i < mz->num_pollers; i++) { + mz->pollers[i] = motorz_poller_create(mz, i); } + /* Listeners live in poller 0. */ + motorz_poller_add_listeners(mz->pollers[0]); mz->mpm->mpm_state = AP_MPMQ_RUNNING; @@ -875,72 +1924,28 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) * {shutdown,restart}_pending are set when a signal is received while * running in single process mode. */ - while (!die_now - && !mz->mpm->shutdown_pending - && !mz->mpm->restart_pending) { - /* - * (Re)initialize this child to a pre-connection state. - */ - - if ((ap_max_requests_per_child > 0 - && requests_this_child++ >= ap_max_requests_per_child)) { - clean_child_exit(0); + for (i = 0; i < mz->num_pollers; i++) { + poller = mz->pollers[i]; + status = apr_thread_create(&poller->thread, NULL, + motorz_poller_main, poller, pchild); + if (status != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO(10554) + "child_main: apr_thread_create failed for poller %d", i); + die_now = 1; + clean_child_exit(APEXIT_CHILDSICK); } + } - ap_update_child_status(sbh, SERVER_READY, NULL); - { - apr_time_t tnow = apr_time_now(); - motorz_timer_t *te; - apr_interval_time_t timeout = apr_time_from_msec(500); - - apr_thread_mutex_lock(mz->mtx); - te = apr_skiplist_peek(mz->timeout_ring); - - if (te) { - if (tnow < te->expires) { - timeout = (te->expires - tnow); - if (timeout > apr_time_from_msec(500)) { - timeout = apr_time_from_msec(500); - } - } - else { - timeout = 0; - } - } - apr_thread_mutex_unlock(mz->mtx); - - status = motorz_pollset_cb(mz, timeout); - - tnow = apr_time_now(); - - if (status != APR_SUCCESS) { - if (!APR_STATUS_IS_EINTR(status) && !APR_STATUS_IS_TIMEUP(status)) { - ap_log_error(APLOG_MARK, APLOG_CRIT, status, NULL, APLOGNO(03117) - "motorz_main_loop: apr_pollcb_poll failed"); - clean_child_exit(0); - } - } - - apr_thread_mutex_lock(mz->mtx); + /* Supervise on this thread; returns when the child should wind down. */ + motorz_supervise(mz, sbh); - /* now iterate any timers and push to worker pool */ - while (te && te->expires < tnow) { - apr_skiplist_pop(mz->timeout_ring, NULL); - motorz_timer_event_process(mz, te); - te = apr_skiplist_peek(mz->timeout_ring); - } - - apr_thread_mutex_unlock(mz->mtx); - } - if (ap_mpm_pod_check(my_bucket->pod) == APR_SUCCESS) { /* selected as idle? */ - die_now = 1; - } - else if (mz->mpm->my_generation != - ap_scoreboard_image->global->running_generation) { /* restart? */ - /* yeah, this could be non-graceful restart, in which case the - * parent will kill us soon enough, but why bother checking? - */ - die_now = 1; + /* die_now is now set; join the poller threads so their pollsets/rings are + * quiescent before we tear the child down. + */ + for (i = 0; i < mz->num_pollers; i++) { + if (mz->pollers[i]->thread) { + apr_status_t pstatus; + apr_thread_join(&pstatus, mz->pollers[i]->thread); } } @@ -1477,8 +2482,9 @@ static int motorz_pre_config(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp) mz->mpm = ap_unixd_mpm_get_retained_data(); mz->mpm->baton = mz; mz->max_daemons_limit = -1; - mz->timeout_ring = motorz_timer_ring; - mz->pollset = motorz_pollset; + /* Pollsets, timer rings and their mutexes are now per-poller and are + * created per child in motorz_poller_create(); nothing to seed here. + */ } else if (mz->mpm->baton != mz) { /* If the MPM changes on restart, be ungraceful */ @@ -1503,12 +2509,7 @@ static int motorz_pre_config(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp) } apr_pool_create(&mz->pool, ap_pglobal); apr_pool_tag(mz->pool, "motorz-mpm-core"); - rv = apr_thread_mutex_create(&mz->mtx, 0, mz->pool); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO(02966) - "motorz_pre_config: apr_thread_mutex_create failed"); - return rv; - } + /* Per-poller ring mutexes are created in motorz_poller_create(). */ } parent_pid = ap_my_pid = getpid(); @@ -1614,6 +2615,26 @@ static int motorz_check_config(apr_pool_t *p, apr_pool_t *plog, threads_per_child = 1; } + /* Warn about ThreadsPerChild 1: the admission-control low-water mark + * becomes (1*3)/4 = 0, so listeners only re-enable when the task queue + * is completely empty, causing severe throughput degradation under any + * sustained load. ThreadsPerChild >= 4 is strongly recommended. + */ + if (threads_per_child == 1) { + if (startup) { + ap_log_error(APLOG_MARK, APLOG_WARNING | APLOG_STARTUP, 0, NULL, + APLOGNO(10555) + "WARNING: ThreadsPerChild 1 causes severe throughput " + "degradation in motorz due to admission-control " + "hysteresis. Use ThreadsPerChild >= 4."); + } + else { + ap_log_error(APLOG_MARK, APLOG_WARNING, 0, s, APLOGNO(10556) + "ThreadsPerChild 1 causes severe throughput " + "degradation in motorz. Use ThreadsPerChild >= 4."); + } + } + return OK; } @@ -1635,6 +2656,8 @@ static void motorz_hooks(apr_pool_t *p) ap_hook_mpm(motorz_run, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_query(motorz_query, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_get_name(motorz_get_name, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_mpm_resume_suspended(motorz_resume_suspended, NULL, NULL, + APR_HOOK_MIDDLE); } static const char *set_daemons_to_start(cmd_parms *cmd, void *dummy, const char *arg) @@ -1669,6 +2692,17 @@ static const char *set_thread_limit (cmd_parms *cmd, void *dummy, const char *ar return NULL; } +static const char *set_pollers_per_child(cmd_parms *cmd, void *dummy, + const char *arg) +{ + const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY); + if (err != NULL) { + return err; + } + num_pollers = atoi(arg); + return NULL; +} + static const command_rec motorz_cmds[] = { LISTEN_COMMANDS, AP_INIT_TAKE1("StartServers", set_daemons_to_start, NULL, RSRC_CONF, @@ -1677,6 +2711,8 @@ AP_INIT_TAKE1("ThreadsPerChild", set_threads_per_child, NULL, RSRC_CONF, "Number of threads each child creates"), AP_INIT_TAKE1("ThreadLimit", set_thread_limit, NULL, RSRC_CONF, "Maximum number of worker threads per child process for this run of Apache - Upper limit for ThreadsPerChild"), +AP_INIT_TAKE1("PollersPerChild", set_pollers_per_child, NULL, RSRC_CONF, + "Number of poll threads per child process (0 = auto from online CPUs)"), AP_GRACEFUL_SHUTDOWN_TIMEOUT_COMMAND, { NULL } }; diff --git a/server/mpm/motorz/motorz.h b/server/mpm/motorz/motorz.h index a54357b120..66d48ef4f6 100644 --- a/server/mpm/motorz/motorz.h +++ b/server/mpm/motorz/motorz.h @@ -15,6 +15,7 @@ */ #include "apr.h" +#include "apr_atomic.h" #include "apr_portable.h" #include "apr_strings.h" #include "apr_thread_proc.h" @@ -113,6 +114,8 @@ * allocated on first call to pre-config hook; located on * subsequent calls to pre-config hook */ +typedef struct motorz_poller_t motorz_poller_t; + typedef struct motorz_core_t motorz_core_t; struct motorz_core_t { ap_unixd_mpm_retained_data *mpm; @@ -126,10 +129,48 @@ struct motorz_core_t { */ int max_daemons_limit; apr_pool_t *pool; - apr_thread_mutex_t *mtx; + /* Worker thread pool, shared by all pollers in this child. */ + apr_thread_pool_t *workers; + /* Per-child array of pollers. Each owns its own pollset + timer ring + + * recycle list, so the single-poll-thread throughput ceiling scales with + * num_pollers. A connection is bound to one poller for its lifetime + * (scon->poller) and always re-arms there. See MOTORZ.README. + */ + motorz_poller_t **pollers; + int num_pollers; +}; + +typedef struct motorz_recycled_pool motorz_recycled_pool; +struct motorz_recycled_pool { + apr_pool_t *pool; + motorz_recycled_pool *next; +}; + +/* One poll thread's context. Each poller owns its pollset, timer ring and the + * mutex guarding that ring, plus its own lock-free transaction-pool recycle + * list and listener-admission state -- so pollers do not contend with each + * other. A connection is permanently bound to the poller that accepted it + * (scon->poller), which is what makes connection sharding across pollers + * correct: it always re-arms in, and recycles to, its own poller. + */ +struct motorz_poller_t { + motorz_core_t *mz; /* back-pointer to the shared child core */ + int index; /* 0 .. num_pollers-1 */ + apr_pool_t *pool; /* subpool of mz->pool for this poller */ + apr_thread_t *thread; /* the poll thread running motorz_poller_main */ apr_pollset_t *pollset; apr_skiplist *timeout_ring; - apr_thread_pool_t *workers; + apr_thread_mutex_t *mtx; /* guards this poller's timeout_ring */ + /* Lock-free (CAS) recycle free-list: multi-producer push (any worker), but + * single-consumer pop (THIS poller's thread only) -- the same MPSC + * contract as mpm_fdqueue.c's ap_queue_info_{push,pop}_pool. + */ + struct motorz_recycled_pool *volatile recycled_pools; + apr_uint32_t num_recycled; + /* Listener admission control (poller that owns the listeners only). */ + apr_pollfd_t **listener_pfds; + int num_listener_pfds; + int listeners_disabled; }; typedef struct motorz_child_bucket motorz_child_bucket; @@ -168,6 +209,7 @@ struct motorz_timer_t void *baton; apr_pool_t *pool; motorz_core_t *mz; + motorz_poller_t *poller; /* the poller whose ring this timer is in */ }; typedef struct motorz_conn_t motorz_conn_t; @@ -175,6 +217,16 @@ struct motorz_conn_t { apr_pool_t *pool; motorz_core_t *mz; + /* The poller this connection does its I/O on for its whole lifetime: it + * re-arms in, and times out on, this poller's pollset/ring. Sharded across + * pollers at accept time to spread load. */ + motorz_poller_t *poller; + /* The poller that owns this connection's transaction-pool recycling -- the + * poller that accepted it (and popped its ptrans). Recycling must return to + * the same single-consumer free-list it came from, which (unlike I/O) is + * NOT sharded: only the accepting poller pops, so only it may be the home. + */ + motorz_poller_t *pool_poller; apr_socket_t *sock; apr_bucket_alloc_t *ba; ap_sb_handle_t *sbh; @@ -184,6 +236,8 @@ struct motorz_conn_t request_rec *r; /** is the current conn_rec suspended? */ int suspended; + /** has ap_start_lingering_close() been called for this conn? */ + int linger_started; /** poll file descriptor information */ apr_pollfd_t pfd; /** public parts of the connection state */ diff --git a/server/mpm/motorz/test/README.md b/server/mpm/motorz/test/README.md new file mode 100644 index 0000000000..a777cb535f --- /dev/null +++ b/server/mpm/motorz/test/README.md @@ -0,0 +1,187 @@ +# motorz MPM test harness + +Self-contained smoke / regression tests for the experimental **motorz** MPM +(`server/mpm/motorz/motorz.c`). These drive a real `httpd` built from this tree +against a throwaway `ServerRoot` in a temp dir; nothing is installed and no +existing config is touched. + +## Build & configure (do this first) + +The tests run a real `httpd` built from this tree. The easiest way to get a +correctly-configured build is the bootstrap script — it runs `buildconf` (only +if `./configure` is missing), `./configure` with the right flags, `make`, and +then verifies every module the tests need is present: + +```sh +server/mpm/motorz/test/setup.sh # configure (if needed) + build + verify +server/mpm/motorz/test/setup.sh --reconfigure # force a fresh ./configure +server/mpm/motorz/test/setup.sh --jobs 8 # control make parallelism +``` + +It does **not** install httpd; the tests use the freshly built `./httpd` in +place. On success it prints `setup OK` and the run-all command. + +### Doing it by hand instead + +```sh +# from the build top. (Run ./buildconf first only on a fresh git checkout that +# has no ./configure; needs autoconf + libtool + python3.) +./configure --with-included-apr \ + --enable-mpms-shared='event motorz' \ + --enable-so --enable-unixd \ + --enable-authz_core --enable-authz_host --enable-log_config \ + --enable-mime --enable-dir \ + --enable-socache_shmcb --enable-ssl --enable-http2 +make +``` + +`--with-included-apr` uses the bundled APR/APR-Util (no system APR needed). +`event` is built alongside `motorz` because `bench.sh` compares against it. +(`mod_ssl`/`mod_http2` are also part of the default `most` module set, so a +plain `./configure --enable-mpms-shared='event motorz' --with-included-apr` +usually yields them too; the explicit flags above just make it deterministic.) + +### What the tests need + +The harness finds the `httpd` binary at the top of the build tree and locates +the shared module `.so` files under `modules/` and `server/mpm/`. Required for +the HTTP/1.1 suite + smoke: `mod_mpm_motorz`, `mod_unixd`, `mod_authz_core`, +`mod_authz_host`, `mod_log_config`, `mod_mime`, `mod_dir`. The HTTP/2 suite +additionally needs `mod_ssl`, `mod_http2`, `mod_socache_shmcb` (building these +requires OpenSSL headers and `libnghttp2`), plus an `openssl` CLI and a `curl` +built with HTTP/2 — it **self-skips** (exit 0) with a clear message if any are +missing. If [`h2load`](https://nghttp2.org/) (from nghttp2) is on `PATH`, the +HTTP/2 suite adds real multiplexed load tests; otherwise those are skipped (not +failed). `bench.sh` needs `mod_mpm_event` and `ab`/`h2load`. + +## Running + +```sh +server/mpm/motorz/test/run-all.sh # smoke + both suites +server/mpm/motorz/test/smoke.sh # fast change-mapped smoke test only +server/mpm/motorz/test/run-http1.sh # HTTP/1.1 only +server/mpm/motorz/test/run-http2.sh # HTTP/2-over-TLS only +server/mpm/motorz/test/bench.sh # motorz vs event throughput comparison +``` + +### bench.sh — motorz vs event + +Runs identical `ab` (HTTP/1.1) and `h2load` (HTTP/2) workloads against the same +build configured first with the event MPM, then motorz, with matched tunables, +and prints a req/s comparison table. + +**Critical:** worker threads must exceed peak connection concurrency — under +HTTP/2 each connection holds a worker for its lifetime, so too few workers +starves the pool and h2load reports spurious failures on *both* MPMs. `bench.sh` +defaults `THREADS=128` (≥ the c=50 default) and **flags any run with +failed/errored requests** (its req/s is then not a valid figure). Env knobs: +`REQS CONC H2_REQS H2_CONC H2_STREAMS H2_BIG_REQS THREADS PORT`. + +Observed (12-core arm64, 1 child, adequate workers): motorz tracks event within +~1–2% on all four workloads. Note: motorz shows an *intermittent* small h2 +failure rate under rapid connection churn at high concurrency that event does +not — see the project memory note; not a throughput issue. + +Environment knobs: + +- `PORT` / `TLS_PORT` — listen ports (default 8099 / 8443). +- `KEEP=1` — keep the temp `ServerRoot` after the run for inspection + (path is printed); otherwise it is removed on exit. + +Exit status is non-zero if any assertion fails. + +## What is covered + +**`smoke.sh`** — fast (~10 s) robust smoke test whose checks map one-to-one to +the changes made on this branch, so a failure points straight at what broke: + +1. **forward-decl of `motorz_update_listeners()`** — motorz loads, parses + config, and serves (the bug was a C89 implicit-declaration / link error); +2. **clogging-input-filters branch honors hook state** — h2 keep-alive reuses + one connection instead of collapsing to one-shot (h2 c2 connections set + `clogging_input_filters` unconditionally); +3. **async HTTP/2 handoff is enabled** (`MOTORZ_ENABLE_ASYNC 1`) — asserts + motorz *does* return the c1 connection to MPM monitoring (async hand-back) + and that HTTP/2 connection churn (`h2load -n 10000 -c 50 -m 1`) still drops + **0** requests. This is the regression test for the dropped-request bug, + which is now fixed in mod_http2 (c1 is closed only after every c2 is done and + flushed); see MOTORZ.README "HTTP/2 async handoff" for the close-ordering + issue and the fix. + +Runs h2-over-TLS when ssl/http2/openssl/h2-curl are present, else an +HTTP/1.1-only subset. + +**`run-http1.sh`** — exercises the connection state machine in +`motorz_io_process()`: + +- basic GET / 404 / body correctness +- **keep-alive reuse** (5 requests, asserts a single TCP connection) — the + KEEPALIVE → WRITE_COMPLETION path +- 200KB body (write-completion cycling) +- concurrency / load: keep-alive correctness is checked with **curl** (3000 + reused requests, all 200) because `ab` miscounts a server-closed keep-alive + connection's final read as a non-2xx failure — a known `ab` artifact, more + frequent with `MOTORZ_ENABLE_ASYNC=0` since idle keep-alive closes via the + blocking path; `ab` is still used for the completion count and the + non-keep-alive lingering-close path (no `-k`, so no artifact) +- slow client: a partial request line completed after a pause (read-wait path) +- **lifecycle**: graceful restart under load, and 5× restart churn under + continuous load — the skiplist/worker-drain scenario this branch hardened +- error-log scan for crash/crit/emerg/assert/deadlock + +**`run-http2.sh`** — mod_http2 + mod_ssl on motorz: + +- h2 ALPN negotiation (`http_version == 2`) +- request multiplexing over a single connection (10 streams, 1 connect) +- large multi-frame body +- **`h2load` load tests** (when available): 5000 req / 20 clients / 25 streams, + 1000 req of the 200KB body / 50 streams (flow control), and a rate-limited + run with idle gaps — each asserting **zero** failed/errored and all-2xx +- inspects the `motorz_io_process()` state traces, asserting PROCESSING is + driven (with async on, an idle c1 is handed back as `CONN_STATE_ASYNC_WAITIO` + rather than via WRITE_COMPLETION; HTTP/1.1 still covers WRITE_COMPLETION) +- **asserts async is enabled**: the `CONN_STATE_ASYNC_WAITIO` arm (`AH10557`) + appears and h2 logs "returning to mpm c1 monitoring" (see below) +- h2 still negotiated after a graceful restart +- error-log scan + +### Two-phase logging (why) + +The state traces require global `LogLevel trace8`, but the `h2load` phase +pushes thousands of requests — at trace8 the error_log would balloon to +**gigabytes** and make the log greps crawl. So the suite runs the load phase at +`LogLevel info` (quiet) and only switches to `trace8` (via `set_loglevel()`, +which rewrites the marked LogLevel line and gracefully restarts) for the light +trace phase. Trace scans read only past a marker line written at the switch, +keeping them fast. + +### Async is enabled; the suite asserts it is on + +motorz reports `AP_MPMQ_IS_ASYNC = 1` (`MOTORZ_ENABLE_ASYNC 1` in `motorz.c`). +The HTTP/2 churn bug it used to expose is fixed in mod_http2 (a graceful client +GOAWAY no longer tears the session down while a stream's c2 is still finishing; +see MOTORZ.README "HTTP/2 async handoff"). Consequences the suite asserts: +mod_http2 requests `CONN_STATE_ASYNC_WAITIO` of an async MPM, so the WAITIO arm +trace `AH10557` **does** appear, and h2 **does** log "returning to mpm c1 +monitoring" — while the churn run stays at 0 dropped requests. The suite drives +the WAITIO path (an idle raw-h2 `openssl s_client` connection — preface + empty +`SETTINGS`, no request) via `trigger_async_waitio()` in `lib.sh` and confirms it +arms ≥ 1 time. + +This positively exercises the branch added in this work. The suite also +confirms (via the `info`-level log) that motorz's `AP_MPMQ_IS_ASYNC = 1` drives +mod_http2's async idle-return ("returning to mpm c1 monitoring"), rather than +the prefork-style blocking poll. + +## Layout + +``` +setup.sh configure + build httpd with the modules the tests need +lib.sh shared helpers (build-tree discovery, config gen, assert API, + set_loglevel, trigger_async_waitio, h2load_stat) +smoke.sh fast change-mapped smoke test +run-http1.sh HTTP/1.1 functional + lifecycle suite +run-http2.sh HTTP/2-over-TLS suite (+ h2load load tests) +run-all.sh runs smoke + both suites, aggregates pass/fail +bench.sh motorz vs event throughput comparison (not in run-all) +``` diff --git a/server/mpm/motorz/test/bench.sh b/server/mpm/motorz/test/bench.sh new file mode 100755 index 0000000000..87910122cc --- /dev/null +++ b/server/mpm/motorz/test/bench.sh @@ -0,0 +1,214 @@ +#!/bin/sh +# +# motorz vs event -- head-to-head MPM performance comparison. +# +# Runs identical workloads against the SAME httpd build configured first with +# the event MPM, then the motorz MPM, with matched tunables (one child, +# ThreadsPerChild 16, quiet logging) so the only variable is the MPM. +# +# Workloads: +# - HTTP/1.1 keep-alive (ab -k) small body +# - HTTP/1.1 no keep-alive (ab) small body (accept/linger cost) +# - HTTP/2 multiplexed (h2load) small body +# - HTTP/2 large body (h2load) 200 KB (write/flow control) +# +# Reports req/s and mean latency for each, side by side. NOT a rigorous +# microbenchmark -- it's a practical apples-to-apples smoke of relative +# throughput on this machine. Run it a couple of times; numbers vary. +# +# Usage: server/mpm/motorz/test/bench.sh [REQS=50000 CONC=50 DUR=...] +# Env: PORT (TLS+plain reuse one port per run), KEEP=1 + +. "$(dirname "$0")/lib.sh" + +require_httpd + +REQS="${REQS:-50000}" # ab request count (keep-alive run) +REQS_NOKA="${REQS_NOKA:-20000}" +CONC="${CONC:-50}" +H2_REQS="${H2_REQS:-50000}" +H2_CONC="${H2_CONC:-50}" +H2_STREAMS="${H2_STREAMS:-25}" +H2_BIG_REQS="${H2_BIG_REQS:-5000}" +# Worker threads MUST exceed the peak concurrent CONNECTION count: under HTTP/2 +# each connection holds a worker for its lifetime (the h2 c1 connection is +# dispatched to a worker), so fewer workers than connections starves the pool +# and h2load reports spurious failures/resets on BOTH MPMs -- making any req/s +# number meaningless. Size generously above max(CONC, H2_CONC). +THREADS="${THREADS:-128}" +PORT="${PORT:-8551}" + +have_ab=0; command -v ab >/dev/null 2>&1 && have_ab=1 +have_h2load=0; command -v h2load >/dev/null 2>&1 && have_h2load=1 +have_ssl=1 +for n in mod_ssl.so mod_http2.so mod_socache_shmcb.so; do + find "$TOP/modules" -name "$n" -path '*/.libs/*' 2>/dev/null | grep -q . || have_ssl=0 +done +[ "$have_h2load" -eq 1 ] && [ "$have_ssl" -eq 1 ] && do_h2=1 || do_h2=0 + +make_rundir +trap cleanup EXIT INT TERM +openssl req -x509 -newkey rsa:2048 -nodes -keyout "$RUNDIR/key.pem" \ + -out "$RUNDIR/cert.pem" -days 2 -subj "/CN=localhost" >/dev/null 2>&1 + +# Result accumulators (one column per MPM), kept as text rows we print at the end. +RESULTS_FILE="$RUNDIR/results.txt" +: > "$RESULTS_FILE" +record() { printf '%s\t%s\t%s\n' "$1" "$2" "$3" >> "$RESULTS_FILE"; } # workload, mpm, "val" + +# --- write a config for the given MPM ($1 = event|motorz) ------------------- +write_conf() { + local mpm="$1" mpm_line tune + if [ "$mpm" = event ]; then + mpm_line="$(load mpm_event_module mod_mpm_event.so)" + tune="StartServers 1 +ServerLimit 1 +ThreadLimit $THREADS +ThreadsPerChild $THREADS +MinSpareThreads $THREADS +MaxSpareThreads $THREADS +MaxRequestWorkers $THREADS" + else + mpm_line="$(load mpm_motorz_module mod_mpm_motorz.so)" + tune="StartServers 1 +ThreadLimit $THREADS +ThreadsPerChild $THREADS +PollersPerChild 2" + fi + # Both MPMs run ONE child with $THREADS worker threads, sized above the peak + # connection concurrency so neither starves under h2 (see THREADS note). + { + echo "ServerRoot \"$RUNDIR\"" + echo "ServerName localhost" + echo "PidFile \"$RUNDIR/httpd.pid\"" + echo "Listen $PORT" + echo "$mpm_line" + load unixd_module mod_unixd.so + load authz_core_module mod_authz_core.so + load authz_host_module mod_authz_host.so + load log_config_module mod_log_config.so + load mime_module mod_mime.so + load dir_module mod_dir.so + if [ "$do_h2" -eq 1 ]; then + load socache_shmcb_module mod_socache_shmcb.so + load ssl_module mod_ssl.so + load http2_module mod_http2.so + fi + echo "$tune" + echo "ErrorLog \"$RUNDIR/logs/error_log\"" + echo "LogLevel error" # quiet: logging must not skew the numbers + echo "TypesConfig /dev/null" + echo "AddType text/html .html" + echo "AddType text/plain .txt" + echo "EnableSendfile On" + echo "DocumentRoot \"$RUNDIR/htdocs\"" + echo "DirectoryIndex index.html" + echo "" + echo " Require all granted" + echo "" + if [ "$do_h2" -eq 1 ]; then + echo "Protocols h2 http/1.1" + echo "SSLSessionCache \"shmcb:$RUNDIR/logs/sc(512000)\"" + echo "" + echo " ServerName localhost" + echo " SSLEngine on" + echo " SSLCertificateFile \"$RUNDIR/cert.pem\"" + echo " SSLCertificateKeyFile \"$RUNDIR/key.pem\"" + echo " Protocols h2 http/1.1" + echo "" + fi + } > "$RUNDIR/httpd.conf" +} + +# extract "Requests per second" and "Time per request (mean)" from ab output +ab_rps() { printf '%s\n' "$1" | awk '/Requests per second/{print $4; exit}'; } +ab_mean() { printf '%s\n' "$1" | awk '/Time per request/ && /\(mean\)/{print $4; exit}'; } +# h2load: "finished in Xs, NNN.NN req/s, ..." and the req mean from the table +h2_rps() { printf '%s\n' "$1" | awk -F',' '/req\/s/{for(i=1;i<=NF;i++) if($i ~ /req\/s/){gsub(/[^0-9.]/,"",$i); print $i; exit}}'; } +# Warn loudly if an h2load run had ANY failed/errored requests -- the req/s of +# such a run is not a valid throughput figure (it raced a starved pool or a +# crash). $1=output $2=workload $3=mpm. +h2_check_clean() { + local f e + f=$(printf '%s\n' "$1" | sed -nE 's/.* ([0-9]+) failed.*/\1/p') + e=$(printf '%s\n' "$1" | sed -nE 's/.* ([0-9]+) errored.*/\1/p') + if [ "${f:-0}" -ne 0 ] || [ "${e:-0}" -ne 0 ]; then + echo " !! WARNING: $2/$3 had $f failed / $e errored -- req/s is NOT valid" + record "${2}-FAILED" "$mpm" "$f/$e" + fi +} + +run_mpm() { + local mpm="$1" base out rps mean + echo + echo "######## $mpm ########" + write_conf "$mpm" + "$HTTPD" -f "$RUNDIR/httpd.conf" -t >/dev/null 2>&1 \ + || { echo " config invalid for $mpm; skipping"; return; } + start_httpd + # confirm which MPM is actually serving + sleep 0.5 + + if [ "$have_ab" -eq 1 ]; then + base="http://127.0.0.1:$PORT" + echo "-- HTTP/1.1 keep-alive ($REQS req, c=$CONC)" + out=$(ab -n "$REQS" -c "$CONC" -k -q "$base/" 2>&1) + rps=$(ab_rps "$out"); mean=$(ab_mean "$out") + echo " req/s=$rps mean(ms/req-across-conc)=$mean" + record "h1-keepalive" "$mpm" "$rps" + + echo "-- HTTP/1.1 no keep-alive ($REQS_NOKA req, c=$CONC)" + out=$(ab -n "$REQS_NOKA" -c "$CONC" -q "$base/" 2>&1) + rps=$(ab_rps "$out") + echo " req/s=$rps" + record "h1-no-keepalive" "$mpm" "$rps" + else + echo "-- ab not found; skipping HTTP/1.1 throughput" + fi + + if [ "$do_h2" -eq 1 ]; then + base="https://localhost:$PORT" + echo "-- HTTP/2 ($H2_REQS req, c=$H2_CONC, m=$H2_STREAMS, small body)" + out=$(h2load -n "$H2_REQS" -c "$H2_CONC" -m "$H2_STREAMS" "$base/" 2>&1) + rps=$(h2_rps "$out"); h2_check_clean "$out" "h2-small" "$mpm" + echo " req/s=$rps ($(h2load_stat "$out" '^requests:'))" + record "h2-small" "$mpm" "$rps" + + echo "-- HTTP/2 large body ($H2_BIG_REQS req, c=$H2_CONC, m=$H2_STREAMS, 200KB)" + out=$(h2load -n "$H2_BIG_REQS" -c "$H2_CONC" -m "$H2_STREAMS" "$base/big.txt" 2>&1) + rps=$(h2_rps "$out"); h2_check_clean "$out" "h2-large" "$mpm" + echo " req/s=$rps" + record "h2-large" "$mpm" "$rps" + else + echo "-- h2load/ssl unavailable; skipping HTTP/2 throughput" + fi + + stop_httpd +} + +echo "==== motorz vs event MPM benchmark ====" +echo "host: $(uname -sm), cpus: $(sysctl -n hw.ncpu 2>/dev/null || nproc 2>/dev/null)" +echo "settings: 1 child, ThreadsPerChild=$THREADS, LogLevel error" +echo "tools: ab=$have_ab h2load=$have_h2load ssl=$have_ssl" + +run_mpm event +run_mpm motorz + +# ---- comparison table ------------------------------------------------------ +echo +echo "==== req/s comparison (higher is better) ====" +awk -F'\t' ' + { v[$1"|"$2]=$3; if(!seen[$1]++) order[++n]=$1 } + END { + printf "%-18s %12s %12s %10s\n", "workload", "event", "motorz", "motorz/event" + printf "%-18s %12s %12s %10s\n", "--------", "-----", "------", "-----------" + for(i=1;i<=n;i++){ + w=order[i]; e=v[w"|event"]; m=v[w"|motorz"] + ratio = (e+0>0 && m+0>0) ? sprintf("%.2fx", m/e) : "n/a" + printf "%-18s %12s %12s %10s\n", w, (e==""?"-":e), (m==""?"-":m), ratio + } + } +' "$RESULTS_FILE" +echo +echo "(req/s; ratio = motorz relative to event. Re-run a few times -- single" +echo " runs are noisy. LogLevel was 'error' so logging did not skew results.)" diff --git a/server/mpm/motorz/test/lib.sh b/server/mpm/motorz/test/lib.sh new file mode 100644 index 0000000000..3fd4e54215 --- /dev/null +++ b/server/mpm/motorz/test/lib.sh @@ -0,0 +1,177 @@ +# Shared helpers for the motorz MPM test harness. +# +# Sourced by run-http1.sh and run-http2.sh. Not executable on its own. +# +# Resolves the httpd build tree, locates the shared MPM/module .so files, +# generates a throwaway ServerRoot under a temp dir, and provides start/stop +# plus a tiny assertion API. Everything lives under a per-run temp dir that is +# removed on exit unless KEEP=1 is set in the environment. + +set -u + +# --- locate the build tree ------------------------------------------------- +# This script lives in /server/mpm/motorz/test/. The top of the +# build tree is four levels up. +TEST_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TOP="$(cd "$TEST_DIR/../../../.." && pwd)" + +HTTPD="$TOP/httpd" +PORT="${PORT:-8099}" # http1 default; http2 suite overrides to 8443 +TLS_PORT="${TLS_PORT:-8443}" + +PASS=0 +FAIL=0 +RUNDIR="" + +fail() { echo " FAIL: $*"; FAIL=$((FAIL + 1)); } +pass() { echo " ok: $*"; PASS=$((PASS + 1)); } + +# assert_eq