]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
merge from trunk
authorChristos Tsantilas <chtsanti@users.sourceforge.net>
Wed, 2 Mar 2011 16:40:40 +0000 (18:40 +0200)
committerChristos Tsantilas <chtsanti@users.sourceforge.net>
Wed, 2 Mar 2011 16:40:40 +0000 (18:40 +0200)
1  2 
src/mgr/Inquirer.cc
src/mgr/Inquirer.h
src/snmp/Inquirer.cc

index 8b9c54b8aaf9bee206db2fcae9cdc6c5eca23d68,411904519361e515b64c3dec96288862c9e80eb2..fcf3e333e2a18a5fe21526401d19066d77bfedf1
  #include "comm/Write.h"
  #include "CommCalls.h"
  #include "HttpReply.h"
- #include "ipc/Coordinator.h"
 +#include "HttpRequest.h"
+ #include "ipc/UdsOp.h"
  #include "mgr/ActionWriter.h"
- #include "mgr/Command.h"
 +#include "mgr/IntParam.h"
  #include "mgr/Inquirer.h"
++#include "mgr/Command.h"
  #include "mgr/Request.h"
  #include "mgr/Response.h"
  #include "SquidTime.h"
  
  CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
  
- Mgr::Inquirer::RequestsMap Mgr::Inquirer::TheRequestsMap;
- unsigned int Mgr::Inquirer::LastRequestId = 0;
- /// compare Ipc::StrandCoord using kidId, for std::sort() below
- static bool
- LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
- {
-     return c1.kidId < c2.kidId;
- }
  
- Mgr::Inquirer::Inquirer(Action::Pointer anAction, int aFd,
+ Mgr::Inquirer::Inquirer(Action::Pointer anAction,
                          const Request &aCause, const Ipc::StrandCoords &coords):
-         AsyncJob("Mgr::Inquirer"),
 -        Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100),
++        Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
          aggrAction(anAction),
-         cause(aCause),
-         fd(aFd),
-         strands(applyQueryParams(coords, aCause.params.queryParams)), pos(strands.begin()),
-         requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100)
+         fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket))
  {
-     debugs(16, 5, HERE << "FD " << aFd << " action: " << aggrAction);
+     debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction);
  
      closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
                         CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
