]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
importing smp-snmp patch
authorChristos Tsantilas <chtsanti@users.sourceforge.net>
Wed, 19 Jan 2011 10:12:54 +0000 (12:12 +0200)
committerChristos Tsantilas <chtsanti@users.sourceforge.net>
Wed, 19 Jan 2011 10:12:54 +0000 (12:12 +0200)
50 files changed:
configure.ac
include/Range.h
src/Makefile.am
src/client_side.cc
src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/Forwarder.cc [new file with mode: 0644]
src/ipc/Forwarder.h [new file with mode: 0644]
src/ipc/Inquirer.cc [new file with mode: 0644]
src/ipc/Inquirer.h [new file with mode: 0644]
src/ipc/Makefile.am
src/ipc/Messages.h
src/ipc/Request.cc [new file with mode: 0644]
src/ipc/Request.h [new file with mode: 0644]
src/ipc/Response.cc [new file with mode: 0644]
src/ipc/Response.h [new file with mode: 0644]
src/ipc/Strand.cc
src/ipc/Strand.h
src/ipc/UdsOp.cc
src/ipc/UdsOp.h
src/ipc/forward.h
src/mgr/Forwarder.cc
src/mgr/Forwarder.h
src/mgr/FunAction.cc
src/mgr/InfoAction.cc
src/mgr/Inquirer.cc
src/mgr/Inquirer.h
src/mgr/Request.cc
src/mgr/Request.h
src/mgr/Response.cc
src/mgr/Response.h
src/mgr/StoreToCommWriter.cc
src/mgr/StoreToCommWriter.h
src/snmp_core.cc
src/snmp_core.h [new file with mode: 0644]
src/snmpx/Forwarder.cc [new file with mode: 0644]
src/snmpx/Forwarder.h [new file with mode: 0644]
src/snmpx/Inquirer.cc [new file with mode: 0644]
src/snmpx/Inquirer.h [new file with mode: 0644]
src/snmpx/Pdu.cc [new file with mode: 0644]
src/snmpx/Pdu.h [new file with mode: 0644]
src/snmpx/Request.cc [new file with mode: 0644]
src/snmpx/Request.h [new file with mode: 0644]
src/snmpx/Response.cc [new file with mode: 0644]
src/snmpx/Response.h [new file with mode: 0644]
src/snmpx/Session.cc [new file with mode: 0644]
src/snmpx/Session.h [new file with mode: 0644]
src/snmpx/Var.cc [new file with mode: 0644]
src/snmpx/Var.h [new file with mode: 0644]
src/snmpx/forward.h [new file with mode: 0644]

index 085ddf4812600f20e9dd5909b923daa0a696df4f..ee58f1fa5c303681326814402a6e2d7eb8880726 100644 (file)
@@ -3370,6 +3370,7 @@ AC_CONFIG_FILES([\
        src/ipc/Makefile \
        src/ssl/Makefile \
        src/mgr/Makefile \
+       src/snmpx/Makefile \
        contrib/Makefile \
        snmplib/Makefile \
        icons/Makefile \
index 7229729677d28c7d36b765bc29f934a65f99ce12..29a5924347a97f3e66e25ad0ca9465c0fff13af0 100644 (file)
@@ -52,7 +52,7 @@ public:
     C start;
     C end;
     Range intersection (Range const &) const;
-    C size() const;
+    size_t size() const;
 };
 
 template <class C>
@@ -77,7 +77,7 @@ Range<C>::intersection (Range const &rhs) const
 }
 
 template<class C>
-C
+size_t
 Range<C>::size() const
 {
     return end > start ? end - start : 0;
index dc061644c354578e479828af8b316c2d4bc32842..068de325301c6c8309c601a7e61fa46e6cec949a 100644 (file)
@@ -25,6 +25,7 @@ SBUF_SOURCE= \
        MemBlob.cc
 
 SNMP_ALL_SOURCE = \
+       snmp_core.h \
        snmp_core.cc \
        snmp_agent.cc
 if USE_SNMP
@@ -50,6 +51,11 @@ else
 SSL_LOCAL_LIBS =
 endif
 
+if USE_SNMP
+SUBDIRS += snmpx
+SNMPXLIB = snmpx/libsnmpx.la
+endif
+
 if USE_ADAPTATION
 SUBDIRS += adaptation
 endif
@@ -547,6 +553,7 @@ squid_LDADD = \
        $(CRYPTLIB) \
        $(REGEXLIB) \
        $(SNMPLIB) \
+       $(SNMPXLIB) \
        ${ADAPTATION_LIBS} \
        $(ESI_LIBS) \
        $(SSL_LIBS) \
index d961801a5f8e8eaf8253b2f8f2a62fe51dae3f62..a5722a10e7cfe112e81a90ee53b1d30804b088d8 100644 (file)
@@ -1060,7 +1060,7 @@ ClientSocketContext::packRange(StoreIOBuffer const &source, MemBuf * mb)
              * intersection of "have" and "need" ranges must not be empty
              */
             assert(http->out.offset < i->currentSpec()->offset + i->currentSpec()->length);
-            assert(http->out.offset + available.size() > i->currentSpec()->offset);
+            assert(http->out.offset + available.size() > (uint64_t)i->currentSpec()->offset);
 
             /*
              * put boundary and headers at the beginning of a range in a
@@ -1118,7 +1118,7 @@ ClientSocketContext::packRange(StoreIOBuffer const &source, MemBuf * mb)
         /* adjust for not to be transmitted bytes */
         http->out.offset = nextOffset;
 
-        if (available.size() <= skip)
+        if (available.size() <= (uint64_t)skip)
             return;
 
         available.start += skip;
index d7f341949680a85164f5f784faf0b9cb3bba6b74..548d05146eb50586328b8843f710c250564ec4f4 100644 (file)
 #include "CacheManager.h"
 #include "comm.h"
 #include "ipc/Coordinator.h"
-#include "ipc/FdNotes.h"
 #include "ipc/SharedListen.h"
 #include "mgr/Inquirer.h"
 #include "mgr/Request.h"
 #include "mgr/Response.h"
-#include "mgr/StoreToCommWriter.h"
+#include "snmpx/Inquirer.h"
+#include "snmpx/Request.h"
+#include "snmpx/Response.h"
 
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
@@ -74,6 +75,16 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
         handleCacheMgrResponse(Mgr::Response(message));
         break;
 
+    case mtSnmpRequest:
+        debugs(54, 6, HERE << "SNMP request");
+        handleSnmpRequest(Snmp::Request(message));
+        break;
+
+    case mtSnmpResponse:
+        debugs(54, 6, HERE << "SNMP response");
+        handleSnmpResponse(Snmp::Response(message));
+        break;
+
     default:
         debugs(54, 1, HERE << "Unhandled message type: " << message.type());
         break;
@@ -123,8 +134,7 @@ Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request)
 
     Mgr::Action::Pointer action =
         CacheManager::GetInstance()->createRequestedAction(request.params);
-    AsyncJob::Start(new Mgr::Inquirer(action,
-                                      Mgr::ImportHttpFdIntoComm(request.fd), request, strands_));
+    AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
 }
 
 void
@@ -133,6 +143,26 @@ Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
     Mgr::Inquirer::HandleRemoteAck(response);
 }
 
+void
+Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
+{
+    debugs(54, 4, HERE);
+
+    Snmp::Response response(request.requestId);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+
+    AsyncJob::Start(new Snmp::Inquirer(request, strands_));
+}
+
+void
+Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
+{
+    debugs(54, 4, HERE);
+    Snmp::Inquirer::HandleRemoteAck(response);
+}
+
 int
 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
                                    int &errNo)
index 11856b1d3e2d0f7a0ca956668bab6d4c988e7682..6ef25cebf94b573229aa609447aebe76141e0b89 100644 (file)
@@ -15,6 +15,7 @@
 #include "ipc/SharedListen.h"
 #include "ipc/StrandCoords.h"
 #include "mgr/forward.h"
+#include "snmpx/forward.h"
 
 #include <map>
 
@@ -46,6 +47,8 @@ protected:
     void handleSharedListenRequest(const SharedListenRequest& request);
     void handleCacheMgrRequest(const Mgr::Request& request);
     void handleCacheMgrResponse(const Mgr::Response& response);
+    void handleSnmpRequest(const Snmp::Request& request);
+    void handleSnmpResponse(const Snmp::Response& response);
 
     /// calls comm_open_listener()
     int openListenSocket(const SharedListenRequest& request, int &errNo);
