]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/Subscription.h"
10 #include "base/TextException.h"
11 #include "CacheManager.h"
13 #include "comm/Connection.h"
14 #include "ipc/Coordinator.h"
15 #include "ipc/SharedListen.h"
16 #include "mgr/Inquirer.h"
17 #include "mgr/Request.h"
18 #include "mgr/Response.h"
21 #include "snmp/Inquirer.h"
22 #include "snmp/Request.h"
23 #include "snmp/Response.h"
29 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
30 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
32 Ipc::Coordinator::Coordinator():
37 void Ipc::Coordinator::start()
42 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
44 typedef StrandCoords::iterator SI
;
45 for (SI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
46 if (iter
->kidId
== kidId
)
52 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
54 debugs(54, 3, HERE
<< "registering kid" << strand
.kidId
<<
56 if (StrandCoord
* found
= findStrand(strand
.kidId
)) {
57 const String oldTag
= found
->tag
;
59 if (oldTag
.size() && !strand
.tag
.size())
60 found
->tag
= oldTag
; // keep more detailed info (XXX?)
62 strands_
.push_back(strand
);
65 // notify searchers waiting for this new strand, if any
66 typedef Searchers::iterator SRI
;
67 for (SRI i
= searchers
.begin(); i
!= searchers
.end();) {
68 if (i
->tag
== strand
.tag
) {
69 notifySearcher(*i
, strand
);
70 i
= searchers
.erase(i
);
77 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
79 switch (message
.type()) {
81 debugs(54, 6, HERE
<< "Registration request");
82 handleRegistrationRequest(HereIamMessage(message
));
85 case mtStrandSearchRequest
: {
86 const StrandSearchRequest
sr(message
);
87 debugs(54, 6, HERE
<< "Strand search request: " << sr
.requestorId
<<
89 handleSearchRequest(sr
);
93 case mtSharedListenRequest
:
94 debugs(54, 6, HERE
<< "Shared listen request");
95 handleSharedListenRequest(SharedListenRequest(message
));
98 case mtCacheMgrRequest
: {
99 debugs(54, 6, HERE
<< "Cache manager request");
100 const Mgr::Request
req(message
);
101 handleCacheMgrRequest(req
);
105 case mtCacheMgrResponse
: {
106 debugs(54, 6, HERE
<< "Cache manager response");
107 const Mgr::Response
resp(message
);
108 handleCacheMgrResponse(resp
);
113 case mtSnmpRequest
: {
114 debugs(54, 6, HERE
<< "SNMP request");
115 const Snmp::Request
req(message
);
116 handleSnmpRequest(req
);
120 case mtSnmpResponse
: {
121 debugs(54, 6, HERE
<< "SNMP response");
122 const Snmp::Response
resp(message
);
123 handleSnmpResponse(resp
);
129 debugs(54, DBG_IMPORTANT
, HERE
<< "Unhandled message type: " << message
.type());
134 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage
& msg
)
136 registerStrand(msg
.strand
);
138 // send back an acknowledgement; TODO: remove as not needed?
141 SendMessage(MakeAddr(strandAddrPfx
, msg
.strand
.kidId
), message
);
145 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
147 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
148 " needs shared listen FD for " << request
.params
.addr
);
149 Listeners::const_iterator i
= listeners
.find(request
.params
);
151 const Comm::ConnectionPointer c
= (i
!= listeners
.end()) ?
152 i
->second
: openListenSocket(request
, errNo
);
154 debugs(54, 3, HERE
<< "sending shared listen " << c
<< " for " <<
155 request
.params
.addr
<< " to kid" << request
.requestorId
<<
156 " mapId=" << request
.mapId
);
158 SharedListenResponse
response(c
->fd
, errNo
, request
.mapId
);
160 response
.pack(message
);
161 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
165 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request
& request
)
170 Mgr::Action::Pointer action
=
171 CacheManager::GetInstance()->createRequestedAction(request
.params
);
172 AsyncJob::Start(new Mgr::Inquirer(action
, request
, strands_
));
173 } catch (const std::exception
&ex
) {
174 debugs(54, DBG_IMPORTANT
, "BUG: cannot aggregate mgr:" <<
175 request
.params
.actionName
<< ": " << ex
.what());
176 // TODO: Avoid half-baked Connections or teach them how to close.
177 ::close(request
.conn
->fd
);
178 request
.conn
->fd
= -1;
179 return; // the worker will timeout and close
182 // Let the strand know that we are now responsible for handling the request
183 Mgr::Response
response(request
.requestId
);
185 response
.pack(message
);
186 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
191 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response
& response
)
193 Mgr::Inquirer::HandleRemoteAck(response
);
197 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest
&request
)
199 // do we know of a strand with the given search tag?
200 const StrandCoord
*strand
= NULL
;
201 typedef StrandCoords::const_iterator SCCI
;
202 for (SCCI i
= strands_
.begin(); !strand
&& i
!= strands_
.end(); ++i
) {
203 if (i
->tag
== request
.tag
)
208 notifySearcher(request
, *strand
);
212 searchers
.push_back(request
);
213 debugs(54, 3, HERE
<< "cannot yet tell kid" << request
.requestorId
<<
214 " who " << request
.tag
<< " is");
218 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest
&request
,
219 const StrandCoord
& strand
)
221 debugs(54, 3, HERE
<< "tell kid" << request
.requestorId
<< " that " <<
222 request
.tag
<< " is kid" << strand
.kidId
);
223 const StrandSearchResponse
response(strand
);
225 response
.pack(message
);
226 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
231 Ipc::Coordinator::handleSnmpRequest(const Snmp::Request
& request
)
235 Snmp::Response
response(request
.requestId
);
237 response
.pack(message
);
238 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
240 AsyncJob::Start(new Snmp::Inquirer(request
, strands_
));
244 Ipc::Coordinator::handleSnmpResponse(const Snmp::Response
& response
)
247 Snmp::Inquirer::HandleRemoteAck(response
);
251 Comm::ConnectionPointer
252 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
255 const OpenListenerParams
&p
= request
.params
;
257 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
258 request
.requestorId
);
260 Comm::ConnectionPointer conn
= new Comm::Connection
;
261 conn
->local
= p
.addr
; // comm_open_listener may modify it
262 conn
->flags
= p
.flags
;
265 comm_open_listener(p
.sock_type
, p
.proto
, conn
, FdNote(p
.fdNote
));
266 errNo
= Comm::IsConnOpen(conn
) ? 0 : errno
;
269 debugs(54, 6, HERE
<< "tried listening on " << conn
<< " for kid" <<
270 request
.requestorId
);
272 // cache positive results
273 if (Comm::IsConnOpen(conn
))
274 listeners
[request
.params
] = conn
;
279 void Ipc::Coordinator::broadcastSignal(int sig
) const
281 typedef StrandCoords::const_iterator SCI
;
282 for (SCI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
283 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
284 ", PID=" << iter
->pid
);
285 kill(iter
->pid
, sig
);
289 Ipc::Coordinator
* Ipc::Coordinator::Instance()
292 TheInstance
= new Coordinator
;
293 // XXX: if the Coordinator job quits, this pointer will become invalid
294 // we could make Coordinator death fatal, except during exit, but since
295 // Strands do not re-register, even process death would be pointless.
299 const Ipc::StrandCoords
&
300 Ipc::Coordinator::strands() const