]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
4 * DEBUG: section 54 Interprocess Communication
10 #include "base/Subscription.h"
11 #include "base/TextException.h"
12 #include "CacheManager.h"
14 #include "comm/Connection.h"
15 #include "ipc/Coordinator.h"
16 #include "ipc/SharedListen.h"
17 #include "mgr/Inquirer.h"
18 #include "mgr/Request.h"
19 #include "mgr/Response.h"
22 #include "snmp/Inquirer.h"
23 #include "snmp/Request.h"
24 #include "snmp/Response.h"
27 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
28 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
31 Ipc::Coordinator::Coordinator():
36 void Ipc::Coordinator::start()
41 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
43 typedef StrandCoords::iterator SI
;
44 for (SI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
45 if (iter
->kidId
== kidId
)
51 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
53 debugs(54, 3, HERE
<< "registering kid" << strand
.kidId
<<
55 if (StrandCoord
* found
= findStrand(strand
.kidId
)) {
56 const String oldTag
= found
->tag
;
58 if (oldTag
.size() && !strand
.tag
.size())
59 found
->tag
= oldTag
; // keep more detailed info (XXX?)
61 strands_
.push_back(strand
);
64 // notify searchers waiting for this new strand, if any
65 typedef Searchers::iterator SRI
;
66 for (SRI i
= searchers
.begin(); i
!= searchers
.end();) {
67 if (i
->tag
== strand
.tag
) {
68 notifySearcher(*i
, strand
);
69 i
= searchers
.erase(i
);
76 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
78 switch (message
.type()) {
80 debugs(54, 6, HERE
<< "Registration request");
81 handleRegistrationRequest(HereIamMessage(message
));
84 case mtStrandSearchRequest
: {
85 const StrandSearchRequest
sr(message
);
86 debugs(54, 6, HERE
<< "Strand search request: " << sr
.requestorId
<<
88 handleSearchRequest(sr
);
92 case mtSharedListenRequest
:
93 debugs(54, 6, HERE
<< "Shared listen request");
94 handleSharedListenRequest(SharedListenRequest(message
));
97 case mtCacheMgrRequest
: {
98 debugs(54, 6, HERE
<< "Cache manager request");
99 const Mgr::Request
req(message
);
100 handleCacheMgrRequest(req
);
104 case mtCacheMgrResponse
: {
105 debugs(54, 6, HERE
<< "Cache manager response");
106 const Mgr::Response
resp(message
);
107 handleCacheMgrResponse(resp
);
112 case mtSnmpRequest
: {
113 debugs(54, 6, HERE
<< "SNMP request");
114 const Snmp::Request
req(message
);
115 handleSnmpRequest(req
);
119 case mtSnmpResponse
: {
120 debugs(54, 6, HERE
<< "SNMP response");
121 const Snmp::Response
resp(message
);
122 handleSnmpResponse(resp
);
128 debugs(54, DBG_IMPORTANT
, HERE
<< "Unhandled message type: " << message
.type());
133 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage
& msg
)
135 registerStrand(msg
.strand
);
137 // send back an acknowledgement; TODO: remove as not needed?
140 SendMessage(MakeAddr(strandAddrPfx
, msg
.strand
.kidId
), message
);
144 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
146 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
147 " needs shared listen FD for " << request
.params
.addr
);
148 Listeners::const_iterator i
= listeners
.find(request
.params
);
150 const Comm::ConnectionPointer c
= (i
!= listeners
.end()) ?
151 i
->second
: openListenSocket(request
, errNo
);
153 debugs(54, 3, HERE
<< "sending shared listen " << c
<< " for " <<
154 request
.params
.addr
<< " to kid" << request
.requestorId
<<
155 " mapId=" << request
.mapId
);
157 SharedListenResponse
response(c
->fd
, errNo
, request
.mapId
);
159 response
.pack(message
);
160 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
164 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request
& request
)
169 Mgr::Action::Pointer action
=
170 CacheManager::GetInstance()->createRequestedAction(request
.params
);
171 AsyncJob::Start(new Mgr::Inquirer(action
, request
, strands_
));
172 } catch (const std::exception
&ex
) {
173 debugs(54, DBG_IMPORTANT
, "BUG: cannot aggregate mgr:" <<
174 request
.params
.actionName
<< ": " << ex
.what());
175 // TODO: Avoid half-baked Connections or teach them how to close.
176 ::close(request
.conn
->fd
);
177 request
.conn
->fd
= -1;
178 return; // the worker will timeout and close
181 // Let the strand know that we are now responsible for handling the request
182 Mgr::Response
response(request
.requestId
);
184 response
.pack(message
);
185 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
190 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response
& response
)
192 Mgr::Inquirer::HandleRemoteAck(response
);
196 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest
&request
)
198 // do we know of a strand with the given search tag?
199 const StrandCoord
*strand
= NULL
;
200 typedef StrandCoords::const_iterator SCCI
;
201 for (SCCI i
= strands_
.begin(); !strand
&& i
!= strands_
.end(); ++i
) {
202 if (i
->tag
== request
.tag
)
207 notifySearcher(request
, *strand
);
211 searchers
.push_back(request
);
212 debugs(54, 3, HERE
<< "cannot yet tell kid" << request
.requestorId
<<
213 " who " << request
.tag
<< " is");
217 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest
&request
,
218 const StrandCoord
& strand
)
220 debugs(54, 3, HERE
<< "tell kid" << request
.requestorId
<< " that " <<
221 request
.tag
<< " is kid" << strand
.kidId
);
222 const StrandSearchResponse
response(strand
);
224 response
.pack(message
);
225 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
230 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request
& request
)
234 Snmp::Response
response(request
.requestId
);
236 response
.pack(message
);
237 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
239 AsyncJob::Start(new Snmp::Inquirer(request
, strands_
));
243 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response
& response
)
246 Snmp::Inquirer::HandleRemoteAck(response
);
250 Comm::ConnectionPointer
251 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
254 const OpenListenerParams
&p
= request
.params
;
256 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
257 request
.requestorId
);
259 Comm::ConnectionPointer conn
= new Comm::Connection
;
260 conn
->local
= p
.addr
; // comm_open_listener may modify it
261 conn
->flags
= p
.flags
;
264 comm_open_listener(p
.sock_type
, p
.proto
, conn
, FdNote(p
.fdNote
));
265 errNo
= Comm::IsConnOpen(conn
) ? 0 : errno
;
268 debugs(54, 6, HERE
<< "tried listening on " << conn
<< " for kid" <<
269 request
.requestorId
);
271 // cache positive results
272 if (Comm::IsConnOpen(conn
))
273 listeners
[request
.params
] = conn
;
278 void Ipc::Coordinator::broadcastSignal(int sig
) const
280 typedef StrandCoords::const_iterator SCI
;
281 for (SCI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
282 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
283 ", PID=" << iter
->pid
);
284 kill(iter
->pid
, sig
);
288 Ipc::Coordinator
* Ipc::Coordinator::Instance()
291 TheInstance
= new Coordinator
;
292 // XXX: if the Coordinator job quits, this pointer will become invalid
293 // we could make Coordinator death fatal, except during exit, but since
294 // Strands do not re-register, even process death would be pointless.
298 const Ipc::StrandCoords
&
299 Ipc::Coordinator::strands() const