diff --git a/src/ipc/Forwarder.cc b/src/ipc/Forwarder.cc
new file mode 100644 (file)
index 0000000..bfb3eb0
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "base/AsyncJobCalls.h"
+#include "base/TextException.h"
+#include "ipc/Forwarder.h"
+#include "ipc/Port.h"
+#include "ipc/TypedMsgHdr.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Forwarder);
+
+Ipc::Forwarder::RequestsMap Ipc::Forwarder::TheRequestsMap;
+unsigned int Ipc::Forwarder::LastRequestId = 0;
+
+Ipc::Forwarder::Forwarder(Request::Pointer aRequest, double aTimeout):
+    AsyncJob("Ipc::Forwarder"),
+    request(aRequest), timeout(aTimeout)
+{
+    debugs(54, 5, HERE);
+}
+
+Ipc::Forwarder::~Forwarder()
+{
+    debugs(54, 5, HERE);
+    Must(request->requestId == 0);
+    cleanup();
+}
+
+/// perform cleanup actions
+void
+Ipc::Forwarder::cleanup()
+{
+}
+
+void
+Ipc::Forwarder::start()
+{
+    debugs(54, 3, HERE);
+
+    typedef NullaryMemFunT<Forwarder> Dialer;
+    AsyncCall::Pointer callback = JobCallback(16, 5, Dialer, this, Forwarder::handleRemoteAck);
+    if (++LastRequestId == 0) // don't use zero value as request->requestId
+        ++LastRequestId;
+    request->requestId = LastRequestId;
+    TheRequestsMap[request->requestId] = callback;
+    TypedMsgHdr message;
+
+    try {
+        request->pack(message);
+    } catch (...) {
+        // assume the pack() call failed because the message did not fit
+        // TODO: add a more specific exception?
+        handleError();
+    }
+
+    SendMessage(coordinatorAddr, message);
+    eventAdd("Ipc::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut,
+             this, timeout, 0, false);
+}
+
+void
+Ipc::Forwarder::swanSong()
+{
+    debugs(54, 5, HERE);
+    removeTimeoutEvent();
+    if (request->requestId > 0) {
+        DequeueRequest(request->requestId);
+        request->requestId = 0;
+    }
+    cleanup();
+}
+
+bool
+Ipc::Forwarder::doneAll() const
+{
+    debugs(54, 5, HERE);
+    return request->requestId == 0;
+}
+
+/// called when Coordinator starts processing the request
+void
+Ipc::Forwarder::handleRemoteAck()
+{
+    debugs(54, 3, HERE);
+    request->requestId = 0;
+}
+
+/// Ipc::Forwarder::requestTimedOut wrapper
+void
+Ipc::Forwarder::RequestTimedOut(void* param)
+{
+    debugs(54, 3, HERE);
+    Must(param != NULL);
+    Forwarder* fwdr = static_cast<Forwarder*>(param);
+    // use async call to enable job call protection that time events lack
+    CallJobHere(16, 5, fwdr, Forwarder, requestTimedOut);
+}
+
+/// called when Coordinator fails to start processing the request [in time]
+void
+Ipc::Forwarder::requestTimedOut()
+{
+    debugs(54, 3, HERE);
+    handleTimeout();
+}
+
+void
+Ipc::Forwarder::handleError()
+{
+    mustStop("error");
+}
+
+void
+Ipc::Forwarder::handleTimeout()
+{
+    mustStop("timeout");
+}
+
+/// terminate with an error
+void
+Ipc::Forwarder::handleException(const std::exception& e)
+{
+    debugs(16, 3, HERE << e.what());
+    mustStop("exception");
+}
+
+void
+Ipc::Forwarder::callException(const std::exception& e)
+{
+    try {
+        handleException(e);
+    } catch (const std::exception& ex) {
+        debugs(54, DBG_CRITICAL, HERE << ex.what());
+    }
+    AsyncJob::callException(e);
+}
+
+/// returns and forgets the right Forwarder callback for the request
+AsyncCall::Pointer
+Ipc::Forwarder::DequeueRequest(unsigned int requestId)
+{
+    debugs(54, 3, HERE);
+    Must(requestId != 0);
+    AsyncCall::Pointer call;
+    RequestsMap::iterator request = TheRequestsMap.find(requestId);
+    if (request != TheRequestsMap.end()) {
+        call = request->second;
+        Must(call != NULL);
+        TheRequestsMap.erase(request);
+    }
+    return call;
+}
+
+/// called when we are no longer waiting for Coordinator to respond
+void
+Ipc::Forwarder::removeTimeoutEvent()
+{
+    if (eventFind(&Forwarder::RequestTimedOut, this))
+        eventDelete(&Forwarder::RequestTimedOut, this);
+}
+
+void
+Ipc::Forwarder::HandleRemoteAck(unsigned int requestId)
+{
+    debugs(54, 3, HERE);
+    Must(requestId != 0);
+
+    AsyncCall::Pointer call = DequeueRequest(requestId);
+    if (call != NULL)
+        ScheduleCallHere(call);
+}
diff --git a/src/ipc/Forwarder.h b/src/ipc/Forwarder.h
new file mode 100644 (file)
index 0000000..25b115b
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_FORWARDER_H
+#define SQUID_IPC_FORWARDER_H
+
+#include "base/AsyncJob.h"
+#include "ipc/Request.h"
+#include <map>
+
+
+namespace Ipc
+{
+
+/** Forwards a worker request to coordinator.
+ * Waits for an ACK from Coordinator
+ * Send the data unit with an error response if forwarding fails.
+ */
+class Forwarder: public AsyncJob
+{
+public:
+    Forwarder(Request::Pointer aRequest, double aTimeout);
+    virtual ~Forwarder();
+
+    /// finds and calls the right Forwarder upon Coordinator's response
+    static void HandleRemoteAck(unsigned int requestId);
+
+    /* has-to-be-public AsyncJob API */
+    virtual void callException(const std::exception& e);
+
+protected:
+    /* AsyncJob API */
+    virtual void start();
+    virtual void swanSong();
+    virtual bool doneAll() const;
+
+    virtual void cleanup(); ///< perform cleanup actions
+    virtual void handleError();
+    virtual void handleTimeout();
+    virtual void handleException(const std::exception& e);
+    virtual void handleRemoteAck();
+
+private:
+    static void RequestTimedOut(void* param);
+    void requestTimedOut();
+    void removeTimeoutEvent();
+    static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
+
+protected:
+    Request::Pointer request;
+    const double timeout; ///< response wait timeout in seconds
+
+    /// maps request->id to Forwarder::handleRemoteAck callback
+    typedef std::map<unsigned int, AsyncCall::Pointer> RequestsMap;
+    static RequestsMap TheRequestsMap; ///< pending Coordinator requests
+
+    static unsigned int LastRequestId; ///< last requestId used
+
+    CBDATA_CLASS2(Forwarder);
+};
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_FORWARDER_H */
diff --git a/src/ipc/Inquirer.cc b/src/ipc/Inquirer.cc
new file mode 100644 (file)
index 0000000..309232e
--- /dev/null
@@ -0,0 +1,206 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "comm/Write.h"
+#include "ipc/Inquirer.h"
+#include "ipc/Port.h"
+#include "ipc/TypedMsgHdr.h"
+#include "MemBuf.h"
+#include <algorithm>
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
+
+Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
+unsigned int Ipc::Inquirer::LastRequestId = 0;
+
+/// compare Ipc::StrandCoord using kidId, for std::sort() below
+static bool
+LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
+{
+    return c1.kidId < c2.kidId;
+}
+
+Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords,
+                        double aTimeout):
+    AsyncJob("Ipc::Inquirer"),
+    request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
+{
+    debugs(54, 5, HERE);
+
+    // order by ascending kid IDs; useful for non-aggregatable stats
+    std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
+}
+
+Ipc::Inquirer::~Inquirer()
+{
+    debugs(54, 5, HERE);
+    cleanup();
+}
+
+void
+Ipc::Inquirer::cleanup()
+{
+}
+
+void
+Ipc::Inquirer::start()
+{
+    request->requestId = 0;
+}
+
+void
+Ipc::Inquirer::inquire()
+{
+    if (pos == strands.end()) {
+        Must(done());
+        return;
+    }
+
+    Must(request->requestId == 0);
+    AsyncCall::Pointer callback = asyncCall(16, 5, "Mgr::Inquirer::handleRemoteAck",
+                                            HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL));
+    if (++LastRequestId == 0) // don't use zero value as request->requestId
+        ++LastRequestId;
+    request->requestId = LastRequestId;
+    const int kidId = pos->kidId;
+    debugs(54, 4, HERE << "inquire kid: " << kidId << status());
+    TheRequestsMap[request->requestId] = callback;
+    TypedMsgHdr message;
+    request->pack(message);
+    SendMessage(Port::MakeAddr(strandAddrPfx, kidId), message);
+    eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
+             this, timeout, 0, false);
+}
+
+/// called when a strand is done writing its output
+void
+Ipc::Inquirer::handleRemoteAck(Response::Pointer response)
+{
+    debugs(54, 4, HERE << status());
+    request->requestId = 0;
+    removeTimeoutEvent();
+    if (aggregate(response)) {
+        Must(!done()); // or we should not be called
+        ++pos; // advance after a successful inquiry
+        inquire();
+    } else {
+        mustStop("error");
+    }
+}
+
+void
+Ipc::Inquirer::swanSong()
+{
+    debugs(54, 5, HERE);
+    removeTimeoutEvent();
+    if (request->requestId > 0) {
+        DequeueRequest(request->requestId);
+        request->requestId = 0;
+    }
+    sendResponse();
+    cleanup();
+}
+
+bool
+Ipc::Inquirer::doneAll() const
+{
+    return pos == strands.end();
+}
+
+void
+Ipc::Inquirer::handleException(const std::exception& e)
+{
+    debugs(54, 3, HERE << e.what());
+    mustStop("exception");
+}
+
+void
+Ipc::Inquirer::callException(const std::exception& e)
+{
+    debugs(54, 3, HERE);
+    try {
+        handleException(e);
+    } catch (const std::exception& ex) {
+        debugs(54, DBG_CRITICAL, HERE << ex.what());
+    }
+    AsyncJob::callException(e);
+}
+
+/// returns and forgets the right Inquirer callback for strand request
+AsyncCall::Pointer
+Ipc::Inquirer::DequeueRequest(unsigned int requestId)
+{
+    debugs(54, 3, HERE << " requestId " << requestId);
+    Must(requestId != 0);
+    AsyncCall::Pointer call;
+    RequestsMap::iterator request = TheRequestsMap.find(requestId);
+    if (request != TheRequestsMap.end()) {
+        call = request->second;
+        Must(call != NULL);
+        TheRequestsMap.erase(request);
+    }
+    return call;
+}
+
+void
+Ipc::Inquirer::HandleRemoteAck(const Response& response)
+{
+    Must(response.requestId != 0);
+    AsyncCall::Pointer call = DequeueRequest(response.requestId);
+    if (call != NULL) {
+        HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
+        Must(dialer);
+        dialer->arg1 = response.clone();
+        ScheduleCallHere(call);
+    }
+}
+
+/// called when we are no longer waiting for the strand to respond
+void
+Ipc::Inquirer::removeTimeoutEvent()
+{
+    if (eventFind(&Inquirer::RequestTimedOut, this))
+        eventDelete(&Inquirer::RequestTimedOut, this);
+}
+
+/// Ipc::Inquirer::requestTimedOut wrapper
+void
+Ipc::Inquirer::RequestTimedOut(void* param)
+{
+    debugs(54, 3, HERE);
+    Must(param != NULL);
+    Inquirer* cmi = static_cast<Inquirer*>(param);
+    // use async call to enable job call protection that time events lack
+    CallJobHere(16, 5, cmi, Inquirer, requestTimedOut);
+}
+
+/// called when the strand failed to respond (or finish responding) in time
+void
+Ipc::Inquirer::requestTimedOut()
+{
+    debugs(54, 3, HERE);
+    if (request->requestId != 0) {
+        DequeueRequest(request->requestId);
+        request->requestId = 0;
+        Must(!done()); // or we should not be called
+        ++pos; // advance after a failed inquiry
+        inquire();
+    }
+}
+
+const char*
+Ipc::Inquirer::status() const
+{
+    static MemBuf buf;
+    buf.reset();
+    buf.Printf(" [request->requestId %u]", request->requestId);
+    buf.terminate();
+    return buf.content();
+}
diff --git a/src/ipc/Inquirer.h b/src/ipc/Inquirer.h
new file mode 100644 (file)
index 0000000..6341528
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_INQUIRER_H
+#define SQUID_IPC_INQUIRER_H
+
+#include "base/AsyncJobCalls.h"
+#include "base/AsyncJob.h"
+#include "ipc/forward.h"
+#include "ipc/Request.h"
+#include "ipc/Response.h"
+#include "ipc/StrandCoords.h"
+#include <map>
+
+
+namespace Ipc
+{
+
+/// Coordinator's job that sends a cache manage request to each strand,
+/// aggregating individual strand responses and dumping the result if needed
+class Inquirer: public AsyncJob
+{
+public:
+    Inquirer(Request::Pointer aRequest, const Ipc::StrandCoords& coords, double aTimeout);
+    virtual ~Inquirer();
+
+    /// finds and calls the right Inquirer upon strand's response
+    static void HandleRemoteAck(const Response& response);
+
+    /* has-to-be-public AsyncJob API */
+    virtual void callException(const std::exception& e);
+
+protected:
+    /* AsyncJob API */
+    virtual void start();
+    virtual void swanSong();
+    virtual bool doneAll() const;
+    virtual const char *status() const;
+
+    /// inquire the next strand
+    virtual void inquire();
+    /// perform cleanup actions on completion of job
+    virtual void cleanup();
+    /// do specific exception handling
+    virtual void handleException(const std::exception& e);
+    /// send response to client
+    virtual void sendResponse() = 0;
+    /// perform aggregating of responses and returns true if need to continue
+    virtual bool aggregate(Response::Pointer aResponse) = 0;
+
+private:
+    typedef UnaryMemFunT<Inquirer, Response::Pointer, Response::Pointer> HandleAckDialer;
+
+    void handleRemoteAck(Response::Pointer response);
+
+    static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
+
+    static void RequestTimedOut(void* param);
+    void requestTimedOut();
+    void removeTimeoutEvent();
+
+protected:
+    Request::Pointer request; ///< cache manager request received from client
+
+    Ipc::StrandCoords strands; ///< all strands we want to query, in order
+    Ipc::StrandCoords::const_iterator pos; ///< strand we should query now
+
+    const double timeout; ///< number of seconds to wait for strand response
+
+    /// maps request->id to Inquirer::handleRemoteAck callback
+    typedef std::map<unsigned int, AsyncCall::Pointer> RequestsMap;
+    static RequestsMap TheRequestsMap; ///< pending strand requests
+
+    static unsigned int LastRequestId; ///< last requestId used
+
+    CBDATA_CLASS2(Inquirer);
+};
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_INQUIRER_H */
index e2d13504d9dc99754ad79ca8a6c9c9fea4a93635..0864394f394ff424accf1750372f286b323b57fc 100644 (file)
@@ -28,7 +28,13 @@ libipc_la_SOURCES = \
        Port.h \
        Strand.cc \
        Strand.h \
-       \
-       forward.h
+       forward.h \
+       Forwarder.cc \
+       Forwarder.h \
+       Inquirer.cc \
+       Inquirer.h \
+       Request.h \
+       Response.cc \
+       Response.h
 
 DEFS += -DDEFAULT_PREFIX=\"$(prefix)\"
index 57ae03632d05cc325c6048052a83f5f7fb8f0a25..59b2eb5db157b39f6377d80efe5e5abda95db32e 100644 (file)
@@ -19,7 +19,8 @@ namespace Ipc
 /// message class identifier
 typedef enum { mtNone = 0, mtRegistration,
                mtSharedListenRequest, mtSharedListenResponse,
-               mtCacheMgrRequest, mtCacheMgrResponse
+               mtCacheMgrRequest, mtCacheMgrResponse,
+               mtSnmpRequest, mtSnmpResponse
              } MessageType;
 
 } // namespace Ipc;
diff --git a/src/ipc/Request.cc b/src/ipc/Request.cc
new file mode 100644 (file)
index 0000000..2d8156e
--- /dev/null
@@ -0,0 +1,10 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "ipc/Request.h"
+#include "ipc/TypedMsgHdr.h"
diff --git a/src/ipc/Request.h b/src/ipc/Request.h
new file mode 100644 (file)
index 0000000..1711ff2
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_REQUEST_H
+#define SQUID_IPC_REQUEST_H
+
+#include "ipc/forward.h"
+#include "RefCount.h"
+
+
+namespace Ipc
+{
+
+/// IPC request
+class Request: public RefCountable
+{
+public:
+    typedef RefCount<Request> Pointer;
+
+public:
+    Request(int aRequestorId, unsigned int aRequestId):
+        requestorId(aRequestorId), requestId(aRequestId) {}
+
+    virtual void pack(TypedMsgHdr& msg) const = 0; ///< prepare for sendmsg()
+    virtual Pointer clone() const = 0; ///< returns a copy of this
+
+private:
+    Request(const Request&); // not implemented
+    Request& operator= (const Request&); // not implemented
+
+public:
+    int requestorId; ///< kidId of the requestor; used for response destination
+    unsigned int requestId; ///< unique for sender; matches request w/ response
+};
+
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_REQUEST_H */
diff --git a/src/ipc/Response.cc b/src/ipc/Response.cc
new file mode 100644 (file)
index 0000000..5d1f39a
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "ipc/Response.h"
+#include "ipc/TypedMsgHdr.h"
+
+
+std::ostream& Ipc::operator << (std::ostream &os, const Response& response)
+{
+    os << "[response.requestId %u]" << response.requestId << '}';
+    return os;
+}
diff --git a/src/ipc/Response.h b/src/ipc/Response.h
new file mode 100644 (file)
index 0000000..bd3fa9a
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_RESPONSE_H
+#define SQUID_IPC_RESPONSE_H
+
+#include "ipc/forward.h"
+#include "RefCount.h"
+
+
+namespace Ipc
+{
+
+/// A response to Ipc::Request.
+class Response: public RefCountable
+{
+public:
+    typedef RefCount<Response> Pointer;
+
+public:
+    explicit Response(unsigned int aRequestId):
+        requestId(aRequestId) {}
+
+    virtual void pack(TypedMsgHdr& msg) const = 0; ///< prepare for sendmsg()
+    virtual Pointer clone() const = 0; ///< returns a copy of this
+
+private:
+    Response(const Response&); // not implemented
+    Response& operator= (const Response&); // not implemented
+
+public:
+    unsigned int requestId; ///< ID of request we are responding to
+};
+
+extern std::ostream& operator <<(std::ostream &os, const Response &response);
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_RESPONSE_H */
index a1e9f59c5dc69b54b451de1fc031185c18a57cb7..6a4fd004f2722dc460918b8268a488908e566546 100644 (file)
@@ -16,6 +16,9 @@
 #include "mgr/Response.h"
 #include "mgr/Forwarder.h"
 #include "CacheManager.h"
+#include "snmpx/Forwarder.h"
+#include "snmpx/Request.h"
+#include "snmpx/Response.h"
 
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
@@ -64,6 +67,14 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
         handleCacheMgrResponse(Mgr::Response(message));
         break;
 
+    case mtSnmpRequest:
+        handleSnmpRequest(Snmp::Request(message));
+        break;
+
+    case mtSnmpResponse:
+        handleSnmpResponse(Snmp::Response(message));
+        break;
+
     default:
         debugs(54, 1, HERE << "Unhandled message type: " << message.type());
         break;
@@ -95,6 +106,18 @@ void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response& response)
     Mgr::Forwarder::HandleRemoteAck(response.requestId);
 }
 
