]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
4 * DEBUG: section 54 Interprocess Communication
10 #include "base/Subscription.h"
11 #include "base/TextException.h"
12 #include "CacheManager.h"
14 #include "comm/Connection.h"
15 #include "ipc/Coordinator.h"
16 #include "ipc/SharedListen.h"
17 #include "mgr/Inquirer.h"
18 #include "mgr/Request.h"
19 #include "mgr/Response.h"
21 #include "snmp/Inquirer.h"
22 #include "snmp/Request.h"
23 #include "snmp/Response.h"
26 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
27 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
30 Ipc::Coordinator::Coordinator():
35 void Ipc::Coordinator::start()
40 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
42 typedef StrandCoords::iterator SI
;
43 for (SI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
44 if (iter
->kidId
== kidId
)
50 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
52 debugs(54, 3, HERE
<< "registering kid" << strand
.kidId
<<
54 if (StrandCoord
* found
= findStrand(strand
.kidId
)) {
55 const String oldTag
= found
->tag
;
57 if (oldTag
.size() && !strand
.tag
.size())
58 found
->tag
= oldTag
; // keep more detailed info (XXX?)
60 strands_
.push_back(strand
);
63 // notify searchers waiting for this new strand, if any
64 typedef Searchers::iterator SRI
;
65 for (SRI i
= searchers
.begin(); i
!= searchers
.end();) {
66 if (i
->tag
== strand
.tag
) {
67 notifySearcher(*i
, strand
);
68 i
= searchers
.erase(i
);
75 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
77 switch (message
.type()) {
79 debugs(54, 6, HERE
<< "Registration request");
80 handleRegistrationRequest(HereIamMessage(message
));
83 case mtStrandSearchRequest
: {
84 const StrandSearchRequest
sr(message
);
85 debugs(54, 6, HERE
<< "Strand search request: " << sr
.requestorId
<<
87 handleSearchRequest(sr
);
91 case mtSharedListenRequest
:
92 debugs(54, 6, HERE
<< "Shared listen request");
93 handleSharedListenRequest(SharedListenRequest(message
));
96 case mtCacheMgrRequest
: {
97 debugs(54, 6, HERE
<< "Cache manager request");
98 const Mgr::Request
req(message
);
99 handleCacheMgrRequest(req
);
103 case mtCacheMgrResponse
: {
104 debugs(54, 6, HERE
<< "Cache manager response");
105 const Mgr::Response
resp(message
);
106 handleCacheMgrResponse(resp
);
111 case mtSnmpRequest
: {
112 debugs(54, 6, HERE
<< "SNMP request");
113 const Snmp::Request
req(message
);
114 handleSnmpRequest(req
);
118 case mtSnmpResponse
: {
119 debugs(54, 6, HERE
<< "SNMP response");
120 const Snmp::Response
resp(message
);
121 handleSnmpResponse(resp
);
127 debugs(54, 1, HERE
<< "Unhandled message type: " << message
.type());
132 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage
& msg
)
134 registerStrand(msg
.strand
);
136 // send back an acknowledgement; TODO: remove as not needed?
139 SendMessage(MakeAddr(strandAddrPfx
, msg
.strand
.kidId
), message
);
143 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
145 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
146 " needs shared listen FD for " << request
.params
.addr
);
147 Listeners::const_iterator i
= listeners
.find(request
.params
);
149 const Comm::ConnectionPointer c
= (i
!= listeners
.end()) ?
150 i
->second
: openListenSocket(request
, errNo
);
152 debugs(54, 3, HERE
<< "sending shared listen " << c
<< " for " <<
153 request
.params
.addr
<< " to kid" << request
.requestorId
<<
154 " mapId=" << request
.mapId
);
156 SharedListenResponse
response(c
->fd
, errNo
, request
.mapId
);
158 response
.pack(message
);
159 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
163 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request
& request
)
168 Mgr::Action::Pointer action
=
169 CacheManager::GetInstance()->createRequestedAction(request
.params
);
170 AsyncJob::Start(new Mgr::Inquirer(action
, request
, strands_
));
171 } catch (const std::exception
&ex
) {
172 debugs(54, DBG_IMPORTANT
, "BUG: cannot aggregate mgr:" <<
173 request
.params
.actionName
<< ": " << ex
.what());
174 // TODO: Avoid half-baked Connections or teach them how to close.
175 ::close(request
.conn
->fd
);
176 request
.conn
->fd
= -1;
177 return; // the worker will timeout and close
180 // Let the strand know that we are now responsible for handling the request
181 Mgr::Response
response(request
.requestId
);
183 response
.pack(message
);
184 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
189 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response
& response
)
191 Mgr::Inquirer::HandleRemoteAck(response
);
195 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest
&request
)
197 // do we know of a strand with the given search tag?
198 const StrandCoord
*strand
= NULL
;
199 typedef StrandCoords::const_iterator SCCI
;
200 for (SCCI i
= strands_
.begin(); !strand
&& i
!= strands_
.end(); ++i
) {
201 if (i
->tag
== request
.tag
)
206 notifySearcher(request
, *strand
);
210 searchers
.push_back(request
);
211 debugs(54, 3, HERE
<< "cannot yet tell kid" << request
.requestorId
<<
212 " who " << request
.tag
<< " is");
216 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest
&request
,
217 const StrandCoord
& strand
)
219 debugs(54, 3, HERE
<< "tell kid" << request
.requestorId
<< " that " <<
220 request
.tag
<< " is kid" << strand
.kidId
);
221 const StrandSearchResponse
response(strand
);
223 response
.pack(message
);
224 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
229 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request
& request
)
233 Snmp::Response
response(request
.requestId
);
235 response
.pack(message
);
236 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
238 AsyncJob::Start(new Snmp::Inquirer(request
, strands_
));
242 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response
& response
)
245 Snmp::Inquirer::HandleRemoteAck(response
);
249 Comm::ConnectionPointer
250 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
253 const OpenListenerParams
&p
= request
.params
;
255 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
256 request
.requestorId
);
258 Comm::ConnectionPointer conn
= new Comm::Connection
;
259 conn
->local
= p
.addr
; // comm_open_listener may modify it
260 conn
->flags
= p
.flags
;
263 comm_open_listener(p
.sock_type
, p
.proto
, conn
, FdNote(p
.fdNote
));
264 errNo
= Comm::IsConnOpen(conn
) ? 0 : errno
;
267 debugs(54, 6, HERE
<< "tried listening on " << conn
<< " for kid" <<
268 request
.requestorId
);
270 // cache positive results
271 if (Comm::IsConnOpen(conn
))
272 listeners
[request
.params
] = conn
;
277 void Ipc::Coordinator::broadcastSignal(int sig
) const
279 typedef StrandCoords::const_iterator SCI
;
280 for (SCI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
281 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
282 ", PID=" << iter
->pid
);
283 kill(iter
->pid
, sig
);
287 Ipc::Coordinator
* Ipc::Coordinator::Instance()
290 TheInstance
= new Coordinator
;
291 // XXX: if the Coordinator job quits, this pointer will become invalid
292 // we could make Coordinator death fatal, except during exit, but since
293 // Strands do not re-register, even process death would be pointless.
297 const Ipc::StrandCoords
&
298 Ipc::Coordinator::strands() const