2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 54 Interprocess Communication */
12 #include "base/Subscription.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
16 #include "comm/Connection.h"
17 #include "ipc/Coordinator.h"
18 #include "ipc/SharedListen.h"
19 #include "mgr/Inquirer.h"
20 #include "mgr/Request.h"
21 #include "mgr/Response.h"
24 #include "snmp/Inquirer.h"
25 #include "snmp/Request.h"
26 #include "snmp/Response.h"
31 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
32 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
34 Ipc::Coordinator::Coordinator():
35 Port(Ipc::Port::CoordinatorAddr())
39 void Ipc::Coordinator::start()
44 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
46 typedef StrandCoords::iterator SI
;
47 for (SI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
48 if (iter
->kidId
== kidId
)
54 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
56 debugs(54, 3, HERE
<< "registering kid" << strand
.kidId
<<
58 if (StrandCoord
* found
= findStrand(strand
.kidId
)) {
59 const String oldTag
= found
->tag
;
61 if (oldTag
.size() && !strand
.tag
.size())
62 found
->tag
= oldTag
; // keep more detailed info (XXX?)
64 strands_
.push_back(strand
);
67 // notify searchers waiting for this new strand, if any
68 typedef Searchers::iterator SRI
;
69 for (SRI i
= searchers
.begin(); i
!= searchers
.end();) {
70 if (i
->tag
== strand
.tag
) {
71 notifySearcher(*i
, strand
);
72 i
= searchers
.erase(i
);
79 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
81 switch (message
.type()) {
83 debugs(54, 6, HERE
<< "Registration request");
84 handleRegistrationRequest(HereIamMessage(message
));
87 case mtStrandSearchRequest
: {
88 const StrandSearchRequest
sr(message
);
89 debugs(54, 6, HERE
<< "Strand search request: " << sr
.requestorId
<<
91 handleSearchRequest(sr
);
95 case mtSharedListenRequest
:
96 debugs(54, 6, HERE
<< "Shared listen request");
97 handleSharedListenRequest(SharedListenRequest(message
));
100 case mtCacheMgrRequest
: {
101 debugs(54, 6, HERE
<< "Cache manager request");
102 const Mgr::Request
req(message
);
103 handleCacheMgrRequest(req
);
107 case mtCacheMgrResponse
: {
108 debugs(54, 6, HERE
<< "Cache manager response");
109 const Mgr::Response
resp(message
);
110 handleCacheMgrResponse(resp
);
115 case mtSnmpRequest
: {
116 debugs(54, 6, HERE
<< "SNMP request");
117 const Snmp::Request
req(message
);
118 handleSnmpRequest(req
);
122 case mtSnmpResponse
: {
123 debugs(54, 6, HERE
<< "SNMP response");
124 const Snmp::Response
resp(message
);
125 handleSnmpResponse(resp
);
131 debugs(54, DBG_IMPORTANT
, HERE
<< "Unhandled message type: " << message
.type());
136 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage
& msg
)
138 registerStrand(msg
.strand
);
140 // send back an acknowledgement; TODO: remove as not needed?
143 SendMessage(MakeAddr(strandAddrLabel
, msg
.strand
.kidId
), message
);
147 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
149 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
150 " needs shared listen FD for " << request
.params
.addr
);
151 Listeners::const_iterator i
= listeners
.find(request
.params
);
153 const Comm::ConnectionPointer c
= (i
!= listeners
.end()) ?
154 i
->second
: openListenSocket(request
, errNo
);
156 debugs(54, 3, HERE
<< "sending shared listen " << c
<< " for " <<
157 request
.params
.addr
<< " to kid" << request
.requestorId
<<
158 " mapId=" << request
.mapId
);
160 SharedListenResponse
response(c
->fd
, errNo
, request
.mapId
);
162 response
.pack(message
);
163 SendMessage(MakeAddr(strandAddrLabel
, request
.requestorId
), message
);
167 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request
& request
)
172 Mgr::Action::Pointer action
=
173 CacheManager::GetInstance()->createRequestedAction(request
.params
);
174 AsyncJob::Start(new Mgr::Inquirer(action
, request
, strands_
));
175 } catch (const std::exception
&ex
) {
176 debugs(54, DBG_IMPORTANT
, "BUG: cannot aggregate mgr:" <<
177 request
.params
.actionName
<< ": " << ex
.what());
178 // TODO: Avoid half-baked Connections or teach them how to close.
179 ::close(request
.conn
->fd
);
180 request
.conn
->fd
= -1;
181 return; // the worker will timeout and close
184 // Let the strand know that we are now responsible for handling the request
185 Mgr::Response
response(request
.requestId
);
187 response
.pack(message
);
188 SendMessage(MakeAddr(strandAddrLabel
, request
.requestorId
), message
);
193 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response
& response
)
195 Mgr::Inquirer::HandleRemoteAck(response
);
199 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest
&request
)
201 // do we know of a strand with the given search tag?
202 const StrandCoord
*strand
= NULL
;
203 typedef StrandCoords::const_iterator SCCI
;
204 for (SCCI i
= strands_
.begin(); !strand
&& i
!= strands_
.end(); ++i
) {
205 if (i
->tag
== request
.tag
)
210 notifySearcher(request
, *strand
);
214 searchers
.push_back(request
);
215 debugs(54, 3, HERE
<< "cannot yet tell kid" << request
.requestorId
<<
216 " who " << request
.tag
<< " is");
220 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest
&request
,
221 const StrandCoord
& strand
)
223 debugs(54, 3, HERE
<< "tell kid" << request
.requestorId
<< " that " <<
224 request
.tag
<< " is kid" << strand
.kidId
);
225 const StrandSearchResponse
response(strand
);
227 response
.pack(message
);
228 SendMessage(MakeAddr(strandAddrLabel
, request
.requestorId
), message
);
233 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request
& request
)
237 Snmp::Response
response(request
.requestId
);
239 response
.pack(message
);
240 SendMessage(MakeAddr(strandAddrLabel
, request
.requestorId
), message
);
242 AsyncJob::Start(new Snmp::Inquirer(request
, strands_
));
246 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response
& response
)
249 Snmp::Inquirer::HandleRemoteAck(response
);
253 Comm::ConnectionPointer
254 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
257 const OpenListenerParams
&p
= request
.params
;
259 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
260 request
.requestorId
);
262 Comm::ConnectionPointer newConn
= new Comm::Connection
;
263 newConn
->local
= p
.addr
; // comm_open_listener may modify it
264 newConn
->flags
= p
.flags
;
267 comm_open_listener(p
.sock_type
, p
.proto
, newConn
, FdNote(p
.fdNote
));
268 errNo
= Comm::IsConnOpen(newConn
) ? 0 : errno
;
271 debugs(54, 6, HERE
<< "tried listening on " << newConn
<< " for kid" <<
272 request
.requestorId
);
274 // cache positive results
275 if (Comm::IsConnOpen(newConn
))
276 listeners
[request
.params
] = newConn
;
281 void Ipc::Coordinator::broadcastSignal(int sig
) const
283 typedef StrandCoords::const_iterator SCI
;
284 for (SCI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
285 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
286 ", PID=" << iter
->pid
);
287 kill(iter
->pid
, sig
);
291 Ipc::Coordinator
* Ipc::Coordinator::Instance()
294 TheInstance
= new Coordinator
;
295 // XXX: if the Coordinator job quits, this pointer will become invalid
296 // we could make Coordinator death fatal, except during exit, but since
297 // Strands do not re-register, even process death would be pointless.
301 const Ipc::StrandCoords
&
302 Ipc::Coordinator::strands() const