From: Christos Tsantilas Date: Wed, 2 Mar 2011 16:40:40 +0000 (+0200) Subject: merge from trunk X-Git-Tag: take06~27^2~127^2~2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=995eb827aaa64d9bcd4110b756e04051ef9d03e9;p=thirdparty%2Fsquid.git merge from trunk --- 995eb827aaa64d9bcd4110b756e04051ef9d03e9 diff --cc src/mgr/Inquirer.cc index 8b9c54b8aa,4119045193..fcf3e333e2 --- a/src/mgr/Inquirer.cc +++ b/src/mgr/Inquirer.cc @@@ -10,12 -10,9 +10,12 @@@ #include "comm/Write.h" #include "CommCalls.h" #include "HttpReply.h" +#include "HttpRequest.h" - #include "ipc/Coordinator.h" + #include "ipc/UdsOp.h" #include "mgr/ActionWriter.h" - #include "mgr/Command.h" +#include "mgr/IntParam.h" #include "mgr/Inquirer.h" ++#include "mgr/Command.h" #include "mgr/Request.h" #include "mgr/Response.h" #include "SquidTime.h" @@@ -26,26 -21,14 +26,14 @@@ CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer); - Mgr::Inquirer::RequestsMap Mgr::Inquirer::TheRequestsMap; - unsigned int Mgr::Inquirer::LastRequestId = 0; - - /// compare Ipc::StrandCoord using kidId, for std::sort() below - static bool - LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2) - { - return c1.kidId < c2.kidId; - } - Mgr::Inquirer::Inquirer(Action::Pointer anAction, int aFd, + Mgr::Inquirer::Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords): - AsyncJob("Mgr::Inquirer"), - Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100), ++ Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100), aggrAction(anAction), - cause(aCause), - fd(aFd), - strands(applyQueryParams(coords, aCause.params.queryParams)), pos(strands.begin()), - requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100) + fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket)) { - debugs(16, 5, HERE << "FD " << aFd << " action: " << aggrAction); + debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction); closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed", CommCbMemFunT(this, &Inquirer::noteCommClosed)); @@@ -82,25 -59,14 +64,26 @@@ voi Mgr::Inquirer::start() { debugs(16, 5, HERE); + Ipc::Inquirer::start(); Must(fd >= 0); Must(aggrAction != NULL); - + - std::auto_ptr reply(new HttpReply); - reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime); - reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response - std::auto_ptr replyBuf(reply->pack()); + std::auto_ptr replyBuf; + if (strands.empty()) { + LOCAL_ARRAY(char, url, MAX_URL); + snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf()); + HttpRequest *req = HttpRequest::CreateFromUrl(url); + ErrorState *err = errorCon(ERR_INVALID_URL, HTTP_NOT_FOUND, req); + std::auto_ptr reply(err->BuildHttpReply()); + replyBuf.reset(reply->pack()); + errorStateFree(err); + } + else { + std::auto_ptr reply(new HttpReply); + reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime); + reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response + replyBuf.reset(reply->pack()); + } writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader", CommCbMemFunT(this, &Inquirer::noteWroteHeader)); Comm::Write(fd, replyBuf.get(), writer); @@@ -169,16 -95,19 +112,19 @@@ Mgr::Inquirer::noteCommClosed(const Com mustStop("commClosed"); } + bool + Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse) + { + Mgr::Response& response = static_cast(*aResponse); + if (response.hasAction()) + aggrAction->add(response.getAction()); + return true; + } + void - Mgr::Inquirer::swanSong() + Mgr::Inquirer::sendResponse() { - debugs(16, 5, HERE); - removeTimeoutEvent(); - if (requestId > 0) { - DequeueRequest(requestId); - requestId = 0; - } - if (aggrAction->aggregatable()) { + if (!strands.empty() && aggrAction->aggregatable()) { removeCloseHandler(); AsyncJob::Start(new ActionWriter(aggrAction, fd)); fd = -1; // should not close fd because we passed it to ActionWriter @@@ -189,123 -117,5 +134,49 @@@ bool Mgr::Inquirer::doneAll() const { - return !writer && pos == strands.end(); - } - - /// returns and forgets the right Inquirer callback for strand request - AsyncCall::Pointer - Mgr::Inquirer::DequeueRequest(unsigned int requestId) - { - debugs(16, 3, HERE << " requestId " << requestId); - Must(requestId != 0); - AsyncCall::Pointer call; - RequestsMap::iterator request = TheRequestsMap.find(requestId); - if (request != TheRequestsMap.end()) { - call = request->second; - Must(call != NULL); - TheRequestsMap.erase(request); - } - return call; - } - - void - Mgr::Inquirer::HandleRemoteAck(const Mgr::Response& response) - { - Must(response.requestId != 0); - AsyncCall::Pointer call = DequeueRequest(response.requestId); - if (call != NULL) { - HandleAckDialer* dialer = dynamic_cast(call->getDialer()); - Must(dialer); - dialer->arg1 = response; - ScheduleCallHere(call); - } - } - - /// called when we are no longer waiting for the strand to respond - void - Mgr::Inquirer::removeTimeoutEvent() - { - if (eventFind(&Inquirer::RequestTimedOut, this)) - eventDelete(&Inquirer::RequestTimedOut, this); - } - - /// Mgr::Inquirer::requestTimedOut wrapper - void - Mgr::Inquirer::RequestTimedOut(void* param) - { - debugs(16, 3, HERE); - Must(param != NULL); - Inquirer* cmi = static_cast(param); - // use async call to enable job call protection that time events lack - CallJobHere(16, 5, cmi, Mgr::Inquirer, requestTimedOut); - } - - /// called when the strand failed to respond (or finish responding) in time - void - Mgr::Inquirer::requestTimedOut() - { - debugs(16, 3, HERE); - if (requestId != 0) { - DequeueRequest(requestId); - requestId = 0; - Must(!done()); // or we should not be called - ++pos; // advance after a failed inquiry - inquire(); - } - } - - const char* - Mgr::Inquirer::status() const - { - static MemBuf buf; - buf.reset(); - buf.Printf(" [FD %d, requestId %u]", fd, requestId); - buf.terminate(); - return buf.content(); + return !writer && Ipc::Inquirer::doneAll(); } + +Ipc::StrandCoords +Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams) +{ + Ipc::StrandCoords strands; + - QueryParam::Pointer processesParam = cause.params.queryParams.get("processes"); - QueryParam::Pointer workersParam = cause.params.queryParams.get("workers"); ++ Mgr::Request *cause = static_cast(request.getRaw()); ++ QueryParam::Pointer processesParam = cause->params.queryParams.get("processes"); ++ QueryParam::Pointer workersParam = cause->params.queryParams.get("workers"); + + if (processesParam == NULL || workersParam == NULL) { + if (processesParam != NULL) { + IntParam* param = dynamic_cast(processesParam.getRaw()); + if (param != NULL && param->type == QueryParam::ptInt) { + const std::vector& processes = param->value(); + for (Ipc::StrandCoords::const_iterator iter = aStrands.begin(); + iter != aStrands.end(); ++iter) + { + if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end()) + strands.push_back(*iter); + } + } + } else if (workersParam != NULL) { + IntParam* param = dynamic_cast(workersParam.getRaw()); + if (param != NULL && param->type == QueryParam::ptInt) { + const std::vector& workers = param->value(); + for (size_t i = 0; i < aStrands.size(); ++i) + { + if (std::find(workers.begin(), workers.end(), i + 1) != workers.end()) + strands.push_back(aStrands[i]); + } + } + } else { + strands = aStrands; + } - - // order by ascending kid IDs; useful for non-aggregatable stats - std::sort(strands.begin(), strands.end(), LesserStrandByKidId); + } + + debugs(0, 0, HERE << "strands kid IDs = "); + for (Ipc::StrandCoords::const_iterator iter = strands.begin(); iter != strands.end(); ++iter) { + debugs(0, 0, HERE << iter->kidId); + } + + return strands; +} diff --cc src/mgr/Inquirer.h index 92cc0a1c3e,03d7efeda1..c8e6cc8673 --- a/src/mgr/Inquirer.h +++ b/src/mgr/Inquirer.h @@@ -37,30 -28,18 +28,19 @@@ public protected: /* AsyncJob API */ virtual void start(); - virtual void swanSong(); virtual bool doneAll() const; - virtual const char *status() const; - private: - typedef UnaryMemFunT HandleAckDialer; + /* Ipc::Inquirer API */ + virtual void cleanup(); + virtual void sendResponse(); + virtual bool aggregate(Ipc::Response::Pointer aResponse); - void inquire(); + private: void noteWroteHeader(const CommIoCbParams& params); void noteCommClosed(const CommCloseCbParams& params); - - void handleRemoteAck(const Response& response); - - static AsyncCall::Pointer DequeueRequest(unsigned int requestId); - - static void RequestTimedOut(void* param); - void requestTimedOut(); - void removeTimeoutEvent(); - - void close(); void removeCloseHandler(); - + Ipc::StrandCoords applyQueryParams(const Ipc::StrandCoords& aStrands, + const QueryParams& aParams); - private: Action::Pointer aggrAction; //< action to aggregate diff --cc src/snmp/Inquirer.cc index 0000000000,6d748b0355..d3ab625e82 mode 000000,100644..100644 --- a/src/snmp/Inquirer.cc +++ b/src/snmp/Inquirer.cc @@@ -1,0 -1,102 +1,103 @@@ + /* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + + #include "config.h" + #include "base/TextException.h" + #include "CommCalls.h" + #include "ipc/UdsOp.h" + #include "snmp_core.h" + #include "snmp/Inquirer.h" + #include "snmp/Response.h" + #include "snmp/Request.h" + + + CBDATA_NAMESPACED_CLASS_INIT(Snmp, Inquirer); + + + Snmp::Inquirer::Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords): + Ipc::Inquirer(aRequest.clone(), coords, 2), + aggrPdu(aRequest.pdu), + fd(ImportFdIntoComm(aRequest.fd, SOCK_DGRAM, IPPROTO_UDP, Ipc::fdnInSnmpSocket)) + { + debugs(49, 5, HERE); ++ + closer = asyncCall(49, 5, "Snmp::Inquirer::noteCommClosed", + CommCbMemFunT(this, &Inquirer::noteCommClosed)); + comm_add_close_handler(fd, closer); + } + + /// closes our copy of the client connection socket + void + Snmp::Inquirer::cleanup() + { + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } + comm_close(fd); + fd = -1; + } + } + + void + Snmp::Inquirer::start() + { + debugs(49, 5, HERE); + Ipc::Inquirer::start(); + Must(fd >= 0); + inquire(); + } + + void + Snmp::Inquirer::handleException(const std::exception& e) + { + aggrPdu.errstat = SNMP_ERR_GENERR; + Ipc::Inquirer::handleException(e); + } + + bool + Snmp::Inquirer::aggregate(Response::Pointer aResponse) + { + Snmp::Response& response = static_cast(*aResponse); + bool error = response.pdu.errstat != SNMP_ERR_NOERROR; + if (error) { + aggrPdu = response.pdu; + } else { + aggrPdu.aggregate(response.pdu); + } + return !error; + } + + /// called when the some external force closed our socket + void + Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params) + { + debugs(49, 5, HERE); + Must(fd < 0 || fd == params.fd); + fd = -1; + mustStop("commClosed"); + } + + bool + Snmp::Inquirer::doneAll() const + { + return !writer && Ipc::Inquirer::doneAll(); + } + + void + Snmp::Inquirer::sendResponse() + { + debugs(49, 5, HERE); + aggrPdu.fixAggregate(); + aggrPdu.command = SNMP_PDU_RESPONSE; + u_char buffer[SNMP_REQUEST_SIZE]; + int len = sizeof(buffer); + Snmp::Request& req = static_cast(*request); + snmp_build(&req.session, &aggrPdu, buffer, &len); + comm_udp_sendto(fd, req.address, buffer, len); + }