]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
4 * DEBUG: section 54 Interprocess Communication
10 #include "base/TextException.h"
11 #include "CacheManager.h"
13 #include "ipc/Coordinator.h"
14 #include "ipc/FdNotes.h"
15 #include "ipc/SharedListen.h"
16 #include "mgr/Inquirer.h"
17 #include "mgr/Request.h"
18 #include "mgr/Response.h"
19 #include "mgr/StoreToCommWriter.h"
20 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
23 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
24 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
27 Ipc::Coordinator::Coordinator():
32 void Ipc::Coordinator::start()
37 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
39 typedef StrandCoords::iterator SI
;
40 for (SI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
41 if (iter
->kidId
== kidId
)
47 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
49 debugs(54, 3, HERE
<< "registering kid" << strand
.kidId
<<
51 if (StrandCoord
* found
= findStrand(strand
.kidId
))
54 strands_
.push_back(strand
);
56 // notify searchers waiting for this new strand, if any
57 typedef Searchers::iterator SRI
;
58 for (SRI i
= searchers
.begin(); i
!= searchers
.end();) {
59 if (i
->tag
== strand
.tag
) {
60 notifySearcher(*i
, strand
);
61 i
= searchers
.erase(i
);
68 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
70 switch (message
.type()) {
72 debugs(54, 6, HERE
<< "Registration request");
73 handleRegistrationRequest(HereIamMessage(message
));
76 case mtIpcIoRequest
: { // XXX: this should have been mtStrandSearchRequest
77 IpcIoRequest
io(message
);
78 StrandSearchRequest sr
;
79 sr
.requestorId
= io
.requestorId
;
80 sr
.requestId
= io
.requestId
;
81 sr
.tag
.limitInit(io
.buf
, io
.len
);
82 debugs(54, 6, HERE
<< "Strand search request: " << io
.requestorId
<< ' ' << io
.requestId
<< ' ' << io
.len
<< " cmd=" << io
.command
<< " tag: " << sr
.tag
);
83 handleSearchRequest(sr
);
87 case mtSharedListenRequest
:
88 debugs(54, 6, HERE
<< "Shared listen request");
89 handleSharedListenRequest(SharedListenRequest(message
));
92 case mtCacheMgrRequest
:
93 debugs(54, 6, HERE
<< "Cache manager request");
94 handleCacheMgrRequest(Mgr::Request(message
));
97 case mtCacheMgrResponse
:
98 debugs(54, 6, HERE
<< "Cache manager response");
99 handleCacheMgrResponse(Mgr::Response(message
));
103 debugs(54, 1, HERE
<< "Unhandled message type: " << message
.type());
108 void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage
& msg
)
110 registerStrand(msg
.strand
);
112 // send back an acknowledgement; TODO: remove as not needed?
115 SendMessage(MakeAddr(strandAddrPfx
, msg
.strand
.kidId
), message
);
119 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
121 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
122 " needs shared listen FD for " << request
.params
.addr
);
123 Listeners::const_iterator i
= listeners
.find(request
.params
);
125 const int sock
= (i
!= listeners
.end()) ?
126 i
->second
: openListenSocket(request
, errNo
);
128 debugs(54, 3, HERE
<< "sending shared listen FD " << sock
<< " for " <<
129 request
.params
.addr
<< " to kid" << request
.requestorId
<<
130 " mapId=" << request
.mapId
);
132 SharedListenResponse
response(sock
, errNo
, request
.mapId
);
134 response
.pack(message
);
135 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
139 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request
& request
)
143 // Let the strand know that we are now responsible for handling the request
144 Mgr::Response
response(request
.requestId
);
146 response
.pack(message
);
147 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
149 Mgr::Action::Pointer action
=
150 CacheManager::GetInstance()->createRequestedAction(request
.params
);
151 AsyncJob::Start(new Mgr::Inquirer(action
,
152 Mgr::ImportHttpFdIntoComm(request
.fd
), request
, strands_
));
156 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response
& response
)
158 Mgr::Inquirer::HandleRemoteAck(response
);
162 Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest
&request
)
164 // do we know of a strand with the given search tag?
165 const StrandCoord
*strand
= NULL
;
166 typedef StrandCoords::const_iterator SCCI
;
167 for (SCCI i
= strands_
.begin(); !strand
&& i
!= strands_
.end(); ++i
) {
168 if (i
->tag
== request
.tag
)
173 notifySearcher(request
, *strand
);
177 searchers
.push_back(request
);
178 debugs(54, 3, HERE
<< "cannot yet tell kid" << request
.requestorId
<<
179 " who " << request
.tag
<< " is");
183 Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest
&request
,
184 const StrandCoord
& strand
)
186 debugs(54, 3, HERE
<< "tell kid" << request
.requestorId
<< " that " <<
187 request
.tag
<< " is kid" << strand
.kidId
);
188 const StrandSearchResponse
response0(request
.requestId
, strand
);
189 // XXX: we should use StrandSearchResponse instead of converting it
191 io
.diskId
= strand
.kidId
;
192 io
.requestId
= request
.requestId
;
193 io
.command
= IpcIo::cmdOpen
;
196 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
201 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
204 const OpenListenerParams
&p
= request
.params
;
206 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
207 request
.requestorId
);
209 Ip::Address addr
= p
.addr
; // comm_open_listener may modify it
212 const int sock
= comm_open_listener(p
.sock_type
, p
.proto
, addr
, p
.flags
,
214 errNo
= (sock
>= 0) ? 0 : errno
;
217 // cache positive results
219 listeners
[request
.params
] = sock
;
224 void Ipc::Coordinator::broadcastSignal(int sig
) const
226 typedef StrandCoords::const_iterator SCI
;
227 for (SCI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
228 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
229 ", PID=" << iter
->pid
);
230 kill(iter
->pid
, sig
);
234 Ipc::Coordinator
* Ipc::Coordinator::Instance()
237 TheInstance
= new Coordinator
;
238 // XXX: if the Coordinator job quits, this pointer will become invalid
239 // we could make Coordinator death fatal, except during exit, but since
240 // Strands do not re-register, even process death would be pointless.
244 const Ipc::StrandCoords
&
245 Ipc::Coordinator::strands() const