+void Ipc::Strand::handleSnmpRequest(const Snmp::Request& request)
+{
+    debugs(54, 6, HERE);
+    Snmp::SendResponse(request.requestId, request.pdu);
+}
+
+void Ipc::Strand::handleSnmpResponse(const Snmp::Response& response)
+{
+    debugs(54, 6, HERE);
+    Snmp::Forwarder::HandleRemoteAck(response.requestId);
+}
+
 void Ipc::Strand::timedout()
 {
     debugs(54, 6, HERE << isRegistered);
index d01cf2b7f29d7aab08a0c48f615637765e0dd179..1394c0992eae3be1068670732d7901df3939ae7e 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "ipc/Port.h"
 #include "mgr/forward.h"
+#include "snmpx/forward.h"
 
 
 namespace Ipc
@@ -34,6 +35,8 @@ private:
     void handleRegistrationResponse(const StrandCoord &strand);
     void handleCacheMgrRequest(const Mgr::Request& request);
     void handleCacheMgrResponse(const Mgr::Response& response);
+    void handleSnmpRequest(const Snmp::Request& request);
+    void handleSnmpResponse(const Snmp::Response& response);
 
 private:
     bool isRegistered; ///< whether Coordinator ACKed registration (unused)
index 873450574a8b0b667851cba9e1d675b2254379d2..a4da136956172d9f103dc7ba02339b40871fdfbc 100644 (file)
@@ -132,3 +132,23 @@ void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
 {
     AsyncJob::Start(new UdsSender(toAddress, message));
 }
+
+int Ipc::ImportFdIntoComm(int fd, int socktype, int protocol, Ipc::FdNoteId noteId)
+{
+    struct sockaddr_in addr;
+    socklen_t len = sizeof(addr);
+    if (getsockname(fd, reinterpret_cast<sockaddr*>(&addr), &len) == 0) {
+        Ip::Address ipAddr(addr);
+        struct addrinfo* addr_info = NULL;
+        ipAddr.GetAddrInfo(addr_info);
+        addr_info->ai_socktype = socktype;
+        addr_info->ai_protocol = protocol;
+        comm_import_opened(fd, ipAddr, COMM_NONBLOCKING, Ipc::FdNote(noteId), addr_info);
+        ipAddr.FreeAddrInfo(addr_info);
+    } else {
+        debugs(16, DBG_CRITICAL, HERE << "ERROR: FD " << fd << ' ' << xstrerror());
+        ::close(fd);
+        fd = -1;
+    }
+    return fd;
+}
index 0187a4e3ee33404be3de9f373f7a9bb10135854c..bea34d3a6e6015f65edab9d495b8ba6ce6aabd5c 100644 (file)
@@ -12,6 +12,7 @@
 #include "SquidString.h"
 #include "base/AsyncJob.h"
 #include "ipc/TypedMsgHdr.h"
+#include "ipc/FdNotes.h"
 
 class CommTimeoutCbParams;
 class CommIoCbParams;
@@ -90,6 +91,8 @@ private:
 
 
 void SendMessage(const String& toAddress, const TypedMsgHdr& message);
+/// import socket fd from another strand into our Comm state
+int ImportFdIntoComm(int fd, int socktype, int protocol, FdNoteId noteId);
 
 
 }
index 7325abb6bc68c91982783669ce19fa581ebb5464..f4ddf4cb0caeec111b66084ccfcab05b4bda875c 100644 (file)
@@ -13,6 +13,10 @@ namespace Ipc
 
 class TypedMsgHdr;
 class StrandCoord;
+class Forwarder;
+class Inquirer;
+class Request;
+class Response;
 
 } // namespace Ipc
 
index 2d908978cf8c9c4f20efc49539a31fdaf33f5d4b..4ea3f8d34ea9b1f31738d9ec12c1c97c2e3c3ab7 100644 (file)
 
 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Forwarder);
 
-Mgr::Forwarder::RequestsMap Mgr::Forwarder::TheRequestsMap;
-unsigned int Mgr::Forwarder::LastRequestId = 0;
 
 Mgr::Forwarder::Forwarder(int aFd, const ActionParams &aParams,
                           HttpRequest* aRequest, StoreEntry* anEntry):
