]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Strand.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/Subscription.h"
10 #include "base/TextException.h"
11 #include "comm/Connection.h"
13 #include "ipc/Strand.h"
14 #include "ipc/StrandCoord.h"
15 #include "ipc/Messages.h"
16 #include "ipc/SharedListen.h"
17 #include "ipc/StrandSearch.h"
19 #include "mgr/Request.h"
20 #include "mgr/Response.h"
21 #include "mgr/Forwarder.h"
22 #include "SwapDir.h" /* XXX: scope boundary violation */
23 #include "CacheManager.h"
25 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
28 #include "snmp/Forwarder.h"
29 #include "snmp/Request.h"
30 #include "snmp/Response.h"
33 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Strand
);
36 Ipc::Strand::Strand():
37 Port(MakeAddr(strandAddrPfx
, KidIdentifier
)),
42 void Ipc::Strand::start()
48 void Ipc::Strand::registerSelf()
53 HereIamMessage
ann(StrandCoord(KidIdentifier
, getpid()));
56 SendMessage(coordinatorAddr
, message
);
57 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
60 void Ipc::Strand::receive(const TypedMsgHdr
&message
)
62 debugs(54, 6, HERE
<< message
.type());
63 switch (message
.type()) {
66 handleRegistrationResponse(HereIamMessage(message
));
69 case mtSharedListenResponse
:
70 SharedListenJoined(SharedListenResponse(message
));
74 case mtStrandSearchResponse
:
75 IpcIoFile::HandleOpenResponse(StrandSearchResponse(message
));
78 case mtIpcIoNotification
:
79 IpcIoFile::HandleNotification(message
);
81 #endif /* USE_DISKIO_IPCIO */
83 case mtCacheMgrRequest
: {
84 const Mgr::Request
req(message
);
85 handleCacheMgrRequest(req
);
89 case mtCacheMgrResponse
: {
90 const Mgr::Response
resp(message
);
91 handleCacheMgrResponse(resp
);
97 const Snmp::Request
req(message
);
98 handleSnmpRequest(req
);
102 case mtSnmpResponse
: {
103 const Snmp::Response
resp(message
);
104 handleSnmpResponse(resp
);
110 debugs(54, DBG_IMPORTANT
, HERE
<< "Unhandled message type: " << message
.type());
115 void Ipc::Strand::handleRegistrationResponse(const HereIamMessage
&msg
)
117 // handle registration response from the coordinator; it could be stale
118 if (msg
.strand
.kidId
== KidIdentifier
&& msg
.strand
.pid
== getpid()) {
119 debugs(54, 6, "kid" << KidIdentifier
<< " registered");
120 clearTimeout(); // we are done
122 // could be an ACK to the registration message of our dead predecessor
123 debugs(54, 6, "kid" << KidIdentifier
<< " is not yet registered");
124 // keep listening, with a timeout
128 void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request
& request
)
130 Mgr::Action::Pointer action
=
131 CacheManager::GetInstance()->createRequestedAction(request
.params
);
132 action
->respond(request
);
135 void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response
& response
)
137 Mgr::Forwarder::HandleRemoteAck(response
.requestId
);
141 void Ipc::Strand::handleSnmpRequest(const Snmp::Request
& request
)
144 Snmp::SendResponse(request
.requestId
, request
.pdu
);
147 void Ipc::Strand::handleSnmpResponse(const Snmp::Response
& response
)
150 Snmp::Forwarder::HandleRemoteAck(response
.requestId
);
154 void Ipc::Strand::timedout()
156 debugs(54, 6, HERE
<< isRegistered
);
158 fatalf("kid%d registration timed out", KidIdentifier
);