InstanceIdDefinitions(HelperServerBase, "Hlpr");
+void
+HelperServerBase::initStats()
+{
+ stats.uses=0;
+ stats.replies=0;
+ stats.pending=0;
+ stats.releases=0;
+ stats.timedout = 0;
+}
+
void
HelperServerBase::closePipesSafely(const char *id_name)
{
}
}
-HelperServerBase::HelperServerBase(const char *desc, Ip::Address &ip, int aPid, void *aIpc, int rfd, int wfd) :
- pid(aPid),
- addr(ip),
- readPipe(new Comm::Connection),
- writePipe(new Comm::Connection),
- hIpc(aIpc),
- rbuf(static_cast<char *>(memAllocBuf(ReadBufSize, &rbuf_sz)))
-{
- readPipe->fd = rfd;
- readPipe->noteStart();
- fd_note(readPipe->fd, desc);
-
- writePipe->fd = wfd;
- writePipe->noteStart();
- fd_note(writePipe->fd, desc);
-}
-
HelperServerBase::~HelperServerBase()
{
if (rbuf) {
}
}
-helper_server::helper_server(helper *hlp, int aPid, void *aIpc, int rfd, int wfd) :
- HelperServerBase(hlp->cmdline->key, hlp->addr, aPid, aIpc, rfd, wfd),
- parent(cbdataReference(hlp))
-{
-}
-
helper_server::~helper_server()
{
wqueue->clean();
requestsIndex.clear();
}
-helper_stateful_server::helper_stateful_server(statefulhelper *hlp, int aPid, void *aIpc, int rfd, int wfd) :
- HelperServerBase(hlp->cmdline->key, hlp->addr, aPid, aIpc, rfd, wfd),
- parent(cbdataReference(hlp))
-{
-}
-
helper_stateful_server::~helper_stateful_server()
{
/* TODO: walk the local queue of requests and carry them all out */
char *procname;
const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
char fd_note_buf[FD_DESC_SZ];
+ helper_server *srv;
int nargs = 0;
int k;
pid_t pid;
++ hlp->childs.n_running;
++ hlp->childs.n_active;
- auto *srv = new helper_server(hlp, pid, hIpc, rfd, wfd);
+ srv = new helper_server;
+ srv->hIpc = hIpc;
+ srv->pid = pid;
+ srv->initStats();
+ srv->addr = hlp->addr;
+ srv->readPipe = new Comm::Connection;
+ srv->readPipe->fd = rfd;
+ srv->writePipe = new Comm::Connection;
+ srv->writePipe->fd = wfd;
+ srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
+ srv->wqueue = new MemBuf;
+ srv->roffset = 0;
+ srv->nextRequestId = 0;
+ srv->replyXaction = NULL;
+ srv->ignoreToEom = false;
+ srv->parent = cbdataReference(hlp);
dlinkAddTail(srv, &srv->link, &hlp->servers);
if (rfd == wfd) {
++ hlp->childs.n_running;
++ hlp->childs.n_active;
- auto *srv = new helper_stateful_server(hlp, pid, hIpc, rfd, wfd);
+ helper_stateful_server *srv = new helper_stateful_server;
+ srv->hIpc = hIpc;
+ srv->pid = pid;
+ srv->initStats();
+ srv->addr = hlp->addr;
+ srv->readPipe = new Comm::Connection;
+ srv->readPipe->fd = rfd;
+ srv->writePipe = new Comm::Connection;
+ srv->writePipe->fd = wfd;
+ srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
+ srv->roffset = 0;
+ srv->parent = cbdataReference(hlp);
+ srv->reservationStart = 0;
+
dlinkAddTail(srv, &srv->link, &hlp->servers);
if (rfd == wfd) {
#include <queue>
#include <unordered_map>
-class CommTimeoutCbParams;
-class HelperServerBase;
-class MemBuf;
class Packable;
class wordlist;
};
}
+class HelperServerBase;
/**
* Managers a set of individual helper processes with a common queue of requests.
*
CBDATA_CLASS(helper);
public:
- explicit helper(const char *name) : id_name(name) {}
- virtual ~helper();
+ inline helper(const char *name) :
+ cmdline(NULL),
+ id_name(name),
+ ipc_type(0),
+ droppedRequests(0),
+ overloadStart(0),
+ last_queue_warn(0),
+ last_restart(0),
+ timeout(0),
+ retryTimedOut(false),
+ retryBrokenHelper(false),
+ eom('\n') {
+ memset(&stats, 0, sizeof(stats));
+ }
+ ~helper();
/// \returns next request in the queue, or nil.
Helper::Xaction *nextRequest();
void handleKilledServer(HelperServerBase *srv, bool &needsNewServers);
public:
- wordlist *cmdline = nullptr;
+ wordlist *cmdline;
dlink_list servers;
std::queue<Helper::Xaction *> queue;
- const char *id_name = nullptr;
- Helper::ChildConfig childs; ///< configuration settings for number running
- int ipc_type = 0;
+ const char *id_name;
+ Helper::ChildConfig childs; ///< Configuration settings for number running.
+ int ipc_type;
Ip::Address addr;
- unsigned int droppedRequests = 0; ///< requests not sent during helper overload
- time_t overloadStart = 0; ///< when the helper became overloaded (zero if it is not)
- time_t last_queue_warn = 0;
- time_t last_restart = 0;
- time_t timeout = 0; ///< requests timeout
- bool retryTimedOut = false; ///< whether the timed-out requests must retried
- bool retryBrokenHelper = false; ///< whether the requests must retried on BH replies
- SBuf onTimedOutResponse; ///< the response to use when helper response timedout
- char eom = '\n'; ///< the char which marks the end of (response) message
+ unsigned int droppedRequests; ///< requests not sent during helper overload
+ time_t overloadStart; ///< when the helper became overloaded (zero if it is not)
+ time_t last_queue_warn;
+ time_t last_restart;
+ time_t timeout; ///< Requests timeout
+ bool retryTimedOut; ///< Whether the timed-out requests must retried
+ bool retryBrokenHelper; ///< Whether the requests must retried on BH replies
+ SBuf onTimedOutResponse; ///< The response to use when helper response timedout
+ char eom; ///< The char which marks the end of (response) message, normally '\n'
struct _stats {
- int requests = 0;
- int replies = 0;
- int timedout = 0;
- int queue_size = 0;
- int avg_svc_time = 0;
+ int requests;
+ int replies;
+ int timedout;
+ int queue_size;
+ int avg_svc_time;
} stats;
protected:
public:
typedef std::unordered_map<Helper::ReservationId, helper_stateful_server *> Reservations;
- explicit statefulhelper(const char *name) : helper(name) {}
- virtual ~statefulhelper() = default;
+ inline statefulhelper(const char *name) : helper(name) {}
+ inline ~statefulhelper() {}
public:
/// reserve the given server
void submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation);
bool trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation);
- /// reserved servers indexed by reservation IDs
+ ///< reserved servers indexed by reservation IDs
Reservations reservations;
};
class HelperServerBase: public CbdataParent
{
public:
- HelperServerBase(const char *desc, Ip::Address &, int aPid, void *aIpc, int rfd, int wfd);
virtual ~HelperServerBase();
-
/** Closes pipes to the helper safely.
* Handles the case where the read and write pipes are the same FD.
*
/// Helper program identifier; does not change when contents do,
/// including during assignment
const InstanceId<HelperServerBase> index;
- int pid = 0;
+ int pid;
Ip::Address addr;
Comm::ConnectionPointer readPipe;
Comm::ConnectionPointer writePipe;
- void *hIpc = nullptr;
+ void *hIpc;
- char *rbuf = nullptr;
- size_t rbuf_sz = 0;
- size_t roffset = 0;
+ char *rbuf;
+ size_t rbuf_sz;
+ size_t roffset;
struct timeval dispatch_time;
struct timeval answer_time;
dlink_node link;
struct _helper_flags {
- bool writing = false;
- bool closing = false;
- bool shutdown = false;
+ bool writing;
+ bool closing;
+ bool shutdown;
} flags;
typedef std::list<Helper::Xaction *> Requests;
Requests requests; ///< requests in order of submission/expiration
struct {
- uint64_t uses = 0; ///< requests sent to this helper
- uint64_t replies = 0; ///< replies received from this helper
- uint64_t pending = 0; ///< queued lookups waiting to be sent to this helper
- uint64_t releases = 0; ///< times release() has been called on this helper (if stateful)
- uint64_t timedout = 0; ///< requests which timed-out
+ uint64_t uses; //< requests sent to this helper
+ uint64_t replies; //< replies received from this helper
+ uint64_t pending; //< queued lookups waiting to be sent to this helper
+ uint64_t releases; //< times release() has been called on this helper (if stateful)
+ uint64_t timedout; //< requests which timed-out
} stats;
+ void initStats();
};
+class MemBuf;
+class CommTimeoutCbParams;
+
// TODO: Rename to StatelessHelperServer and rename HelperServerBase to HelperServer.
/// represents a single "stateless helper" process
class helper_server : public HelperServerBase
CBDATA_CHILD(helper_server);
public:
- helper_server(helper *hlp, int pid, void *hIpc, int rfd, int wfd);
- virtual ~helper_server();
+ uint64_t nextRequestId;
- uint64_t nextRequestId = 0;
+ MemBuf *wqueue;
+ MemBuf *writebuf;
- MemBuf *wqueue = nullptr;
- MemBuf *writebuf = nullptr;
-
- helper *parent = nullptr;
+ helper *parent;
/// The helper request Xaction object for the current reply .
/// A helper reply may be distributed to more than one of the retrieved
/// packets from helper. This member stores the Xaction object as long as
/// the end-of-message for current reply is not retrieved.
- Helper::Xaction *replyXaction = nullptr;
+ Helper::Xaction *replyXaction;
/// Whether to ignore current message, because it is timed-out or other reason
- bool ignoreToEom = false;
+ bool ignoreToEom;
// STL says storing std::list iterators is safe when changing the list
typedef std::map<uint64_t, Requests::iterator> RequestIndex;
RequestIndex requestsIndex; ///< maps request IDs to requests
+ virtual ~helper_server();
/// Search in queue for the request with requestId, return the related
/// Xaction object and remove it from queue.
/// If concurrency is disabled then the requestId is ignored and the
CBDATA_CHILD(helper_stateful_server);
public:
- helper_stateful_server(statefulhelper *hlp, int pid, void *hIpc, int rfd, int wfd);
virtual ~helper_stateful_server();
-
void reserve();
void clearReservation();
/// close handler to handle exited server processes
static void HelperServerClosed(helper_stateful_server *srv);
- statefulhelper *parent = nullptr;
+ statefulhelper *parent;
// Reservations temporary lock the server for an exclusive "client" use. The
// client keeps the reservation ID as a proof of her reservation. If a
// reservation expires, and the server is reserved for another client, then
// the reservation ID presented by the late client will not match ours.
Helper::ReservationId reservationId; ///< "confirmation ID" of the last
- time_t reservationStart = 0; ///< when the last reservation was made
+ time_t reservationStart; ///< when the last `reservation` was made
};
/* helper.c */