-        AsyncJob("Mgr::Forwarder"),
-        params(aParams),
-        request(aRequest), entry(anEntry), fd(aFd), requestId(0), closer(NULL)
+        Ipc::Forwarder(new Request(KidIdentifier, 0, aFd, aParams), 10),
+        httpRequest(aRequest), entry(anEntry), fd(aFd)
 {
-    debugs(16, 5, HERE << "FD " << aFd);
+    debugs(16, 5, HERE << "FD " << fd);
     Must(fd >= 0);
-    Must(request != NULL);
+    Must(httpRequest != NULL);
     Must(entry != NULL);
 
-    HTTPMSGLOCK(request);
+    HTTPMSGLOCK(httpRequest);
     entry->lock();
     EBIT_SET(entry->flags, ENTRY_FWD_HDR_WAIT);
 
@@ -47,19 +44,18 @@ Mgr::Forwarder::Forwarder(int aFd, const ActionParams &aParams,
 Mgr::Forwarder::~Forwarder()
 {
     debugs(16, 5, HERE);
-    Must(request != NULL);
+    Must(httpRequest != NULL);
     Must(entry != NULL);
-    Must(requestId == 0);
 
-    HTTPMSGUNLOCK(request);
+    HTTPMSGUNLOCK(httpRequest);
     entry->unregisterAbort();
     entry->unlock();
-    close();
+    cleanup();
 }
 
 /// closes our copy of the client HTTP connection socket
 void
-Mgr::Forwarder::close()
+Mgr::Forwarder::cleanup()
 {
     if (fd >= 0) {
         if (closer != NULL) {
@@ -72,61 +68,34 @@ Mgr::Forwarder::close()
 }
 
 void
-Mgr::Forwarder::start()
+Mgr::Forwarder::handleError()
 {
-    debugs(16, 3, HERE);
-    entry->registerAbort(&Forwarder::Abort, this);
-
-    typedef NullaryMemFunT<Mgr::Forwarder> Dialer;
-    AsyncCall::Pointer callback = JobCallback(16, 5, Dialer, this,
-                                  Forwarder::handleRemoteAck);
-    if (++LastRequestId == 0) // don't use zero value as requestId
-        ++LastRequestId;
-    requestId = LastRequestId;
-    TheRequestsMap[requestId] = callback;
-    Request mgrRequest(KidIdentifier, requestId, fd, params);
-    Ipc::TypedMsgHdr message;
-
-    try {
-        mgrRequest.pack(message);
-    } catch (...) {
-        // assume the pack() call failed because the message did not fit
-        // TODO: add a more specific exception?
-        debugs(16, DBG_CRITICAL, "ERROR: uri " << entry->url() << " exceeds buffer size");
-        quitOnError("long URI", errorCon(ERR_INVALID_URL, HTTP_REQUEST_URI_TOO_LARGE, request));
-    }
-
-    Ipc::SendMessage(Ipc::coordinatorAddr, message);
-    const double timeout = 10; // in seconds
-    eventAdd("Mgr::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut,
-             this, timeout, 0, false);
+    debugs(16, DBG_CRITICAL, "ERROR: uri " << entry->url() << " exceeds buffer size");
+    sendError(errorCon(ERR_INVALID_URL, HTTP_REQUEST_URI_TOO_LARGE, httpRequest));
+    mustStop("long URI");
 }
 
 void
-Mgr::Forwarder::swanSong()
+Mgr::Forwarder::handleTimeout()
 {
-    debugs(16, 5, HERE);
-    removeTimeoutEvent();
-    if (requestId > 0) {
-        DequeueRequest(requestId);
-        requestId = 0;
-    }
-    close();
+    sendError(errorCon(ERR_LIFETIME_EXP, HTTP_REQUEST_TIMEOUT, httpRequest));
+    Ipc::Forwarder::handleTimeout();
 }
 
-bool
-Mgr::Forwarder::doneAll() const
+void
+Mgr::Forwarder::handleException(const std::exception& e)
 {
-    debugs(16, 5, HERE);
-    return requestId == 0;
+    if (entry != NULL && httpRequest != NULL && fd >= 0)
+        sendError(errorCon(ERR_INVALID_RESP, HTTP_INTERNAL_SERVER_ERROR, httpRequest));
+    Ipc::Forwarder::handleException(e);
 }
 
 /// called when the client socket gets closed by some external force
 void
-Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &io)
+Mgr::Forwarder::noteCommClosed(const CommCloseCbParams& params)
 {
     debugs(16, 5, HERE);
-    Must(fd == io.fd);
+    Must(fd == params.fd);
     fd = -1;
     mustStop("commClosed");
 }
@@ -135,42 +104,21 @@ Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &io)
 void
 Mgr::Forwarder::handleRemoteAck()
 {
-    debugs(16, 3, HERE);
-    Must(entry != NULL);
+    Ipc::Forwarder::handleRemoteAck();
 
-    requestId = 0;
+    Must(entry != NULL);
     EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     entry->complete();
 }
 
-/// Mgr::Forwarder::requestTimedOut wrapper
+/// send error page
 void
-Mgr::Forwarder::RequestTimedOut(void* param)
+Mgr::Forwarder::sendError(ErrorState *error)
 {
     debugs(16, 3, HERE);
-    Must(param != NULL);
-    Forwarder* mgrFwdr = static_cast<Forwarder*>(param);
-    // use async call to enable job call protection that time events lack
-    CallJobHere(16, 5, mgrFwdr, Mgr::Forwarder, requestTimedOut);
-}
-
-/// called when Coordinator fails to start processing the request [in time]
-void
-Mgr::Forwarder::requestTimedOut()
-{
-    debugs(16, 3, HERE);
-    quitOnError("timeout", errorCon(ERR_LIFETIME_EXP, HTTP_REQUEST_TIMEOUT, request));
-}
-
-/// terminate with an error
-void
-Mgr::Forwarder::quitOnError(const char *reason, ErrorState *error)
-{
-    debugs(16, 3, HERE);
-    Must(reason != NULL);
     Must(error != NULL);
     Must(entry != NULL);
-    Must(request != NULL);
+    Must(httpRequest != NULL);
 
     EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     entry->buffer();
@@ -179,62 +127,4 @@ Mgr::Forwarder::quitOnError(const char *reason, ErrorState *error)
     errorStateFree(error);
     entry->flush();
     entry->complete();
-
-    mustStop(reason);
-}
-
-void
-Mgr::Forwarder::callException(const std::exception& e)
-{
-    try {
-        if (entry != NULL && request != NULL && fd >= 0)
-            quitOnError("exception", errorCon(ERR_INVALID_RESP, HTTP_INTERNAL_SERVER_ERROR, request));
-    } catch (const std::exception& ex) {
-        debugs(16, DBG_CRITICAL, HERE << ex.what());
-    }
-    AsyncJob::callException(e);
-}
-
-/// returns and forgets the right Forwarder callback for the request
-AsyncCall::Pointer
-Mgr::Forwarder::DequeueRequest(unsigned int requestId)
-{
-    debugs(16, 3, HERE);
-    Must(requestId != 0);
-    AsyncCall::Pointer call;
-    RequestsMap::iterator request = TheRequestsMap.find(requestId);
-    if (request != TheRequestsMap.end()) {
-        call = request->second;
-        Must(call != NULL);
-        TheRequestsMap.erase(request);
-    }
-    return call;
-}
-
-/// called when we are no longer waiting for Coordinator to respond
-void
-Mgr::Forwarder::removeTimeoutEvent()
-{
-    if (eventFind(&Forwarder::RequestTimedOut, this))
-        eventDelete(&Forwarder::RequestTimedOut, this);
-}
-
-void
-Mgr::Forwarder::HandleRemoteAck(unsigned int requestId)
-{
-    debugs(16, 3, HERE);
-    Must(requestId != 0);
-
-    AsyncCall::Pointer call = DequeueRequest(requestId);
-    if (call != NULL)
-        ScheduleCallHere(call);
-}
-
-/// called when something goes wrong with the Store entry
-void
-Mgr::Forwarder::Abort(void* param)
-{
-    Forwarder* mgrFwdr = static_cast<Forwarder*>(param);
-    if (mgrFwdr->fd >= 0)
-        comm_close(mgrFwdr->fd);
 }
index 470a1a3fc7f42423a99e0b60385bba297051d7e4..1c8b9e84f556eda7d46f68188e86194b29b334e9 100644 (file)
@@ -8,9 +8,8 @@
 #ifndef SQUID_MGR_FORWARDER_H
 #define SQUID_MGR_FORWARDER_H
 
-#include "base/AsyncJob.h"
+#include "ipc/Forwarder.h"
 #include "mgr/ActionParams.h"
-#include <map>
 
 
 class CommCloseCbParams;
@@ -25,50 +24,31 @@ namespace Mgr
  * Waits for an ACK from Coordinator while holding the Store entry.
  * Fills the store entry with an error response if forwarding fails.
  */
-class Forwarder: public AsyncJob
+class Forwarder: public Ipc::Forwarder
 {
 public:
     Forwarder(int aFd, const ActionParams &aParams, HttpRequest* aRequest,
               StoreEntry* anEntry);
     virtual ~Forwarder();
 
-    /// finds and calls the right Forwarder upon Coordinator's response
-    static void HandleRemoteAck(unsigned int requestId);
-
-    /* has-to-be-public AsyncJob API */
-    virtual void callException(const std::exception& e);
-
 protected:
-    /* AsyncJob API */
-    virtual void start();
-    virtual void swanSong();
-    virtual bool doneAll() const;
+    /* Ipc::Forwarder API */
+    virtual void cleanup(); ///< perform cleanup actions
+    virtual void handleError();
+    virtual void handleTimeout();
+    virtual void handleException(const std::exception& e);
+    virtual void handleRemoteAck();
 
 private:
-    void handleRemoteAck();
-    static void RequestTimedOut(void* param);
-    void requestTimedOut();
-    void quitOnError(const char *reason, ErrorState *error);
     void noteCommClosed(const CommCloseCbParams& params);
-    void removeTimeoutEvent();
-    static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
-    static void Abort(void* param);
-    void close();
+    void sendError(ErrorState* error);
 
 private:
-    ActionParams params; ///< action parameters to pass to the other side
-    HttpRequest* request; ///< HTTP client request for detailing errors
+    HttpRequest* httpRequest; ///< HTTP client request for detailing errors
     StoreEntry* entry; ///< Store entry expecting the response
     int fd; ///< HTTP client connection descriptor
-    unsigned int requestId; ///< request id
     AsyncCall::Pointer closer; ///< comm_close handler for the HTTP connection
 
-    /// maps requestId to Forwarder::handleRemoteAck callback
-    typedef std::map<unsigned int, AsyncCall::Pointer> RequestsMap;
-    static RequestsMap TheRequestsMap; ///< pending Coordinator requests
-
-    static unsigned int LastRequestId; ///< last requestId used
-
     CBDATA_CLASS2(Forwarder);
 };
 
index 4e7bc4927048114ce0ab4a5893bd5527076e1b65..6fd0f97c1a4140c747f49aff832b08f886de0501 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "config.h"
 #include "base/TextException.h"
+#include "ipc/UdsOp.h"
 #include "mgr/Command.h"
 #include "mgr/Filler.h"
 #include "mgr/FunAction.h"
@@ -31,7 +32,7 @@ void
 Mgr::FunAction::respond(const Request& request)
 {
     debugs(16, 5, HERE);
-    const int fd = ImportHttpFdIntoComm(request.fd);
+    const int fd = Ipc::ImportFdIntoComm(request.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
     Must(fd >= 0);
     Must(request.requestId != 0);
     AsyncJob::Start(new Mgr::Filler(this, fd, request.requestId));
index 27adf22ec872ce02a3fb6f78e9ab2df706c24c54..eb5760ba0b86236bc867a0d2a74444a2e15c35ac 100644 (file)
@@ -9,6 +9,7 @@
 #include "base/TextException.h"
 #include "HttpReply.h"
 #include "ipc/Messages.h"
+#include "ipc/UdsOp.h"
 #include "ipc/TypedMsgHdr.h"
 #include "mgr/Filler.h"
 #include "mgr/InfoAction.h"
@@ -155,7 +156,7 @@ void
 Mgr::InfoAction::respond(const Request& request)
 {
     debugs(16, 5, HERE);
-    int fd = ImportHttpFdIntoComm(request.fd);
+    int fd = Ipc::ImportFdIntoComm(request.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
     Must(fd >= 0);
     Must(request.requestId != 0);
     AsyncJob::Start(new Mgr::Filler(this, fd, request.requestId));
index d249c3a9136948cf1987ab7ad4dd416fc39667c2..411904519361e515b64c3dec96288862c9e80eb2 100644 (file)
 #include "comm/Write.h"
 #include "CommCalls.h"
 #include "HttpReply.h"
-#include "ipc/Coordinator.h"
+#include "ipc/UdsOp.h"
 #include "mgr/ActionWriter.h"
-#include "mgr/Command.h"
 #include "mgr/Inquirer.h"
 #include "mgr/Request.h"
 #include "mgr/Response.h"
 #include "SquidTime.h"
 #include <memory>
-#include <algorithm>
 
 
 CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer);
 
-Mgr::Inquirer::RequestsMap Mgr::Inquirer::TheRequestsMap;
-unsigned int Mgr::Inquirer::LastRequestId = 0;
 
-/// compare Ipc::StrandCoord using kidId, for std::sort() below
-static bool
-LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
-{
-    return c1.kidId < c2.kidId;
-}
-
-Mgr::Inquirer::Inquirer(Action::Pointer anAction, int aFd,
+Mgr::Inquirer::Inquirer(Action::Pointer anAction,
                         const Request &aCause, const Ipc::StrandCoords &coords):
-        AsyncJob("Mgr::Inquirer"),
+        Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100),
         aggrAction(anAction),
-        cause(aCause),
-        fd(aFd),
-        strands(coords), pos(strands.begin()),
-        requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100)
+        fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket))
 {
-    debugs(16, 5, HERE << "FD " << aFd << " action: " << aggrAction);
-
-    // order by ascending kid IDs; useful for non-aggregatable stats
-    std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
+    debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction);
 
     closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
                        CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
     comm_add_close_handler(fd, closer);
 }
 
-Mgr::Inquirer::~Inquirer()
-{
-    debugs(16, 5, HERE);
-    close();
-}
-
 /// closes our copy of the client HTTP connection socket
 void
-Mgr::Inquirer::close()
+Mgr::Inquirer::cleanup()
 {
     if (fd >= 0) {
         removeCloseHandler();
@@ -82,6 +59,7 @@ void
 Mgr::Inquirer::start()
 {
     debugs(16, 5, HERE);
+    Ipc::Inquirer::start();
     Must(fd >= 0);
     Must(aggrAction != NULL);
 
@@ -107,46 +85,6 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
     inquire();
 }
 
-void
-Mgr::Inquirer::inquire()
-{
-    if (pos == strands.end()) {
-        Must(done());
-        return;
-    }
-
-    Must(requestId == 0);
-    AsyncCall::Pointer callback = asyncCall(16, 5, "Mgr::Inquirer::handleRemoteAck",
-                                            HandleAckDialer(this, &Inquirer::handleRemoteAck, Response()));
-    if (++LastRequestId == 0) // don't use zero value as requestId
-        ++LastRequestId;
-    requestId = LastRequestId;
-    const int kidId = pos->kidId;
-    debugs(16, 4, HERE << "inquire kid: " << kidId << status());
-    TheRequestsMap[requestId] = callback;
-    Request mgrRequest(KidIdentifier, requestId, fd,
-                       aggrAction->command().params);
-    Ipc::TypedMsgHdr message;
-    mgrRequest.pack(message);
-    Ipc::SendMessage(Ipc::Port::MakeAddr(Ipc::strandAddrPfx, kidId), message);
-    eventAdd("Mgr::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
-             this, timeout, 0, false);
-}
-
-/// called when a strand is done writing its output
-void
-Mgr::Inquirer::handleRemoteAck(const Response& response)
-{
-    debugs(16, 4, HERE << status());
-    requestId = 0;
-    removeTimeoutEvent();
-    if (response.hasAction())
-        aggrAction->add(response.getAction());
-    Must(!done()); // or we should not be called
-    ++pos; // advance after a successful inquiry
-    inquire();
-}
-
 /// called when the HTTP client or some external force closed our socket
 void
 Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
@@ -157,97 +95,27 @@ Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
     mustStop("commClosed");
 }
 
+bool
+Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse)
+{
+    Mgr::Response& response = static_cast<Response&>(*aResponse);
+    if (response.hasAction())
+        aggrAction->add(response.getAction());
+    return true;
+}
+
 void
-Mgr::Inquirer::swanSong()
+Mgr::Inquirer::sendResponse()
 {
-    debugs(16, 5, HERE);
-    removeTimeoutEvent();
-    if (requestId > 0) {
-        DequeueRequest(requestId);
-        requestId = 0;
-    }
     if (aggrAction->aggregatable()) {
         removeCloseHandler();
         AsyncJob::Start(new ActionWriter(aggrAction, fd));
         fd = -1; // should not close fd because we passed it to ActionWriter
     }
-    close();
 }
 
 bool
 Mgr::Inquirer::doneAll() const
 {
-    return !writer && pos == strands.end();
-}
-
-/// returns and forgets the right Inquirer callback for strand request
-AsyncCall::Pointer
-Mgr::Inquirer::DequeueRequest(unsigned int requestId)
-{
-    debugs(16, 3, HERE << " requestId " << requestId);
-    Must(requestId != 0);
-    AsyncCall::Pointer call;
-    RequestsMap::iterator request = TheRequestsMap.find(requestId);
-    if (request != TheRequestsMap.end()) {
-        call = request->second;
-        Must(call != NULL);
-        TheRequestsMap.erase(request);
-    }
-    return call;
-}
-
-void
-Mgr::Inquirer::HandleRemoteAck(const Mgr::Response& response)
-{
-    Must(response.requestId != 0);
-    AsyncCall::Pointer call = DequeueRequest(response.requestId);
-    if (call != NULL) {
-        HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
-        Must(dialer);
-        dialer->arg1 = response;
-        ScheduleCallHere(call);
-    }
-}
-
-/// called when we are no longer waiting for the strand to respond
-void
-Mgr::Inquirer::removeTimeoutEvent()
-{
-    if (eventFind(&Inquirer::RequestTimedOut, this))
-        eventDelete(&Inquirer::RequestTimedOut, this);
-}
-
-/// Mgr::Inquirer::requestTimedOut wrapper
-void
-Mgr::Inquirer::RequestTimedOut(void* param)
-{
-    debugs(16, 3, HERE);
-    Must(param != NULL);
-    Inquirer* cmi = static_cast<Inquirer*>(param);
-    // use async call to enable job call protection that time events lack
-    CallJobHere(16, 5, cmi, Mgr::Inquirer, requestTimedOut);
-}
-
-/// called when the strand failed to respond (or finish responding) in time
-void
-Mgr::Inquirer::requestTimedOut()
-{
-    debugs(16, 3, HERE);
-    if (requestId != 0) {
-        DequeueRequest(requestId);
-        requestId = 0;
-        Must(!done()); // or we should not be called
-        ++pos; // advance after a failed inquiry
-        inquire();
-    }
-}
-
-const char*
-Mgr::Inquirer::status() const
-{
-    static MemBuf buf;
-    buf.reset();
-    buf.Printf(" [FD %d, requestId %u]", fd, requestId);
-    buf.terminate();
-    return buf.content();
+    return !writer && Ipc::Inquirer::doneAll();
 }
index 7059daa2b3945a2c72e12e2b13dc9fc63f40a9c2..03d7efeda1ed1c6919c35ee1f6c17b5bfe293fa5 100644 (file)
@@ -8,13 +8,8 @@
 #ifndef SQUID_MGR_INQUIRER_H
 #define SQUID_MGR_INQUIRER_H
 
-#include "base/AsyncJobCalls.h"
-#include "base/AsyncJob.h"
-#include "ipc/StrandCoords.h"
-#include "MemBuf.h"
+#include "ipc/Inquirer.h"
 #include "mgr/Action.h"
-#include "mgr/Request.h"
-#include <map>
 
 class CommIoCbParams;
 class CommCloseCbParams;
@@ -24,60 +19,34 @@ namespace Mgr
 
 /// Coordinator's job that sends a cache manage request to each strand,
 /// aggregating individual strand responses and dumping the result if needed
-class Inquirer: public AsyncJob
+class Inquirer: public Ipc::Inquirer
 {
 public:
-    Inquirer(Action::Pointer anAction, int aFd, const Request &aCause,
+    Inquirer(Action::Pointer anAction, const Request &aCause,
              const Ipc::StrandCoords &coords);
-    virtual ~Inquirer();
-
-    /// finds and calls the right Inquirer upon strand's response
-    static void HandleRemoteAck(const Mgr::Response& response);
 
 protected:
     /* AsyncJob API */
     virtual void start();
-    virtual void swanSong();
     virtual bool doneAll() const;
-    virtual const char *status() const;
 
-private:
-    typedef UnaryMemFunT<Inquirer, Response, const Response&> HandleAckDialer;
+    /* Ipc::Inquirer API */
+    virtual void cleanup();
+    virtual void sendResponse();
+    virtual bool aggregate(Ipc::Response::Pointer aResponse);
 
-    void inquire();
+private:
     void noteWroteHeader(const CommIoCbParams& params);
     void noteCommClosed(const CommCloseCbParams& params);
-
-    void handleRemoteAck(const Response& response);
-
-    static AsyncCall::Pointer DequeueRequest(unsigned int requestId);
-
-    static void RequestTimedOut(void* param);
-    void requestTimedOut();
-    void removeTimeoutEvent();
-
-    void close();
     void removeCloseHandler();
 
 private:
     Action::Pointer aggrAction; //< action to aggregate
 
-    Request cause; ///< cache manager request received from HTTP client
     int fd; ///< HTTP client socket descriptor
 
-    Ipc::StrandCoords strands; ///< all strands we want to query, in order
-    Ipc::StrandCoords::const_iterator pos; ///< strand we should query now
-
-    unsigned int requestId; ///< ID of our outstanding request to strand
     AsyncCall::Pointer writer; ///< comm_write callback
     AsyncCall::Pointer closer; ///< comm_close handler
-    const double timeout; ///< number of seconds to wait for strand response
-
-    /// maps requestId to Inquirer::handleRemoteAck callback
-    typedef std::map<unsigned int, AsyncCall::Pointer> RequestsMap;
-    static RequestsMap TheRequestsMap; ///< pending strand requests
-
-    static unsigned int LastRequestId; ///< last requestId used
 
     CBDATA_CLASS2(Inquirer);
 };
index a0f4014cdd19b5f40040a56ed239624048cbb76b..15d8bd066e88f162882156304ae68dc4ea94cbbe 100644 (file)
@@ -8,20 +8,27 @@
 #include "config.h"
 #include "base/TextException.h"
 #include "ipc/Messages.h"
+#include "ipc/TypedMsgHdr.h"
 #include "mgr/ActionParams.h"
 #include "mgr/Request.h"
 
 
 Mgr::Request::Request(int aRequestorId, unsigned int aRequestId, int aFd,
                       const ActionParams &aParams):
-        requestorId(aRequestorId), requestId(aRequestId),
+        Ipc::Request(aRequestorId, aRequestId),
         fd(aFd), params(aParams)
 {
     Must(requestorId > 0);
-    Must(requestId != 0);
 }
 
-Mgr::Request::Request(const Ipc::TypedMsgHdr& msg)
+Mgr::Request::Request(const Request& request):
+    Ipc::Request(request.requestorId, request.requestId),
+    fd(request.fd), params(request.params)
+{
+}
+
+Mgr::Request::Request(const Ipc::TypedMsgHdr& msg):
+    Ipc::Request(0, 0)
 {
     msg.checkType(Ipc::mtCacheMgrRequest);
     msg.getPod(requestorId);
@@ -41,3 +48,9 @@ Mgr::Request::pack(Ipc::TypedMsgHdr& msg) const
 
     msg.putFd(fd);
 }
+
+Ipc::Request::Pointer
+Mgr::Request::clone() const
+{
+    return new Request(*this);
+}
index 89924f683713d666a4de190957ab4de0341d95ee..50b29d7016c13688ade20886a8e809b75b57dbcf 100644 (file)
@@ -8,7 +8,8 @@
 #ifndef SQUID_MGR_REQUEST_H
 #define SQUID_MGR_REQUEST_H
 
-#include "ipc/TypedMsgHdr.h"
+#include "ipc/forward.h"
+#include "ipc/Request.h"
 #include "mgr/ActionParams.h"
 
 
@@ -16,18 +17,21 @@ namespace Mgr
 {
 
 /// cache manager request
-class Request
+class Request: public Ipc::Request
 {
 public:
     Request(int aRequestorId, unsigned int aRequestId, int aFd,
             const ActionParams &aParams);
 
     explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
-    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+    /* Ipc::Request API */
+    virtual void pack(Ipc::TypedMsgHdr& msg) const;
+    virtual Pointer clone() const;
+
+private:
+    Request(const Request& request);
 
 public:
-    int requestorId; ///< kidId of the requestor; used for response destination
-    unsigned int requestId; ///< unique for sender; matches request w/ response
     int fd; ///< HTTP client connection descriptor
 
     ActionParams params; ///< action name and parameters
index e3204f814f15bb3048c5e6158ea7adce9b6c756e..e7a5866705beafc0d67f3a5ce4ee802dca867779 100644 (file)
 #include "mgr/Response.h"
 
 
-std::ostream& Mgr::operator << (std::ostream &os, const Response& response)
+Mgr::Response::Response(unsigned int aRequestId, Action::Pointer anAction):
+        Ipc::Response(aRequestId), action(anAction)
 {
-    os << "response: {requestId: " << response.requestId << '}';
-    return os;
+    Must(!action || action->name()); // if there is an action, it must be named
 }
 
-Mgr::Response::Response(unsigned int aRequestId, Action::Pointer anAction):
-        requestId(aRequestId), action(anAction)
+Mgr::Response::Response(const Response& response):
+    Ipc::Response(response.requestId), action(response.action)
 {
-    Must(!action || action->name()); // if there is an action, it must be named
 }
 
-Mgr::Response::Response(const Ipc::TypedMsgHdr& msg)
+Mgr::Response::Response(const Ipc::TypedMsgHdr& msg):
+    Ipc::Response(0)
 {
     msg.checkType(Ipc::mtCacheMgrResponse);
     msg.getPod(requestId);
@@ -54,6 +54,12 @@ Mgr::Response::pack(Ipc::TypedMsgHdr& msg) const
     }
 }
 
+Ipc::Response::Pointer
+Mgr::Response::clone() const
+{
+    return new Response(*this);
+}
+
 bool
 Mgr::Response::hasAction() const
 {
index 7b759926eb7e35b33082ea854bbd0634767dab95..38f5f08e7ee2349fde084280946cf081dd915178 100644 (file)
@@ -8,6 +8,8 @@
 #ifndef SQUID_MGR_RESPONSE_H
 #define SQUID_MGR_RESPONSE_H
 
+#include "ipc/forward.h"
+#include "ipc/Response.h"
 #include "mgr/Action.h"
 
 
@@ -16,23 +18,27 @@ namespace Mgr
 
 /// A response to Mgr::Request.
 /// May carry strand action data to be aggregated with data from other strands.
-class Response
+class Response: public Ipc::Response
 {
 public:
-    Response(unsigned int aRequestId = 0, Action::Pointer anAction = NULL);
+    Response(unsigned int aRequestId, Action::Pointer anAction = NULL);
 
     explicit Response(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
-    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+
+    /* Ipc::Response API */
+    virtual void pack(Ipc::TypedMsgHdr& msg) const;
+    virtual Ipc::Response::Pointer clone() const;
+
     bool hasAction() const; ///< whether response contain action object
     const Action& getAction() const; ///< returns action object
 
+private:
+    Response(const Response& response);
+
 public:
-    unsigned int requestId; ///< ID of request we are responding to
     Action::Pointer action; ///< action relating to response
 };
 
-extern std::ostream& operator <<(std::ostream &os, const Response &response);
-
 } // namespace Mgr
 
 #endif /* SQUID_MGR_RESPONSE_H */
index 12273337687ad01d0f444cf740792beda67e4aed..52aed325d8e738043bfdf5346de658611704dbec 100644 (file)
@@ -164,25 +164,3 @@ Mgr::StoreToCommWriter::Abort(void* param)
     if (mgrWriter->fd >= 0)
         comm_close(mgrWriter->fd);
 }
-
-
-int
-Mgr::ImportHttpFdIntoComm(int fd)
-{
-    struct sockaddr_in addr;
-    socklen_t len = sizeof(addr);
-    if (getsockname(fd, reinterpret_cast<sockaddr*>(&addr), &len) == 0) {
-        Ip::Address ipAddr(addr);
-        struct addrinfo* addr_info = NULL;
-        ipAddr.GetAddrInfo(addr_info);
-        addr_info->ai_socktype = SOCK_STREAM;
-        addr_info->ai_protocol = IPPROTO_TCP;
-        comm_import_opened(fd, ipAddr, COMM_NONBLOCKING, Ipc::FdNote(Ipc::fdnHttpSocket), addr_info);
-        ipAddr.FreeAddrInfo(addr_info);
-    } else {
-        debugs(16, DBG_CRITICAL, HERE << "ERROR: FD " << fd << ' ' << xstrerror());
-        ::close(fd);
-        fd = -1;
-    }
-    return fd;
-}
index 116ffdba51dceeec9daacbb32f89ffa3606ff03b..43678a3207cf24ff043d920ba9e14a39ec912f83 100644 (file)
@@ -65,9 +65,6 @@ protected:
     CBDATA_CLASS2(StoreToCommWriter);
 };
 
-/// import HTTP socket fd from another strand into our Comm state
-extern int ImportHttpFdIntoComm(int fd);
-
 } // namespace Mgr
 
 #endif /* SQUID_MGR_STORE_TO_COMM_WRITER_H */
index 555ee882ac5b578ea249e90614b6f8ccb427b3cf..b9d65f57010d148628dcbd9a128254e72161dfab 100644 (file)
  */
 #include "squid.h"
 #include "acl/FilledChecklist.h"
-#include "cache_snmp.h"
+#include "base/CbcPointer.h"
 #include "comm.h"
 #include "comm/Loops.h"
 #include "ipc/StartListening.h"
 #include "ip/Address.h"
 #include "ip/tools.h"
+#include "snmp_core.h"
+#include "snmpx/Forwarder.h"
 
-#define SNMP_REQUEST_SIZE 4096
-#define MAX_PROTOSTAT 5
 
 /// dials snmpConnectionOpened call
 class SnmpListeningStartedDialer: public CallDialer,
@@ -61,29 +61,14 @@ public:
 
 Ip::Address theOutSNMPAddr;
 
-typedef struct _mib_tree_entry mib_tree_entry;
-typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn);
-
-struct _mib_tree_entry {
-    oid *name;
-    int len;
-    oid_ParseFn *parsefunction;
-    instance_Fn *instancefunction;
-    int children;
-
-    struct _mib_tree_entry **leaves;
-
-    struct _mib_tree_entry *parent;
-};
-
 mib_tree_entry *mib_tree_head;
 mib_tree_entry *mib_tree_last;
 
 static void snmpIncomingConnectionOpened(int fd, int errNo);
 static void snmpOutgoingConnectionOpened(int fd, int errNo);
 