@@@ -82,25 -59,14 +64,26 @@@ voi
  Mgr::Inquirer::start()
  {
      debugs(16, 5, HERE);
+     Ipc::Inquirer::start();
      Must(fd >= 0);
      Must(aggrAction != NULL);
-  
 -    std::auto_ptr<HttpReply> reply(new HttpReply);
 -    reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
 -    reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
 -    std::auto_ptr<MemBuf> replyBuf(reply->pack());
 +    std::auto_ptr<MemBuf> replyBuf;
 +    if (strands.empty()) {
 +        LOCAL_ARRAY(char, url, MAX_URL);
 +        snprintf(url, MAX_URL, "%s", aggrAction->command().params.httpUri.termedBuf());
 +        HttpRequest *req = HttpRequest::CreateFromUrl(url);
 +        ErrorState *err = errorCon(ERR_INVALID_URL, HTTP_NOT_FOUND, req);
 +        std::auto_ptr<HttpReply> reply(err->BuildHttpReply());
 +        replyBuf.reset(reply->pack());
 +        errorStateFree(err);
 +    }
 +    else {
 +        std::auto_ptr<HttpReply> reply(new HttpReply);
 +        reply->setHeaders(HTTP_OK, NULL, "text/plain", -1, squid_curtime, squid_curtime);
 +        reply->header.putStr(HDR_CONNECTION, "close"); // until we chunk response
 +        replyBuf.reset(reply->pack());
 +    }
      writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
                         CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
      Comm::Write(fd, replyBuf.get(), writer);
@@@ -169,16 -95,19 +112,19 @@@ Mgr::Inquirer::noteCommClosed(const Com
      mustStop("commClosed");
  }
  
+ bool
+ Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
+ {
+     Mgr::Response& response = static_cast<Response&>(*aResponse);
+     if (response.hasAction())
+         aggrAction->add(response.getAction());
+     return true;
+ }
  void
- Mgr::Inquirer::swanSong()
+ Mgr::Inquirer::sendResponse()
  {
-     debugs(16, 5, HERE);
-     removeTimeoutEvent();
-     if (requestId > 0) {
-         DequeueRequest(requestId);
-         requestId = 0;
-     }
 -    if (aggrAction->aggregatable()) {
 +    if (!strands.empty() && aggrAction->aggregatable()) {
          removeCloseHandler();
          AsyncJob::Start(new ActionWriter(aggrAction, fd));
          fd = -1; // should not close fd because we passed it to ActionWriter
  bool
  Mgr::Inquirer::doneAll() const
  {
-     return !writer && pos == strands.end();
- }
- /// returns and forgets the right Inquirer callback for strand request
- AsyncCall::Pointer
- Mgr::Inquirer::DequeueRequest(unsigned int requestId)
- {
-     debugs(16, 3, HERE << " requestId " << requestId);
-     Must(requestId != 0);
-     AsyncCall::Pointer call;
-     RequestsMap::iterator request = TheRequestsMap.find(requestId);
-     if (request != TheRequestsMap.end()) {
-         call = request->second;
-         Must(call != NULL);
-         TheRequestsMap.erase(request);
-     }
-     return call;
- }
- void
- Mgr::Inquirer::HandleRemoteAck(const Mgr::Response& response)
- {
-     Must(response.requestId != 0);
-     AsyncCall::Pointer call = DequeueRequest(response.requestId);
-     if (call != NULL) {
-         HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
-         Must(dialer);
-         dialer->arg1 = response;
-         ScheduleCallHere(call);
-     }
- }
- /// called when we are no longer waiting for the strand to respond
- void
- Mgr::Inquirer::removeTimeoutEvent()
- {
-     if (eventFind(&Inquirer::RequestTimedOut, this))
-         eventDelete(&Inquirer::RequestTimedOut, this);
- }
- /// Mgr::Inquirer::requestTimedOut wrapper
- void
- Mgr::Inquirer::RequestTimedOut(void* param)
- {
-     debugs(16, 3, HERE);
-     Must(param != NULL);
-     Inquirer* cmi = static_cast<Inquirer*>(param);
-     // use async call to enable job call protection that time events lack
-     CallJobHere(16, 5, cmi, Mgr::Inquirer, requestTimedOut);
- }
- /// called when the strand failed to respond (or finish responding) in time
- void
- Mgr::Inquirer::requestTimedOut()
- {
-     debugs(16, 3, HERE);
-     if (requestId != 0) {
-         DequeueRequest(requestId);
-         requestId = 0;
-         Must(!done()); // or we should not be called
-         ++pos; // advance after a failed inquiry
-         inquire();
-     }
- }
- const char*
- Mgr::Inquirer::status() const
- {
-     static MemBuf buf;
-     buf.reset();
-     buf.Printf(" [FD %d, requestId %u]", fd, requestId);
-     buf.terminate();
-     return buf.content();
+     return !writer && Ipc::Inquirer::doneAll();
  }
-     QueryParam::Pointer processesParam = cause.params.queryParams.get("processes");
-     QueryParam::Pointer workersParam = cause.params.queryParams.get("workers");
 +
 +Ipc::StrandCoords
 +Mgr::Inquirer::applyQueryParams(const Ipc::StrandCoords& aStrands, const QueryParams& aParams)
 +{
 +    Ipc::StrandCoords strands;
 +
-         // order by ascending kid IDs; useful for non-aggregatable stats
-         std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
++    Mgr::Request *cause = static_cast<Mgr::Request *>(request.getRaw());
++    QueryParam::Pointer processesParam = cause->params.queryParams.get("processes");
++    QueryParam::Pointer workersParam = cause->params.queryParams.get("workers");
 +
 +    if (processesParam == NULL || workersParam == NULL) {
 +        if (processesParam != NULL) {
 +            IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
 +            if (param != NULL && param->type == QueryParam::ptInt) {
 +                const std::vector<int>& processes = param->value();
 +                for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
 +                     iter != aStrands.end(); ++iter)
 +                {
 +                    if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
 +                        strands.push_back(*iter);
 +                }
 +            }
 +        } else if (workersParam != NULL) {
 +            IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
 +            if (param != NULL && param->type == QueryParam::ptInt) {
 +                const std::vector<int>& workers = param->value();
 +                for (size_t i = 0; i < aStrands.size(); ++i)
 +                {
 +                    if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
 +                        strands.push_back(aStrands[i]);
 +                }
 +            }
 +        } else {
 +            strands = aStrands;
 +        }
 +    }
 +
 +    debugs(0, 0, HERE << "strands kid IDs = ");
 +    for (Ipc::StrandCoords::const_iterator iter = strands.begin(); iter != strands.end(); ++iter) {
 +        debugs(0, 0, HERE << iter->kidId);
 +    }
 +
 +    return strands;
 +}
index 92cc0a1c3e88073c1bd59342d329d512d0e49440,03d7efeda1ed1c6919c35ee1f6c17b5bfe293fa5..c8e6cc8673fc57dc370ff041d24219b2b21ac099
@@@ -37,30 -28,18 +28,19 @@@ public
  protected:
      /* AsyncJob API */
      virtual void start();
-     virtual void swanSong();
      virtual bool doneAll() const;
-     virtual const char *status() const;
  
- private:
-     typedef UnaryMemFunT<Inquirer, Response, const Response&> HandleAckDialer;
+     /* Ipc::Inquirer API */
+     virtual void cleanup();
+     virtual void sendResponse();
+     virtual bool aggregate(Ipc::Response::Pointer aResponse);
  
-     void inquire();
+ private:
      void noteWroteHeader(const CommIoCbParams& params);
      void noteCommClosed(const CommCloseCbParams& params);
-     void handleRemoteAck(const Response& response);
-     static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
-     static void RequestTimedOut(void* param);
-     void requestTimedOut();
-     void removeTimeoutEvent();
-     void close();
      void removeCloseHandler();
 -
 +    Ipc::StrandCoords applyQueryParams(const Ipc::StrandCoords& aStrands,
 +                                       const QueryParams& aParams);
  private:
      Action::Pointer aggrAction; //< action to aggregate
  
index 0000000000000000000000000000000000000000,6d748b0355f2b3ea84c37bd620b996befb848f8a..d3ab625e827ae875d3e2a0b76292f75392c01e6a
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,102 +1,103 @@@
+ /*
+  * $Id$
+  *
+  * DEBUG: section 49    SNMP Interface
+  *
+  */
+ #include "config.h"
+ #include "base/TextException.h"
+ #include "CommCalls.h"
+ #include "ipc/UdsOp.h"
+ #include "snmp_core.h"
+ #include "snmp/Inquirer.h"
+ #include "snmp/Response.h"
+ #include "snmp/Request.h"
+ CBDATA_NAMESPACED_CLASS_INIT(Snmp, Inquirer);
+ Snmp::Inquirer::Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords):
+         Ipc::Inquirer(aRequest.clone(), coords, 2),
+         aggrPdu(aRequest.pdu),
+         fd(ImportFdIntoComm(aRequest.fd, SOCK_DGRAM, IPPROTO_UDP, Ipc::fdnInSnmpSocket))
+ {
+     debugs(49, 5, HERE);
++
+     closer = asyncCall(49, 5, "Snmp::Inquirer::noteCommClosed",
+                        CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
+     comm_add_close_handler(fd, closer);
+ }
+ /// closes our copy of the client connection socket
+ void
+ Snmp::Inquirer::cleanup()
+ {
+     if (fd >= 0) {
+         if (closer != NULL) {
+             comm_remove_close_handler(fd, closer);
+             closer = NULL;
+         }
+         comm_close(fd);
+         fd = -1;
+     }
+ }
+ void
+ Snmp::Inquirer::start()
+ {
+     debugs(49, 5, HERE);
+     Ipc::Inquirer::start();
+     Must(fd >= 0);
+     inquire();
+ }
+ void
+ Snmp::Inquirer::handleException(const std::exception& e)
+ {
+     aggrPdu.errstat = SNMP_ERR_GENERR;
+     Ipc::Inquirer::handleException(e);
+ }
+ bool
+ Snmp::Inquirer::aggregate(Response::Pointer aResponse)
+ {
+     Snmp::Response& response = static_cast<Snmp::Response&>(*aResponse);
+     bool error = response.pdu.errstat != SNMP_ERR_NOERROR;
+     if (error) {
+         aggrPdu = response.pdu;
+     } else {
+         aggrPdu.aggregate(response.pdu);
+     }
+     return !error;
+ }
+ /// called when the some external force closed our socket
+ void
+ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+ {
+     debugs(49, 5, HERE);
+     Must(fd < 0 || fd == params.fd);
+     fd = -1;
+     mustStop("commClosed");
+ }
+ bool
+ Snmp::Inquirer::doneAll() const
+ {
+     return !writer && Ipc::Inquirer::doneAll();
+ }
+ void
+ Snmp::Inquirer::sendResponse()
+ {
+     debugs(49, 5, HERE);
+     aggrPdu.fixAggregate();
+     aggrPdu.command = SNMP_PDU_RESPONSE;
+     u_char buffer[SNMP_REQUEST_SIZE];
+     int len = sizeof(buffer);
+     Snmp::Request& req = static_cast<Snmp::Request&>(*request);
+     snmp_build(&req.session, &aggrPdu, buffer, &len);
+     comm_udp_sendto(fd, req.address, buffer, len);
+ }