]>
Commit | Line | Data |
---|---|---|
10cefb7b | 1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
8 | ||
f7f3304a | 9 | #include "squid.h" |
e0d28505 | 10 | #include "base/Subscription.h" |
8822ebee AR |
11 | #include "base/TextException.h" |
12 | #include "CacheManager.h" | |
0d0bce6a | 13 | #include "comm.h" |
e0d28505 | 14 | #include "comm/Connection.h" |
10cefb7b | 15 | #include "ipc/Coordinator.h" |
0d0bce6a | 16 | #include "ipc/SharedListen.h" |
8822ebee AR |
17 | #include "mgr/Inquirer.h" |
18 | #include "mgr/Request.h" | |
19 | #include "mgr/Response.h" | |
582c2af2 | 20 | #include "protos.h" |
f738d783 | 21 | #if SQUID_SNMP |
d6e3ad20 CT |
22 | #include "snmp/Inquirer.h" |
23 | #include "snmp/Request.h" | |
24 | #include "snmp/Response.h" | |
f738d783 | 25 | #endif |
10cefb7b | 26 | |
27 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); | |
a2c48c98 | 28 | Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL; |
10cefb7b | 29 | |
30 | ||
31 | Ipc::Coordinator::Coordinator(): | |
5667a628 | 32 | Port(coordinatorAddr) |
10cefb7b | 33 | { |
34 | } | |
35 | ||
36 | void Ipc::Coordinator::start() | |
37 | { | |
ba568924 | 38 | Port::start(); |
10cefb7b | 39 | } |
40 | ||
1bac0258 | 41 | Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) |
10cefb7b | 42 | { |
8822ebee AR |
43 | typedef StrandCoords::iterator SI; |
44 | for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
10cefb7b | 45 | if (iter->kidId == kidId) |
46 | return &(*iter); | |
47 | } | |
48 | return NULL; | |
49 | } | |
50 | ||
1bac0258 | 51 | void Ipc::Coordinator::registerStrand(const StrandCoord& strand) |
10cefb7b | 52 | { |
254912f3 AR |
53 | debugs(54, 3, HERE << "registering kid" << strand.kidId << |
54 | ' ' << strand.tag); | |
b04c601b AR |
55 | if (StrandCoord* found = findStrand(strand.kidId)) { |
56 | const String oldTag = found->tag; | |
10cefb7b | 57 | *found = strand; |
b04c601b AR |
58 | if (oldTag.size() && !strand.tag.size()) |
59 | found->tag = oldTag; // keep more detailed info (XXX?) | |
60 | } else { | |
8822ebee | 61 | strands_.push_back(strand); |
b04c601b | 62 | } |
254912f3 AR |
63 | |
64 | // notify searchers waiting for this new strand, if any | |
65 | typedef Searchers::iterator SRI; | |
66 | for (SRI i = searchers.begin(); i != searchers.end();) { | |
67 | if (i->tag == strand.tag) { | |
68 | notifySearcher(*i, strand); | |
69 | i = searchers.erase(i); | |
e67d0508 | 70 | } else { |
254912f3 | 71 | ++i; |
e67d0508 | 72 | } |
254912f3 | 73 | } |
10cefb7b | 74 | } |
75 | ||
1bac0258 | 76 | void Ipc::Coordinator::receive(const TypedMsgHdr& message) |
10cefb7b | 77 | { |
78 | switch (message.type()) { | |
ba568924 | 79 | case mtRegistration: |
10cefb7b | 80 | debugs(54, 6, HERE << "Registration request"); |
254912f3 | 81 | handleRegistrationRequest(HereIamMessage(message)); |
10cefb7b | 82 | break; |
83 | ||
9a51593d DK |
84 | case mtStrandSearchRequest: { |
85 | const StrandSearchRequest sr(message); | |
86 | debugs(54, 6, HERE << "Strand search request: " << sr.requestorId << | |
87 | " tag: " << sr.tag); | |
254912f3 AR |
88 | handleSearchRequest(sr); |
89 | break; | |
9a51593d | 90 | } |
254912f3 | 91 | |
0d0bce6a AR |
92 | case mtSharedListenRequest: |
93 | debugs(54, 6, HERE << "Shared listen request"); | |
94 | handleSharedListenRequest(SharedListenRequest(message)); | |
95 | break; | |
96 | ||
3595fd68 | 97 | case mtCacheMgrRequest: { |
8822ebee | 98 | debugs(54, 6, HERE << "Cache manager request"); |
3595fd68 CT |
99 | const Mgr::Request req(message); |
100 | handleCacheMgrRequest(req); | |
101 | } | |
102 | break; | |
8822ebee | 103 | |
3595fd68 | 104 | case mtCacheMgrResponse: { |
8822ebee | 105 | debugs(54, 6, HERE << "Cache manager response"); |
3595fd68 CT |
106 | const Mgr::Response resp(message); |
107 | handleCacheMgrResponse(resp); | |
108 | } | |
109 | break; | |
8822ebee | 110 | |
f738d783 | 111 | #if SQUID_SNMP |
3595fd68 | 112 | case mtSnmpRequest: { |
51ea0904 | 113 | debugs(54, 6, HERE << "SNMP request"); |
3595fd68 CT |
114 | const Snmp::Request req(message); |
115 | handleSnmpRequest(req); | |
116 | } | |
117 | break; | |
51ea0904 | 118 | |
3595fd68 | 119 | case mtSnmpResponse: { |
51ea0904 | 120 | debugs(54, 6, HERE << "SNMP response"); |
3595fd68 CT |
121 | const Snmp::Response resp(message); |
122 | handleSnmpResponse(resp); | |
123 | } | |
124 | break; | |
f738d783 | 125 | #endif |
8822ebee | 126 | |
10cefb7b | 127 | default: |
e0236918 | 128 | debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type()); |
10cefb7b | 129 | break; |
130 | } | |
131 | } | |
132 | ||
254912f3 | 133 | void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg) |
10cefb7b | 134 | { |
254912f3 | 135 | registerStrand(msg.strand); |
ba568924 AR |
136 | |
137 | // send back an acknowledgement; TODO: remove as not needed? | |
1bac0258 | 138 | TypedMsgHdr message; |
254912f3 AR |
139 | msg.pack(message); |
140 | SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message); | |
10cefb7b | 141 | } |
a2c48c98 | 142 | |
0d0bce6a AR |
143 | void |
144 | Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request) | |
145 | { | |
146 | debugs(54, 4, HERE << "kid" << request.requestorId << | |
5667a628 | 147 | " needs shared listen FD for " << request.params.addr); |
0d0bce6a AR |
148 | Listeners::const_iterator i = listeners.find(request.params); |
149 | int errNo = 0; | |
e0d28505 | 150 | const Comm::ConnectionPointer c = (i != listeners.end()) ? |
dc49061a | 151 | i->second : openListenSocket(request, errNo); |
0d0bce6a | 152 | |
e0d28505 | 153 | debugs(54, 3, HERE << "sending shared listen " << c << " for " << |
5667a628 AR |
154 | request.params.addr << " to kid" << request.requestorId << |
155 | " mapId=" << request.mapId); | |
0d0bce6a | 156 | |
b72fb55d | 157 | SharedListenResponse response(c->fd, errNo, request.mapId); |
0d0bce6a AR |
158 | TypedMsgHdr message; |
159 | response.pack(message); | |
160 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
161 | } | |
162 | ||
8822ebee AR |
163 | void |
164 | Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request) | |
165 | { | |
166 | debugs(54, 4, HERE); | |
167 | ||
46a7dc66 AR |
168 | try { |
169 | Mgr::Action::Pointer action = | |
170 | CacheManager::GetInstance()->createRequestedAction(request.params); | |
171 | AsyncJob::Start(new Mgr::Inquirer(action, request, strands_)); | |
3077b2d4 | 172 | } catch (const std::exception &ex) { |
46a7dc66 AR |
173 | debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" << |
174 | request.params.actionName << ": " << ex.what()); | |
175 | // TODO: Avoid half-baked Connections or teach them how to close. | |
176 | ::close(request.conn->fd); | |
177 | request.conn->fd = -1; | |
178 | return; // the worker will timeout and close | |
179 | } | |
180 | ||
8822ebee AR |
181 | // Let the strand know that we are now responsible for handling the request |
182 | Mgr::Response response(request.requestId); | |
183 | TypedMsgHdr message; | |
184 | response.pack(message); | |
185 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
186 | ||
8822ebee AR |
187 | } |
188 | ||
189 | void | |
190 | Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) | |
191 | { | |
192 | Mgr::Inquirer::HandleRemoteAck(response); | |
193 | } | |
194 | ||
254912f3 AR |
195 | void |
196 | Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request) | |
197 | { | |
198 | // do we know of a strand with the given search tag? | |
199 | const StrandCoord *strand = NULL; | |
200 | typedef StrandCoords::const_iterator SCCI; | |
201 | for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) { | |
202 | if (i->tag == request.tag) | |
203 | strand = &(*i); | |
204 | } | |
205 | ||
206 | if (strand) { | |
207 | notifySearcher(request, *strand); | |
208 | return; | |
e67d0508 | 209 | } |
254912f3 AR |
210 | |
211 | searchers.push_back(request); | |
212 | debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId << | |
9199139f | 213 | " who " << request.tag << " is"); |
254912f3 AR |
214 | } |
215 | ||
216 | void | |
217 | Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request, | |
218 | const StrandCoord& strand) | |
219 | { | |
220 | debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " << | |
9199139f | 221 | request.tag << " is kid" << strand.kidId); |
b2aa0934 | 222 | const StrandSearchResponse response(strand); |
254912f3 | 223 | TypedMsgHdr message; |
9a51593d | 224 | response.pack(message); |
254912f3 AR |
225 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); |
226 | } | |
227 | ||
f738d783 | 228 | #if SQUID_SNMP |
51ea0904 CT |
229 | void |
230 | Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request) | |
231 | { | |
232 | debugs(54, 4, HERE); | |
233 | ||
234 | Snmp::Response response(request.requestId); | |
235 | TypedMsgHdr message; | |
236 | response.pack(message); | |
237 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
238 | ||
239 | AsyncJob::Start(new Snmp::Inquirer(request, strands_)); | |
240 | } | |
241 | ||
242 | void | |
243 | Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response) | |
244 | { | |
245 | debugs(54, 4, HERE); | |
246 | Snmp::Inquirer::HandleRemoteAck(response); | |
247 | } | |
f738d783 | 248 | #endif |
254912f3 | 249 | |
e0d28505 | 250 | Comm::ConnectionPointer |
0d0bce6a | 251 | Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, |
5667a628 | 252 | int &errNo) |
0d0bce6a AR |
253 | { |
254 | const OpenListenerParams &p = request.params; | |
255 | ||
256 | debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << | |
5667a628 | 257 | request.requestorId); |
0d0bce6a | 258 | |
e0d28505 AJ |
259 | Comm::ConnectionPointer conn = new Comm::Connection; |
260 | conn->local = p.addr; // comm_open_listener may modify it | |
261 | conn->flags = p.flags; | |
0d0bce6a AR |
262 | |
263 | enter_suid(); | |
e0d28505 AJ |
264 | comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote)); |
265 | errNo = Comm::IsConnOpen(conn) ? 0 : errno; | |
0d0bce6a AR |
266 | leave_suid(); |
267 | ||
e0d28505 AJ |
268 | debugs(54, 6, HERE << "tried listening on " << conn << " for kid" << |
269 | request.requestorId); | |
270 | ||
0d0bce6a | 271 | // cache positive results |
e0d28505 AJ |
272 | if (Comm::IsConnOpen(conn)) |
273 | listeners[request.params] = conn; | |
0d0bce6a | 274 | |
e0d28505 | 275 | return conn; |
0d0bce6a AR |
276 | } |
277 | ||
a2c48c98 AR |
278 | void Ipc::Coordinator::broadcastSignal(int sig) const |
279 | { | |
8822ebee AR |
280 | typedef StrandCoords::const_iterator SCI; |
281 | for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
a2c48c98 | 282 | debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << |
5667a628 | 283 | ", PID=" << iter->pid); |
a2c48c98 AR |
284 | kill(iter->pid, sig); |
285 | } | |
286 | } | |
287 | ||
288 | Ipc::Coordinator* Ipc::Coordinator::Instance() | |
289 | { | |
290 | if (!TheInstance) | |
291 | TheInstance = new Coordinator; | |
292 | // XXX: if the Coordinator job quits, this pointer will become invalid | |
293 | // we could make Coordinator death fatal, except during exit, but since | |
294 | // Strands do not re-register, even process death would be pointless. | |
295 | return TheInstance; | |
296 | } | |
8822ebee AR |
297 | |
298 | const Ipc::StrandCoords& | |
299 | Ipc::Coordinator::strands() const | |
300 | { | |
301 | return strands_; | |
302 | } |