]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
fc38a7d27373b9f7c2bb36054a57c63bc4828d49
4 * DEBUG: section 54 Interprocess Communication
10 #include "base/Subscription.h"
11 #include "base/TextException.h"
13 #include "comm/Connection.h"
14 #include "ipc/Coordinator.h"
15 #include "ipc/FdNotes.h"
16 #include "ipc/SharedListen.h"
17 #include "mgr/Inquirer.h"
18 #include "mgr/Request.h"
19 #include "mgr/Response.h"
20 #include "mgr/StoreToCommWriter.h"
23 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
24 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
27 Ipc::Coordinator::Coordinator():
32 void Ipc::Coordinator::start()
37 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
39 typedef StrandCoords::iterator SI
;
40 for (SI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
41 if (iter
->kidId
== kidId
)
47 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
49 if (StrandCoord
* found
= findStrand(strand
.kidId
))
52 strands_
.push_back(strand
);
55 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
57 switch (message
.type()) {
59 debugs(54, 6, HERE
<< "Registration request");
60 handleRegistrationRequest(StrandCoord(message
));
63 case mtSharedListenRequest
:
64 debugs(54, 6, HERE
<< "Shared listen request");
65 handleSharedListenRequest(SharedListenRequest(message
));
68 case mtCacheMgrRequest
:
69 debugs(54, 6, HERE
<< "Cache manager request");
70 handleCacheMgrRequest(Mgr::Request(message
));
73 case mtCacheMgrResponse
:
74 debugs(54, 6, HERE
<< "Cache manager response");
75 handleCacheMgrResponse(Mgr::Response(message
));
79 debugs(54, 1, HERE
<< "Unhandled message type: " << message
.type());
84 void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord
& strand
)
86 registerStrand(strand
);
88 // send back an acknowledgement; TODO: remove as not needed?
91 SendMessage(MakeAddr(strandAddrPfx
, strand
.kidId
), message
);
95 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
97 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
98 " needs shared listen FD for " << request
.params
.addr
);
99 Listeners::const_iterator i
= listeners
.find(request
.params
);
101 const Comm::ConnectionPointer c
= (i
!= listeners
.end()) ?
102 i
->second
: openListenSocket(request
, errNo
);
104 debugs(54, 3, HERE
<< "sending shared listen " << c
<< " for " <<
105 request
.params
.addr
<< " to kid" << request
.requestorId
<<
106 " mapId=" << request
.mapId
);
108 SharedListenResponse
response(c
, errNo
, request
.mapId
);
110 response
.pack(message
);
111 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
115 Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request
& request
)
119 // Let the strand know that we are now responsible for handling the request
120 Mgr::Response
response(request
.requestId
);
122 response
.pack(message
);
123 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
125 Mgr::Action::Pointer action
=
126 CacheManager::GetInstance()->createRequestedAction(request
.params
);
127 AsyncJob::Start(new Mgr::Inquirer(action
,
128 Mgr::ImportHttpFdIntoComm(request
.fd
), request
, strands_
));
132 Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response
& response
)
134 Mgr::Inquirer::HandleRemoteAck(response
);
137 Comm::ConnectionPointer
138 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
141 const OpenListenerParams
&p
= request
.params
;
143 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
144 request
.requestorId
);
146 Comm::ConnectionPointer conn
= new Comm::Connection
;
147 conn
->local
= p
.addr
; // comm_open_listener may modify it
148 conn
->flags
= p
.flags
;
151 comm_open_listener(p
.sock_type
, p
.proto
, conn
, FdNote(p
.fdNote
));
152 errNo
= Comm::IsConnOpen(conn
) ? 0 : errno
;
155 debugs(54, 6, HERE
<< "tried listening on " << conn
<< " for kid" <<
156 request
.requestorId
);
158 // cache positive results
159 if (Comm::IsConnOpen(conn
))
160 listeners
[request
.params
] = conn
;
165 void Ipc::Coordinator::broadcastSignal(int sig
) const
167 typedef StrandCoords::const_iterator SCI
;
168 for (SCI iter
= strands_
.begin(); iter
!= strands_
.end(); ++iter
) {
169 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
170 ", PID=" << iter
->pid
);
171 kill(iter
->pid
, sig
);
175 Ipc::Coordinator
* Ipc::Coordinator::Instance()
178 TheInstance
= new Coordinator
;
179 // XXX: if the Coordinator job quits, this pointer will become invalid
180 // we could make Coordinator death fatal, except during exit, but since
181 // Strands do not re-register, even process death would be pointless.
185 const Ipc::StrandCoords
&
186 Ipc::Coordinator::strands() const