From 51ea0904875d045eb83a16bdd96cc6c169b1b8b4 Mon Sep 17 00:00:00 2001 From: Christos Tsantilas Date: Wed, 19 Jan 2011 12:12:54 +0200 Subject: [PATCH] importing smp-snmp patch --- configure.ac | 1 + include/Range.h | 4 +- src/Makefile.am | 7 + src/client_side.cc | 4 +- src/ipc/Coordinator.cc | 38 +++- src/ipc/Coordinator.h | 3 + src/ipc/Forwarder.cc | 177 +++++++++++++++++ src/ipc/Forwarder.h | 68 +++++++ src/ipc/Inquirer.cc | 206 +++++++++++++++++++ src/ipc/Inquirer.h | 85 ++++++++ src/ipc/Makefile.am | 10 +- src/ipc/Messages.h | 3 +- src/ipc/Request.cc | 10 + src/ipc/Request.h | 43 ++++ src/ipc/Response.cc | 17 ++ src/ipc/Response.h | 43 ++++ src/ipc/Strand.cc | 23 +++ src/ipc/Strand.h | 3 + src/ipc/UdsOp.cc | 20 ++ src/ipc/UdsOp.h | 3 + src/ipc/forward.h | 4 + src/mgr/Forwarder.cc | 166 +++------------- src/mgr/Forwarder.h | 40 +--- src/mgr/FunAction.cc | 3 +- src/mgr/InfoAction.cc | 3 +- src/mgr/Inquirer.cc | 168 ++-------------- src/mgr/Inquirer.h | 47 +---- src/mgr/Request.cc | 19 +- src/mgr/Request.h | 14 +- src/mgr/Response.cc | 20 +- src/mgr/Response.h | 18 +- src/mgr/StoreToCommWriter.cc | 22 --- src/mgr/StoreToCommWriter.h | 3 - src/snmp_core.cc | 200 ++++++++++--------- src/snmp_core.h | 38 ++++ src/snmpx/Forwarder.cc | 107 ++++++++++ src/snmpx/Forwarder.h | 52 +++++ src/snmpx/Inquirer.cc | 101 ++++++++++ src/snmpx/Inquirer.h | 54 +++++ src/snmpx/Pdu.cc | 221 +++++++++++++++++++++ src/snmpx/Pdu.h | 48 +++++ src/snmpx/Request.cc | 59 ++++++ src/snmpx/Request.h | 46 +++++ src/snmpx/Response.cc | 51 +++++ src/snmpx/Response.h | 40 ++++ src/snmpx/Session.cc | 112 +++++++++++ src/snmpx/Session.h | 40 ++++ src/snmpx/Var.cc | 374 +++++++++++++++++++++++++++++++++++ src/snmpx/Var.h | 74 +++++++ src/snmpx/forward.h | 24 +++ 50 files changed, 2428 insertions(+), 508 deletions(-) create mode 100644 src/ipc/Forwarder.cc create mode 100644 src/ipc/Forwarder.h create mode 100644 src/ipc/Inquirer.cc create mode 100644 src/ipc/Inquirer.h create mode 100644 src/ipc/Request.cc create mode 100644 src/ipc/Request.h create mode 100644 src/ipc/Response.cc create mode 100644 src/ipc/Response.h create mode 100644 src/snmp_core.h create mode 100644 src/snmpx/Forwarder.cc create mode 100644 src/snmpx/Forwarder.h create mode 100644 src/snmpx/Inquirer.cc create mode 100644 src/snmpx/Inquirer.h create mode 100644 src/snmpx/Pdu.cc create mode 100644 src/snmpx/Pdu.h create mode 100644 src/snmpx/Request.cc create mode 100644 src/snmpx/Request.h create mode 100644 src/snmpx/Response.cc create mode 100644 src/snmpx/Response.h create mode 100644 src/snmpx/Session.cc create mode 100644 src/snmpx/Session.h create mode 100644 src/snmpx/Var.cc create mode 100644 src/snmpx/Var.h create mode 100644 src/snmpx/forward.h diff --git a/configure.ac b/configure.ac index 085ddf4812..ee58f1fa5c 100644 --- a/configure.ac +++ b/configure.ac @@ -3370,6 +3370,7 @@ AC_CONFIG_FILES([\ src/ipc/Makefile \ src/ssl/Makefile \ src/mgr/Makefile \ + src/snmpx/Makefile \ contrib/Makefile \ snmplib/Makefile \ icons/Makefile \ diff --git a/include/Range.h b/include/Range.h index 7229729677..29a5924347 100644 --- a/include/Range.h +++ b/include/Range.h @@ -52,7 +52,7 @@ public: C start; C end; Range intersection (Range const &) const; - C size() const; + size_t size() const; }; template @@ -77,7 +77,7 @@ Range::intersection (Range const &rhs) const } template -C +size_t Range::size() const { return end > start ? end - start : 0; diff --git a/src/Makefile.am b/src/Makefile.am index dc061644c3..068de32530 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -25,6 +25,7 @@ SBUF_SOURCE= \ MemBlob.cc SNMP_ALL_SOURCE = \ + snmp_core.h \ snmp_core.cc \ snmp_agent.cc if USE_SNMP @@ -50,6 +51,11 @@ else SSL_LOCAL_LIBS = endif +if USE_SNMP +SUBDIRS += snmpx +SNMPXLIB = snmpx/libsnmpx.la +endif + if USE_ADAPTATION SUBDIRS += adaptation endif @@ -547,6 +553,7 @@ squid_LDADD = \ $(CRYPTLIB) \ $(REGEXLIB) \ $(SNMPLIB) \ + $(SNMPXLIB) \ ${ADAPTATION_LIBS} \ $(ESI_LIBS) \ $(SSL_LIBS) \ diff --git a/src/client_side.cc b/src/client_side.cc index d961801a5f..a5722a10e7 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -1060,7 +1060,7 @@ ClientSocketContext::packRange(StoreIOBuffer const &source, MemBuf * mb) * intersection of "have" and "need" ranges must not be empty */ assert(http->out.offset < i->currentSpec()->offset + i->currentSpec()->length); - assert(http->out.offset + available.size() > i->currentSpec()->offset); + assert(http->out.offset + available.size() > (uint64_t)i->currentSpec()->offset); /* * put boundary and headers at the beginning of a range in a @@ -1118,7 +1118,7 @@ ClientSocketContext::packRange(StoreIOBuffer const &source, MemBuf * mb) /* adjust for not to be transmitted bytes */ http->out.offset = nextOffset; - if (available.size() <= skip) + if (available.size() <= (uint64_t)skip) return; available.start += skip; diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index d7f3419496..548d05146e 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -11,12 +11,13 @@ #include "CacheManager.h" #include "comm.h" #include "ipc/Coordinator.h" -#include "ipc/FdNotes.h" #include "ipc/SharedListen.h" #include "mgr/Inquirer.h" #include "mgr/Request.h" #include "mgr/Response.h" -#include "mgr/StoreToCommWriter.h" +#include "snmpx/Inquirer.h" +#include "snmpx/Request.h" +#include "snmpx/Response.h" CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); @@ -74,6 +75,16 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) handleCacheMgrResponse(Mgr::Response(message)); break; + case mtSnmpRequest: + debugs(54, 6, HERE << "SNMP request"); + handleSnmpRequest(Snmp::Request(message)); + break; + + case mtSnmpResponse: + debugs(54, 6, HERE << "SNMP response"); + handleSnmpResponse(Snmp::Response(message)); + break; + default: debugs(54, 1, HERE << "Unhandled message type: " << message.type()); break; @@ -123,8 +134,7 @@ Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request) Mgr::Action::Pointer action = CacheManager::GetInstance()->createRequestedAction(request.params); - AsyncJob::Start(new Mgr::Inquirer(action, - Mgr::ImportHttpFdIntoComm(request.fd), request, strands_)); + AsyncJob::Start(new Mgr::Inquirer(action, request, strands_)); } void @@ -133,6 +143,26 @@ Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) Mgr::Inquirer::HandleRemoteAck(response); } +void +Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request) +{ + debugs(54, 4, HERE); + + Snmp::Response response(request.requestId); + TypedMsgHdr message; + response.pack(message); + SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); + + AsyncJob::Start(new Snmp::Inquirer(request, strands_)); +} + +void +Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response) +{ + debugs(54, 4, HERE); + Snmp::Inquirer::HandleRemoteAck(response); +} + int Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, int &errNo) diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h index 11856b1d3e..6ef25cebf9 100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@ -15,6 +15,7 @@ #include "ipc/SharedListen.h" #include "ipc/StrandCoords.h" #include "mgr/forward.h" +#include "snmpx/forward.h" #include @@ -46,6 +47,8 @@ protected: void handleSharedListenRequest(const SharedListenRequest& request); void handleCacheMgrRequest(const Mgr::Request& request); void handleCacheMgrResponse(const Mgr::Response& response); + void handleSnmpRequest(const Snmp::Request& request); + void handleSnmpResponse(const Snmp::Response& response); /// calls comm_open_listener() int openListenSocket(const SharedListenRequest& request, int &errNo); diff --git a/src/ipc/Forwarder.cc b/src/ipc/Forwarder.cc new file mode 100644 index 0000000000..bfb3eb02e8 --- /dev/null +++ b/src/ipc/Forwarder.cc @@ -0,0 +1,177 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "base/AsyncJobCalls.h" +#include "base/TextException.h" +#include "ipc/Forwarder.h" +#include "ipc/Port.h" +#include "ipc/TypedMsgHdr.h" + + +CBDATA_NAMESPACED_CLASS_INIT(Ipc, Forwarder); + +Ipc::Forwarder::RequestsMap Ipc::Forwarder::TheRequestsMap; +unsigned int Ipc::Forwarder::LastRequestId = 0; + +Ipc::Forwarder::Forwarder(Request::Pointer aRequest, double aTimeout): + AsyncJob("Ipc::Forwarder"), + request(aRequest), timeout(aTimeout) +{ + debugs(54, 5, HERE); +} + +Ipc::Forwarder::~Forwarder() +{ + debugs(54, 5, HERE); + Must(request->requestId == 0); + cleanup(); +} + +/// perform cleanup actions +void +Ipc::Forwarder::cleanup() +{ +} + +void +Ipc::Forwarder::start() +{ + debugs(54, 3, HERE); + + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer callback = JobCallback(16, 5, Dialer, this, Forwarder::handleRemoteAck); + if (++LastRequestId == 0) // don't use zero value as request->requestId + ++LastRequestId; + request->requestId = LastRequestId; + TheRequestsMap[request->requestId] = callback; + TypedMsgHdr message; + + try { + request->pack(message); + } catch (...) { + // assume the pack() call failed because the message did not fit + // TODO: add a more specific exception? + handleError(); + } + + SendMessage(coordinatorAddr, message); + eventAdd("Ipc::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut, + this, timeout, 0, false); +} + +void +Ipc::Forwarder::swanSong() +{ + debugs(54, 5, HERE); + removeTimeoutEvent(); + if (request->requestId > 0) { + DequeueRequest(request->requestId); + request->requestId = 0; + } + cleanup(); +} + +bool +Ipc::Forwarder::doneAll() const +{ + debugs(54, 5, HERE); + return request->requestId == 0; +} + +/// called when Coordinator starts processing the request +void +Ipc::Forwarder::handleRemoteAck() +{ + debugs(54, 3, HERE); + request->requestId = 0; +} + +/// Ipc::Forwarder::requestTimedOut wrapper +void +Ipc::Forwarder::RequestTimedOut(void* param) +{ + debugs(54, 3, HERE); + Must(param != NULL); + Forwarder* fwdr = static_cast(param); + // use async call to enable job call protection that time events lack + CallJobHere(16, 5, fwdr, Forwarder, requestTimedOut); +} + +/// called when Coordinator fails to start processing the request [in time] +void +Ipc::Forwarder::requestTimedOut() +{ + debugs(54, 3, HERE); + handleTimeout(); +} + +void +Ipc::Forwarder::handleError() +{ + mustStop("error"); +} + +void +Ipc::Forwarder::handleTimeout() +{ + mustStop("timeout"); +} + +/// terminate with an error +void +Ipc::Forwarder::handleException(const std::exception& e) +{ + debugs(16, 3, HERE << e.what()); + mustStop("exception"); +} + +void +Ipc::Forwarder::callException(const std::exception& e) +{ + try { + handleException(e); + } catch (const std::exception& ex) { + debugs(54, DBG_CRITICAL, HERE << ex.what()); + } + AsyncJob::callException(e); +} + +/// returns and forgets the right Forwarder callback for the request +AsyncCall::Pointer +Ipc::Forwarder::DequeueRequest(unsigned int requestId) +{ + debugs(54, 3, HERE); + Must(requestId != 0); + AsyncCall::Pointer call; + RequestsMap::iterator request = TheRequestsMap.find(requestId); + if (request != TheRequestsMap.end()) { + call = request->second; + Must(call != NULL); + TheRequestsMap.erase(request); + } + return call; +} + +/// called when we are no longer waiting for Coordinator to respond +void +Ipc::Forwarder::removeTimeoutEvent() +{ + if (eventFind(&Forwarder::RequestTimedOut, this)) + eventDelete(&Forwarder::RequestTimedOut, this); +} + +void +Ipc::Forwarder::HandleRemoteAck(unsigned int requestId) +{ + debugs(54, 3, HERE); + Must(requestId != 0); + + AsyncCall::Pointer call = DequeueRequest(requestId); + if (call != NULL) + ScheduleCallHere(call); +} diff --git a/src/ipc/Forwarder.h b/src/ipc/Forwarder.h new file mode 100644 index 0000000000..25b115b020 --- /dev/null +++ b/src/ipc/Forwarder.h @@ -0,0 +1,68 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_FORWARDER_H +#define SQUID_IPC_FORWARDER_H + +#include "base/AsyncJob.h" +#include "ipc/Request.h" +#include + + +namespace Ipc +{ + +/** Forwards a worker request to coordinator. + * Waits for an ACK from Coordinator + * Send the data unit with an error response if forwarding fails. + */ +class Forwarder: public AsyncJob +{ +public: + Forwarder(Request::Pointer aRequest, double aTimeout); + virtual ~Forwarder(); + + /// finds and calls the right Forwarder upon Coordinator's response + static void HandleRemoteAck(unsigned int requestId); + + /* has-to-be-public AsyncJob API */ + virtual void callException(const std::exception& e); + +protected: + /* AsyncJob API */ + virtual void start(); + virtual void swanSong(); + virtual bool doneAll() const; + + virtual void cleanup(); ///< perform cleanup actions + virtual void handleError(); + virtual void handleTimeout(); + virtual void handleException(const std::exception& e); + virtual void handleRemoteAck(); + +private: + static void RequestTimedOut(void* param); + void requestTimedOut(); + void removeTimeoutEvent(); + static AsyncCall::Pointer DequeueRequest(unsigned int requestId); + +protected: + Request::Pointer request; + const double timeout; ///< response wait timeout in seconds + + /// maps request->id to Forwarder::handleRemoteAck callback + typedef std::map RequestsMap; + static RequestsMap TheRequestsMap; ///< pending Coordinator requests + + static unsigned int LastRequestId; ///< last requestId used + + CBDATA_CLASS2(Forwarder); +}; + +} // namespace Ipc + +#endif /* SQUID_IPC_FORWARDER_H */ diff --git a/src/ipc/Inquirer.cc b/src/ipc/Inquirer.cc new file mode 100644 index 0000000000..309232e03d --- /dev/null +++ b/src/ipc/Inquirer.cc @@ -0,0 +1,206 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "comm/Write.h" +#include "ipc/Inquirer.h" +#include "ipc/Port.h" +#include "ipc/TypedMsgHdr.h" +#include "MemBuf.h" +#include + + +CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer); + +Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap; +unsigned int Ipc::Inquirer::LastRequestId = 0; + +/// compare Ipc::StrandCoord using kidId, for std::sort() below +static bool +LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2) +{ + return c1.kidId < c2.kidId; +} + +Ipc::Inquirer::Inquirer(Request::Pointer aRequest, const StrandCoords& coords, + double aTimeout): + AsyncJob("Ipc::Inquirer"), + request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout) +{ + debugs(54, 5, HERE); + + // order by ascending kid IDs; useful for non-aggregatable stats + std::sort(strands.begin(), strands.end(), LesserStrandByKidId); +} + +Ipc::Inquirer::~Inquirer() +{ + debugs(54, 5, HERE); + cleanup(); +} + +void +Ipc::Inquirer::cleanup() +{ +} + +void +Ipc::Inquirer::start() +{ + request->requestId = 0; +} + +void +Ipc::Inquirer::inquire() +{ + if (pos == strands.end()) { + Must(done()); + return; + } + + Must(request->requestId == 0); + AsyncCall::Pointer callback = asyncCall(16, 5, "Mgr::Inquirer::handleRemoteAck", + HandleAckDialer(this, &Inquirer::handleRemoteAck, NULL)); + if (++LastRequestId == 0) // don't use zero value as request->requestId + ++LastRequestId; + request->requestId = LastRequestId; + const int kidId = pos->kidId; + debugs(54, 4, HERE << "inquire kid: " << kidId << status()); + TheRequestsMap[request->requestId] = callback; + TypedMsgHdr message; + request->pack(message); + SendMessage(Port::MakeAddr(strandAddrPfx, kidId), message); + eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut, + this, timeout, 0, false); +} + +/// called when a strand is done writing its output +void +Ipc::Inquirer::handleRemoteAck(Response::Pointer response) +{ + debugs(54, 4, HERE << status()); + request->requestId = 0; + removeTimeoutEvent(); + if (aggregate(response)) { + Must(!done()); // or we should not be called + ++pos; // advance after a successful inquiry + inquire(); + } else { + mustStop("error"); + } +} + +void +Ipc::Inquirer::swanSong() +{ + debugs(54, 5, HERE); + removeTimeoutEvent(); + if (request->requestId > 0) { + DequeueRequest(request->requestId); + request->requestId = 0; + } + sendResponse(); + cleanup(); +} + +bool +Ipc::Inquirer::doneAll() const +{ + return pos == strands.end(); +} + +void +Ipc::Inquirer::handleException(const std::exception& e) +{ + debugs(54, 3, HERE << e.what()); + mustStop("exception"); +} + +void +Ipc::Inquirer::callException(const std::exception& e) +{ + debugs(54, 3, HERE); + try { + handleException(e); + } catch (const std::exception& ex) { + debugs(54, DBG_CRITICAL, HERE << ex.what()); + } + AsyncJob::callException(e); +} + +/// returns and forgets the right Inquirer callback for strand request +AsyncCall::Pointer +Ipc::Inquirer::DequeueRequest(unsigned int requestId) +{ + debugs(54, 3, HERE << " requestId " << requestId); + Must(requestId != 0); + AsyncCall::Pointer call; + RequestsMap::iterator request = TheRequestsMap.find(requestId); + if (request != TheRequestsMap.end()) { + call = request->second; + Must(call != NULL); + TheRequestsMap.erase(request); + } + return call; +} + +void +Ipc::Inquirer::HandleRemoteAck(const Response& response) +{ + Must(response.requestId != 0); + AsyncCall::Pointer call = DequeueRequest(response.requestId); + if (call != NULL) { + HandleAckDialer* dialer = dynamic_cast(call->getDialer()); + Must(dialer); + dialer->arg1 = response.clone(); + ScheduleCallHere(call); + } +} + +/// called when we are no longer waiting for the strand to respond +void +Ipc::Inquirer::removeTimeoutEvent() +{ + if (eventFind(&Inquirer::RequestTimedOut, this)) + eventDelete(&Inquirer::RequestTimedOut, this); +} + +/// Ipc::Inquirer::requestTimedOut wrapper +void +Ipc::Inquirer::RequestTimedOut(void* param) +{ + debugs(54, 3, HERE); + Must(param != NULL); + Inquirer* cmi = static_cast(param); + // use async call to enable job call protection that time events lack + CallJobHere(16, 5, cmi, Inquirer, requestTimedOut); +} + +/// called when the strand failed to respond (or finish responding) in time +void +Ipc::Inquirer::requestTimedOut() +{ + debugs(54, 3, HERE); + if (request->requestId != 0) { + DequeueRequest(request->requestId); + request->requestId = 0; + Must(!done()); // or we should not be called + ++pos; // advance after a failed inquiry + inquire(); + } +} + +const char* +Ipc::Inquirer::status() const +{ + static MemBuf buf; + buf.reset(); + buf.Printf(" [request->requestId %u]", request->requestId); + buf.terminate(); + return buf.content(); +} diff --git a/src/ipc/Inquirer.h b/src/ipc/Inquirer.h new file mode 100644 index 0000000000..6341528938 --- /dev/null +++ b/src/ipc/Inquirer.h @@ -0,0 +1,85 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_INQUIRER_H +#define SQUID_IPC_INQUIRER_H + +#include "base/AsyncJobCalls.h" +#include "base/AsyncJob.h" +#include "ipc/forward.h" +#include "ipc/Request.h" +#include "ipc/Response.h" +#include "ipc/StrandCoords.h" +#include + + +namespace Ipc +{ + +/// Coordinator's job that sends a cache manage request to each strand, +/// aggregating individual strand responses and dumping the result if needed +class Inquirer: public AsyncJob +{ +public: + Inquirer(Request::Pointer aRequest, const Ipc::StrandCoords& coords, double aTimeout); + virtual ~Inquirer(); + + /// finds and calls the right Inquirer upon strand's response + static void HandleRemoteAck(const Response& response); + + /* has-to-be-public AsyncJob API */ + virtual void callException(const std::exception& e); + +protected: + /* AsyncJob API */ + virtual void start(); + virtual void swanSong(); + virtual bool doneAll() const; + virtual const char *status() const; + + /// inquire the next strand + virtual void inquire(); + /// perform cleanup actions on completion of job + virtual void cleanup(); + /// do specific exception handling + virtual void handleException(const std::exception& e); + /// send response to client + virtual void sendResponse() = 0; + /// perform aggregating of responses and returns true if need to continue + virtual bool aggregate(Response::Pointer aResponse) = 0; + +private: + typedef UnaryMemFunT HandleAckDialer; + + void handleRemoteAck(Response::Pointer response); + + static AsyncCall::Pointer DequeueRequest(unsigned int requestId); + + static void RequestTimedOut(void* param); + void requestTimedOut(); + void removeTimeoutEvent(); + +protected: + Request::Pointer request; ///< cache manager request received from client + + Ipc::StrandCoords strands; ///< all strands we want to query, in order + Ipc::StrandCoords::const_iterator pos; ///< strand we should query now + + const double timeout; ///< number of seconds to wait for strand response + + /// maps request->id to Inquirer::handleRemoteAck callback + typedef std::map RequestsMap; + static RequestsMap TheRequestsMap; ///< pending strand requests + + static unsigned int LastRequestId; ///< last requestId used + + CBDATA_CLASS2(Inquirer); +}; + +} // namespace Ipc + +#endif /* SQUID_IPC_INQUIRER_H */ diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index e2d13504d9..0864394f39 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -28,7 +28,13 @@ libipc_la_SOURCES = \ Port.h \ Strand.cc \ Strand.h \ - \ - forward.h + forward.h \ + Forwarder.cc \ + Forwarder.h \ + Inquirer.cc \ + Inquirer.h \ + Request.h \ + Response.cc \ + Response.h DEFS += -DDEFAULT_PREFIX=\"$(prefix)\" diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index 57ae03632d..59b2eb5db1 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -19,7 +19,8 @@ namespace Ipc /// message class identifier typedef enum { mtNone = 0, mtRegistration, mtSharedListenRequest, mtSharedListenResponse, - mtCacheMgrRequest, mtCacheMgrResponse + mtCacheMgrRequest, mtCacheMgrResponse, + mtSnmpRequest, mtSnmpResponse } MessageType; } // namespace Ipc; diff --git a/src/ipc/Request.cc b/src/ipc/Request.cc new file mode 100644 index 0000000000..2d8156ef0d --- /dev/null +++ b/src/ipc/Request.cc @@ -0,0 +1,10 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "ipc/Request.h" +#include "ipc/TypedMsgHdr.h" diff --git a/src/ipc/Request.h b/src/ipc/Request.h new file mode 100644 index 0000000000..1711ff2164 --- /dev/null +++ b/src/ipc/Request.h @@ -0,0 +1,43 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_REQUEST_H +#define SQUID_IPC_REQUEST_H + +#include "ipc/forward.h" +#include "RefCount.h" + + +namespace Ipc +{ + +/// IPC request +class Request: public RefCountable +{ +public: + typedef RefCount Pointer; + +public: + Request(int aRequestorId, unsigned int aRequestId): + requestorId(aRequestorId), requestId(aRequestId) {} + + virtual void pack(TypedMsgHdr& msg) const = 0; ///< prepare for sendmsg() + virtual Pointer clone() const = 0; ///< returns a copy of this + +private: + Request(const Request&); // not implemented + Request& operator= (const Request&); // not implemented + +public: + int requestorId; ///< kidId of the requestor; used for response destination + unsigned int requestId; ///< unique for sender; matches request w/ response +}; + + +} // namespace Ipc + +#endif /* SQUID_IPC_REQUEST_H */ diff --git a/src/ipc/Response.cc b/src/ipc/Response.cc new file mode 100644 index 0000000000..5d1f39a9a0 --- /dev/null +++ b/src/ipc/Response.cc @@ -0,0 +1,17 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "ipc/Response.h" +#include "ipc/TypedMsgHdr.h" + + +std::ostream& Ipc::operator << (std::ostream &os, const Response& response) +{ + os << "[response.requestId %u]" << response.requestId << '}'; + return os; +} diff --git a/src/ipc/Response.h b/src/ipc/Response.h new file mode 100644 index 0000000000..bd3fa9afff --- /dev/null +++ b/src/ipc/Response.h @@ -0,0 +1,43 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_RESPONSE_H +#define SQUID_IPC_RESPONSE_H + +#include "ipc/forward.h" +#include "RefCount.h" + + +namespace Ipc +{ + +/// A response to Ipc::Request. +class Response: public RefCountable +{ +public: + typedef RefCount Pointer; + +public: + explicit Response(unsigned int aRequestId): + requestId(aRequestId) {} + + virtual void pack(TypedMsgHdr& msg) const = 0; ///< prepare for sendmsg() + virtual Pointer clone() const = 0; ///< returns a copy of this + +private: + Response(const Response&); // not implemented + Response& operator= (const Response&); // not implemented + +public: + unsigned int requestId; ///< ID of request we are responding to +}; + +extern std::ostream& operator <<(std::ostream &os, const Response &response); + +} // namespace Ipc + +#endif /* SQUID_IPC_RESPONSE_H */ diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index a1e9f59c5d..6a4fd004f2 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -16,6 +16,9 @@ #include "mgr/Response.h" #include "mgr/Forwarder.h" #include "CacheManager.h" +#include "snmpx/Forwarder.h" +#include "snmpx/Request.h" +#include "snmpx/Response.h" CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand); @@ -64,6 +67,14 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) handleCacheMgrResponse(Mgr::Response(message)); break; + case mtSnmpRequest: + handleSnmpRequest(Snmp::Request(message)); + break; + + case mtSnmpResponse: + handleSnmpResponse(Snmp::Response(message)); + break; + default: debugs(54, 1, HERE << "Unhandled message type: " << message.type()); break; @@ -95,6 +106,18 @@ void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response& response) Mgr::Forwarder::HandleRemoteAck(response.requestId); } +void Ipc::Strand::handleSnmpRequest(const Snmp::Request& request) +{ + debugs(54, 6, HERE); + Snmp::SendResponse(request.requestId, request.pdu); +} + +void Ipc::Strand::handleSnmpResponse(const Snmp::Response& response) +{ + debugs(54, 6, HERE); + Snmp::Forwarder::HandleRemoteAck(response.requestId); +} + void Ipc::Strand::timedout() { debugs(54, 6, HERE << isRegistered); diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h index d01cf2b7f2..1394c0992e 100644 --- a/src/ipc/Strand.h +++ b/src/ipc/Strand.h @@ -10,6 +10,7 @@ #include "ipc/Port.h" #include "mgr/forward.h" +#include "snmpx/forward.h" namespace Ipc @@ -34,6 +35,8 @@ private: void handleRegistrationResponse(const StrandCoord &strand); void handleCacheMgrRequest(const Mgr::Request& request); void handleCacheMgrResponse(const Mgr::Response& response); + void handleSnmpRequest(const Snmp::Request& request); + void handleSnmpResponse(const Snmp::Response& response); private: bool isRegistered; ///< whether Coordinator ACKed registration (unused) diff --git a/src/ipc/UdsOp.cc b/src/ipc/UdsOp.cc index 873450574a..a4da136956 100644 --- a/src/ipc/UdsOp.cc +++ b/src/ipc/UdsOp.cc @@ -132,3 +132,23 @@ void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message) { AsyncJob::Start(new UdsSender(toAddress, message)); } + +int Ipc::ImportFdIntoComm(int fd, int socktype, int protocol, Ipc::FdNoteId noteId) +{ + struct sockaddr_in addr; + socklen_t len = sizeof(addr); + if (getsockname(fd, reinterpret_cast(&addr), &len) == 0) { + Ip::Address ipAddr(addr); + struct addrinfo* addr_info = NULL; + ipAddr.GetAddrInfo(addr_info); + addr_info->ai_socktype = socktype; + addr_info->ai_protocol = protocol; + comm_import_opened(fd, ipAddr, COMM_NONBLOCKING, Ipc::FdNote(noteId), addr_info); + ipAddr.FreeAddrInfo(addr_info); + } else { + debugs(16, DBG_CRITICAL, HERE << "ERROR: FD " << fd << ' ' << xstrerror()); + ::close(fd); + fd = -1; + } + return fd; +} diff --git a/src/ipc/UdsOp.h b/src/ipc/UdsOp.h index 0187a4e3ee..bea34d3a6e 100644 --- a/src/ipc/UdsOp.h +++ b/src/ipc/UdsOp.h @@ -12,6 +12,7 @@ #include "SquidString.h" #include "base/AsyncJob.h" #include "ipc/TypedMsgHdr.h" +#include "ipc/FdNotes.h" class CommTimeoutCbParams; class CommIoCbParams; @@ -90,6 +91,8 @@ private: void SendMessage(const String& toAddress, const TypedMsgHdr& message); +/// import socket fd from another strand into our Comm state +int ImportFdIntoComm(int fd, int socktype, int protocol, FdNoteId noteId); } diff --git a/src/ipc/forward.h b/src/ipc/forward.h index 7325abb6bc..f4ddf4cb0c 100644 --- a/src/ipc/forward.h +++ b/src/ipc/forward.h @@ -13,6 +13,10 @@ namespace Ipc class TypedMsgHdr; class StrandCoord; +class Forwarder; +class Inquirer; +class Request; +class Response; } // namespace Ipc diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc index 2d908978cf..4ea3f8d34e 100644 --- a/src/mgr/Forwarder.cc +++ b/src/mgr/Forwarder.cc @@ -21,21 +21,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Mgr, Forwarder); -Mgr::Forwarder::RequestsMap Mgr::Forwarder::TheRequestsMap; -unsigned int Mgr::Forwarder::LastRequestId = 0; Mgr::Forwarder::Forwarder(int aFd, const ActionParams &aParams, HttpRequest* aRequest, StoreEntry* anEntry): - AsyncJob("Mgr::Forwarder"), - params(aParams), - request(aRequest), entry(anEntry), fd(aFd), requestId(0), closer(NULL) + Ipc::Forwarder(new Request(KidIdentifier, 0, aFd, aParams), 10), + httpRequest(aRequest), entry(anEntry), fd(aFd) { - debugs(16, 5, HERE << "FD " << aFd); + debugs(16, 5, HERE << "FD " << fd); Must(fd >= 0); - Must(request != NULL); + Must(httpRequest != NULL); Must(entry != NULL); - HTTPMSGLOCK(request); + HTTPMSGLOCK(httpRequest); entry->lock(); EBIT_SET(entry->flags, ENTRY_FWD_HDR_WAIT); @@ -47,19 +44,18 @@ Mgr::Forwarder::Forwarder(int aFd, const ActionParams &aParams, Mgr::Forwarder::~Forwarder() { debugs(16, 5, HERE); - Must(request != NULL); + Must(httpRequest != NULL); Must(entry != NULL); - Must(requestId == 0); - HTTPMSGUNLOCK(request); + HTTPMSGUNLOCK(httpRequest); entry->unregisterAbort(); entry->unlock(); - close(); + cleanup(); } /// closes our copy of the client HTTP connection socket void -Mgr::Forwarder::close() +Mgr::Forwarder::cleanup() { if (fd >= 0) { if (closer != NULL) { @@ -72,61 +68,34 @@ Mgr::Forwarder::close() } void -Mgr::Forwarder::start() +Mgr::Forwarder::handleError() { - debugs(16, 3, HERE); - entry->registerAbort(&Forwarder::Abort, this); - - typedef NullaryMemFunT Dialer; - AsyncCall::Pointer callback = JobCallback(16, 5, Dialer, this, - Forwarder::handleRemoteAck); - if (++LastRequestId == 0) // don't use zero value as requestId - ++LastRequestId; - requestId = LastRequestId; - TheRequestsMap[requestId] = callback; - Request mgrRequest(KidIdentifier, requestId, fd, params); - Ipc::TypedMsgHdr message; - - try { - mgrRequest.pack(message); - } catch (...) { - // assume the pack() call failed because the message did not fit - // TODO: add a more specific exception? - debugs(16, DBG_CRITICAL, "ERROR: uri " << entry->url() << " exceeds buffer size"); - quitOnError("long URI", errorCon(ERR_INVALID_URL, HTTP_REQUEST_URI_TOO_LARGE, request)); - } - - Ipc::SendMessage(Ipc::coordinatorAddr, message); - const double timeout = 10; // in seconds - eventAdd("Mgr::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut, - this, timeout, 0, false); + debugs(16, DBG_CRITICAL, "ERROR: uri " << entry->url() << " exceeds buffer size"); + sendError(errorCon(ERR_INVALID_URL, HTTP_REQUEST_URI_TOO_LARGE, httpRequest)); + mustStop("long URI"); } void -Mgr::Forwarder::swanSong() +Mgr::Forwarder::handleTimeout() { - debugs(16, 5, HERE); - removeTimeoutEvent(); - if (requestId > 0) { - DequeueRequest(requestId); - requestId = 0; - } - close(); + sendError(errorCon(ERR_LIFETIME_EXP, HTTP_REQUEST_TIMEOUT, httpRequest)); + Ipc::Forwarder::handleTimeout(); } -bool -Mgr::Forwarder::doneAll() const +void +Mgr::Forwarder::handleException(const std::exception& e) { - debugs(16, 5, HERE); - return requestId == 0; + if (entry != NULL && httpRequest != NULL && fd >= 0) + sendError(errorCon(ERR_INVALID_RESP, HTTP_INTERNAL_SERVER_ERROR, httpRequest)); + Ipc::Forwarder::handleException(e); } /// called when the client socket gets closed by some external force void -Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &io) +Mgr::Forwarder::noteCommClosed(const CommCloseCbParams& params) { debugs(16, 5, HERE); - Must(fd == io.fd); + Must(fd == params.fd); fd = -1; mustStop("commClosed"); } @@ -135,42 +104,21 @@ Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &io) void Mgr::Forwarder::handleRemoteAck() { - debugs(16, 3, HERE); - Must(entry != NULL); + Ipc::Forwarder::handleRemoteAck(); - requestId = 0; + Must(entry != NULL); EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT); entry->complete(); } -/// Mgr::Forwarder::requestTimedOut wrapper +/// send error page void -Mgr::Forwarder::RequestTimedOut(void* param) +Mgr::Forwarder::sendError(ErrorState *error) { debugs(16, 3, HERE); - Must(param != NULL); - Forwarder* mgrFwdr = static_cast(param); - // use async call to enable job call protection that time events lack - CallJobHere(16, 5, mgrFwdr, Mgr::Forwarder, requestTimedOut); -} - -/// called when Coordinator fails to start processing the request [in time] -void -Mgr::Forwarder::requestTimedOut() -{ - debugs(16, 3, HERE); - quitOnError("timeout", errorCon(ERR_LIFETIME_EXP, HTTP_REQUEST_TIMEOUT, request)); -} - -/// terminate with an error -void -Mgr::Forwarder::quitOnError(const char *reason, ErrorState *error) -{ - debugs(16, 3, HERE); - Must(reason != NULL); Must(error != NULL); Must(entry != NULL); - Must(request != NULL); + Must(httpRequest != NULL); EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT); entry->buffer(); @@ -179,62 +127,4 @@ Mgr::Forwarder::quitOnError(const char *reason, ErrorState *error) errorStateFree(error); entry->flush(); entry->complete(); - - mustStop(reason); -} - -void -Mgr::Forwarder::callException(const std::exception& e) -{ - try { - if (entry != NULL && request != NULL && fd >= 0) - quitOnError("exception", errorCon(ERR_INVALID_RESP, HTTP_INTERNAL_SERVER_ERROR, request)); - } catch (const std::exception& ex) { - debugs(16, DBG_CRITICAL, HERE << ex.what()); - } - AsyncJob::callException(e); -} - -/// returns and forgets the right Forwarder callback for the request -AsyncCall::Pointer -Mgr::Forwarder::DequeueRequest(unsigned int requestId) -{ - debugs(16, 3, HERE); - Must(requestId != 0); - AsyncCall::Pointer call; - RequestsMap::iterator request = TheRequestsMap.find(requestId); - if (request != TheRequestsMap.end()) { - call = request->second; - Must(call != NULL); - TheRequestsMap.erase(request); - } - return call; -} - -/// called when we are no longer waiting for Coordinator to respond -void -Mgr::Forwarder::removeTimeoutEvent() -{ - if (eventFind(&Forwarder::RequestTimedOut, this)) - eventDelete(&Forwarder::RequestTimedOut, this); -} - -void -Mgr::Forwarder::HandleRemoteAck(unsigned int requestId) -{ - debugs(16, 3, HERE); - Must(requestId != 0); - - AsyncCall::Pointer call = DequeueRequest(requestId); - if (call != NULL) - ScheduleCallHere(call); -} - -/// called when something goes wrong with the Store entry -void -Mgr::Forwarder::Abort(void* param) -{ - Forwarder* mgrFwdr = static_cast(param); - if (mgrFwdr->fd >= 0) - comm_close(mgrFwdr->fd); } diff --git a/src/mgr/Forwarder.h b/src/mgr/Forwarder.h index 470a1a3fc7..1c8b9e84f5 100644 --- a/src/mgr/Forwarder.h +++ b/src/mgr/Forwarder.h @@ -8,9 +8,8 @@ #ifndef SQUID_MGR_FORWARDER_H #define SQUID_MGR_FORWARDER_H -#include "base/AsyncJob.h" +#include "ipc/Forwarder.h" #include "mgr/ActionParams.h" -#include class CommCloseCbParams; @@ -25,50 +24,31 @@ namespace Mgr * Waits for an ACK from Coordinator while holding the Store entry. * Fills the store entry with an error response if forwarding fails. */ -class Forwarder: public AsyncJob +class Forwarder: public Ipc::Forwarder { public: Forwarder(int aFd, const ActionParams &aParams, HttpRequest* aRequest, StoreEntry* anEntry); virtual ~Forwarder(); - /// finds and calls the right Forwarder upon Coordinator's response - static void HandleRemoteAck(unsigned int requestId); - - /* has-to-be-public AsyncJob API */ - virtual void callException(const std::exception& e); - protected: - /* AsyncJob API */ - virtual void start(); - virtual void swanSong(); - virtual bool doneAll() const; + /* Ipc::Forwarder API */ + virtual void cleanup(); ///< perform cleanup actions + virtual void handleError(); + virtual void handleTimeout(); + virtual void handleException(const std::exception& e); + virtual void handleRemoteAck(); private: - void handleRemoteAck(); - static void RequestTimedOut(void* param); - void requestTimedOut(); - void quitOnError(const char *reason, ErrorState *error); void noteCommClosed(const CommCloseCbParams& params); - void removeTimeoutEvent(); - static AsyncCall::Pointer DequeueRequest(unsigned int requestId); - static void Abort(void* param); - void close(); + void sendError(ErrorState* error); private: - ActionParams params; ///< action parameters to pass to the other side - HttpRequest* request; ///< HTTP client request for detailing errors + HttpRequest* httpRequest; ///< HTTP client request for detailing errors StoreEntry* entry; ///< Store entry expecting the response int fd; ///< HTTP client connection descriptor - unsigned int requestId; ///< request id AsyncCall::Pointer closer; ///< comm_close handler for the HTTP connection - /// maps requestId to Forwarder::handleRemoteAck callback - typedef std::map RequestsMap; - static RequestsMap TheRequestsMap; ///< pending Coordinator requests - - static unsigned int LastRequestId; ///< last requestId used - CBDATA_CLASS2(Forwarder); }; diff --git a/src/mgr/FunAction.cc b/src/mgr/FunAction.cc index 4e7bc49270..6fd0f97c1a 100644 --- a/src/mgr/FunAction.cc +++ b/src/mgr/FunAction.cc @@ -7,6 +7,7 @@ #include "config.h" #include "base/TextException.h" +#include "ipc/UdsOp.h" #include "mgr/Command.h" #include "mgr/Filler.h" #include "mgr/FunAction.h" @@ -31,7 +32,7 @@ void Mgr::FunAction::respond(const Request& request) { debugs(16, 5, HERE); - const int fd = ImportHttpFdIntoComm(request.fd); + const int fd = Ipc::ImportFdIntoComm(request.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket); Must(fd >= 0); Must(request.requestId != 0); AsyncJob::Start(new Mgr::Filler(this, fd, request.requestId)); diff --git a/src/mgr/InfoAction.cc b/src/mgr/InfoAction.cc index 27adf22ec8..eb5760ba0b 100644 --- a/src/mgr/InfoAction.cc +++ b/src/mgr/InfoAction.cc @@ -9,6 +9,7 @@ #include "base/TextException.h" #include "HttpReply.h" #include "ipc/Messages.h" +#include "ipc/UdsOp.h" #include "ipc/TypedMsgHdr.h" #include "mgr/Filler.h" #include "mgr/InfoAction.h" @@ -155,7 +156,7 @@ void Mgr::InfoAction::respond(const Request& request) { debugs(16, 5, HERE); - int fd = ImportHttpFdIntoComm(request.fd); + int fd = Ipc::ImportFdIntoComm(request.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket); Must(fd >= 0); Must(request.requestId != 0); AsyncJob::Start(new Mgr::Filler(this, fd, request.requestId)); diff --git a/src/mgr/Inquirer.cc b/src/mgr/Inquirer.cc index d249c3a913..4119045193 100644 --- a/src/mgr/Inquirer.cc +++ b/src/mgr/Inquirer.cc @@ -10,57 +10,34 @@ #include "comm/Write.h" #include "CommCalls.h" #include "HttpReply.h" -#include "ipc/Coordinator.h" +#include "ipc/UdsOp.h" #include "mgr/ActionWriter.h" -#include "mgr/Command.h" #include "mgr/Inquirer.h" #include "mgr/Request.h" #include "mgr/Response.h" #include "SquidTime.h" #include -#include CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer); -Mgr::Inquirer::RequestsMap Mgr::Inquirer::TheRequestsMap; -unsigned int Mgr::Inquirer::LastRequestId = 0; -/// compare Ipc::StrandCoord using kidId, for std::sort() below -static bool -LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2) -{ - return c1.kidId < c2.kidId; -} - -Mgr::Inquirer::Inquirer(Action::Pointer anAction, int aFd, +Mgr::Inquirer::Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords): - AsyncJob("Mgr::Inquirer"), + Ipc::Inquirer(aCause.clone(), coords, anAction->atomic() ? 10 : 100), aggrAction(anAction), - cause(aCause), - fd(aFd), - strands(coords), pos(strands.begin()), - requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100) + fd(Ipc::ImportFdIntoComm(aCause.fd, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket)) { - debugs(16, 5, HERE << "FD " << aFd << " action: " << aggrAction); - - // order by ascending kid IDs; useful for non-aggregatable stats - std::sort(strands.begin(), strands.end(), LesserStrandByKidId); + debugs(16, 5, HERE << "FD " << fd << " action: " << aggrAction); closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed", CommCbMemFunT(this, &Inquirer::noteCommClosed)); comm_add_close_handler(fd, closer); } -Mgr::Inquirer::~Inquirer() -{ - debugs(16, 5, HERE); - close(); -} - /// closes our copy of the client HTTP connection socket void -Mgr::Inquirer::close() +Mgr::Inquirer::cleanup() { if (fd >= 0) { removeCloseHandler(); @@ -82,6 +59,7 @@ void Mgr::Inquirer::start() { debugs(16, 5, HERE); + Ipc::Inquirer::start(); Must(fd >= 0); Must(aggrAction != NULL); @@ -107,46 +85,6 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params) inquire(); } -void -Mgr::Inquirer::inquire() -{ - if (pos == strands.end()) { - Must(done()); - return; - } - - Must(requestId == 0); - AsyncCall::Pointer callback = asyncCall(16, 5, "Mgr::Inquirer::handleRemoteAck", - HandleAckDialer(this, &Inquirer::handleRemoteAck, Response())); - if (++LastRequestId == 0) // don't use zero value as requestId - ++LastRequestId; - requestId = LastRequestId; - const int kidId = pos->kidId; - debugs(16, 4, HERE << "inquire kid: " << kidId << status()); - TheRequestsMap[requestId] = callback; - Request mgrRequest(KidIdentifier, requestId, fd, - aggrAction->command().params); - Ipc::TypedMsgHdr message; - mgrRequest.pack(message); - Ipc::SendMessage(Ipc::Port::MakeAddr(Ipc::strandAddrPfx, kidId), message); - eventAdd("Mgr::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut, - this, timeout, 0, false); -} - -/// called when a strand is done writing its output -void -Mgr::Inquirer::handleRemoteAck(const Response& response) -{ - debugs(16, 4, HERE << status()); - requestId = 0; - removeTimeoutEvent(); - if (response.hasAction()) - aggrAction->add(response.getAction()); - Must(!done()); // or we should not be called - ++pos; // advance after a successful inquiry - inquire(); -} - /// called when the HTTP client or some external force closed our socket void Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params) @@ -157,97 +95,27 @@ Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params) mustStop("commClosed"); } +bool +Mgr::Inquirer::aggregate(Ipc::Response::Pointer aResponse) +{ + Mgr::Response& response = static_cast(*aResponse); + if (response.hasAction()) + aggrAction->add(response.getAction()); + return true; +} + void -Mgr::Inquirer::swanSong() +Mgr::Inquirer::sendResponse() { - debugs(16, 5, HERE); - removeTimeoutEvent(); - if (requestId > 0) { - DequeueRequest(requestId); - requestId = 0; - } if (aggrAction->aggregatable()) { removeCloseHandler(); AsyncJob::Start(new ActionWriter(aggrAction, fd)); fd = -1; // should not close fd because we passed it to ActionWriter } - close(); } bool Mgr::Inquirer::doneAll() const { - return !writer && pos == strands.end(); -} - -/// returns and forgets the right Inquirer callback for strand request -AsyncCall::Pointer -Mgr::Inquirer::DequeueRequest(unsigned int requestId) -{ - debugs(16, 3, HERE << " requestId " << requestId); - Must(requestId != 0); - AsyncCall::Pointer call; - RequestsMap::iterator request = TheRequestsMap.find(requestId); - if (request != TheRequestsMap.end()) { - call = request->second; - Must(call != NULL); - TheRequestsMap.erase(request); - } - return call; -} - -void -Mgr::Inquirer::HandleRemoteAck(const Mgr::Response& response) -{ - Must(response.requestId != 0); - AsyncCall::Pointer call = DequeueRequest(response.requestId); - if (call != NULL) { - HandleAckDialer* dialer = dynamic_cast(call->getDialer()); - Must(dialer); - dialer->arg1 = response; - ScheduleCallHere(call); - } -} - -/// called when we are no longer waiting for the strand to respond -void -Mgr::Inquirer::removeTimeoutEvent() -{ - if (eventFind(&Inquirer::RequestTimedOut, this)) - eventDelete(&Inquirer::RequestTimedOut, this); -} - -/// Mgr::Inquirer::requestTimedOut wrapper -void -Mgr::Inquirer::RequestTimedOut(void* param) -{ - debugs(16, 3, HERE); - Must(param != NULL); - Inquirer* cmi = static_cast(param); - // use async call to enable job call protection that time events lack - CallJobHere(16, 5, cmi, Mgr::Inquirer, requestTimedOut); -} - -/// called when the strand failed to respond (or finish responding) in time -void -Mgr::Inquirer::requestTimedOut() -{ - debugs(16, 3, HERE); - if (requestId != 0) { - DequeueRequest(requestId); - requestId = 0; - Must(!done()); // or we should not be called - ++pos; // advance after a failed inquiry - inquire(); - } -} - -const char* -Mgr::Inquirer::status() const -{ - static MemBuf buf; - buf.reset(); - buf.Printf(" [FD %d, requestId %u]", fd, requestId); - buf.terminate(); - return buf.content(); + return !writer && Ipc::Inquirer::doneAll(); } diff --git a/src/mgr/Inquirer.h b/src/mgr/Inquirer.h index 7059daa2b3..03d7efeda1 100644 --- a/src/mgr/Inquirer.h +++ b/src/mgr/Inquirer.h @@ -8,13 +8,8 @@ #ifndef SQUID_MGR_INQUIRER_H #define SQUID_MGR_INQUIRER_H -#include "base/AsyncJobCalls.h" -#include "base/AsyncJob.h" -#include "ipc/StrandCoords.h" -#include "MemBuf.h" +#include "ipc/Inquirer.h" #include "mgr/Action.h" -#include "mgr/Request.h" -#include class CommIoCbParams; class CommCloseCbParams; @@ -24,60 +19,34 @@ namespace Mgr /// Coordinator's job that sends a cache manage request to each strand, /// aggregating individual strand responses and dumping the result if needed -class Inquirer: public AsyncJob +class Inquirer: public Ipc::Inquirer { public: - Inquirer(Action::Pointer anAction, int aFd, const Request &aCause, + Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords); - virtual ~Inquirer(); - - /// finds and calls the right Inquirer upon strand's response - static void HandleRemoteAck(const Mgr::Response& response); protected: /* AsyncJob API */ virtual void start(); - virtual void swanSong(); virtual bool doneAll() const; - virtual const char *status() const; -private: - typedef UnaryMemFunT HandleAckDialer; + /* Ipc::Inquirer API */ + virtual void cleanup(); + virtual void sendResponse(); + virtual bool aggregate(Ipc::Response::Pointer aResponse); - void inquire(); +private: void noteWroteHeader(const CommIoCbParams& params); void noteCommClosed(const CommCloseCbParams& params); - - void handleRemoteAck(const Response& response); - - static AsyncCall::Pointer DequeueRequest(unsigned int requestId); - - static void RequestTimedOut(void* param); - void requestTimedOut(); - void removeTimeoutEvent(); - - void close(); void removeCloseHandler(); private: Action::Pointer aggrAction; //< action to aggregate - Request cause; ///< cache manager request received from HTTP client int fd; ///< HTTP client socket descriptor - Ipc::StrandCoords strands; ///< all strands we want to query, in order - Ipc::StrandCoords::const_iterator pos; ///< strand we should query now - - unsigned int requestId; ///< ID of our outstanding request to strand AsyncCall::Pointer writer; ///< comm_write callback AsyncCall::Pointer closer; ///< comm_close handler - const double timeout; ///< number of seconds to wait for strand response - - /// maps requestId to Inquirer::handleRemoteAck callback - typedef std::map RequestsMap; - static RequestsMap TheRequestsMap; ///< pending strand requests - - static unsigned int LastRequestId; ///< last requestId used CBDATA_CLASS2(Inquirer); }; diff --git a/src/mgr/Request.cc b/src/mgr/Request.cc index a0f4014cdd..15d8bd066e 100644 --- a/src/mgr/Request.cc +++ b/src/mgr/Request.cc @@ -8,20 +8,27 @@ #include "config.h" #include "base/TextException.h" #include "ipc/Messages.h" +#include "ipc/TypedMsgHdr.h" #include "mgr/ActionParams.h" #include "mgr/Request.h" Mgr::Request::Request(int aRequestorId, unsigned int aRequestId, int aFd, const ActionParams &aParams): - requestorId(aRequestorId), requestId(aRequestId), + Ipc::Request(aRequestorId, aRequestId), fd(aFd), params(aParams) { Must(requestorId > 0); - Must(requestId != 0); } -Mgr::Request::Request(const Ipc::TypedMsgHdr& msg) +Mgr::Request::Request(const Request& request): + Ipc::Request(request.requestorId, request.requestId), + fd(request.fd), params(request.params) +{ +} + +Mgr::Request::Request(const Ipc::TypedMsgHdr& msg): + Ipc::Request(0, 0) { msg.checkType(Ipc::mtCacheMgrRequest); msg.getPod(requestorId); @@ -41,3 +48,9 @@ Mgr::Request::pack(Ipc::TypedMsgHdr& msg) const msg.putFd(fd); } + +Ipc::Request::Pointer +Mgr::Request::clone() const +{ + return new Request(*this); +} diff --git a/src/mgr/Request.h b/src/mgr/Request.h index 89924f6837..50b29d7016 100644 --- a/src/mgr/Request.h +++ b/src/mgr/Request.h @@ -8,7 +8,8 @@ #ifndef SQUID_MGR_REQUEST_H #define SQUID_MGR_REQUEST_H -#include "ipc/TypedMsgHdr.h" +#include "ipc/forward.h" +#include "ipc/Request.h" #include "mgr/ActionParams.h" @@ -16,18 +17,21 @@ namespace Mgr { /// cache manager request -class Request +class Request: public Ipc::Request { public: Request(int aRequestorId, unsigned int aRequestId, int aFd, const ActionParams &aParams); explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() - void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + /* Ipc::Request API */ + virtual void pack(Ipc::TypedMsgHdr& msg) const; + virtual Pointer clone() const; + +private: + Request(const Request& request); public: - int requestorId; ///< kidId of the requestor; used for response destination - unsigned int requestId; ///< unique for sender; matches request w/ response int fd; ///< HTTP client connection descriptor ActionParams params; ///< action name and parameters diff --git a/src/mgr/Response.cc b/src/mgr/Response.cc index e3204f814f..e7a5866705 100644 --- a/src/mgr/Response.cc +++ b/src/mgr/Response.cc @@ -15,19 +15,19 @@ #include "mgr/Response.h" -std::ostream& Mgr::operator << (std::ostream &os, const Response& response) +Mgr::Response::Response(unsigned int aRequestId, Action::Pointer anAction): + Ipc::Response(aRequestId), action(anAction) { - os << "response: {requestId: " << response.requestId << '}'; - return os; + Must(!action || action->name()); // if there is an action, it must be named } -Mgr::Response::Response(unsigned int aRequestId, Action::Pointer anAction): - requestId(aRequestId), action(anAction) +Mgr::Response::Response(const Response& response): + Ipc::Response(response.requestId), action(response.action) { - Must(!action || action->name()); // if there is an action, it must be named } -Mgr::Response::Response(const Ipc::TypedMsgHdr& msg) +Mgr::Response::Response(const Ipc::TypedMsgHdr& msg): + Ipc::Response(0) { msg.checkType(Ipc::mtCacheMgrResponse); msg.getPod(requestId); @@ -54,6 +54,12 @@ Mgr::Response::pack(Ipc::TypedMsgHdr& msg) const } } +Ipc::Response::Pointer +Mgr::Response::clone() const +{ + return new Response(*this); +} + bool Mgr::Response::hasAction() const { diff --git a/src/mgr/Response.h b/src/mgr/Response.h index 7b759926eb..38f5f08e7e 100644 --- a/src/mgr/Response.h +++ b/src/mgr/Response.h @@ -8,6 +8,8 @@ #ifndef SQUID_MGR_RESPONSE_H #define SQUID_MGR_RESPONSE_H +#include "ipc/forward.h" +#include "ipc/Response.h" #include "mgr/Action.h" @@ -16,23 +18,27 @@ namespace Mgr /// A response to Mgr::Request. /// May carry strand action data to be aggregated with data from other strands. -class Response +class Response: public Ipc::Response { public: - Response(unsigned int aRequestId = 0, Action::Pointer anAction = NULL); + Response(unsigned int aRequestId, Action::Pointer anAction = NULL); explicit Response(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() - void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + + /* Ipc::Response API */ + virtual void pack(Ipc::TypedMsgHdr& msg) const; + virtual Ipc::Response::Pointer clone() const; + bool hasAction() const; ///< whether response contain action object const Action& getAction() const; ///< returns action object +private: + Response(const Response& response); + public: - unsigned int requestId; ///< ID of request we are responding to Action::Pointer action; ///< action relating to response }; -extern std::ostream& operator <<(std::ostream &os, const Response &response); - } // namespace Mgr #endif /* SQUID_MGR_RESPONSE_H */ diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc index 1227333768..52aed325d8 100644 --- a/src/mgr/StoreToCommWriter.cc +++ b/src/mgr/StoreToCommWriter.cc @@ -164,25 +164,3 @@ Mgr::StoreToCommWriter::Abort(void* param) if (mgrWriter->fd >= 0) comm_close(mgrWriter->fd); } - - -int -Mgr::ImportHttpFdIntoComm(int fd) -{ - struct sockaddr_in addr; - socklen_t len = sizeof(addr); - if (getsockname(fd, reinterpret_cast(&addr), &len) == 0) { - Ip::Address ipAddr(addr); - struct addrinfo* addr_info = NULL; - ipAddr.GetAddrInfo(addr_info); - addr_info->ai_socktype = SOCK_STREAM; - addr_info->ai_protocol = IPPROTO_TCP; - comm_import_opened(fd, ipAddr, COMM_NONBLOCKING, Ipc::FdNote(Ipc::fdnHttpSocket), addr_info); - ipAddr.FreeAddrInfo(addr_info); - } else { - debugs(16, DBG_CRITICAL, HERE << "ERROR: FD " << fd << ' ' << xstrerror()); - ::close(fd); - fd = -1; - } - return fd; -} diff --git a/src/mgr/StoreToCommWriter.h b/src/mgr/StoreToCommWriter.h index 116ffdba51..43678a3207 100644 --- a/src/mgr/StoreToCommWriter.h +++ b/src/mgr/StoreToCommWriter.h @@ -65,9 +65,6 @@ protected: CBDATA_CLASS2(StoreToCommWriter); }; -/// import HTTP socket fd from another strand into our Comm state -extern int ImportHttpFdIntoComm(int fd); - } // namespace Mgr #endif /* SQUID_MGR_STORE_TO_COMM_WRITER_H */ diff --git a/src/snmp_core.cc b/src/snmp_core.cc index 555ee882ac..b9d65f5701 100644 --- a/src/snmp_core.cc +++ b/src/snmp_core.cc @@ -31,15 +31,15 @@ */ #include "squid.h" #include "acl/FilledChecklist.h" -#include "cache_snmp.h" +#include "base/CbcPointer.h" #include "comm.h" #include "comm/Loops.h" #include "ipc/StartListening.h" #include "ip/Address.h" #include "ip/tools.h" +#include "snmp_core.h" +#include "snmpx/Forwarder.h" -#define SNMP_REQUEST_SIZE 4096 -#define MAX_PROTOSTAT 5 /// dials snmpConnectionOpened call class SnmpListeningStartedDialer: public CallDialer, @@ -61,29 +61,14 @@ public: Ip::Address theOutSNMPAddr; -typedef struct _mib_tree_entry mib_tree_entry; -typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn); - -struct _mib_tree_entry { - oid *name; - int len; - oid_ParseFn *parsefunction; - instance_Fn *instancefunction; - int children; - - struct _mib_tree_entry **leaves; - - struct _mib_tree_entry *parent; -}; - mib_tree_entry *mib_tree_head; mib_tree_entry *mib_tree_last; static void snmpIncomingConnectionOpened(int fd, int errNo); static void snmpOutgoingConnectionOpened(int fd, int errNo); -static mib_tree_entry * snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction); -static mib_tree_entry *snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, int children,...); +static mib_tree_entry * snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType = atNone); +static mib_tree_entry *snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType, int children,...); static oid *snmpCreateOid(int length,...); mib_tree_entry * snmpLookupNodeStr(mib_tree_entry *entry, const char *str); int snmpCreateOidFromStr(const char *str, oid **name, int *nl); @@ -95,7 +80,6 @@ static oid *client_Inst(oid * name, snint * len, mib_tree_entry * current, oid_P static void snmpDecodePacket(snmp_request_t * rq); static void snmpConstructReponse(snmp_request_t * rq); -static struct snmp_pdu *snmpAgentResponse(struct snmp_pdu *PDU); static oid_ParseFn *snmpTreeNext(oid * Current, snint CurrentLen, oid ** Next, snint * NextLen); static oid_ParseFn *snmpTreeGet(oid * Current, snint CurrentLen); static mib_tree_entry *snmpTreeEntry(oid entry, snint len, mib_tree_entry * current); @@ -125,7 +109,7 @@ snmpInit(void) * without having a "search" function. A search function should be written * to make this and the other code much less evil. */ - mib_tree_head = snmpAddNode(snmpCreateOid(1, 1), 1, NULL, NULL, 0); + mib_tree_head = snmpAddNode(snmpCreateOid(1, 1), 1, NULL, NULL, atNone, 0); assert(mib_tree_head); debugs(49, 5, "snmpInit: root is " << mib_tree_head); @@ -144,9 +128,9 @@ snmpInit(void) /* SQ_SYS - 1.3.6.1.4.1.3495.1.1 */ snmpAddNodeStr("1.3.6.1.4.1.3495.1", 1, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSVMSIZ, snmp_sysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSSTOR, snmp_sysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYS_UPTIME, snmp_sysFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSVMSIZ, snmp_sysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYSSTOR, snmp_sysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.1", SYS_UPTIME, snmp_sysFn, static_Inst, atMax); /* SQ_CONF - 1.3.6.1.4.1.3495.1.2 */ snmpAddNodeStr("1.3.6.1.4.1.3495.1", 2, NULL, NULL); @@ -157,10 +141,10 @@ snmpInit(void) /* SQ_CONF + CONF_STORAGE - 1.3.6.1.4.1.3495.1.5 */ snmpAddNodeStr("1.3.6.1.4.1.3495.1.2", CONF_STORAGE, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_MMAXSZ, snmp_confFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWMAXSZ, snmp_confFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWHIWM, snmp_confFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWLOWM, snmp_confFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_MMAXSZ, snmp_confFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWMAXSZ, snmp_confFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWHIWM, snmp_confFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.2.5", CONF_ST_SWLOWM, snmp_confFn, static_Inst, atSum); snmpAddNodeStr("1.3.6.1.4.1.3495.1.2", CONF_UNIQNAME, snmp_confFn, static_Inst); @@ -169,38 +153,38 @@ snmpInit(void) /* PERF_SYS - 1.3.6.1.4.1.3495.1.3.1 */ snmpAddNodeStr("1.3.6.1.4.1.3495.1.3", PERF_SYS, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_PF, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMR, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MEMUSAGE, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUTIME, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUUSAGE, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MAXRESSZ, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMOBJCNT, snmp_prfSysFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_PF, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMR, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MEMUSAGE, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUTIME, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CPUUSAGE, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_MAXRESSZ, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_NUMOBJCNT, snmp_prfSysFn, static_Inst, atSum); snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURLRUEXP, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNLREQ, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNUSED_FD, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURRESERVED_FD, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUSED_FD, snmp_prfSysFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURMAX_FD, snmp_prfSysFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNLREQ, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUNUSED_FD, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURRESERVED_FD, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURUSED_FD, snmp_prfSysFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.1", PERF_SYS_CURMAX_FD, snmp_prfSysFn, static_Inst, atMax); /* PERF_PROTO - 1.3.6.1.4.1.3495.1.3.2 */ snmpAddNodeStr("1.3.6.1.4.1.3495.1.3", PERF_PROTO, NULL, NULL); snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2", PERF_PROTOSTAT_AGGR, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_REQ, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_HITS, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_ERRORS, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_IN, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_OUT, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_S, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_R, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_SKB, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_RKB, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_REQ, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ERRORS, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_IN, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_OUT, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CURSWAP, snmp_prfProtoFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CLIENTS, snmp_prfProtoFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_REQ, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_HITS, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_ERRORS, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_IN, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_HTTP_KBYTES_OUT, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_S, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_R, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_SKB, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ICP_RKB, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_REQ, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_ERRORS, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_IN, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_KBYTES_OUT, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CURSWAP, snmp_prfProtoFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.1", PERF_PROTOSTAT_AGGR_CLIENTS, snmp_prfProtoFn, static_Inst, atSum); /* Note this is time-series rather than 'static' */ /* cacheMedianSvcTable */ @@ -208,49 +192,49 @@ snmpInit(void) /* cacheMedianSvcEntry */ snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2", 1, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_TIME, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_ALL, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_MISS, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NM, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_HIT, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_QUERY, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_REPLY, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_DNS, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_RHR, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_BHR, snmp_prfProtoFn, time_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NH, snmp_prfProtoFn, time_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_TIME, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_ALL, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_MISS, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NM, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_HIT, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_QUERY, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_ICP_REPLY, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_DNS, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_RHR, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_BHR, snmp_prfProtoFn, time_Inst, atAverage); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.3.2.2.1", PERF_MEDIAN_HTTP_NH, snmp_prfProtoFn, time_Inst, atAverage); /* SQ_NET - 1.3.6.1.4.1.3495.1.4 */ snmpAddNodeStr("1.3.6.1.4.1.3495.1", 4, NULL, NULL); snmpAddNodeStr("1.3.6.1.4.1.3495.1.4", NET_IP_CACHE, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_ENT, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_REQ, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_HITS, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_PENDHIT, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_NEGHIT, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_MISS, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_GHBN, snmp_netIpFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_LOC, snmp_netIpFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_ENT, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_REQ, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_HITS, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_PENDHIT, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_NEGHIT, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_MISS, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_GHBN, snmp_netIpFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.1", IP_LOC, snmp_netIpFn, static_Inst, atSum); snmpAddNodeStr("1.3.6.1.4.1.3495.1.4", NET_FQDN_CACHE, NULL, NULL); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_ENT, snmp_netFqdnFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_REQ, snmp_netFqdnFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_HITS, snmp_netFqdnFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_PENDHIT, snmp_netFqdnFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_NEGHIT, snmp_netFqdnFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_MISS, snmp_netFqdnFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_GHBN, snmp_netFqdnFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_ENT, snmp_netFqdnFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_REQ, snmp_netFqdnFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_HITS, snmp_netFqdnFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_PENDHIT, snmp_netFqdnFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_NEGHIT, snmp_netFqdnFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_MISS, snmp_netFqdnFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.2", FQDN_GHBN, snmp_netFqdnFn, static_Inst, atSum); snmpAddNodeStr("1.3.6.1.4.1.3495.1.4", NET_DNS_CACHE, NULL, NULL); #if USE_DNSSERVERS - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netDnsFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netDnsFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netDnsFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netDnsFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netDnsFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netDnsFn, static_Inst, atSum); #else - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netIdnsFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netIdnsFn, static_Inst); - snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netIdnsFn, static_Inst); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REQ, snmp_netIdnsFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_REP, snmp_netIdnsFn, static_Inst, atSum); + snmpAddNodeStr("1.3.6.1.4.1.3495.1.4.3", DNS_SERVERS, snmp_netIdnsFn, static_Inst, atSum); #endif /* SQ_MESH - 1.3.6.1.4.1.3495.1.5 */ @@ -537,6 +521,14 @@ snmpConstructReponse(snmp_request_t * rq) struct snmp_pdu *RespPDU; debugs(49, 5, "snmpConstructReponse: Called."); + + if (UsingSmp() && IamWorkerProcess()) { + AsyncJob::Start(new Snmp::Forwarder(static_cast(*rq->PDU), + static_cast(rq->session), rq->sock, rq->from)); + snmp_free_pdu(rq->PDU); + return; + } + RespPDU = snmpAgentResponse(rq->PDU); snmp_free_pdu(rq->PDU); @@ -552,7 +544,7 @@ snmpConstructReponse(snmp_request_t * rq) * return the response to the requester. */ -static struct snmp_pdu * +struct snmp_pdu * snmpAgentResponse(struct snmp_pdu *PDU) { struct snmp_pdu *Answer = NULL; @@ -665,6 +657,29 @@ snmpTreeGet(oid * Current, snint CurrentLen) return (Fn); } +AggrType +snmpAggrType(oid* Current, snint CurrentLen) +{ + debugs(49, 5, HERE); + + mib_tree_entry* mibTreeEntry = mib_tree_head; + AggrType type = atNone; + int count = 0; + + if (Current[count] == mibTreeEntry->name[count]) { + count++; + + while (mibTreeEntry != NULL && count < CurrentLen) { + mibTreeEntry = snmpTreeEntry(Current[count], count, mibTreeEntry); + if (mibTreeEntry != NULL) + type = mibTreeEntry->aggrType; + count++; + } + } + + return type; +} + static oid_ParseFn * snmpTreeNext(oid * Current, snint CurrentLen, oid ** Next, snint * NextLen) { @@ -1032,7 +1047,7 @@ snmpCreateOidFromStr(const char *str, oid **name, int *nl) * on failure. */ static mib_tree_entry * -snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction) +snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType) { mib_tree_entry *m, *b; oid *n; @@ -1051,7 +1066,7 @@ snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instanc return NULL; /* Create a node */ - m = snmpAddNode(n, nl, parsefunction, instancefunction, 0); + m = snmpAddNode(n, nl, parsefunction, instancefunction, aggrType, 0); /* Link it into the existing tree */ snmpAddNodeChild(b, m); @@ -1065,7 +1080,7 @@ snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instanc * Adds a node to the MIB tree structure and adds the appropriate children */ static mib_tree_entry * -snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, int children,...) +snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, AggrType aggrType, int children,...) { va_list args; int loop; @@ -1083,6 +1098,7 @@ snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * inst entry->instancefunction = instancefunction; entry->children = children; entry->leaves = NULL; + entry->aggrType = aggrType; if (children > 0) { entry->leaves = (mib_tree_entry **)xmalloc(sizeof(mib_tree_entry *) * children); diff --git a/src/snmp_core.h b/src/snmp_core.h new file mode 100644 index 0000000000..284902e5eb --- /dev/null +++ b/src/snmp_core.h @@ -0,0 +1,38 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMP_CORE_H +#define SQUID_SNMP_CORE_H + +#include "config.h" +#include "cache_snmp.h" + +#define SNMP_REQUEST_SIZE 4096 +#define MAX_PROTOSTAT 5 + + +typedef struct _mib_tree_entry mib_tree_entry; +typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn); +typedef enum {atNone = 0, atSum, atAverage, atMax, atMin} AggrType; + +struct _mib_tree_entry { + oid *name; + int len; + oid_ParseFn *parsefunction; + instance_Fn *instancefunction; + int children; + + struct _mib_tree_entry **leaves; + + struct _mib_tree_entry *parent; + AggrType aggrType; +}; + +extern struct snmp_pdu* snmpAgentResponse(struct snmp_pdu* PDU); +extern AggrType snmpAggrType(oid* Current, snint CurrentLen); + +#endif /* SQUID_SNMP_CORE_H */ diff --git a/src/snmpx/Forwarder.cc b/src/snmpx/Forwarder.cc new file mode 100644 index 0000000000..8cba9b7f8e --- /dev/null +++ b/src/snmpx/Forwarder.cc @@ -0,0 +1,107 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "CommCalls.h" +#include "ipc/Port.h" +#include "snmp_core.h" +#include "snmpx/Forwarder.h" +#include "snmpx/Request.h" +#include "snmpx/Response.h" + + +CBDATA_NAMESPACED_CLASS_INIT(Snmp, Forwarder); + + +Snmp::Forwarder::Forwarder(const Pdu& aPdu, const Session& aSession, int aFd, + const Ip::Address& anAddress): + Ipc::Forwarder(new Request(KidIdentifier, 0, aPdu, aSession, aFd, anAddress), 2), + fd(aFd) +{ + debugs(49, 5, HERE << "FD " << aFd); + Must(fd >= 0); + closer = asyncCall(16, 5, "Snmp::Forwarder::noteCommClosed", + CommCbMemFunT(this, &Forwarder::noteCommClosed)); + comm_add_close_handler(fd, closer); +} + +/// removes our cleanup handler of the client connection socket +void +Snmp::Forwarder::cleanup() +{ + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } + fd = -1; + } +} + +/// called when the client socket gets closed by some external force +void +Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params) +{ + debugs(49, 5, HERE); + Must(fd == params.fd); + fd = -1; + mustStop("commClosed"); +} + +void +Snmp::Forwarder::handleTimeout() +{ + sendError(SNMP_ERR_RESOURCEUNAVAILABLE); + Ipc::Forwarder::handleTimeout(); +} + +void +Snmp::Forwarder::handleException(const std::exception& e) +{ + debugs(49, 3, HERE << e.what()); + if (fd >= 0) + sendError(SNMP_ERR_GENERR); + Ipc::Forwarder::handleException(e); +} + +/// send error SNMP response +void +Snmp::Forwarder::sendError(int error) +{ + debugs(49, 3, HERE); + Snmp::Request& req = static_cast(*request); + req.pdu.command = SNMP_PDU_RESPONSE; + req.pdu.errstat = error; + u_char buffer[SNMP_REQUEST_SIZE]; + int len = sizeof(buffer); + snmp_build(&req.session, &req.pdu, buffer, &len); + comm_udp_sendto(fd, req.address, buffer, len); +} + +void +Snmp::SendResponse(unsigned int requestId, const Pdu& pdu) +{ + debugs(49, 5, HERE); + // snmpAgentResponse() can modify arg + Pdu tmp = pdu; + Snmp::Response response(requestId); + snmp_pdu* response_pdu = NULL; + try { + response_pdu = snmpAgentResponse(&tmp); + Must(response_pdu != NULL); + response.pdu = static_cast(*response_pdu); + snmp_free_pdu(response_pdu); + } catch (const std::exception& e) { + debugs(49, DBG_CRITICAL, HERE << e.what()); + response.pdu.command = SNMP_PDU_RESPONSE; + response.pdu.errstat = SNMP_ERR_GENERR; + } + Ipc::TypedMsgHdr message; + response.pack(message); + Ipc::SendMessage(Ipc::coordinatorAddr, message); +} diff --git a/src/snmpx/Forwarder.h b/src/snmpx/Forwarder.h new file mode 100644 index 0000000000..bdef2d6c80 --- /dev/null +++ b/src/snmpx/Forwarder.h @@ -0,0 +1,52 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_FORWARDER_H +#define SQUID_SNMPX_FORWARDER_H + +#include "ipc/Forwarder.h" +#include "snmpx/Pdu.h" +#include "snmpx/Session.h" + + +class CommCloseCbParams; + +namespace Snmp +{ + +/** Forwards a single client SNMP request to Coordinator. + * Waits for an ACK from Coordinator + * Send the data unit with an error response if forwarding fails. + */ +class Forwarder: public Ipc::Forwarder +{ +public: + Forwarder(const Pdu& aPdu, const Session& aSession, int aFd, + const Ip::Address& anAddress); + +protected: + /* Ipc::Forwarder API */ + virtual void cleanup(); ///< perform cleanup actions + virtual void handleTimeout(); + virtual void handleException(const std::exception& e); + +private: + void noteCommClosed(const CommCloseCbParams& params); + void sendError(int error); + +private: + int fd; ///< client connection descriptor + AsyncCall::Pointer closer; ///< comm_close handler for the connection + + CBDATA_CLASS2(Forwarder); +}; + +extern void SendResponse(unsigned int requestId, const Pdu& pdu); + +} // namespace Snmp + +#endif /* SQUID_SNMPX_FORWARDER_H */ diff --git a/src/snmpx/Inquirer.cc b/src/snmpx/Inquirer.cc new file mode 100644 index 0000000000..b7e4a59ed6 --- /dev/null +++ b/src/snmpx/Inquirer.cc @@ -0,0 +1,101 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "CommCalls.h" +#include "ipc/UdsOp.h" +#include "snmp_core.h" +#include "snmpx/Inquirer.h" +#include "snmpx/Response.h" +#include "snmpx/Request.h" + + +CBDATA_NAMESPACED_CLASS_INIT(Snmp, Inquirer); + + +Snmp::Inquirer::Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords): + Ipc::Inquirer(aRequest.clone(), coords, 2), + aggrPdu(aRequest.pdu), + fd(ImportFdIntoComm(aRequest.fd, SOCK_DGRAM, IPPROTO_UDP, Ipc::fdnInSnmpSocket)) +{ + debugs(49, 5, HERE); + closer = asyncCall(16, 5, "Snmp::Inquirer::noteCommClosed", + CommCbMemFunT(this, &Inquirer::noteCommClosed)); + comm_add_close_handler(fd, closer); +} + +/// closes our copy of the client connection socket +void +Snmp::Inquirer::cleanup() +{ + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } + comm_close(fd); + fd = -1; + } +} + +void +Snmp::Inquirer::start() +{ + debugs(49, 5, HERE); + Ipc::Inquirer::start(); + Must(fd >= 0); + inquire(); +} + +void +Snmp::Inquirer::handleException(const std::exception& e) +{ + aggrPdu.errstat = SNMP_ERR_GENERR; + Ipc::Inquirer::handleException(e); +} + +bool +Snmp::Inquirer::aggregate(Response::Pointer aResponse) +{ + Snmp::Response& response = static_cast(*aResponse); + bool error = response.pdu.errstat != SNMP_ERR_NOERROR; + if (error) { + aggrPdu = response.pdu; + } else { + aggrPdu.aggregate(response.pdu); + } + return error; +} + +/// called when the some external force closed our socket +void +Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params) +{ + debugs(49, 5, HERE); + Must(fd < 0 || fd == params.fd); + fd = -1; + mustStop("commClosed"); +} + +bool +Snmp::Inquirer::doneAll() const +{ + return !writer && Ipc::Inquirer::doneAll(); +} + +void +Snmp::Inquirer::sendResponse() +{ + debugs(49, 5, HERE); + aggrPdu.command = SNMP_PDU_RESPONSE; + u_char buffer[SNMP_REQUEST_SIZE]; + int len = sizeof(buffer); + Snmp::Request& req = static_cast(*request); + snmp_build(&req.session, &aggrPdu, buffer, &len); + comm_udp_sendto(fd, req.address, buffer, len); +} diff --git a/src/snmpx/Inquirer.h b/src/snmpx/Inquirer.h new file mode 100644 index 0000000000..5f53ff17b8 --- /dev/null +++ b/src/snmpx/Inquirer.h @@ -0,0 +1,54 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_INQUIRER_H +#define SQUID_SNMPX_INQUIRER_H + +#include "ipc/Inquirer.h" +#include "snmpx/forward.h" +#include "snmpx/Pdu.h" + + +class CommCloseCbParams; + +namespace Snmp +{ + +/// Coordinator's job that sends a PDU request to each strand, +/// aggregates strand responses and send back the result to client +class Inquirer: public Ipc::Inquirer +{ +public: + Inquirer(const Request& aRequest, const Ipc::StrandCoords& coords); + +protected: + /* AsyncJob API */ + virtual void start(); + virtual bool doneAll() const; + + /* Ipc::Inquirer API */ + virtual void cleanup(); + virtual void handleException(const std::exception& e); + virtual void sendResponse(); + virtual bool aggregate(Ipc::Response::Pointer aResponse); + +private: + void noteCommClosed(const CommCloseCbParams& params); + +private: + Pdu aggrPdu; ///< aggregated pdu + int fd; ///< client connection descriptor + + AsyncCall::Pointer writer; ///< comm_write callback + AsyncCall::Pointer closer; ///< comm_close handler + + CBDATA_CLASS2(Inquirer); +}; + +} // namespace Snmp + +#endif /* SQUID_SNMPX_INQUIRER_H */ diff --git a/src/snmpx/Pdu.cc b/src/snmpx/Pdu.cc new file mode 100644 index 0000000000..d27c92ba8b --- /dev/null +++ b/src/snmpx/Pdu.cc @@ -0,0 +1,221 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "ipc/TypedMsgHdr.h" +#include "protos.h" +#include "snmp_core.h" +#include "snmpx/Pdu.h" +#include "snmpx/Var.h" + + +Snmp::Pdu::Pdu() +{ + init(); +} + +Snmp::Pdu::Pdu(const Pdu& pdu) +{ + init(); + assign(pdu); +} + +Snmp::Pdu::~Pdu() +{ + clear(); +} + +Snmp::Pdu& +Snmp::Pdu::operator = (const Pdu& pdu) +{ + clear(); + assign(pdu); + return *this; +} + +void +Snmp::Pdu::init() +{ + xmemset(this, 0, sizeof(*this)); + errstat = SNMP_DEFAULT_ERRSTAT; + errindex = SNMP_DEFAULT_ERRINDEX; +} + +void +Snmp::Pdu::aggregate(const Pdu& pdu) +{ + Must(varCount() == pdu.varCount()); + for (variable_list* p_aggr = variables, *p_var = pdu.variables; p_var != NULL; + p_aggr = p_aggr->next_variable, p_var = p_var->next_variable) + { + Must(p_aggr != NULL); + Var& aggr = static_cast(*p_aggr); + Var& var = static_cast(*p_var); + if (aggr.isNull()) { + aggr.setName(var.getName()); + aggr.copyValue(var); + } else { + switch(snmpAggrType(aggr.name, aggr.name_length)) + { + case atSum: + case atAverage: + aggr += var; + break; + case atMax: + if (var > aggr) + aggr.copyValue(var); + break; + case atMin: + if (var < aggr) + aggr.copyValue(var); + break; + default: + break; + } + } + } +} + +void +Snmp::Pdu::clear() +{ + clearSystemOid(); + clearVars(); + init(); +} + +void +Snmp::Pdu::assign(const Pdu& pdu) +{ + command = pdu.command; + address.sin_addr.s_addr = pdu.address.sin_addr.s_addr; + reqid = pdu.reqid; + errstat = pdu.errstat; + errindex = pdu.errindex; + non_repeaters = pdu.non_repeaters; + max_repetitions = pdu.max_repetitions; + agent_addr.sin_addr.s_addr = pdu.agent_addr.sin_addr.s_addr; + trap_type = pdu.trap_type; + specific_type = pdu.specific_type; + time = pdu.time; + setSystemOid(pdu.getSystemOid()); + setVars(pdu.variables); +} + +void +Snmp::Pdu::clearVars() +{ + variable_list* var = variables; + while (var != NULL) { + variable_list* tmp = var; + var = var->next_variable; + snmp_var_free(tmp); + } + variables = NULL; +} + +void +Snmp::Pdu::setVars(variable_list* vars) +{ + clearVars(); + for (variable_list** p_var = &variables; vars != NULL; + vars = vars->next_variable, p_var = &(*p_var)->next_variable) + { + *p_var = new Var(static_cast(*vars)); + } +} + +void +Snmp::Pdu::clearSystemOid() +{ + if (enterprise != NULL) { + xfree(enterprise); + enterprise = NULL; + } + enterprise_length = 0; +} + +Range +Snmp::Pdu::getSystemOid() const +{ + return Range(enterprise, enterprise + enterprise_length); +} + +void +Snmp::Pdu::setSystemOid(const Range& systemOid) +{ + clearSystemOid(); + if (systemOid.start != NULL && systemOid.size() != 0) { + enterprise_length = systemOid.size(); + enterprise = static_cast(xmalloc(enterprise_length * sizeof(oid))); + std::copy(systemOid.start, systemOid.end, enterprise); + } +} + +void +Snmp::Pdu::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.putPod(command); + msg.putPod(address); + msg.putPod(reqid); + msg.putPod(errstat); + msg.putPod(errindex); + msg.putPod(non_repeaters); + msg.putPod(max_repetitions); + msg.putInt(enterprise_length); + if (enterprise_length > 0) { + Must(enterprise != NULL); + msg.putFixed(enterprise, enterprise_length * sizeof(oid)); + } + msg.putPod(agent_addr); + msg.putPod(trap_type); + msg.putPod(specific_type); + msg.putPod(time); + msg.putInt(varCount()); + for (variable_list* var = variables; var != NULL; var = var->next_variable) + static_cast(var)->pack(msg); +} + +void +Snmp::Pdu::unpack(const Ipc::TypedMsgHdr& msg) +{ + clear(); + msg.getPod(command); + msg.getPod(address); + msg.getPod(reqid); + msg.getPod(errstat); + msg.getPod(errindex); + msg.getPod(non_repeaters); + msg.getPod(max_repetitions); + enterprise_length = msg.getInt(); + if (enterprise_length > 0) { + enterprise = static_cast(xmalloc(enterprise_length * sizeof(oid))); + msg.getFixed(enterprise, enterprise_length * sizeof(oid)); + } + msg.getPod(agent_addr); + msg.getPod(trap_type); + msg.getPod(specific_type); + msg.getPod(time); + int count = msg.getInt(); + for (variable_list** p_var = &variables; count > 0; + p_var = &(*p_var)->next_variable, --count) + { + Var* var = new Var(); + var->unpack(msg); + *p_var = var; + } +} + +int +Snmp::Pdu::varCount() const +{ + int count = 0; + for (variable_list* var = variables; var != NULL; var = var->next_variable) + ++count; + return count; +} diff --git a/src/snmpx/Pdu.h b/src/snmpx/Pdu.h new file mode 100644 index 0000000000..2084894b57 --- /dev/null +++ b/src/snmpx/Pdu.h @@ -0,0 +1,48 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_PDU_H +#define SQUID_SNMPX_PDU_H + +#include "config.h" +#include "ipc/forward.h" +#include "Range.h" +#include "snmp.h" + + +namespace Snmp +{ + +/// snmp_pdu wrapper introduce the feature +/// to aggregate variables and to pack/unpack message +class Pdu: public snmp_pdu +{ +public: + Pdu(); + Pdu(const Pdu& pdu); + Pdu& operator = (const Pdu& pdu); + ~Pdu(); + + void aggregate(const Pdu& pdu); + void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + void unpack(const Ipc::TypedMsgHdr& msg); ///< restore struct from the message + int varCount() const; ///< size of variables list + void clear(); ///< clear all internal members + void setVars(variable_list* vars); ///< perform assignment of variables list + void clearVars(); ///< clear variables list + Range getSystemOid() const; + void setSystemOid(const Range& systemOid); + void clearSystemOid(); + +private: + void init(); ///< initialize members + void assign(const Pdu& pdu); ///< perform full assignment +}; + +} // namespace Snmp + +#endif /* SQUID_SNMPX_PDU_H */ diff --git a/src/snmpx/Request.cc b/src/snmpx/Request.cc new file mode 100644 index 0000000000..a0d5f0927e --- /dev/null +++ b/src/snmpx/Request.cc @@ -0,0 +1,59 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "ipc/Messages.h" +#include "ipc/TypedMsgHdr.h" +#include "snmpx/Request.h" + + +Snmp::Request::Request(int aRequestorId, unsigned int aRequestId, + const Pdu& aPdu, const Session& aSession, + int aFd, const Ip::Address& anAddress): + Ipc::Request(aRequestorId, aRequestId), + pdu(aPdu), session(aSession), fd(aFd), address(anAddress) +{ +} + +Snmp::Request::Request(const Request& request): + Ipc::Request(request.requestorId, request.requestId), + pdu(request.pdu), session(request.session), + fd(request.fd), address(request.address) +{ +} + +Snmp::Request::Request(const Ipc::TypedMsgHdr& msg): + Ipc::Request(0, 0) +{ + msg.checkType(Ipc::mtSnmpRequest); + msg.getPod(requestorId); + msg.getPod(requestId); + pdu.unpack(msg); + session.unpack(msg); + msg.getPod(address); + + fd = msg.getFd(); +} + +void +Snmp::Request::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.setType(Ipc::mtSnmpRequest); + msg.putPod(requestorId); + msg.putPod(requestId); + pdu.pack(msg); + session.pack(msg); + msg.putPod(address); + + msg.putFd(fd); +} + +Ipc::Request::Pointer +Snmp::Request::clone() const +{ + return new Request(*this); +} diff --git a/src/snmpx/Request.h b/src/snmpx/Request.h new file mode 100644 index 0000000000..aab4f1a397 --- /dev/null +++ b/src/snmpx/Request.h @@ -0,0 +1,46 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_REQUEST_H +#define SQUID_SNMPX_REQUEST_H + +#include "ip/Address.h" +#include "ipc/forward.h" +#include "ipc/Request.h" +#include "snmpx/Pdu.h" +#include "snmpx/Session.h" + + +namespace Snmp +{ + +/// SNMP request +class Request: public Ipc::Request +{ +public: + Request(int aRequestorId, unsigned int aRequestId, const Pdu& aPdu, + const Session& aSession, int aFd, const Ip::Address& anAddress); + + explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() + /* Ipc::Request API */ + virtual void pack(Ipc::TypedMsgHdr& msg) const; + virtual Pointer clone() const; + +private: + Request(const Request& request); + +public: + Pdu pdu; ///< SNMP protocol data unit + Session session; ///< SNMP session + int fd; ///< client connection descriptor + Ip::Address address; ///< client address +}; + + +} // namespace Snmp + +#endif /* SQUID_SNMPX_REQUEST_H */ diff --git a/src/snmpx/Response.cc b/src/snmpx/Response.cc new file mode 100644 index 0000000000..f32c355650 --- /dev/null +++ b/src/snmpx/Response.cc @@ -0,0 +1,51 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "ipc/Messages.h" +#include "ipc/TypedMsgHdr.h" +#include "snmpx/Response.h" + + +std::ostream& Snmp::operator << (std::ostream& os, const Response& response) +{ + os << "response: {requestId: " << response.requestId << '}'; + return os; +} + +Snmp::Response::Response(unsigned int aRequestId): + Ipc::Response(aRequestId), pdu() +{ +} + +Snmp::Response::Response(const Response& response): + Ipc::Response(response.requestId), pdu(response.pdu) +{ +} + +Snmp::Response::Response(const Ipc::TypedMsgHdr& msg): + Ipc::Response(0) +{ + msg.checkType(Ipc::mtSnmpResponse); + msg.getPod(requestId); + pdu.unpack(msg); +} + +void +Snmp::Response::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.setType(Ipc::mtSnmpResponse); + msg.putPod(requestId); + pdu.pack(msg); +} + +Ipc::Response::Pointer +Snmp::Response::clone() const +{ + return new Response(*this); +} diff --git a/src/snmpx/Response.h b/src/snmpx/Response.h new file mode 100644 index 0000000000..99d9846f20 --- /dev/null +++ b/src/snmpx/Response.h @@ -0,0 +1,40 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_RESPONSE_H +#define SQUID_SNMPX_RESPONSE_H + +#include "ipc/forward.h" +#include "ipc/Response.h" +#include "snmpx/Pdu.h" +#include + +namespace Snmp +{ + +/// +class Response: public Ipc::Response +{ +public: + Response(unsigned int aRequestId); + explicit Response(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() + /* Ipc::Response API */ + virtual void pack(Ipc::TypedMsgHdr& msg) const; + virtual Ipc::Response::Pointer clone() const; + +private: + Response(const Response& response); + +public: + Pdu pdu; ///< SNMP protocol data unit +}; + +extern std::ostream& operator << (std::ostream& os, const Response& response); + +} // namespace Snmp + +#endif /* SQUID_SNMPX_RESPONSE_H */ diff --git a/src/snmpx/Session.cc b/src/snmpx/Session.cc new file mode 100644 index 0000000000..fb94ca4e55 --- /dev/null +++ b/src/snmpx/Session.cc @@ -0,0 +1,112 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "ipc/TypedMsgHdr.h" +#include "protos.h" +#include "snmpx/Session.h" + + +Snmp::Session::Session() +{ + clear(); +} + +Snmp::Session::Session(const Session& session) +{ + assign(session); +} + +Snmp::Session::~Session() +{ + free(); +} + +Snmp::Session& +Snmp::Session::operator = (const Session& session) +{ + free(); + assign(session); + return *this; +} + +void +Snmp::Session::clear() +{ + xmemset(this, 0, sizeof(*this)); +} + +void +Snmp::Session::free() +{ + if (community_len > 0) { + Must(community != NULL); + xfree(community); + } + if (peername != NULL) + xfree(peername); + clear(); +} + +void +Snmp::Session::assign(const Session& session) +{ + memcpy(this, &session, sizeof(*this)); + if (session.community != NULL) { + community = (u_char*)xstrdup((char*)session.community); + Must(community != NULL); + } + if (session.peername != NULL) { + peername = xstrdup(session.peername); + Must(peername != NULL); + } +} + +void +Snmp::Session::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.putPod(Version); + msg.putInt(community_len); + if (community_len > 0) { + Must(community != NULL); + msg.putFixed(community, community_len); + } + msg.putPod(retries); + msg.putPod(timeout); + int len = peername != NULL ? strlen(peername) : 0; + msg.putInt(len); + if (len > 0) + msg.putFixed(peername, len); + msg.putPod(remote_port); + msg.putPod(local_port); +} + +void +Snmp::Session::unpack(const Ipc::TypedMsgHdr& msg) +{ + free(); + msg.getPod(Version); + community_len = msg.getInt(); + if (community_len > 0) { + community = static_cast(xmalloc(community_len + 1)); + Must(community != NULL); + msg.getFixed(community, community_len); + community[community_len] = 0; + } + msg.getPod(retries); + msg.getPod(timeout); + int len = msg.getInt(); + if (len > 0) { + peername = static_cast(xmalloc(len + 1)); + Must(peername != NULL); + msg.getFixed(peername, len); + peername[len] = 0; + } + msg.getPod(remote_port); + msg.getPod(local_port); +} diff --git a/src/snmpx/Session.h b/src/snmpx/Session.h new file mode 100644 index 0000000000..6e93f81ebf --- /dev/null +++ b/src/snmpx/Session.h @@ -0,0 +1,40 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_SESSION_H +#define SQUID_SNMPX_SESSION_H + +#include "config.h" +#include "ipc/forward.h" +#include "snmp.h" +#include "snmp_session.h" + + +namespace Snmp +{ + +/// snmp_session wrapper add pack/unpack feature +class Session: public snmp_session +{ +public: + Session(); + Session(const Session& session); + Session& operator = (const Session& session); + ~Session(); + + void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + void unpack(const Ipc::TypedMsgHdr& msg); ///< restore struct from the message + void clear(); ///< clear internal members + +private: + void free(); ///< free internal members + void assign(const Session& session); ///< perform full assignment +}; + +} // namespace Snmp + +#endif /* SQUID_SNMPX_SESSION_H */ diff --git a/src/snmpx/Var.cc b/src/snmpx/Var.cc new file mode 100644 index 0000000000..889344026a --- /dev/null +++ b/src/snmpx/Var.cc @@ -0,0 +1,374 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#include "config.h" +#include "base/TextException.h" +#include "ipc/TypedMsgHdr.h" +#include "protos.h" +#include "snmpx/Var.h" + + +Snmp::Var::Var() +{ + init(); +} + +Snmp::Var::Var(const Var& var) +{ + init(); + assign(var); +} + +Snmp::Var::~Var() +{ + clear(); +} + +Snmp::Var& +Snmp::Var::operator = (const Var& var) +{ + clear(); + assign(var); + return *this; +} + +void +Snmp::Var::init() +{ + xmemset(this, 0, sizeof(*this)); +} + +Snmp::Var& +Snmp::Var::operator += (const Var& var) +{ + switch(type) + { + case SMI_INTEGER: + setInt(asInt() + var.asInt()); + break; + case SMI_GAUGE32: + setGauge(asGauge() + var.asGauge()); + break; + case SMI_COUNTER32: + setCounter(asCounter() + var.asCounter()); + break; + case SMI_COUNTER64: + setCounter64(asCounter64() + var.asCounter64()); + break; + case SMI_TIMETICKS: + setTimeTicks(asTimeTicks() + var.asTimeTicks()); + break; + default: + debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type); + throw TexcHere("Unsupported type"); + break; + } + return *this; +} + +Snmp::Var& +Snmp::Var::operator /= (int num) +{ + Must(num != 0); + switch(type) + { + case SMI_INTEGER: + setInt(asInt() / num); + break; + case SMI_GAUGE32: + setGauge(asGauge() / num); + break; + case SMI_COUNTER32: + setCounter(asCounter() / num); + break; + case SMI_COUNTER64: + setCounter64(asCounter64() / num); + break; + case SMI_TIMETICKS: + setTimeTicks(asTimeTicks() / num); + break; + default: + debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type); + throw TexcHere("Unsupported type"); + break; + } + return *this; +} + +bool +Snmp::Var::operator < (const Var& var) const +{ + switch(type) + { + case SMI_INTEGER: + return asInt() < var.asInt(); + case SMI_GAUGE32: + return asGauge() < var.asGauge(); + case SMI_COUNTER32: + return asCounter() < var.asCounter(); + case SMI_COUNTER64: + return asCounter64() < var.asCounter64(); + case SMI_TIMETICKS: + return asTimeTicks() < var.asTimeTicks(); + default: + debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type); + throw TexcHere("Unsupported type"); + break; + } + return false; // unreachable +} + +bool +Snmp::Var::operator > (const Var& var) const +{ + switch(type) + { + case SMI_INTEGER: + return asInt() > var.asInt(); + case SMI_GAUGE32: + return asGauge() > var.asGauge(); + case SMI_COUNTER32: + return asCounter() > var.asCounter(); + case SMI_COUNTER64: + return asCounter64() > var.asCounter64(); + case SMI_TIMETICKS: + return asTimeTicks() > var.asTimeTicks(); + default: + debugs(49, DBG_CRITICAL, HERE << "Unsupported type: " << type); + throw TexcHere("Unsupported type"); + break; + } + return false; // unreachable +} + +void +Snmp::Var::assign(const Var& var) +{ + setName(var.getName()); + copyValue(var); +} + +void +Snmp::Var::clearName() +{ + if (name != NULL) { + xfree(name); + name = NULL; + } + name_length = 0; +} + +Range +Snmp::Var::getName() const +{ + return Range(name, name + name_length); +} + +void +Snmp::Var::setName(const Range& aName) +{ + clearName(); + if (aName.start != NULL && aName.size() != 0) { + name_length = aName.size(); + name = static_cast(xmalloc(name_length * sizeof(oid))); + std::copy(aName.start, aName.end, name); + } +} + +void +Snmp::Var::clearValue() +{ + if (val.string != NULL) { + xfree(val.string); + val.string = NULL; + } + val_len = 0; + type = 0; +} + +bool +Snmp::Var::isNull() const +{ + return type == SMI_NULLOBJ; +} + +int +Snmp::Var::asInt() const +{ + Must(type == SMI_INTEGER); + Must(val.integer != NULL && val_len == sizeof(int)); + return *val.integer; +} + +unsigned int +Snmp::Var::asGauge() const +{ + Must(type == SMI_GAUGE32); + Must(val.integer != NULL && val_len == 4); + return *reinterpret_cast(val.integer); +} + +int +Snmp::Var::asCounter() const +{ + Must(type == SMI_COUNTER32); + Must(val.integer != NULL && val_len == 4); + return *reinterpret_cast(val.integer); +} + +long long int +Snmp::Var::asCounter64() const +{ + Must(type == SMI_COUNTER64); + Must(val.integer != NULL && val_len == 8); + return *reinterpret_cast(val.integer); +} + +time_t +Snmp::Var::asTimeTicks() const +{ + Must(type == SMI_TIMETICKS); + Must(val.integer != NULL && val_len == sizeof(time_t)); + return *reinterpret_cast(val.integer); +} + +Range +Snmp::Var::asObject() const +{ + Must(type == SMI_OBJID); + Must(val_len % sizeof(oid) == 0); + int length = val_len / sizeof(oid); + Must(val.objid != NULL && length > 0); + return Range(val.objid, val.objid + length); +} + +Range +Snmp::Var::asString() const +{ + Must(type == SMI_STRING); + Must(val.string != NULL && val_len > 0); + return Range(val.string, val.string + val_len); +} + +ipaddr +Snmp::Var::asIpAddress() const +{ + Must(type == SMI_IPADDRESS); + Must(val.string != NULL && val_len == sizeof(ipaddr)); + return *reinterpret_cast(val.string); +} + +void +Snmp::Var::setInt(int value) +{ + setValue(&value, sizeof(value), SMI_INTEGER); +} + +void +Snmp::Var::setCounter(int value) +{ + setValue(&value, sizeof(value), SMI_COUNTER32); +} + +void +Snmp::Var::setGauge(unsigned int value) +{ + setValue(&value, sizeof(value), SMI_GAUGE32); +} + +void +Snmp::Var::setString(const Range& string) +{ + setValue(string.start, string.size(), SMI_STRING); +} + +void +Snmp::Var::setObject(const Range& object) +{ + setValue(object.start, object.size() * sizeof(oid), SMI_OBJID); +} + +void +Snmp::Var::setCounter64(long long int counter) +{ + setValue(&counter, sizeof(counter), SMI_COUNTER64); +} + +void +Snmp::Var::setTimeTicks(time_t ticks) +{ + setValue(&ticks, sizeof(ticks), SMI_TIMETICKS); +} + +void +Snmp::Var::setIpAddress(ipaddr addr) +{ + setValue(&addr, sizeof(addr), SMI_IPADDRESS); +} + +void +Snmp::Var::copyValue(const Var& var) +{ + setValue(var.val.string, var.val_len, var.type); +} + +void +Snmp::Var::setValue(const void* value, int length, int aType) +{ + clearValue(); + if (value != NULL) { + Must(length > 0 && aType > 0); + val.string = static_cast(xmalloc(length)); + memcpy(val.string, value, length); + } + val_len = length; + type = aType; +} + +void +Snmp::Var::clear() +{ + clearName(); + clearValue(); + init(); +} + +void +Snmp::Var::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.putInt(name_length); + if (name_length > 0) { + Must(name != NULL); + msg.putFixed(name, name_length * sizeof(oid)); + } + msg.putPod(type); + msg.putPod(val_len); + if (val_len > 0) { + Must(val.string != NULL); + msg.putFixed(val.string, val_len); + } +} + +void +Snmp::Var::unpack(const Ipc::TypedMsgHdr& msg) +{ + clearName(); + clearValue(); + name_length = msg.getInt(); + Must(name_length >= 0); + if (name_length > 0) { + name = static_cast(xmalloc(name_length * sizeof(oid))); + msg.getFixed(name, name_length * sizeof(oid)); + } + msg.getPod(type); + val_len = msg.getInt(); + Must(val_len >= 0); + if (val_len > 0) { + val.string = static_cast(xmalloc(val_len)); + msg.getFixed(val.string, val_len); + } +} diff --git a/src/snmpx/Var.h b/src/snmpx/Var.h new file mode 100644 index 0000000000..86d05326bc --- /dev/null +++ b/src/snmpx/Var.h @@ -0,0 +1,74 @@ +/* + * $Id$ + * + * DEBUG: section 49 SNMP Interface + * + */ + +#ifndef SQUID_SNMPX_VAR_H +#define SQUID_SNMPX_VAR_H + +#include "config.h" +#include "ipc/forward.h" +#include "Range.h" +#include "snmp_vars.h" + + +namespace Snmp +{ + +/// variable_list wrapper implement the feature to change +/// the name/value of variable and to pack/unpack message +class Var: public variable_list +{ +public: + Var(); + Var(const Var& var); + Var& operator = (const Var& var); + ~Var(); + + Var& operator += (const Var& var); + Var& operator /= (int num); + bool operator < (const Var& var) const; + bool operator > (const Var& var) const; + + void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + void unpack(const Ipc::TypedMsgHdr& msg); ///< restore struct from the message + + Range getName() const; ///< returns variable name + void setName(const Range& aName); ///< set new variable name + void clearName(); ///< clear variable name + + bool isNull() const; + + int asInt() const; ///< returns variable value as integer + unsigned int asGauge() const; ///< returns variable value as unsigned int + int asCounter() const; ///< returns variable value as Counter32 + long long int asCounter64() const; ///< returns variable value as Counter64 + time_t asTimeTicks() const; ///< returns variable value as time ticks + Range asObject() const; ///< returns variable value as object oid + Range asString() const; ///< returns variable value as chars string + ipaddr asIpAddress() const; ///< returns variable value as ip address + + void setInt(int value); ///< assign int value to variable + void setCounter(int value); ///< assign Counter32 value to variable + void setGauge(unsigned int value); ///< assign unsigned int value to variable + void setString(const Range& string); ///< assign string to variable + void setObject(const Range& object); ///< assign object oid to variable + void setTimeTicks(time_t ticks); ///