2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 54 Interprocess Communication */
12 #include "base/Subscription.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
15 #include "CollapsedForwarding.h"
16 #include "comm/Connection.h"
19 #include "ipc/Messages.h"
20 #include "ipc/SharedListen.h"
21 #include "ipc/Strand.h"
22 #include "ipc/StrandCoord.h"
23 #include "ipc/StrandSearch.h"
24 #include "mgr/Forwarder.h"
25 #include "mgr/Request.h"
26 #include "mgr/Response.h"
27 #include "SwapDir.h" /* XXX: scope boundary violation */
28 #if HAVE_DISKIO_MODULE_IPCIO
29 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
32 #include "snmp/Forwarder.h"
33 #include "snmp/Request.h"
34 #include "snmp/Response.h"
37 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Strand
);
39 Ipc::Strand::Strand():
40 Port(MakeAddr(strandAddrLabel
, KidIdentifier
)),
45 void Ipc::Strand::start()
51 void Ipc::Strand::registerSelf()
56 HereIamMessage
ann(StrandCoord(KidIdentifier
, getpid()));
59 SendMessage(Port::CoordinatorAddr(), message
);
60 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
63 void Ipc::Strand::receive(const TypedMsgHdr
&message
)
65 debugs(54, 6, HERE
<< message
.type());
66 switch (message
.type()) {
69 handleRegistrationResponse(HereIamMessage(message
));
72 case mtSharedListenResponse
:
73 SharedListenJoined(SharedListenResponse(message
));
76 #if HAVE_DISKIO_MODULE_IPCIO
77 case mtStrandSearchResponse
:
78 IpcIoFile::HandleOpenResponse(StrandSearchResponse(message
));
81 case mtIpcIoNotification
:
82 IpcIoFile::HandleNotification(message
);
84 #endif /* HAVE_DISKIO_MODULE_IPCIO */
86 case mtCacheMgrRequest
: {
87 const Mgr::Request
req(message
);
88 handleCacheMgrRequest(req
);
92 case mtCacheMgrResponse
: {
93 const Mgr::Response
resp(message
);
94 handleCacheMgrResponse(resp
);
98 case mtCollapsedForwardingNotification
:
99 CollapsedForwarding::HandleNotification(message
);
103 case mtSnmpRequest
: {
104 const Snmp::Request
req(message
);
105 handleSnmpRequest(req
);
109 case mtSnmpResponse
: {
110 const Snmp::Response
resp(message
);
111 handleSnmpResponse(resp
);
117 debugs(54, DBG_IMPORTANT
, HERE
<< "Unhandled message type: " << message
.type());
122 void Ipc::Strand::handleRegistrationResponse(const HereIamMessage
&msg
)
124 // handle registration response from the coordinator; it could be stale
125 if (msg
.strand
.kidId
== KidIdentifier
&& msg
.strand
.pid
== getpid()) {
126 debugs(54, 6, "kid" << KidIdentifier
<< " registered");
127 clearTimeout(); // we are done
129 // could be an ACK to the registration message of our dead predecessor
130 debugs(54, 6, "kid" << KidIdentifier
<< " is not yet registered");
131 // keep listening, with a timeout
135 void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request
& request
)
137 Mgr::Action::Pointer action
=
138 CacheManager::GetInstance()->createRequestedAction(request
.params
);
139 action
->respond(request
);
142 void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response
& response
)
144 Mgr::Forwarder::HandleRemoteAck(response
.requestId
);
148 void Ipc::Strand::handleSnmpRequest(const Snmp::Request
& request
)
151 Snmp::SendResponse(request
.requestId
, request
.pdu
);
154 void Ipc::Strand::handleSnmpResponse(const Snmp::Response
& response
)
157 Snmp::Forwarder::HandleRemoteAck(response
.requestId
);
161 void Ipc::Strand::timedout()
163 debugs(54, 6, HERE
<< isRegistered
);
165 fatalf("kid%d registration timed out", KidIdentifier
);