#include "comm/Write.h"
#include "CommCalls.h"
#include "HttpReply.h"
- #include "ipc/Coordinator.h"
+#include "HttpRequest.h"
+ #include "ipc/UdsOp.h"
#include "mgr/ActionWriter.h"
- #include "mgr/Command.h"
+#include "mgr/IntParam.h"
#include "mgr/Inquirer.h"
++#include "mgr/Command.h"
#include "mgr/Request.h"
#include "mgr/Response.h"
#include "SquidTime.h"
CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
- Mgr::Inquirer::RequestsMap Mgr::Inquirer::TheRequestsMap;
- unsigned int Mgr::Inquirer::LastRequestId = 0;
-
- /// compare Ipc::StrandCoord using kidId, for std::sort() below
- static bool
- LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
- {
- return c1.kidId < c2.kidId;
- }
- Mgr::Inquirer::Inquirer(Action::Pointer anAction, int aFd,
+ Mgr::Inquirer::Inquirer(Action::Pointer anAction,
const Request &aCause, const Ipc::StrandCoords &coords):
- AsyncJob("Mgr::Inquirer"),
- Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100),
++ Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
aggrAction(anAction),
- cause(aCause),
- fd(aFd),
- strands(applyQueryParams(coords, aCause.params.queryParams)), pos(strands.begin()),
- requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100)
+ fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket))
{
- debugs(16, 5, HERE << "FD " << aFd << " action: " << aggrAction);
+ debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction);
closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
Mgr::Inquirer::start()
{
debugs(16, 5, HERE);
+ Ipc::Inquirer::start();
Must(fd >= 0);
Must(aggrAction != NULL);
-
+
- std::auto_ptr<HttpReply> reply(new HttpReply);
- reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
- reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
- std::auto_ptr<MemBuf> replyBuf(reply->pack());
+ std::auto_ptr<MemBuf> replyBuf;
+ if (strands.empty()) {
+ LOCAL_ARRAY(char, url, MAX_URL);
+ snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf());
+ HttpRequest *req = HttpRequest::CreateFromUrl(url);
+ ErrorState *err = errorCon(ERR_INVALID_URL, HTTP_NOT_FOUND, req);
+ std::auto_ptr<HttpReply> reply(err->BuildHttpReply());
+ replyBuf.reset(reply->pack());
+ errorStateFree(err);
+ }
+ else {
+ std::auto_ptr<HttpReply> reply(new HttpReply);
+ reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
+ reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
+ replyBuf.reset(reply->pack());
+ }
writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
Comm::Write(fd, replyBuf.get(), writer);
mustStop("commClosed");
}
+ bool
+ Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
+ {
+ Mgr::Response& response = static_cast<Response&>(*aResponse);
+ if (response.hasAction())
+ aggrAction->add(response.getAction());
+ return true;
+ }
+
void
- Mgr::Inquirer::swanSong()
+ Mgr::Inquirer::sendResponse()
{
- debugs(16, 5, HERE);
- removeTimeoutEvent();
- if (requestId > 0) {
- DequeueRequest(requestId);
- requestId = 0;
- }
- if (aggrAction->aggregatable()) {
+ if (!strands.empty() && aggrAction->aggregatable()) {
removeCloseHandler();
AsyncJob::Start(new ActionWriter(aggrAction, fd));
fd = -1; // should not close fd because we passed it to ActionWriter
bool
Mgr::Inquirer::doneAll() const
{
- return !writer && pos == strands.end();
- }
-
- /// returns and forgets the right Inquirer callback for strand request
- AsyncCall::Pointer
- Mgr::Inquirer::DequeueRequest(unsigned int requestId)
- {
- debugs(16, 3, HERE << " requestId " << requestId);
- Must(requestId != 0);
- AsyncCall::Pointer call;
- RequestsMap::iterator request = TheRequestsMap.find(requestId);
- if (request != TheRequestsMap.end()) {
- call = request->second;
- Must(call != NULL);
- TheRequestsMap.erase(request);
- }
- return call;
- }
-
- void
- Mgr::Inquirer::HandleRemoteAck(const Mgr::Response& response)
- {
- Must(response.requestId != 0);
- AsyncCall::Pointer call = DequeueRequest(response.requestId);
- if (call != NULL) {
- HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
- Must(dialer);
- dialer->arg1 = response;
- ScheduleCallHere(call);
- }
- }
-
- /// called when we are no longer waiting for the strand to respond
- void
- Mgr::Inquirer::removeTimeoutEvent()
- {
- if (eventFind(&Inquirer::RequestTimedOut, this))
- eventDelete(&Inquirer::RequestTimedOut, this);
- }
-
- /// Mgr::Inquirer::requestTimedOut wrapper
- void
- Mgr::Inquirer::RequestTimedOut(void* param)
- {
- debugs(16, 3, HERE);
- Must(param != NULL);
- Inquirer* cmi = static_cast<Inquirer*>(param);
- // use async call to enable job call protection that time events lack
- CallJobHere(16, 5, cmi, Mgr::Inquirer, requestTimedOut);
- }
-
- /// called when the strand failed to respond (or finish responding) in time
- void
- Mgr::Inquirer::requestTimedOut()
- {
- debugs(16, 3, HERE);
- if (requestId != 0) {
- DequeueRequest(requestId);
- requestId = 0;
- Must(!done()); // or we should not be called
- ++pos; // advance after a failed inquiry
- inquire();
- }
- }
-
- const char*
- Mgr::Inquirer::status() const
- {
- static MemBuf buf;
- buf.reset();
- buf.Printf(" [FD %d, requestId %u]", fd, requestId);
- buf.terminate();
- return buf.content();
+ return !writer && Ipc::Inquirer::doneAll();
}
- QueryParam::Pointer processesParam = cause.params.queryParams.get("processes");
- QueryParam::Pointer workersParam = cause.params.queryParams.get("workers");
+
+Ipc::StrandCoords
+Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
+{
+ Ipc::StrandCoords strands;
+
-
- // order by ascending kid IDs; useful for non-aggregatable stats
- std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
++ Mgr::Request *cause = static_cast<Mgr::Request *>(request.getRaw());
++ QueryParam::Pointer processesParam = cause->params.queryParams.get("processes");
++ QueryParam::Pointer workersParam = cause->params.queryParams.get("workers");
+
+ if (processesParam == NULL || workersParam == NULL) {
+ if (processesParam != NULL) {
+ IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
+ if (param != NULL && param->type == QueryParam::ptInt) {
+ const std::vector<int>& processes = param->value();
+ for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
+ iter != aStrands.end(); ++iter)
+ {
+ if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
+ strands.push_back(*iter);
+ }
+ }
+ } else if (workersParam != NULL) {
+ IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
+ if (param != NULL && param->type == QueryParam::ptInt) {
+ const std::vector<int>& workers = param->value();
+ for (size_t i = 0; i < aStrands.size(); ++i)
+ {
+ if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
+ strands.push_back(aStrands[i]);
+ }
+ }
+ } else {
+ strands = aStrands;
+ }
+ }
+
+ debugs(0, 0, HERE << "strands kid IDs = ");
+ for (Ipc::StrandCoords::const_iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+ debugs(0, 0, HERE << iter->kidId);
+ }
+
+ return strands;
+}
--- /dev/null
+ /*
+ * $Id$
+ *
+ * DEBUG: section 49 SNMP Interface
+ *
+ */
+
+ #include "config.h"
+ #include "base/TextException.h"
+ #include "CommCalls.h"
+ #include "ipc/UdsOp.h"
+ #include "snmp_core.h"
+ #include "snmp/Inquirer.h"
+ #include "snmp/Response.h"
+ #include "snmp/Request.h"
+
+
+ CBDATA_NAMESPACED_CLASS_INIT(Snmp, Inquirer);
+
+
+ Snmp::Inquirer::Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords):
+ Ipc::Inquirer(aRequest.clone(), coords, 2),
+ aggrPdu(aRequest.pdu),
+ fd(ImportFdIntoComm(aRequest.fd, SOCK_DGRAM, IPPROTO_UDP, Ipc::fdnInSnmpSocket))
+ {
+ debugs(49, 5, HERE);
++
+ closer = asyncCall(49, 5, "Snmp::Inquirer::noteCommClosed",
+ CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
+ comm_add_close_handler(fd, closer);
+ }
+
+ /// closes our copy of the client connection socket
+ void
+ Snmp::Inquirer::cleanup()
+ {
+ if (fd >= 0) {
+ if (closer != NULL) {
+ comm_remove_close_handler(fd, closer);
+ closer = NULL;
+ }
+ comm_close(fd);
+ fd = -1;
+ }
+ }
+
+ void
+ Snmp::Inquirer::start()
+ {
+ debugs(49, 5, HERE);
+ Ipc::Inquirer::start();
+ Must(fd >= 0);
+ inquire();
+ }
+
+ void
+ Snmp::Inquirer::handleException(const std::exception& e)
+ {
+ aggrPdu.errstat = SNMP_ERR_GENERR;
+ Ipc::Inquirer::handleException(e);
+ }
+
+ bool
+ Snmp::Inquirer::aggregate(Response::Pointer aResponse)
+ {
+ Snmp::Response& response = static_cast<Snmp::Response&>(*aResponse);
+ bool error = response.pdu.errstat != SNMP_ERR_NOERROR;
+ if (error) {
+ aggrPdu = response.pdu;
+ } else {
+ aggrPdu.aggregate(response.pdu);
+ }
+ return !error;
+ }
+
+ /// called when the some external force closed our socket
+ void
+ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+ {
+ debugs(49, 5, HERE);
+ Must(fd < 0 || fd == params.fd);
+ fd = -1;
+ mustStop("commClosed");
+ }
+
+ bool
+ Snmp::Inquirer::doneAll() const
+ {
+ return !writer && Ipc::Inquirer::doneAll();
+ }
+
+ void
+ Snmp::Inquirer::sendResponse()
+ {
+ debugs(49, 5, HERE);
+ aggrPdu.fixAggregate();
+ aggrPdu.command = SNMP_PDU_RESPONSE;
+ u_char buffer[SNMP_REQUEST_SIZE];
+ int len = sizeof(buffer);
+ Snmp::Request& req = static_cast<Snmp::Request&>(*request);
+ snmp_build(&req.session, &aggrPdu, buffer, &len);
+ comm_udp_sendto(fd, req.address, buffer, len);
+ }