$(BUILT_SOURCES)
squid_LDADD = \
- $(COMMON_LIBS) \
+ $(AUTH_ACL_LIBS) \
+ ident/libident.la \
+ acl/libacls.la \
+ eui/libeui.la \
+ acl/libstate.la \
+ $(AUTH_LIBS) \
+ $(DISK_LIBS) \
+ acl/libapi.la \
+ base/libbase.la \
+ libsquid.la \
+ ip/libip.la \
+ fs/libfs.la \
+ ipc/libipc.la \
+ mgr/libmgr.la \
+ anyp/libanyp.la \
comm/libcomm.la \
eui/libeui.la \
icmp/libicmp.la icmp/libicmp-core.la \
StoreMetaURL.cc \
StoreMetaVary.cc \
StoreSwapLogData.cc \
- $(TEST_CALL_SOURCES) \
tools.cc \
tunnel.cc \
- SwapDir.cc \
+ SwapDir.cc MemStore.cc MemStoreMap.cc \
url.cc \
URLScheme.cc \
urn.cc \
StoreMetaURL.cc \
StoreMetaVary.cc \
StoreSwapLogData.cc \
- $(TEST_CALL_SOURCES) \
+ String.cc \
+ SwapDir.cc \
+ tests/CapturingStoreEntry.h \
+ tests/testEvent.cc \
+ tests/testEvent.h \
+ tests/testMain.cc \
+ tests/stub_main_cc.cc \
+ tests/stub_ipc_Forwarder.cc \
+ time.cc \
tools.cc \
tunnel.cc \
+ SwapDir.cc MemStore.cc MemStoreMap.cc \
url.cc \
URLScheme.cc \
urn.cc \
StoreMetaURL.cc \
StoreMetaVary.cc \
StoreSwapLogData.cc \
- $(TEST_CALL_SOURCES) \
+ String.cc \
+ SwapDir.cc \
+ tests/testEventLoop.cc \
+ tests/testEventLoop.h \
+ tests/testMain.cc \
+ tests/stub_main_cc.cc \
+ tests/stub_ipc_Forwarder.cc \
+ time.cc \
tools.cc \
tunnel.cc \
+ SwapDir.cc MemStore.cc MemStoreMap.cc \
url.cc \
URLScheme.cc \
urn.cc \
StoreMetaURL.cc \
StoreMetaVary.cc \
StoreSwapLogData.cc \
- $(TEST_CALL_SOURCES) \
+ event.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc \
+ SwapDir.cc MemStore.cc MemStoreMap.cc \
url.cc \
URLScheme.cc \
urn.cc \
StoreMetaURL.cc \
StoreMetaVary.cc \
StoreSwapLogData.cc \
- $(TEST_CALL_SOURCES) \
+ event.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc \
+ SwapDir.cc MemStore.cc MemStoreMap.cc \
urn.cc \
wccp2.cc \
whois.cc \
Mgr::Inquirer::HandleRemoteAck(response);
}
+void
+Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
+{
+ // do we know of a strand with the given search tag?
+ const StrandCoord *strand = NULL;
+ typedef StrandCoords::const_iterator SCCI;
+ for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
+ if (i->tag == request.tag)
+ strand = &(*i);
+ }
+
+ if (strand) {
+ notifySearcher(request, *strand);
+ return;
+ }
+
+ searchers.push_back(request);
+ debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
+ " who " << request.tag << " is");
+}
+
+void
+Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
+ const StrandCoord& strand)
+{
+ debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
+ request.tag << " is kid" << strand.kidId);
+ const StrandSearchResponse response(strand);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+ #if SQUID_SNMP
+ void
+ Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request)
+ {
+ debugs(54, 4, HERE);
+
+ Snmp::Response response(request.requestId);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+
+ AsyncJob::Start(new Snmp::Inquirer(request, strands_));
+ }
+
+ void
+ Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response)
+ {
+ debugs(54, 4, HERE);
+ Snmp::Inquirer::HandleRemoteAck(response);
+ }
+ #endif
int
Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
#include "ipc/Port.h"
#include "ipc/SharedListen.h"
#include "ipc/StrandCoords.h"
+#include "ipc/StrandSearch.h"
#include "mgr/forward.h"
+ #if SQUID_SNMP
+ #include "snmp/forward.h"
+ #endif
+
+#include <list>
#include <map>
++
namespace Ipc
{
--- /dev/null
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+ #include "config.h"
+ #include "base/AsyncJobCalls.h"
+ #include "base/TextException.h"
+ #include "ipc/Forwarder.h"
+ #include "ipc/Port.h"
+ #include "ipc/TypedMsgHdr.h"
+
+
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Forwarder);
+
+ Ipc::Forwarder::RequestsMap Ipc::Forwarder::TheRequestsMap;
+ unsigned int Ipc::Forwarder::LastRequestId = 0;
+
+ Ipc::Forwarder::Forwarder(Request::Pointer aRequest, double aTimeout):
+ AsyncJob("Ipc::Forwarder"),
+ request(aRequest), timeout(aTimeout)
+ {
+ debugs(54, 5, HERE);
+ }
+
+ Ipc::Forwarder::~Forwarder()
+ {
+ debugs(54, 5, HERE);
+ Must(request->requestId == 0);
+ cleanup();
+ }
+
+ /// perform cleanup actions
+ void
+ Ipc::Forwarder::cleanup()
+ {
+ }
+
+ void
+ Ipc::Forwarder::start()
+ {
+ debugs(54, 3, HERE);
+
+ typedef NullaryMemFunT<Forwarder> Dialer;
+ AsyncCall::Pointer callback = JobCallback(54, 5, Dialer, this, Forwarder::handleRemoteAck);
+ if (++LastRequestId == 0) // don't use zero value as request->requestId
+ ++LastRequestId;
+ request->requestId = LastRequestId;
+ TheRequestsMap[request->requestId] = callback;
+ TypedMsgHdr message;
+
+ try {
+ request->pack(message);
+ } catch (...) {
+ // assume the pack() call failed because the message did not fit
+ // TODO: add a more specific exception?
+ handleError();
+ }
+
+ SendMessage(coordinatorAddr, message);
+ eventAdd("Ipc::Forwarder::requestTimedOut", &Forwarder::RequestTimedOut,
+ this, timeout, 0, false);
+ }
+
+ void
+ Ipc::Forwarder::swanSong()
+ {
+ debugs(54, 5, HERE);
+ removeTimeoutEvent();
+ if (request->requestId > 0) {
+ DequeueRequest(request->requestId);
+ request->requestId = 0;
+ }
+ cleanup();
+ }
+
+ bool
+ Ipc::Forwarder::doneAll() const
+ {
+ debugs(54, 5, HERE);
+ return request->requestId == 0;
+ }
+
+ /// called when Coordinator starts processing the request
+ void
+ Ipc::Forwarder::handleRemoteAck()
+ {
+ debugs(54, 3, HERE);
+ request->requestId = 0;
++ // Do not clear ENTRY_FWD_HDR_WAIT or do entry->complete() because
++ // it will trigger our client side processing. Let job cleanup close.
+ }
+
+ /// Ipc::Forwarder::requestTimedOut wrapper
+ void
+ Ipc::Forwarder::RequestTimedOut(void* param)
+ {
+ debugs(54, 3, HERE);
+ Must(param != NULL);
+ Forwarder* fwdr = static_cast<Forwarder*>(param);
+ // use async call to enable job call protection that time events lack
+ CallJobHere(54, 5, fwdr, Forwarder, requestTimedOut);
+ }
+
+ /// called when Coordinator fails to start processing the request [in time]
+ void
+ Ipc::Forwarder::requestTimedOut()
+ {
+ debugs(54, 3, HERE);
+ handleTimeout();
+ }
+
+ void
+ Ipc::Forwarder::handleError()
+ {
+ mustStop("error");
+ }
+
+ void
+ Ipc::Forwarder::handleTimeout()
+ {
+ mustStop("timeout");
+ }
+
+ /// terminate with an error
+ void
+ Ipc::Forwarder::handleException(const std::exception& e)
+ {
+ debugs(54, 3, HERE << e.what());
+ mustStop("exception");
+ }
+
+ void
+ Ipc::Forwarder::callException(const std::exception& e)
+ {
+ try {
+ handleException(e);
+ } catch (const std::exception& ex) {
+ debugs(54, DBG_CRITICAL, HERE << ex.what());
+ }
+ AsyncJob::callException(e);
+ }
+
+ /// returns and forgets the right Forwarder callback for the request
+ AsyncCall::Pointer
+ Ipc::Forwarder::DequeueRequest(unsigned int requestId)
+ {
+ debugs(54, 3, HERE);
+ Must(requestId != 0);
+ AsyncCall::Pointer call;
+ RequestsMap::iterator request = TheRequestsMap.find(requestId);
+ if (request != TheRequestsMap.end()) {
+ call = request->second;
+ Must(call != NULL);
+ TheRequestsMap.erase(request);
+ }
+ return call;
+ }
+
+ /// called when we are no longer waiting for Coordinator to respond
+ void
+ Ipc::Forwarder::removeTimeoutEvent()
+ {
+ if (eventFind(&Forwarder::RequestTimedOut, this))
+ eventDelete(&Forwarder::RequestTimedOut, this);
+ }
+
+ void
+ Ipc::Forwarder::HandleRemoteAck(unsigned int requestId)
+ {
+ debugs(54, 3, HERE);
+ Must(requestId != 0);
+
+ AsyncCall::Pointer call = DequeueRequest(requestId);
+ if (call != NULL)
+ ScheduleCallHere(call);
+ }
Port.h \
Strand.cc \
Strand.h \
- \
forward.h \
- Response.h
+ Forwarder.cc \
+ Forwarder.h \
+ Inquirer.cc \
+ Inquirer.h \
+ Request.h \
++ Response.h \
+ \
+ mem/Page.cc \
+ mem/Page.h \
+ mem/PagePool.cc \
+ mem/PagePool.h \
+ mem/Pages.cc \
+ mem/Pages.h \
+ mem/PageStack.cc \
+ mem/PageStack.h \
+ mem/Segment.cc \
+ mem/Segment.h
DEFS += -DDEFAULT_PREFIX=\"$(prefix)\"
/// message class identifier
typedef enum { mtNone = 0, mtRegistration,
+ mtStrandSearchRequest, mtStrandSearchResponse,
mtSharedListenRequest, mtSharedListenResponse,
+ mtIpcIoNotification,
mtCacheMgrRequest, mtCacheMgrResponse
+ #if SQUID_SNMP
+ ,
+ mtSnmpRequest, mtSnmpResponse
+ #endif
} MessageType;
} // namespace Ipc;
#include "mgr/Request.h"
#include "mgr/Response.h"
#include "mgr/Forwarder.h"
+#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
+#include "SwapDir.h" /* XXX: scope boundary violation */
#include "CacheManager.h"
-
+ #if SQUID_SNMP
+ #include "snmp/Forwarder.h"
+ #include "snmp/Request.h"
+ #include "snmp/Response.h"
+ #endif
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
SharedListenJoined(SharedListenResponse(message));
break;
- case mtCacheMgrRequest:
- handleCacheMgrRequest(Mgr::Request(message));
- break;
+ case mtStrandSearchResponse:
+ IpcIoFile::HandleOpenResponse(StrandSearchResponse(message));
+ break;
+
+ case mtIpcIoNotification:
+ IpcIoFile::HandleNotification(message);
+ break;
+
+ case mtCacheMgrRequest: {
+ const Mgr::Request req(message);
+ handleCacheMgrRequest(req);
+ }
+ break;
- case mtCacheMgrResponse:
- handleCacheMgrResponse(Mgr::Response(message));
- break;
+ case mtCacheMgrResponse: {
+ const Mgr::Response resp(message);
+ handleCacheMgrResponse(resp);
+ }
+ break;
+
+ #if SQUID_SNMP
+ case mtSnmpRequest: {
+ const Snmp::Request req(message);
+ handleSnmpRequest(req);
+ }
+ break;
+
+ case mtSnmpResponse: {
+ const Snmp::Response resp(message);
+ handleSnmpResponse(resp);
+ }
+ break;
+ #endif
default:
debugs(54, 1, HERE << "Unhandled message type: " << message.type());
#ifndef SQUID_IPC_STRAND_H
#define SQUID_IPC_STRAND_H
+#include "ipc/forward.h"
#include "ipc/Port.h"
#include "mgr/forward.h"
-
+ #if SQUID_SNMP
+ #include "snmp/forward.h"
+ #endif
namespace Ipc
{
private:
void registerSelf(); /// let Coordinator know this strand exists
- void handleRegistrationResponse(const StrandCoord &strand);
+ void handleRegistrationResponse(const HereIamMessage &msg);
void handleCacheMgrRequest(const Mgr::Request& request);
void handleCacheMgrResponse(const Mgr::Response& response);
+ #if SQUID_SNMP
+ void handleSnmpRequest(const Snmp::Request& request);
+ void handleSnmpResponse(const Snmp::Response& response);
+ #endif
private:
bool isRegistered; ///< whether Coordinator ACKed registration (unused)
class TypedMsgHdr;
class StrandCoord;
+class HereIamMessage;
+class StrandSearchResponse;
+ class Forwarder;
+ class Inquirer;
+ class Request;
+ class Response;
} // namespace Ipc
#if ICAP_CLIENT
#include "adaptation/icap/icap_log.h"
#endif
+ #if USE_AUTH
#include "auth/Gadgets.h"
+ #endif
+#include "base/RunnersRegistry.h"
#include "base/TextException.h"
#if USE_DELAY_POOLS
#include "ClientDelayConfig.h"
kid->getPid(), kid->exitStatus());
} else if (kid->signaled()) {
syslog(LOG_NOTICE,
- "Squid Parent: child process %d exited due to signal %d with status %d",
+ "Squid Parent: %s process %d exited due to signal %d with status %d",
+ kid->name().termedBuf(),
kid->getPid(), kid->termSignal(), kid->exitStatus());
} else {
- syslog(LOG_NOTICE, "Squid Parent: child process %d exited", kid->getPid());
+ syslog(LOG_NOTICE, "Squid Parent: %s process %d exited",
+ kid->name().termedBuf(), kid->getPid());
}
- syslog(LOG_NOTICE, "Squid Parent: child process %d will not"
+ if (kid->hopeless()) {
- kid->getPid());
++ syslog(LOG_NOTICE, "Squid Parent: %s process %d will not"
+ " be restarted due to repeated, frequent failures",
++ kid->name().termedBuf(), kid->getPid());
+ }
} else {
syslog(LOG_NOTICE, "Squid Parent: unknown child process %d exited", pid);
}