-static mib_tree_entry * snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction);
-static mib_tree_entry *snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, int children,...);
+static mib_tree_entry * snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType = atNone);
+static mib_tree_entry *snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType, int children,...);
 static oid *snmpCreateOid(int length,...);
 mib_tree_entry * snmpLookupNodeStr(mib_tree_entry *entry, const char *str);
 int snmpCreateOidFromStr(const char *str, oid **name, int *nl);
@@ -95,7 +80,6 @@ static oid *client_Inst(oid * name, snint * len, mib_tree_entry * current, oid_P
 static void snmpDecodePacket(snmp_request_t * rq);
 static void snmpConstructReponse(snmp_request_t * rq);
 
-static struct snmp_pdu *snmpAgentResponse(struct snmp_pdu *PDU);
 static oid_ParseFn *snmpTreeNext(oid * Current, snint CurrentLen, oid ** Next, snint * NextLen);
 static oid_ParseFn *snmpTreeGet(oid * Current, snint CurrentLen);
 static mib_tree_entry *snmpTreeEntry(oid entry, snint len, mib_tree_entry * current);
@@ -125,7 +109,7 @@ snmpInit(void)
      * without having a "search" function. A search function should be written
      * to make this and the other code much less evil.
      */
-    mib_tree_head = snmpAddNode(snmpCreateOid(1, 1), 1, NULL, NULL, 0);
+    mib_tree_head = snmpAddNode(snmpCreateOid(1, 1), 1, NULL, NULL, atNone, 0);
 
     assert(mib_tree_head);
     debugs(49, 5, "snmpInit: root is " << mib_tree_head);
@@ -144,9 +128,9 @@ snmpInit(void)
 
     /* SQ_SYS - 1.3.6.1.4.1.3495.1.1 */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1", 1, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSVMSIZ, snmp_sysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSSTOR, snmp_sysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYS_UPTIME, snmp_sysFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSVMSIZ, snmp_sysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSSTOR, snmp_sysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYS_UPTIME, snmp_sysFn, static_Inst, atMax);
 
     /* SQ_CONF - 1.3.6.1.4.1.3495.1.2 */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1", 2, NULL, NULL);
@@ -157,10 +141,10 @@ snmpInit(void)
 
     /* SQ_CONF + CONF_STORAGE - 1.3.6.1.4.1.3495.1.5 */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.2", CONF_STORAGE, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_MMAXSZ, snmp_confFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWMAXSZ, snmp_confFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWHIWM, snmp_confFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWLOWM, snmp_confFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_MMAXSZ, snmp_confFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWMAXSZ, snmp_confFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWHIWM, snmp_confFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWLOWM, snmp_confFn, static_Inst, atSum);
 
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.2", CONF_UNIQNAME, snmp_confFn, static_Inst);
 
@@ -169,38 +153,38 @@ snmpInit(void)
 
     /* PERF_SYS - 1.3.6.1.4.1.3495.1.3.1 */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.3", PERF_SYS, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_PF, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMR, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MEMUSAGE, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUTIME, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUUSAGE, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MAXRESSZ, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMOBJCNT, snmp_prfSysFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_PF, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMR, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MEMUSAGE, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUTIME, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUUSAGE, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MAXRESSZ, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMOBJCNT, snmp_prfSysFn, static_Inst, atSum);
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURLRUEXP, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNLREQ, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNUSED_FD, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURRESERVED_FD, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUSED_FD, snmp_prfSysFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURMAX_FD, snmp_prfSysFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNLREQ, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNUSED_FD, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURRESERVED_FD, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUSED_FD, snmp_prfSysFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURMAX_FD, snmp_prfSysFn, static_Inst, atMax);
 
     /* PERF_PROTO - 1.3.6.1.4.1.3495.1.3.2 */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.3", PERF_PROTO, NULL, NULL);
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2", PERF_PROTOSTAT_AGGR, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_REQ, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_HITS, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_ERRORS, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_IN, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_OUT, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_S, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_R, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_SKB, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_RKB, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_REQ, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ERRORS, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_IN, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_OUT, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CURSWAP, snmp_prfProtoFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CLIENTS, snmp_prfProtoFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_REQ, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_HITS, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_ERRORS, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_IN, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_OUT, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_S, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_R, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_SKB, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_RKB, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_REQ, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ERRORS, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_IN, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_OUT, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CURSWAP, snmp_prfProtoFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CLIENTS, snmp_prfProtoFn, static_Inst, atSum);
 
     /* Note this is time-series rather than 'static' */
     /* cacheMedianSvcTable */
@@ -208,49 +192,49 @@ snmpInit(void)
 
     /* cacheMedianSvcEntry */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2", 1, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_TIME, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_ALL, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_MISS, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NM, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_HIT, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_QUERY, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_REPLY, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_DNS, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_RHR, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_BHR, snmp_prfProtoFn, time_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NH, snmp_prfProtoFn, time_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_TIME, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_ALL, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_MISS, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NM, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_HIT, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_QUERY, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_REPLY, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_DNS, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_RHR, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_BHR, snmp_prfProtoFn, time_Inst, atAverage);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NH, snmp_prfProtoFn, time_Inst, atAverage);
 
     /* SQ_NET - 1.3.6.1.4.1.3495.1.4 */
     snmpAddNodeStr("1.3.6.1.4.1.3495.1", 4, NULL, NULL);
 
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.4", NET_IP_CACHE, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_ENT, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_REQ, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_HITS, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_PENDHIT, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_NEGHIT, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_MISS, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_GHBN, snmp_netIpFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_LOC, snmp_netIpFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_ENT, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_REQ, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_HITS, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_PENDHIT, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_NEGHIT, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_MISS, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_GHBN, snmp_netIpFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_LOC, snmp_netIpFn, static_Inst, atSum);
 
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.4", NET_FQDN_CACHE, NULL, NULL);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_ENT, snmp_netFqdnFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_REQ, snmp_netFqdnFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_HITS, snmp_netFqdnFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_PENDHIT, snmp_netFqdnFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_NEGHIT, snmp_netFqdnFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_MISS, snmp_netFqdnFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_GHBN, snmp_netFqdnFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_ENT, snmp_netFqdnFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_REQ, snmp_netFqdnFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_HITS, snmp_netFqdnFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_PENDHIT, snmp_netFqdnFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_NEGHIT, snmp_netFqdnFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_MISS, snmp_netFqdnFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_GHBN, snmp_netFqdnFn, static_Inst, atSum);
 
     snmpAddNodeStr("1.3.6.1.4.1.3495.1.4", NET_DNS_CACHE, NULL, NULL);
 #if USE_DNSSERVERS
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netDnsFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netDnsFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netDnsFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netDnsFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netDnsFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netDnsFn, static_Inst, atSum);
 #else
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netIdnsFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netIdnsFn, static_Inst);
-    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netIdnsFn, static_Inst);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netIdnsFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netIdnsFn, static_Inst, atSum);
+    snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netIdnsFn, static_Inst, atSum);
 #endif
 
     /* SQ_MESH - 1.3.6.1.4.1.3495.1.5 */
