static IOCB helperHandleRead;
static IOCB helperStatefulHandleRead;
-static void Enqueue(helper * hlp, Helper::Xaction *);
-static helper_server *GetFirstAvailable(const helper::Pointer &);
+static void Enqueue(Helper::Client *, Helper::Xaction *);
+static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
-static void helperDispatch(helper_server * srv, Helper::Xaction * r);
+static void helperDispatch(Helper::Session *, Helper::Xaction *);
static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
-static void helperKickQueue(const helper::Pointer &);
+static void helperKickQueue(const Helper::Client::Pointer &);
static void helperStatefulKickQueue(const statefulhelper::Pointer &);
static void helperStatefulServerDone(helper_stateful_server * srv);
static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
-CBDATA_CLASS_INIT(helper_server);
+CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
CBDATA_CLASS_INIT(helper_stateful_server);
-InstanceIdDefinitions(HelperServerBase, "Hlpr");
+InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
void
-HelperServerBase::initStats()
+Helper::SessionBase::initStats()
{
stats.uses=0;
stats.replies=0;
}
void
-HelperServerBase::closePipesSafely(const char *id_name)
+Helper::SessionBase::closePipesSafely(const char * const id_name)
{
#if _SQUID_WINDOWS_
shutdown(writePipe->fd, SD_BOTH);
}
void
-HelperServerBase::closeWritePipeSafely(const char *id_name)
+Helper::SessionBase::closeWritePipeSafely(const char * const id_name)
{
#if _SQUID_WINDOWS_
shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
}
void
-HelperServerBase::dropQueued()
+Helper::SessionBase::dropQueued()
{
while (!requests.empty()) {
// XXX: re-schedule these on another helper?
- Helper::Xaction *r = requests.front();
+ const auto r = requests.front();
requests.pop_front();
void *cbdata;
if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
}
}
-HelperServerBase::~HelperServerBase()
+Helper::SessionBase::~SessionBase()
{
if (rbuf) {
memFreeBuf(rbuf_sz, rbuf);
}
}
-helper_server::~helper_server()
+Helper::Session::~Session()
{
wqueue->clean();
delete wqueue;
}
void
-helper_server::dropQueued()
+Helper::Session::dropQueued()
{
- HelperServerBase::dropQueued();
+ SessionBase::dropQueued();
requestsIndex.clear();
}
}
void
-helperOpenServers(const helper::Pointer &hlp)
+helperOpenServers(const Helper::Client::Pointer &hlp)
{
char *s;
char *progname;
char *procname;
const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
char fd_note_buf[FD_DESC_SZ];
- helper_server *srv;
int nargs = 0;
int k;
pid_t pid;
++successfullyStarted;
++ hlp->childs.n_running;
++ hlp->childs.n_active;
- srv = new helper_server;
+ const auto srv = new Helper::Session;
srv->hIpc = hIpc;
srv->pid = pid;
srv->initStats();
if (wfd != rfd)
commSetNonBlocking(wfd);
- AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
+ AsyncCall::Pointer closeCall = asyncCall(5,4, "Helper::Session::HelperServerClosed", cbdataDialer(Helper::Session::HelperServerClosed, srv));
comm_add_close_handler(rfd, closeCall);
if (hlp->timeout && hlp->childs.concurrency) {
- AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
- CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
+ CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
}
}
void
-helper::submitRequest(Helper::Xaction *r)
+Helper::Client::submitRequest(Helper::Xaction * const r)
{
- helper_server *srv;
-
- if ((srv = GetFirstAvailable(this)))
+ if (const auto srv = GetFirstAvailable(this))
helperDispatch(srv, r);
else
Enqueue(this, r);
/// handles helperSubmit() and helperStatefulSubmit() failures
static void
-SubmissionFailure(const helper::Pointer &hlp, HLPCB *callback, void *data)
+SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
{
auto result = Helper::Error;
if (!hlp) {
}
void
-helperSubmit(const helper::Pointer &hlp, const char *buf, HLPCB * callback, void *data)
+helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
{
if (!hlp || !hlp->trySubmit(buf, callback, data))
SubmissionFailure(hlp, callback, data);
/// whether queuing an additional request would overload the helper
bool
-helper::queueFull() const {
+Helper::Client::queueFull() const {
return stats.queue_size >= static_cast<int>(childs.queue_size);
}
bool
-helper::overloaded() const {
+Helper::Client::overloaded() const {
return stats.queue_size > static_cast<int>(childs.queue_size);
}
/// synchronizes queue-dependent measurements with the current queue state
void
-helper::syncQueueStats()
+Helper::Client::syncQueueStats()
{
if (overloaded()) {
if (overloadStart) {
/// returns true if and only if the submission should proceed
/// may kill Squid if the helper remains overloaded for too long
bool
-helper::prepSubmit()
+Helper::Client::prepSubmit()
{
// re-sync for the configuration may have changed since the last submission
syncQueueStats();
if (squid_curtime - overloadStart <= 180)
return true; // also OK: overload has not persisted long enough to panic
- if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
+ if (childs.onPersistentOverload == ChildConfig::actDie)
fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
if (!droppedRequests) {
}
bool
-helper::trySubmit(const char *buf, HLPCB * callback, void *data)
+Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
{
if (!prepSubmit())
return false; // request was dropped
/// dispatches or enqueues a helper requests; does not enforce queue limits
void
-helper::submit(const char *buf, HLPCB * callback, void *data)
+Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
{
- Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+ const auto r = new Xaction(callback, data, buf);
submitRequest(r);
debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
}
}
void
-helper::packStatsInto(Packable *p, const char *label) const
+Helper::Client::packStatsInto(Packable * const p, const char * const label) const
{
if (label)
p->appendf("%s:\n", label);
"Request");
for (dlink_node *link = servers.head; link; link = link->next) {
- HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
+ const auto srv = static_cast<SessionBase *>(link->data);
assert(srv);
- Helper::Xaction *xaction = srv->requests.empty() ? nullptr : srv->requests.front();
+ const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index.value,
}
bool
-helper::willOverload() const {
+Helper::Client::willOverload() const {
return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
}
-helper::Pointer
-helper::Make(const char *name)
+Helper::Client::Pointer
+Helper::Client::Make(const char * const name)
{
- return new helper(name);
+ return new Client(name);
}
statefulhelper::Pointer
}
void
-helperShutdown(const helper::Pointer &hlp)
+helperShutdown(const Helper::Client::Pointer &hlp)
{
dlink_node *link = hlp->servers.head;
while (link) {
- helper_server *srv;
- srv = (helper_server *)link->data;
+ const auto srv = static_cast<Helper::Session *>(link->data);
link = link->next;
if (srv->flags.shutdown) {
}
}
-helper::~helper()
+Helper::Client::~Client()
{
/* note, don't free id_name, it probably points to static memory */
}
void
-helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
+Helper::Client::handleKilledServer(SessionBase * const srv, bool &needsNewServers)
{
needsNewServers = false;
if (!srv->flags.shutdown) {
}
void
-helper::handleFewerServers(const bool madeProgress)
+Helper::Client::handleFewerServers(const bool madeProgress)
{
const auto needNew = childs.needNew();
}
void
-helper_server::HelperServerClosed(helper_server *srv)
+Helper::Session::HelperServerClosed(Session * const srv)
{
const auto hlp = srv->parent;
delete srv;
}
-// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
-// TODO: Fix the `helper` class hierarchy to use virtual functions.
+// XXX: Almost duplicates Helper::Session::HelperServerClosed() because helperOpenServers() is not a virtual method of the `Helper::Client` class
+// TODO: Fix the `Helper::Client` class hierarchy to use virtual functions.
void
helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
{
}
Helper::Xaction *
-helper_server::popRequest(int request_number)
+Helper::Session::popRequest(const int request_number)
{
- Helper::Xaction *r = nullptr;
- helper_server::RequestIndex::iterator it;
+ Xaction *r = nullptr;
if (parent->childs.concurrency) {
// If concurrency supported retrieve request from ID
- it = requestsIndex.find(request_number);
+ const auto it = requestsIndex.find(request_number);
if (it != requestsIndex.end()) {
r = *(it->second);
requests.erase(it->second);
/// Calls back with a pointer to the buffer with the helper output
static void
-helperReturnBuffer(helper_server * srv, const helper::Pointer &hlp, char * msg, size_t msgSize, char * msgEnd)
+helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
{
if (Helper::Xaction *r = srv->replyXaction) {
const bool hasSpace = r->reply.accumulate(msg, msgSize);
static void
helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
- helper_server *srv = (helper_server *)data;
+ const auto srv = static_cast<Helper::Session *>(data);
const auto hlp = srv->parent;
assert(cbdataReferenceValid(data));
/// Handles a request when all running helpers, if any, are busy.
static void
-Enqueue(helper * hlp, Helper::Xaction * r)
+Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
{
hlp->queue.push(r);
++ hlp->stats.queue_size;
}
Helper::Xaction *
-helper::nextRequest()
+Helper::Client::nextRequest()
{
if (queue.empty())
return nullptr;
return r;
}
-static helper_server *
-GetFirstAvailable(const helper::Pointer &hlp)
+static Helper::Session *
+GetFirstAvailable(const Helper::Client::Pointer &hlp)
{
dlink_node *n;
- helper_server *srv;
- helper_server *selected = nullptr;
+ Helper::Session *selected = nullptr;
debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
if (hlp->childs.n_running == 0)
/* Find "least" loaded helper (approx) */
for (n = hlp->servers.head; n != nullptr; n = n->next) {
- srv = (helper_server *)n->data;
+ const auto srv = static_cast<Helper::Session *>(n->data);
if (selected && selected->stats.pending <= srv->stats.pending)
continue;
static void
helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
{
- helper_server *srv = (helper_server *)data;
+ const auto srv = static_cast<Helper::Session *>(data);
srv->writebuf->clean();
delete srv->writebuf;
}
static void
-helperDispatch(helper_server * srv, Helper::Xaction * r)
+helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
{
const auto hlp = srv->parent;
const uint64_t reqId = ++srv->nextRequestId;
}
r->request.Id = reqId;
- helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
+ const auto it = srv->requests.insert(srv->requests.end(), r);
r->request.dispatch_time = current_time;
if (srv->wqueue->isNull())
srv->wqueue->init();
if (hlp->childs.concurrency) {
- srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
+ srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
assert(srv->requestsIndex.size() == srv->requests.size());
srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
} else
}
static void
-helperKickQueue(const helper::Pointer &hlp)
+helperKickQueue(const Helper::Client::Pointer &hlp)
{
- Helper::Xaction *r;
- helper_server *srv;
+ Helper::Xaction *r = nullptr;
+ Helper::Session *srv = nullptr;
while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
helperDispatch(srv, r);
}
void
-helper_server::checkForTimedOutRequests(bool const retry)
+Helper::Session::checkForTimedOutRequests(bool const retry)
{
assert(parent->childs.concurrency);
while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
- Helper::Xaction *r = requests.front();
+ const auto r = requests.front();
RequestIndex::iterator it;
it = requestsIndex.find(r->request.Id);
assert(it != requestsIndex.end());
}
void
-helper_server::requestTimeout(const CommTimeoutCbParams &io)
+Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
{
debugs(26, 3, io.conn);
- helper_server *srv = static_cast<helper_server *>(io.data);
+ const auto srv = static_cast<Session *>(io.data);
srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
- debugs(84, 3, io.conn << " establish new helper_server::requestTimeout");
- AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
- CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+ debugs(84, 3, io.conn << " establish a new timeout");
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
+ CommTimeoutCbPtrFun(Session::requestTimeout, srv));
const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
#include <queue>
#include <unordered_map>
+class CommTimeoutCbParams;
+class MemBuf;
class Packable;
class wordlist;
Helper::Request request;
Helper::Reply reply;
};
-}
-class HelperServerBase;
+class SessionBase;
+
/**
* Managers a set of individual helper processes with a common queue of requests.
*
* If an overloaded helper has been overloaded for 3+ minutes, an attempt to use
* it results in on-persistent-overload action, which may kill worker.
*/
-class helper: public RefCountable
+class Client: public RefCountable
{
public:
- using Pointer = RefCount<helper>;
+ using Pointer = RefCount<Client>;
- /// \returns a newly created instance of the named helper
+ /// \returns a newly created instance of the named helper client
/// \param name admin-visible helper category (with this process lifetime)
static Pointer Make(const char *name);
- ~helper();
+ virtual ~Client();
/// \returns next request in the queue, or nil.
- Helper::Xaction *nextRequest();
+ Xaction *nextRequest();
/// If possible, submit request. Otherwise, either kill Squid or return false.
bool trySubmit(const char *buf, HLPCB * callback, void *data);
/// Submits a request to the helper or add it to the queue if none of
/// the servers is available.
- void submitRequest(Helper::Xaction *r);
+ void submitRequest(Xaction *);
/// Dump some stats about the helper state to a Packable object
void packStatsInto(Packable *p, const char *label = nullptr) const;
/// already overloaded helpers return true
bool willOverload() const;
- /// Updates interall statistics and start new helper server processes after
+ /// Updates internal statistics and starts new helper processes after
/// an unexpected server exit
- /// \param needsNewServers true if new servers must started, false otherwise
- void handleKilledServer(HelperServerBase *srv, bool &needsNewServers);
+ /// \param needsNewServers true if new helper(s) must be started, false otherwise
+ void handleKilledServer(SessionBase *, bool &needsNewServers);
- /// Reacts to unexpected server death(s), including a failure to start server(s)
- /// and an unexpected exit of a previously started server. \sa handleKilledServer()
- /// \param madeProgress whether the died server(s) responded to any requests
+ /// Reacts to unexpected helper process death(s), including a failure to start helper(s)
+ /// and an unexpected exit of a previously started helper. \sa handleKilledServer()
+ /// \param madeProgress whether the died helper(s) responded to any requests
void handleFewerServers(bool madeProgress);
public:
wordlist *cmdline = nullptr;
dlink_list servers;
- std::queue<Helper::Xaction *> queue;
+ std::queue<Xaction *> queue;
const char *id_name = nullptr;
- Helper::ChildConfig childs; ///< Configuration settings for number running.
+ ChildConfig childs; ///< Configuration settings for number running.
int ipc_type = 0;
Ip::Address addr;
unsigned int droppedRequests = 0; ///< requests not sent during helper overload
} stats;
protected:
- friend void helperSubmit(const helper::Pointer &, const char *buf, HLPCB * callback, void *data);
-
/// \param name admin-visible helper category (with this process lifetime)
- explicit helper(const char *name): id_name(name) {}
+ explicit Client(const char * const name): id_name(name) {}
bool queueFull() const;
bool overloaded() const;
void submit(const char *buf, HLPCB * callback, void *data);
};
-class statefulhelper : public helper
+} // namespace Helper
+
+// TODO: Rename to a *Client.
+class statefulhelper: public Helper::Client
{
public:
using Pointer = RefCount<statefulhelper>;
typedef std::unordered_map<Helper::ReservationId, helper_stateful_server *> Reservations;
- inline ~statefulhelper() {}
+ ~statefulhelper() override = default;
-public:
static Pointer Make(const char *name);
/// reserve the given server
private:
friend void helperStatefulSubmit(const statefulhelper::Pointer &, const char *buf, HLPCB *, void *cbData, const Helper::ReservationId &);
- explicit statefulhelper(const char *name): helper(name) {}
+ explicit statefulhelper(const char * const name): Helper::Client(name) {}
/// \return the previously reserved server (if the reservation is still valid) or nil
helper_stateful_server *findServer(const Helper::ReservationId & reservation);
Reservations reservations;
};
-/// represents a single helper process abstraction
-class HelperServerBase: public CbdataParent
+namespace Helper
+{
+
+/// represents a single helper process
+class SessionBase: public CbdataParent
{
public:
- ~HelperServerBase() override;
+ ~SessionBase() override;
+
/** Closes pipes to the helper safely.
* Handles the case where the read and write pipes are the same FD.
*
/// whether the server is locked for exclusive use by a client
virtual bool reserved() = 0;
- /// dequeues and sends a Helper::Unknown answer to all queued requests
+ /// dequeues and sends an Unknown answer to all queued requests
virtual void dropQueued();
public:
/// Helper program identifier; does not change when contents do,
/// including during assignment
- const InstanceId<HelperServerBase> index;
+ const InstanceId<SessionBase> index;
+
int pid;
Ip::Address addr;
Comm::ConnectionPointer readPipe;
bool shutdown;
} flags;
- typedef std::list<Helper::Xaction *> Requests;
+ using Requests = std::list<Xaction *>;
Requests requests; ///< requests in order of submission/expiration
struct {
void initStats();
};
-class MemBuf;
-class CommTimeoutCbParams;
-
-// TODO: Rename to StatelessHelperServer and rename HelperServerBase to HelperServer.
-/// represents a single "stateless helper" process
-class helper_server : public HelperServerBase
+/// represents a single "stateless helper" process;
+/// supports concurrent helper requests
+class Session: public SessionBase
{
- CBDATA_CHILD(helper_server);
+ CBDATA_CHILD(Session);
public:
uint64_t nextRequestId;
MemBuf *wqueue;
MemBuf *writebuf;
- helper::Pointer parent;
+ Client::Pointer parent;
/// The helper request Xaction object for the current reply .
/// A helper reply may be distributed to more than one of the retrieved
/// packets from helper. This member stores the Xaction object as long as
/// the end-of-message for current reply is not retrieved.
- Helper::Xaction *replyXaction;
+ Xaction *replyXaction;
/// Whether to ignore current message, because it is timed-out or other reason
bool ignoreToEom;
typedef std::map<uint64_t, Requests::iterator> RequestIndex;
RequestIndex requestsIndex; ///< maps request IDs to requests
- ~helper_server() override;
+ ~Session() override;
+
/// Search in queue for the request with requestId, return the related
/// Xaction object and remove it from queue.
/// If concurrency is disabled then the requestId is ignored and the
/// Xaction of the next request in queue is retrieved.
- Helper::Xaction *popRequest(int requestId);
+ Xaction *popRequest(int requestId);
/// Run over the active requests lists and forces a retry, or timedout reply
/// or the configured "on timeout response" for timedout requests.
void checkForTimedOutRequests(bool const retry);
- /*HelperServerBase API*/
+ /* SessionBase API */
bool reserved() override {return false;}
void dropQueued() override;
static void requestTimeout(const CommTimeoutCbParams &io);
/// close handler to handle exited server processes
- static void HelperServerClosed(helper_server *srv);
+ static void HelperServerClosed(Session *);
};
-// TODO: Rename to StatefulHelperServer and rename HelperServerBase to HelperServer.
-/// represents a single "stateful helper" process
-class helper_stateful_server : public HelperServerBase
+} // namespace Helper
+
+// TODO: Rename to a *Session, matching renamed statefulhelper.
+/// represents a single "stateful helper" process;
+/// supports exclusive transaction reservations
+class helper_stateful_server: public Helper::SessionBase
{
CBDATA_CHILD(helper_stateful_server);
time_t reservationStart; ///< when the last `reservation` was made
};
-void helperOpenServers(const helper::Pointer &);
+void helperOpenServers(const Helper::Client::Pointer &);
void helperStatefulOpenServers(const statefulhelper::Pointer &);
-void helperSubmit(const helper::Pointer &, const char *buf, HLPCB *, void *cbData);
+void helperSubmit(const Helper::Client::Pointer &, const char *buf, HLPCB *, void *cbData);
void helperStatefulSubmit(const statefulhelper::Pointer &, const char *buf, HLPCB *, void *cbData, uint64_t reservation);
-void helperShutdown(const helper::Pointer &);
+void helperShutdown(const Helper::Client::Pointer &);
void helperStatefulShutdown(const statefulhelper::Pointer &);
#endif /* SQUID_HELPER_H */