]>
Commit | Line | Data |
---|---|---|
10cefb7b | 1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
f7f3304a | 8 | #include "squid.h" |
e0d28505 | 9 | #include "base/Subscription.h" |
8822ebee AR |
10 | #include "base/TextException.h" |
11 | #include "CacheManager.h" | |
0d0bce6a | 12 | #include "comm.h" |
e0d28505 | 13 | #include "comm/Connection.h" |
10cefb7b | 14 | #include "ipc/Coordinator.h" |
0d0bce6a | 15 | #include "ipc/SharedListen.h" |
8822ebee AR |
16 | #include "mgr/Inquirer.h" |
17 | #include "mgr/Request.h" | |
18 | #include "mgr/Response.h" | |
582c2af2 | 19 | #include "protos.h" |
f738d783 | 20 | #if SQUID_SNMP |
d6e3ad20 CT |
21 | #include "snmp/Inquirer.h" |
22 | #include "snmp/Request.h" | |
23 | #include "snmp/Response.h" | |
f738d783 | 24 | #endif |
21d845b1 FC |
25 | #if HAVE_ERRNO_H |
26 | #include <errno.h> | |
27 | #endif | |
10cefb7b | 28 | |
29 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); | |
a2c48c98 | 30 | Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL; |
10cefb7b | 31 | |
10cefb7b | 32 | Ipc::Coordinator::Coordinator(): |
5667a628 | 33 | Port(coordinatorAddr) |
10cefb7b | 34 | { |
35 | } | |
36 | ||
37 | void Ipc::Coordinator::start() | |
38 | { | |
ba568924 | 39 | Port::start(); |
10cefb7b | 40 | } |
41 | ||
1bac0258 | 42 | Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) |
10cefb7b | 43 | { |
8822ebee AR |
44 | typedef StrandCoords::iterator SI; |
45 | for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
10cefb7b | 46 | if (iter->kidId == kidId) |
47 | return &(*iter); | |
48 | } | |
49 | return NULL; | |
50 | } | |
51 | ||
1bac0258 | 52 | void Ipc::Coordinator::registerStrand(const StrandCoord& strand) |
10cefb7b | 53 | { |
254912f3 AR |
54 | debugs(54, 3, HERE << "registering kid" << strand.kidId << |
55 | ' ' << strand.tag); | |
b04c601b AR |
56 | if (StrandCoord* found = findStrand(strand.kidId)) { |
57 | const String oldTag = found->tag; | |
10cefb7b | 58 | *found = strand; |
b04c601b AR |
59 | if (oldTag.size() && !strand.tag.size()) |
60 | found->tag = oldTag; // keep more detailed info (XXX?) | |
61 | } else { | |
8822ebee | 62 | strands_.push_back(strand); |
b04c601b | 63 | } |
254912f3 AR |
64 | |
65 | // notify searchers waiting for this new strand, if any | |
66 | typedef Searchers::iterator SRI; | |
67 | for (SRI i = searchers.begin(); i != searchers.end();) { | |
68 | if (i->tag == strand.tag) { | |
69 | notifySearcher(*i, strand); | |
70 | i = searchers.erase(i); | |
e67d0508 | 71 | } else { |
254912f3 | 72 | ++i; |
e67d0508 | 73 | } |
254912f3 | 74 | } |
10cefb7b | 75 | } |
76 | ||
1bac0258 | 77 | void Ipc::Coordinator::receive(const TypedMsgHdr& message) |
10cefb7b | 78 | { |
79 | switch (message.type()) { | |
ba568924 | 80 | case mtRegistration: |
10cefb7b | 81 | debugs(54, 6, HERE << "Registration request"); |
254912f3 | 82 | handleRegistrationRequest(HereIamMessage(message)); |
10cefb7b | 83 | break; |
84 | ||
9a51593d DK |
85 | case mtStrandSearchRequest: { |
86 | const StrandSearchRequest sr(message); | |
87 | debugs(54, 6, HERE << "Strand search request: " << sr.requestorId << | |
88 | " tag: " << sr.tag); | |
254912f3 AR |
89 | handleSearchRequest(sr); |
90 | break; | |
9a51593d | 91 | } |
254912f3 | 92 | |
0d0bce6a AR |
93 | case mtSharedListenRequest: |
94 | debugs(54, 6, HERE << "Shared listen request"); | |
95 | handleSharedListenRequest(SharedListenRequest(message)); | |
96 | break; | |
97 | ||
3595fd68 | 98 | case mtCacheMgrRequest: { |
8822ebee | 99 | debugs(54, 6, HERE << "Cache manager request"); |
3595fd68 CT |
100 | const Mgr::Request req(message); |
101 | handleCacheMgrRequest(req); | |
102 | } | |
103 | break; | |
8822ebee | 104 | |
3595fd68 | 105 | case mtCacheMgrResponse: { |
8822ebee | 106 | debugs(54, 6, HERE << "Cache manager response"); |
3595fd68 CT |
107 | const Mgr::Response resp(message); |
108 | handleCacheMgrResponse(resp); | |
109 | } | |
110 | break; | |
8822ebee | 111 | |
f738d783 | 112 | #if SQUID_SNMP |
3595fd68 | 113 | case mtSnmpRequest: { |
51ea0904 | 114 | debugs(54, 6, HERE << "SNMP request"); |
3595fd68 CT |
115 | const Snmp::Request req(message); |
116 | handleSnmpRequest(req); | |
117 | } | |
118 | break; | |
51ea0904 | 119 | |
3595fd68 | 120 | case mtSnmpResponse: { |
51ea0904 | 121 | debugs(54, 6, HERE << "SNMP response"); |
3595fd68 CT |
122 | const Snmp::Response resp(message); |
123 | handleSnmpResponse(resp); | |
124 | } | |
125 | break; | |
f738d783 | 126 | #endif |
8822ebee | 127 | |
10cefb7b | 128 | default: |
e0236918 | 129 | debugs(54, DBG_IMPORTANT, HERE << "Unhandled message type: " << message.type()); |
10cefb7b | 130 | break; |
131 | } | |
132 | } | |
133 | ||
254912f3 | 134 | void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg) |
10cefb7b | 135 | { |
254912f3 | 136 | registerStrand(msg.strand); |
ba568924 AR |
137 | |
138 | // send back an acknowledgement; TODO: remove as not needed? | |
1bac0258 | 139 | TypedMsgHdr message; |
254912f3 AR |
140 | msg.pack(message); |
141 | SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message); | |
10cefb7b | 142 | } |
a2c48c98 | 143 | |
0d0bce6a AR |
144 | void |
145 | Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request) | |
146 | { | |
147 | debugs(54, 4, HERE << "kid" << request.requestorId << | |
5667a628 | 148 | " needs shared listen FD for " << request.params.addr); |
0d0bce6a AR |
149 | Listeners::const_iterator i = listeners.find(request.params); |
150 | int errNo = 0; | |
e0d28505 | 151 | const Comm::ConnectionPointer c = (i != listeners.end()) ? |
dc49061a | 152 | i->second : openListenSocket(request, errNo); |
0d0bce6a | 153 | |
e0d28505 | 154 | debugs(54, 3, HERE << "sending shared listen " << c << " for " << |
5667a628 AR |
155 | request.params.addr << " to kid" << request.requestorId << |
156 | " mapId=" << request.mapId); | |
0d0bce6a | 157 | |
b72fb55d | 158 | SharedListenResponse response(c->fd, errNo, request.mapId); |
0d0bce6a AR |
159 | TypedMsgHdr message; |
160 | response.pack(message); | |
161 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
162 | } | |
163 | ||
8822ebee AR |
164 | void |
165 | Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request) | |
166 | { | |
167 | debugs(54, 4, HERE); | |
168 | ||
46a7dc66 AR |
169 | try { |
170 | Mgr::Action::Pointer action = | |
171 | CacheManager::GetInstance()->createRequestedAction(request.params); | |
172 | AsyncJob::Start(new Mgr::Inquirer(action, request, strands_)); | |
3077b2d4 | 173 | } catch (const std::exception &ex) { |
46a7dc66 AR |
174 | debugs(54, DBG_IMPORTANT, "BUG: cannot aggregate mgr:" << |
175 | request.params.actionName << ": " << ex.what()); | |
176 | // TODO: Avoid half-baked Connections or teach them how to close. | |
177 | ::close(request.conn->fd); | |
178 | request.conn->fd = -1; | |
179 | return; // the worker will timeout and close | |
180 | } | |
181 | ||
8822ebee AR |
182 | // Let the strand know that we are now responsible for handling the request |
183 | Mgr::Response response(request.requestId); | |
184 | TypedMsgHdr message; | |
185 | response.pack(message); | |
186 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
187 | ||
8822ebee AR |
188 | } |
189 | ||
190 | void | |
191 | Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) | |
192 | { | |
193 | Mgr::Inquirer::HandleRemoteAck(response); | |
194 | } | |
195 | ||
254912f3 AR |
196 | void |
197 | Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request) | |
198 | { | |
199 | // do we know of a strand with the given search tag? | |
200 | const StrandCoord *strand = NULL; | |
201 | typedef StrandCoords::const_iterator SCCI; | |
202 | for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) { | |
203 | if (i->tag == request.tag) | |
204 | strand = &(*i); | |
205 | } | |
206 | ||
207 | if (strand) { | |
208 | notifySearcher(request, *strand); | |
209 | return; | |
e67d0508 | 210 | } |
254912f3 AR |
211 | |
212 | searchers.push_back(request); | |
213 | debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId << | |
9199139f | 214 | " who " << request.tag << " is"); |
254912f3 AR |
215 | } |
216 | ||
217 | void | |
218 | Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request, | |
219 | const StrandCoord& strand) | |
220 | { | |
221 | debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " << | |
9199139f | 222 | request.tag << " is kid" << strand.kidId); |
b2aa0934 | 223 | const StrandSearchResponse response(strand); |
254912f3 | 224 | TypedMsgHdr message; |
9a51593d | 225 | response.pack(message); |
254912f3 AR |
226 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); |
227 | } | |
228 | ||
f738d783 | 229 | #if SQUID_SNMP |
51ea0904 CT |
230 | void |
231 | Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request) | |
232 | { | |
233 | debugs(54, 4, HERE); | |
234 | ||
235 | Snmp::Response response(request.requestId); | |
236 | TypedMsgHdr message; | |
237 | response.pack(message); | |
238 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
239 | ||
240 | AsyncJob::Start(new Snmp::Inquirer(request, strands_)); | |
241 | } | |
242 | ||
243 | void | |
244 | Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response) | |
245 | { | |
246 | debugs(54, 4, HERE); | |
247 | Snmp::Inquirer::HandleRemoteAck(response); | |
248 | } | |
f738d783 | 249 | #endif |
254912f3 | 250 | |
e0d28505 | 251 | Comm::ConnectionPointer |
0d0bce6a | 252 | Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, |
5667a628 | 253 | int &errNo) |
0d0bce6a AR |
254 | { |
255 | const OpenListenerParams &p = request.params; | |
256 | ||
257 | debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << | |
5667a628 | 258 | request.requestorId); |
0d0bce6a | 259 | |
e0d28505 AJ |
260 | Comm::ConnectionPointer conn = new Comm::Connection; |
261 | conn->local = p.addr; // comm_open_listener may modify it | |
262 | conn->flags = p.flags; | |
0d0bce6a AR |
263 | |
264 | enter_suid(); | |
e0d28505 AJ |
265 | comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote)); |
266 | errNo = Comm::IsConnOpen(conn) ? 0 : errno; | |
0d0bce6a AR |
267 | leave_suid(); |
268 | ||
e0d28505 AJ |
269 | debugs(54, 6, HERE << "tried listening on " << conn << " for kid" << |
270 | request.requestorId); | |
271 | ||
0d0bce6a | 272 | // cache positive results |
e0d28505 AJ |
273 | if (Comm::IsConnOpen(conn)) |
274 | listeners[request.params] = conn; | |
0d0bce6a | 275 | |
e0d28505 | 276 | return conn; |
0d0bce6a AR |
277 | } |
278 | ||
a2c48c98 AR |
279 | void Ipc::Coordinator::broadcastSignal(int sig) const |
280 | { | |
8822ebee AR |
281 | typedef StrandCoords::const_iterator SCI; |
282 | for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
a2c48c98 | 283 | debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << |
5667a628 | 284 | ", PID=" << iter->pid); |
a2c48c98 AR |
285 | kill(iter->pid, sig); |
286 | } | |
287 | } | |
288 | ||
289 | Ipc::Coordinator* Ipc::Coordinator::Instance() | |
290 | { | |
291 | if (!TheInstance) | |
292 | TheInstance = new Coordinator; | |
293 | // XXX: if the Coordinator job quits, this pointer will become invalid | |
294 | // we could make Coordinator death fatal, except during exit, but since | |
295 | // Strands do not re-register, even process death would be pointless. | |
296 | return TheInstance; | |
297 | } | |
8822ebee AR |
298 | |
299 | const Ipc::StrandCoords& | |
300 | Ipc::Coordinator::strands() const | |
301 | { | |
302 | return strands_; | |
303 | } |