@@ -537,6 +521,14 @@ snmpConstructReponse(snmp_request_t * rq)
     struct snmp_pdu *RespPDU;
 
     debugs(49, 5, "snmpConstructReponse: Called.");
+
+    if (UsingSmp() && IamWorkerProcess()) {
+        AsyncJob::Start(new Snmp::Forwarder(static_cast<Snmp::Pdu&>(*rq->PDU),
+            static_cast<Snmp::Session&>(rq->session), rq->sock, rq->from));
+        snmp_free_pdu(rq->PDU);
+        return;
+    }
+
     RespPDU = snmpAgentResponse(rq->PDU);
     snmp_free_pdu(rq->PDU);
 
@@ -552,7 +544,7 @@ snmpConstructReponse(snmp_request_t * rq)
  * return the response to the requester.
  */
 
-static struct snmp_pdu *
+struct snmp_pdu *
 snmpAgentResponse(struct snmp_pdu *PDU) {
 
     struct snmp_pdu *Answer = NULL;
@@ -665,6 +657,29 @@ snmpTreeGet(oid * Current, snint CurrentLen)
     return (Fn);
 }
 
+AggrType
+snmpAggrType(oid* Current, snint CurrentLen)
+{
+    debugs(49, 5, HERE);
+
+    mib_tree_entry* mibTreeEntry = mib_tree_head;
+    AggrType type = atNone;
+    int count = 0;
+
+    if (Current[count] == mibTreeEntry->name[count]) {
+        count++;
+
+        while (mibTreeEntry != NULL && count < CurrentLen) {
+            mibTreeEntry = snmpTreeEntry(Current[count], count, mibTreeEntry);
+            if (mibTreeEntry != NULL)
+                type = mibTreeEntry->aggrType;
+            count++;
+        }
+    }
+
+    return type;
+}
+
 static oid_ParseFn *
 snmpTreeNext(oid * Current, snint CurrentLen, oid ** Next, snint * NextLen)
 {
@@ -1032,7 +1047,7 @@ snmpCreateOidFromStr(const char *str, oid **name, int *nl)
  * on failure.
  */
 static mib_tree_entry *
-snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction)
+snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType)
 {
     mib_tree_entry *m, *b;
     oid *n;
@@ -1051,7 +1066,7 @@ snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instanc
         return NULL;
 
     /* Create a node */
-    m = snmpAddNode(n, nl, parsefunction, instancefunction, 0);
+    m = snmpAddNode(n, nl, parsefunction, instancefunction, aggrType, 0);
 
     /* Link it into the existing tree */
     snmpAddNodeChild(b, m);
@@ -1065,7 +1080,7 @@ snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instanc
  * Adds a node to the MIB tree structure and adds the appropriate children
  */
 static mib_tree_entry *
-snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, int children,...)
+snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType, int children,...)
 {
     va_list args;
     int loop;
@@ -1083,6 +1098,7 @@ snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * inst
     entry->instancefunction = instancefunction;
     entry->children = children;
     entry->leaves = NULL;
+    entry->aggrType = aggrType;
 
     if (children > 0) {
         entry->leaves = (mib_tree_entry **)xmalloc(sizeof(mib_tree_entry *) * children);
diff --git a/src/snmp_core.h b/src/snmp_core.h
new file mode 100644 (file)
index 0000000..284902e
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMP_CORE_H
+#define SQUID_SNMP_CORE_H
+
+#include "config.h"
+#include "cache_snmp.h"
+
+#define SNMP_REQUEST_SIZE 4096
+#define MAX_PROTOSTAT 5
+
+
+typedef struct _mib_tree_entry mib_tree_entry;
+typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn);
+typedef enum {atNone = 0, atSum, atAverage, atMax, atMin} AggrType;
+
+struct _mib_tree_entry {
+    oid *name;
+    int len;
+    oid_ParseFn *parsefunction;
+    instance_Fn *instancefunction;
+    int children;
+
+    struct _mib_tree_entry **leaves;
+
+    struct _mib_tree_entry *parent;
+    AggrType aggrType;
+};
+
+extern struct snmp_pdu* snmpAgentResponse(struct snmp_pdu* PDU);
+extern AggrType snmpAggrType(oid* Current, snint CurrentLen);
+
+#endif /* SQUID_SNMP_CORE_H */
diff --git a/src/snmpx/Forwarder.cc b/src/snmpx/Forwarder.cc
new file mode 100644 (file)
index 0000000..8cba9b7
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "CommCalls.h"
+#include "ipc/Port.h"
+#include "snmp_core.h"
+#include "snmpx/Forwarder.h"
+#include "snmpx/Request.h"
+#include "snmpx/Response.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Snmp, Forwarder);
+
+
+Snmp::Forwarder::Forwarder(const Pdu& aPdu, const Session& aSession, int aFd,
+                           const Ip::Address& anAddress):
+        Ipc::Forwarder(new Request(KidIdentifier, 0, aPdu, aSession, aFd, anAddress), 2),
+        fd(aFd)
+{
+    debugs(49, 5, HERE << "FD " << aFd);
+    Must(fd >= 0);
+    closer = asyncCall(16, 5, "Snmp::Forwarder::noteCommClosed",
+                       CommCbMemFunT<Forwarder, CommCloseCbParams>(this, &Forwarder::noteCommClosed));
+    comm_add_close_handler(fd, closer);
+}
+
+/// removes our cleanup handler of the client connection socket
+void
+Snmp::Forwarder::cleanup()
+{
+    if (fd >= 0) {
+        if (closer != NULL) {
+            comm_remove_close_handler(fd, closer);
+            closer = NULL;
+        }
+        fd = -1;
+    }
+}
+
+/// called when the client socket gets closed by some external force
+void
+Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params)
+{
+    debugs(49, 5, HERE);
+    Must(fd == params.fd);
+    fd = -1;
+    mustStop("commClosed");
+}
+
+void
+Snmp::Forwarder::handleTimeout()
+{
+    sendError(SNMP_ERR_RESOURCEUNAVAILABLE);
+    Ipc::Forwarder::handleTimeout();
+}
+
+void
+Snmp::Forwarder::handleException(const std::exception& e)
+{
+    debugs(49, 3, HERE << e.what());
+    if (fd >= 0)
+        sendError(SNMP_ERR_GENERR);
+    Ipc::Forwarder::handleException(e);
+}
+
+/// send error SNMP response
+void
+Snmp::Forwarder::sendError(int error)
+{
+    debugs(49, 3, HERE);
+    Snmp::Request& req = static_cast<Snmp::Request&>(*request);
+    req.pdu.command = SNMP_PDU_RESPONSE;
+    req.pdu.errstat = error;
+    u_char buffer[SNMP_REQUEST_SIZE];
+    int len = sizeof(buffer);
+    snmp_build(&req.session, &req.pdu, buffer, &len);
+    comm_udp_sendto(fd, req.address, buffer, len);
+}
+
+void
+Snmp::SendResponse(unsigned int requestId, const Pdu& pdu)
+{
+    debugs(49, 5, HERE);
+    // snmpAgentResponse() can modify arg
+    Pdu tmp = pdu;
+    Snmp::Response response(requestId);
+    snmp_pdu* response_pdu = NULL;
+    try {
+        response_pdu = snmpAgentResponse(&tmp);
+        Must(response_pdu != NULL);
+        response.pdu = static_cast<Pdu&>(*response_pdu);
+        snmp_free_pdu(response_pdu);
+    } catch (const std::exception& e) {
+        debugs(49, DBG_CRITICAL, HERE << e.what());
+        response.pdu.command = SNMP_PDU_RESPONSE;
+        response.pdu.errstat = SNMP_ERR_GENERR;
+    }
+    Ipc::TypedMsgHdr message;
+    response.pack(message);
+    Ipc::SendMessage(Ipc::coordinatorAddr, message);
+}
diff --git a/src/snmpx/Forwarder.h b/src/snmpx/Forwarder.h
new file mode 100644 (file)
index 0000000..bdef2d6
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_FORWARDER_H
+#define SQUID_SNMPX_FORWARDER_H
+
+#include "ipc/Forwarder.h"
+#include "snmpx/Pdu.h"
+#include "snmpx/Session.h"
+
+
+class CommCloseCbParams;
+
+namespace Snmp
+{
+
+/** Forwards a single client SNMP request to Coordinator.
+ * Waits for an ACK from Coordinator
+ * Send the data unit with an error response if forwarding fails.
+ */
+class Forwarder: public Ipc::Forwarder
+{
+public:
+    Forwarder(const Pdu& aPdu, const Session& aSession, int aFd,
+              const Ip::Address& anAddress);
+
+protected:
+    /* Ipc::Forwarder API */
+    virtual void cleanup(); ///< perform cleanup actions
+    virtual void handleTimeout();
+    virtual void handleException(const std::exception& e);
+
+private:
+    void noteCommClosed(const CommCloseCbParams& params);
+    void sendError(int error);
+
+private:
+    int fd; ///< client connection descriptor
+    AsyncCall::Pointer closer; ///< comm_close handler for the connection
+
+    CBDATA_CLASS2(Forwarder);
+};
+
+extern void SendResponse(unsigned int requestId, const Pdu& pdu);
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_FORWARDER_H */
diff --git a/src/snmpx/Inquirer.cc b/src/snmpx/Inquirer.cc
new file mode 100644 (file)
index 0000000..b7e4a59
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "CommCalls.h"
+#include "ipc/UdsOp.h"
+#include "snmp_core.h"
+#include "snmpx/Inquirer.h"
+#include "snmpx/Response.h"
+#include "snmpx/Request.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Snmp, Inquirer);
+
+
+Snmp::Inquirer::Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords):
+        Ipc::Inquirer(aRequest.clone(), coords, 2),
+        aggrPdu(aRequest.pdu),
+        fd(ImportFdIntoComm(aRequest.fd, SOCK_DGRAM, IPPROTO_UDP, Ipc::fdnInSnmpSocket))
+{
+    debugs(49, 5, HERE);
+    closer = asyncCall(16, 5, "Snmp::Inquirer::noteCommClosed",
+                       CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
+    comm_add_close_handler(fd, closer);
+}
+
+/// closes our copy of the client connection socket
+void
+Snmp::Inquirer::cleanup()
+{
+    if (fd >= 0) {
+        if (closer != NULL) {
+            comm_remove_close_handler(fd, closer);
+            closer = NULL;
+        }
+        comm_close(fd);
+        fd = -1;
+    }
+}
+
+void
+Snmp::Inquirer::start()
+{
+    debugs(49, 5, HERE);
+    Ipc::Inquirer::start();
+    Must(fd >= 0);
+    inquire();
+}
+
+void
+Snmp::Inquirer::handleException(const std::exception& e)
+{
+    aggrPdu.errstat = SNMP_ERR_GENERR;
+    Ipc::Inquirer::handleException(e);
+}
+
+bool
+Snmp::Inquirer::aggregate(Response::Pointer aResponse)
+{
+    Snmp::Response& response = static_cast<Snmp::Response&>(*aResponse);
+    bool error = response.pdu.errstat != SNMP_ERR_NOERROR;
+    if (error) {
+        aggrPdu = response.pdu;
+    } else {
+        aggrPdu.aggregate(response.pdu);
+    }
+    return error;
+}
+
+/// called when the some external force closed our socket
+void
+Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+{
+    debugs(49, 5, HERE);
+    Must(fd < 0 || fd == params.fd);
+    fd = -1;
+    mustStop("commClosed");
+}
+
+bool
+Snmp::Inquirer::doneAll() const
+{
+    return !writer && Ipc::Inquirer::doneAll();
+}
+
+void
+Snmp::Inquirer::sendResponse()
+{
+    debugs(49, 5, HERE);
+    aggrPdu.command = SNMP_PDU_RESPONSE;
+    u_char buffer[SNMP_REQUEST_SIZE];
+    int len = sizeof(buffer);
+    Snmp::Request& req = static_cast<Snmp::Request&>(*request);
+    snmp_build(&req.session, &aggrPdu, buffer, &len);
+    comm_udp_sendto(fd, req.address, buffer, len);
+}
diff --git a/src/snmpx/Inquirer.h b/src/snmpx/Inquirer.h
new file mode 100644 (file)
index 0000000..5f53ff1
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_INQUIRER_H
+#define SQUID_SNMPX_INQUIRER_H
+
+#include "ipc/Inquirer.h"
+#include "snmpx/forward.h"
+#include "snmpx/Pdu.h"
+
+
+class CommCloseCbParams;
+
+namespace Snmp
+{
+
+/// Coordinator's job that sends a PDU request to each strand,
+/// aggregates strand responses and send back the result to client
+class Inquirer: public Ipc::Inquirer
+{
+public:
+    Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords);
+
+protected:
+    /* AsyncJob API */
+    virtual void start();
+    virtual bool doneAll() const;
+
+    /* Ipc::Inquirer API */
+    virtual void cleanup();
+    virtual void handleException(const std::exception& e);
+    virtual void sendResponse();
+    virtual bool aggregate(Ipc::Response::Pointer aResponse);
+
+private:
+    void noteCommClosed(const CommCloseCbParams& params);
+
+private:
+    Pdu aggrPdu; ///< aggregated pdu
+    int fd; ///< client connection descriptor
+
+    AsyncCall::Pointer writer; ///< comm_write callback
+    AsyncCall::Pointer closer; ///< comm_close handler
+
+    CBDATA_CLASS2(Inquirer);
+};
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_INQUIRER_H */
diff --git a/src/snmpx/Pdu.cc b/src/snmpx/Pdu.cc
new file mode 100644 (file)
index 0000000..d27c92b
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "ipc/TypedMsgHdr.h"
+#include "protos.h"
+#include "snmp_core.h"
+#include "snmpx/Pdu.h"
+#include "snmpx/Var.h"
+
+
+Snmp::Pdu::Pdu()
+{
+    init();
+}
+
+Snmp::Pdu::Pdu(const Pdu& pdu)
+{
+    init();
+    assign(pdu);
+}
+
+Snmp::Pdu::~Pdu()
+{
+    clear();
+}
+
+Snmp::Pdu&
+Snmp::Pdu::operator = (const Pdu& pdu)
+{
+    clear();
+    assign(pdu);
+    return *this;
+}
+
+void
+Snmp::Pdu::init()
+{
+    xmemset(this, 0, sizeof(*this));
+    errstat = SNMP_DEFAULT_ERRSTAT;
+    errindex = SNMP_DEFAULT_ERRINDEX;
+}
+
+void
+Snmp::Pdu::aggregate(const Pdu& pdu)
+{
+    Must(varCount() == pdu.varCount());
+    for (variable_list* p_aggr = variables, *p_var = pdu.variables; p_var != NULL;
+         p_aggr = p_aggr->next_variable, p_var = p_var->next_variable)
+    {
+        Must(p_aggr != NULL);
+        Var& aggr = static_cast<Var&>(*p_aggr);
+        Var& var = static_cast<Var&>(*p_var);
+        if (aggr.isNull()) {
+            aggr.setName(var.getName());
+            aggr.copyValue(var);
+        } else {
+            switch(snmpAggrType(aggr.name, aggr.name_length))
+            {
+            case atSum:
+            case atAverage:
+                aggr += var;
+                break;
+            case atMax:
+                if (var > aggr)
+                    aggr.copyValue(var);
+                break;
+            case atMin:
+                if (var < aggr)
+                    aggr.copyValue(var);
+                break;
+            default:
+                break;
+            }
+        }
+    }
+}
+
+void
+Snmp::Pdu::clear()
+{
+    clearSystemOid();
+    clearVars();
+    init();
+}
+
+void
+Snmp::Pdu::assign(const Pdu& pdu)
+{
+    command = pdu.command;
+    address.sin_addr.s_addr = pdu.address.sin_addr.s_addr;
+    reqid = pdu.reqid;
+    errstat = pdu.errstat;
+    errindex = pdu.errindex;
+    non_repeaters = pdu.non_repeaters;
+    max_repetitions = pdu.max_repetitions;
+    agent_addr.sin_addr.s_addr = pdu.agent_addr.sin_addr.s_addr;
+    trap_type = pdu.trap_type;
+    specific_type = pdu.specific_type;
+    time = pdu.time;
+    setSystemOid(pdu.getSystemOid());
+    setVars(pdu.variables);
+}
+
+void
+Snmp::Pdu::clearVars()
+{
+    variable_list* var = variables;
+    while (var != NULL) {
+        variable_list* tmp = var;
+        var = var->next_variable;
+        snmp_var_free(tmp);
+    }
+    variables = NULL;
+}
+
+void
+Snmp::Pdu::setVars(variable_list* vars)
+{
+    clearVars();
+    for (variable_list** p_var = &variables; vars != NULL;
+         vars = vars->next_variable, p_var = &(*p_var)->next_variable)
+    {
+        *p_var = new Var(static_cast<Var&>(*vars));
+    }
+}
+
+void
+Snmp::Pdu::clearSystemOid()
+{
+    if (enterprise != NULL) {
+        xfree(enterprise);
+        enterprise = NULL;
+    }
+    enterprise_length = 0;
+}
+
+Range<const oid*>
+Snmp::Pdu::getSystemOid() const
+{
+    return Range<const oid*>(enterprise, enterprise + enterprise_length);
+}
+
+void
+Snmp::Pdu::setSystemOid(const Range<const oid*>& systemOid)
+{
+    clearSystemOid();
+    if (systemOid.start != NULL && systemOid.size() != 0) {
+        enterprise_length = systemOid.size();
+        enterprise = static_cast<oid*>(xmalloc(enterprise_length * sizeof(oid)));
+        std::copy(systemOid.start, systemOid.end, enterprise);
+    }
+}
+
+void
+Snmp::Pdu::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.putPod(command);
+    msg.putPod(address);
+    msg.putPod(reqid);
+    msg.putPod(errstat);
+    msg.putPod(errindex);
+    msg.putPod(non_repeaters);
+    msg.putPod(max_repetitions);
+    msg.putInt(enterprise_length);
+    if (enterprise_length > 0) {
+        Must(enterprise != NULL);
+        msg.putFixed(enterprise, enterprise_length * sizeof(oid));
+    }
+    msg.putPod(agent_addr);
+    msg.putPod(trap_type);
+    msg.putPod(specific_type);
+    msg.putPod(time);
+    msg.putInt(varCount());
+    for (variable_list* var = variables; var != NULL; var = var->next_variable)
+        static_cast<Var*>(var)->pack(msg);
+}
+
+void
+Snmp::Pdu::unpack(const Ipc::TypedMsgHdr& msg)
+{
+    clear();
+    msg.getPod(command);
+    msg.getPod(address);
+    msg.getPod(reqid);
+    msg.getPod(errstat);
+    msg.getPod(errindex);
+    msg.getPod(non_repeaters);
+    msg.getPod(max_repetitions);
+    enterprise_length = msg.getInt();
+    if (enterprise_length > 0) {
+        enterprise = static_cast<oid*>(xmalloc(enterprise_length * sizeof(oid)));
+        msg.getFixed(enterprise, enterprise_length * sizeof(oid));
+    }
+    msg.getPod(agent_addr);
+    msg.getPod(trap_type);
+    msg.getPod(specific_type);
+    msg.getPod(time);
+    int count = msg.getInt();
+    for (variable_list** p_var = &variables; count > 0;
+         p_var = &(*p_var)->next_variable, --count)
+    {
+        Var* var = new Var();
+        var->unpack(msg);
+        *p_var = var;
+    }
+}
+
+int
+Snmp::Pdu::varCount() const
+{
+    int count = 0;
+    for (variable_list* var = variables; var != NULL; var = var->next_variable)
+        ++count;
+    return count;
+}
diff --git a/src/snmpx/Pdu.h b/src/snmpx/Pdu.h
new file mode 100644 (file)
index 0000000..2084894
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_PDU_H
+#define SQUID_SNMPX_PDU_H
+
+#include "config.h"
+#include "ipc/forward.h"
+#include "Range.h"
+#include "snmp.h"
+
+
+namespace Snmp
+{
+
+/// snmp_pdu wrapper introduce the feature
+/// to aggregate variables and to pack/unpack message
+class Pdu: public snmp_pdu
+{
+public:
+    Pdu();
+    Pdu(const Pdu& pdu);
+    Pdu& operator = (const Pdu& pdu);
+    ~Pdu();
+
+    void aggregate(const Pdu& pdu);
+    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+    void unpack(const Ipc::TypedMsgHdr& msg); ///< restore struct from the message
+    int  varCount() const; ///< size of variables list
+    void clear();  ///< clear all internal members
+    void setVars(variable_list* vars); ///< perform assignment of variables list
+    void clearVars(); ///< clear variables list
+    Range<const oid*> getSystemOid() const;
+    void setSystemOid(const Range<const oid*>& systemOid);
+    void clearSystemOid(); 
+
+private:
+    void init(); ///< initialize members
+    void assign(const Pdu& pdu); ///< perform full assignment
+};
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_PDU_H */
diff --git a/src/snmpx/Request.cc b/src/snmpx/Request.cc
new file mode 100644 (file)
index 0000000..a0d5f09
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "ipc/Messages.h"
+#include "ipc/TypedMsgHdr.h"
+#include "snmpx/Request.h"
+
+
+Snmp::Request::Request(int aRequestorId, unsigned int aRequestId,
+                       const Pdu& aPdu, const Session& aSession,
+                       int aFd, const Ip::Address& anAddress):
+    Ipc::Request(aRequestorId, aRequestId),
+    pdu(aPdu), session(aSession), fd(aFd), address(anAddress)
+{
+}
+
+Snmp::Request::Request(const Request& request):
+    Ipc::Request(request.requestorId, request.requestId),
+    pdu(request.pdu), session(request.session),
+    fd(request.fd), address(request.address)
+{
+}
+
+Snmp::Request::Request(const Ipc::TypedMsgHdr& msg):
+    Ipc::Request(0, 0)
+{
+    msg.checkType(Ipc::mtSnmpRequest);
+    msg.getPod(requestorId);
+    msg.getPod(requestId);
+    pdu.unpack(msg);
+    session.unpack(msg);
+    msg.getPod(address);
+
+    fd = msg.getFd();
+}
+
+void
+Snmp::Request::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.setType(Ipc::mtSnmpRequest);
+    msg.putPod(requestorId);
+    msg.putPod(requestId);
+    pdu.pack(msg);
+    session.pack(msg);
+    msg.putPod(address);
+
+    msg.putFd(fd);
+}
+
+Ipc::Request::Pointer
+Snmp::Request::clone() const
+{
+    return new Request(*this);
+}
diff --git a/src/snmpx/Request.h b/src/snmpx/Request.h
new file mode 100644 (file)
index 0000000..aab4f1a
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_REQUEST_H
+#define SQUID_SNMPX_REQUEST_H
+
+#include "ip/Address.h"
+#include "ipc/forward.h"
+#include "ipc/Request.h"
+#include "snmpx/Pdu.h"
+#include "snmpx/Session.h"
+
+
+namespace Snmp
+{
+
+/// SNMP request
+class Request: public Ipc::Request
+{
+public:
+    Request(int aRequestorId, unsigned int aRequestId, const Pdu& aPdu,
+        const Session& aSession, int aFd, const Ip::Address& anAddress);
+
+    explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+    /* Ipc::Request API */
+    virtual void pack(Ipc::TypedMsgHdr& msg) const;
+    virtual Pointer clone() const;
+
+private:
+    Request(const Request& request);
+
+public:
+    Pdu pdu; ///< SNMP protocol data unit
+    Session session; ///< SNMP session 
+    int fd; ///< client connection descriptor
+    Ip::Address address; ///< client address
+};
+
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_REQUEST_H */
diff --git a/src/snmpx/Response.cc b/src/snmpx/Response.cc
new file mode 100644 (file)
index 0000000..f32c355
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "ipc/Messages.h"
+#include "ipc/TypedMsgHdr.h"
+#include "snmpx/Response.h"
+
+
+std::ostream& Snmp::operator << (std::ostream& os, const Response& response)
+{
+    os << "response: {requestId: " << response.requestId << '}';
+    return os;
+}
+
+Snmp::Response::Response(unsigned int aRequestId):
+    Ipc::Response(aRequestId), pdu()
+{
+}
+
+Snmp::Response::Response(const Response& response):
+    Ipc::Response(response.requestId), pdu(response.pdu)
+{
+}
+
+Snmp::Response::Response(const Ipc::TypedMsgHdr& msg):
+    Ipc::Response(0)
+{
+    msg.checkType(Ipc::mtSnmpResponse);
+    msg.getPod(requestId);
+    pdu.unpack(msg);
+}
+
+void
+Snmp::Response::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.setType(Ipc::mtSnmpResponse);
+    msg.putPod(requestId);
+    pdu.pack(msg);
+}
+
+Ipc::Response::Pointer
+Snmp::Response::clone() const
+{
+    return new Response(*this);
+}
diff --git a/src/snmpx/Response.h b/src/snmpx/Response.h
new file mode 100644 (file)
index 0000000..99d9846
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_RESPONSE_H
+#define SQUID_SNMPX_RESPONSE_H
+
+#include "ipc/forward.h"
+#include "ipc/Response.h"
+#include "snmpx/Pdu.h"
+#include <ostream>
+
+namespace Snmp
+{
+
+/// 
+class Response: public Ipc::Response
+{
+public:
+    Response(unsigned int aRequestId);
+    explicit Response(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+    /* Ipc::Response API */
+    virtual void pack(Ipc::TypedMsgHdr& msg) const;
+    virtual Ipc::Response::Pointer clone() const;
+
+private:
+    Response(const Response& response);
+
+public:
+    Pdu pdu; ///< SNMP protocol data unit
+};
+
+extern std::ostream& operator << (std::ostream& os, const Response& response);
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_RESPONSE_H */
diff --git a/src/snmpx/Session.cc b/src/snmpx/Session.cc
new file mode 100644 (file)
index 0000000..fb94ca4
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "ipc/TypedMsgHdr.h"
+#include "protos.h"
+#include "snmpx/Session.h"
+
+
+Snmp::Session::Session()
+{
+    clear();
+}
+
+Snmp::Session::Session(const Session& session)
+{
+    assign(session);
+}
+
+Snmp::Session::~Session()
+{
+    free();
+}
+
+Snmp::Session&
+Snmp::Session::operator = (const Session& session)
+{
+    free();
+    assign(session);
+    return *this;
+}
+
+void
+Snmp::Session::clear()
+{
+    xmemset(this, 0, sizeof(*this));
+}
+
+void
+Snmp::Session::free()
+{
+    if (community_len > 0) {
+        Must(community != NULL);
+        xfree(community);
+    }
+    if (peername != NULL)
+        xfree(peername);
+    clear();
+}
+
+void
+Snmp::Session::assign(const Session& session)
+{
+    memcpy(this, &session, sizeof(*this));
+    if (session.community != NULL) {
+        community = (u_char*)xstrdup((char*)session.community);
+        Must(community != NULL);
+    }
+    if (session.peername != NULL) {
+        peername = xstrdup(session.peername);
+        Must(peername != NULL);
+    }
+}
+
+void
+Snmp::Session::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.putPod(Version);
+    msg.putInt(community_len);
+    if (community_len > 0) {
+        Must(community != NULL);
+        msg.putFixed(community, community_len);
+    }
+    msg.putPod(retries);
+    msg.putPod(timeout);
+    int len = peername != NULL ? strlen(peername) : 0;
+    msg.putInt(len);
+    if (len > 0)
+        msg.putFixed(peername, len);
+    msg.putPod(remote_port);
+    msg.putPod(local_port);
+}
+
+void
+Snmp::Session::unpack(const Ipc::TypedMsgHdr& msg)
+{
+    free();
+    msg.getPod(Version);
+    community_len = msg.getInt();
+    if (community_len > 0) {
+        community = static_cast<u_char*>(xmalloc(community_len + 1));
+        Must(community != NULL);
+        msg.getFixed(community, community_len);
+        community[community_len] = 0;
+    }
+    msg.getPod(retries);
+    msg.getPod(timeout);
+    int len = msg.getInt();
+    if (len > 0) {
+        peername = static_cast<char*>(xmalloc(len + 1));
+        Must(peername != NULL);
+        msg.getFixed(peername, len);
+        peername[len] = 0;
+    }
+    msg.getPod(remote_port);
+    msg.getPod(local_port);
+}
diff --git a/src/snmpx/Session.h b/src/snmpx/Session.h
new file mode 100644 (file)
index 0000000..6e93f81
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_SESSION_H
+#define SQUID_SNMPX_SESSION_H
+
+#include "config.h"
+#include "ipc/forward.h"
+#include "snmp.h"
+#include "snmp_session.h"
+
+
+namespace Snmp
+{
+
+/// snmp_session wrapper add pack/unpack feature
+class Session: public snmp_session
+{
+public:
+    Session();
+    Session(const Session& session);
+    Session& operator = (const Session& session);
+    ~Session();
+
+    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+    void unpack(const Ipc::TypedMsgHdr& msg); ///< restore struct from the message
+    void clear(); ///< clear internal members
+
+private:
+    void free();  ///< free internal members
+    void assign(const Session& session); ///< perform full assignment
+};
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_SESSION_H */
diff --git a/src/snmpx/Var.cc b/src/snmpx/Var.cc
new file mode 100644 (file)
index 0000000..8893440
--- /dev/null
@@ -0,0 +1,374 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "ipc/TypedMsgHdr.h"
+#include "protos.h"
+#include "snmpx/Var.h"
+
+
+Snmp::Var::Var()
+{
+    init();
+}
+
+Snmp::Var::Var(const Var& var)
+{
+    init();
+    assign(var);
+}
+
+Snmp::Var::~Var()
+{
+    clear();
+}
+
+Snmp::Var&
+Snmp::Var::operator = (const Var& var)
+{
+    clear();
+    assign(var);
+    return *this;
+}
+
+void
+Snmp::Var::init()
+{
+    xmemset(this, 0, sizeof(*this));
+}
+
+Snmp::Var&
+Snmp::Var::operator += (const Var& var)
+{
+    switch(type)
+    {
+    case SMI_INTEGER:
+        setInt(asInt() + var.asInt());
+        break;
+    case SMI_GAUGE32:
+        setGauge(asGauge() + var.asGauge());
+        break;
+    case SMI_COUNTER32:
+        setCounter(asCounter() + var.asCounter());
+        break;
+    case SMI_COUNTER64:
+        setCounter64(asCounter64() + var.asCounter64());
+        break;
+    case SMI_TIMETICKS:
+        setTimeTicks(asTimeTicks() + var.asTimeTicks());
+        break;
+    default:
+        debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type);
+        throw TexcHere("Unsupported type");
+        break;
+    }
+    return *this;
+}
+
+Snmp::Var&
+Snmp::Var::operator /= (int num)
+{
+    Must(num != 0);
+    switch(type)
+    {
+    case SMI_INTEGER:
+        setInt(asInt() / num);
+        break;
+    case SMI_GAUGE32:
+        setGauge(asGauge() / num);
+        break;
+    case SMI_COUNTER32:
+        setCounter(asCounter() / num);
+        break;
+    case SMI_COUNTER64:
+        setCounter64(asCounter64() / num);
+        break;
+    case SMI_TIMETICKS:
+        setTimeTicks(asTimeTicks() / num);
+        break;
+    default:
+        debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type);
+        throw TexcHere("Unsupported type");
+        break;
+    }
+    return *this;
+}
+
+bool
+Snmp::Var::operator < (const Var& var) const
+{
+    switch(type)
+    {
+    case SMI_INTEGER:
+        return asInt() < var.asInt();
+    case SMI_GAUGE32:
+        return asGauge() < var.asGauge();
+    case SMI_COUNTER32:
+        return asCounter() < var.asCounter();
+    case SMI_COUNTER64:
+        return asCounter64() < var.asCounter64();
+    case SMI_TIMETICKS:
+        return asTimeTicks() < var.asTimeTicks();
+    default:
+        debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type);
+        throw TexcHere("Unsupported type");
+        break;
+    }
+    return false; // unreachable
+}
+
+bool
+Snmp::Var::operator > (const Var& var) const
+{
+    switch(type)
+    {
+    case SMI_INTEGER:
+        return asInt() > var.asInt();
+    case SMI_GAUGE32:
+        return asGauge() > var.asGauge();
+    case SMI_COUNTER32:
+        return asCounter() > var.asCounter();
+    case SMI_COUNTER64:
+        return asCounter64() > var.asCounter64();
+    case SMI_TIMETICKS:
+        return asTimeTicks() > var.asTimeTicks();
+    default:
+        debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type);
+        throw TexcHere("Unsupported type");
+        break;
+    }
+    return false; // unreachable
+}
+
+void
+Snmp::Var::assign(const Var& var)
+{
+    setName(var.getName());
+    copyValue(var);
+}
+
+void
+Snmp::Var::clearName()
+{
+    if (name != NULL) {
+        xfree(name);
+        name = NULL;
+    }
+    name_length = 0;
+}
+
+Range<const oid*>
+Snmp::Var::getName() const
+{
+    return Range<const oid*>(name, name + name_length);
+}
+
+void
+Snmp::Var::setName(const Range<const oid*>& aName)
+{
+    clearName();
+    if (aName.start != NULL && aName.size() != 0) {
+        name_length = aName.size();
+        name = static_cast<oid*>(xmalloc(name_length * sizeof(oid)));
+        std::copy(aName.start, aName.end, name);
+    }
+}
+
+void
+Snmp::Var::clearValue()
+{
+    if (val.string != NULL) {
+        xfree(val.string);
+        val.string = NULL;
+    }
+    val_len = 0;
+    type = 0;
+}
+
+bool
+Snmp::Var::isNull() const
+{
+    return type == SMI_NULLOBJ;
+}
+
+int
+Snmp::Var::asInt() const
+{
+    Must(type == SMI_INTEGER);
+    Must(val.integer != NULL && val_len == sizeof(int));
+    return *val.integer;
+}
+
+unsigned int
+Snmp::Var::asGauge() const
+{
+    Must(type == SMI_GAUGE32);
+    Must(val.integer != NULL && val_len == 4);
+    return *reinterpret_cast<unsigned int*>(val.integer);
+}
+
+int
+Snmp::Var::asCounter() const
+{
+    Must(type == SMI_COUNTER32);
+    Must(val.integer != NULL && val_len == 4);
+    return *reinterpret_cast<int*>(val.integer);
+}
+
+long long int
+Snmp::Var::asCounter64() const
+{
+    Must(type == SMI_COUNTER64);
+    Must(val.integer != NULL && val_len == 8);
+    return *reinterpret_cast<long long int*>(val.integer);
+}
+
+time_t
+Snmp::Var::asTimeTicks() const
+{
+    Must(type == SMI_TIMETICKS);
+    Must(val.integer != NULL && val_len == sizeof(time_t));
+    return *reinterpret_cast<time_t*>(val.integer);
+}
+
+Range<const oid*>
+Snmp::Var::asObject() const
+{
+    Must(type == SMI_OBJID);
+    Must(val_len % sizeof(oid) == 0);
+    int length = val_len / sizeof(oid);
+    Must(val.objid != NULL && length > 0);
+    return Range<const oid*>(val.objid, val.objid + length);
+}
+
+Range<const u_char*>
+Snmp::Var::asString() const
+{
+    Must(type == SMI_STRING);
+    Must(val.string != NULL && val_len > 0);
+    return Range<const u_char*>(val.string, val.string + val_len);
+}
+
+ipaddr
+Snmp::Var::asIpAddress() const
+{
+    Must(type == SMI_IPADDRESS);
+    Must(val.string != NULL && val_len == sizeof(ipaddr));
+    return *reinterpret_cast<ipaddr*>(val.string);
+}
+
+void
+Snmp::Var::setInt(int value)
+{
+    setValue(&value, sizeof(value), SMI_INTEGER);
+}
+
+void
+Snmp::Var::setCounter(int value)
+{
+    setValue(&value, sizeof(value), SMI_COUNTER32);
+}
+
+void
+Snmp::Var::setGauge(unsigned int value)
+{
+    setValue(&value, sizeof(value), SMI_GAUGE32);
+}
+
+void
+Snmp::Var::setString(const Range<const u_char*>& string)
+{
+    setValue(string.start, string.size(), SMI_STRING);
+}
+
+void
+Snmp::Var::setObject(const Range<const oid*>& object)
+{
+    setValue(object.start, object.size() * sizeof(oid), SMI_OBJID);
+}
+
+void
+Snmp::Var::setCounter64(long long int counter)
+{
+    setValue(&counter, sizeof(counter), SMI_COUNTER64);
+}
+
+void
+Snmp::Var::setTimeTicks(time_t ticks)
+{
+    setValue(&ticks, sizeof(ticks), SMI_TIMETICKS);
+}
+
+void
+Snmp::Var::setIpAddress(ipaddr addr)
+{
+    setValue(&addr, sizeof(addr), SMI_IPADDRESS);
+}
+
+void
+Snmp::Var::copyValue(const Var& var)
+{
+    setValue(var.val.string, var.val_len, var.type);
+}
+
+void
+Snmp::Var::setValue(const void* value, int length, int aType)
+{
+    clearValue();
+    if (value != NULL) {
+        Must(length > 0 && aType > 0);
+        val.string = static_cast<u_char*>(xmalloc(length));
+        memcpy(val.string, value, length);
+    }
+    val_len = length;
+    type = aType;
+}
+
+void
+Snmp::Var::clear()
+{
+    clearName();
+    clearValue();
+    init();
+}
+
+void
+Snmp::Var::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.putInt(name_length);
+    if (name_length > 0) {
+        Must(name != NULL);
+        msg.putFixed(name, name_length * sizeof(oid));
+    }
+    msg.putPod(type);
+    msg.putPod(val_len);
+    if (val_len > 0) {
+        Must(val.string != NULL);
+        msg.putFixed(val.string, val_len);
+    }
+}
+
+void
+Snmp::Var::unpack(const Ipc::TypedMsgHdr& msg)
+{
+    clearName();
+    clearValue();
+    name_length = msg.getInt();
+    Must(name_length >= 0);
+    if (name_length > 0) {
+        name = static_cast<oid*>(xmalloc(name_length * sizeof(oid)));
+        msg.getFixed(name, name_length * sizeof(oid));
+    }
+    msg.getPod(type);
+    val_len = msg.getInt();
+    Must(val_len >= 0);
+    if (val_len > 0) {
+        val.string = static_cast<u_char*>(xmalloc(val_len));
+        msg.getFixed(val.string, val_len);
+    }
+}
diff --git a/src/snmpx/Var.h b/src/snmpx/Var.h
new file mode 100644 (file)
index 0000000..86d0532
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_VAR_H
+#define SQUID_SNMPX_VAR_H
+
+#include "config.h"
+#include "ipc/forward.h"
+#include "Range.h"
+#include "snmp_vars.h"
+
+
+namespace Snmp
+{
+
+/// variable_list wrapper implement the feature to change
+/// the name/value of variable and to pack/unpack message
+class Var: public variable_list
+{
+public:
+    Var();
+    Var(const Var& var);
+    Var& operator = (const Var& var);
+    ~Var();
+
+    Var& operator += (const Var& var);
+    Var& operator /= (int num);
+    bool operator < (const Var& var) const;
+    bool operator > (const Var& var) const;
+    
+    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+    void unpack(const Ipc::TypedMsgHdr& msg); ///< restore struct from the message
+
+    Range<const oid*> getName() const; ///< returns variable name
+    void setName(const Range<const oid*>& aName); ///< set new variable name
+    void clearName(); ///< clear variable name
+
+    bool isNull() const;
+
+    int asInt() const; ///< returns variable value as integer
+    unsigned int asGauge() const; ///< returns variable value as unsigned int
+    int asCounter() const; ///< returns variable value as Counter32
+    long long int asCounter64() const; ///< returns variable value as Counter64
+    time_t asTimeTicks() const; ///< returns variable value as time ticks
+    Range<const oid*> asObject() const; ///< returns variable value as object oid
+    Range<const u_char*> asString() const; ///< returns variable value as chars string
+    ipaddr asIpAddress() const; ///< returns variable value as ip address
+
+    void setInt(int value); ///< assign int value to variable
+    void setCounter(int value); ///< assign Counter32 value to variable
+    void setGauge(unsigned int value); ///< assign unsigned int value to variable
+    void setString(const Range<const u_char*>& string); ///< assign string to variable
+    void setObject(const Range<const oid*>& object); ///< assign object oid to variable
+    void setTimeTicks(time_t ticks); ///<assign time_t value to variable
+    void setIpAddress(ipaddr addr); ///< assign sockaddr_in to variable
+    void setCounter64(long long int counter); ///< assign Counter64 value to variable
+
+    void copyValue(const Var& var); ///< copy variable from another one
+    void clearValue(); ///< clear .val member
+    void clear();  ///< clear all internal members
+
+private:
+    void init(); ///< initialize members
+    void assign(const Var& var); ///< perform full assignment
+    void setValue(const void* value, int length, int aType); ///< set new variable value
+};
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_VAR_H */
diff --git a/src/snmpx/forward.h b/src/snmpx/forward.h
new file mode 100644 (file)
index 0000000..5f02316
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 49    SNMP Interface
+ *
+ */
+
+#ifndef SQUID_SNMPX_FORWARD_H
+#define SQUID_SNMPX_FORWARD_H
+
+#include "config.h"
+
+namespace Snmp
+{
+
+class Pdu;
+class Request;
+class Response;
+class Session;
+class Var;
+
+} // namespace Snmp
+
+#endif /* SQUID_SNMPX_FORWARD_H */