]>
Commit | Line | Data |
---|---|---|
10cefb7b | 1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
8 | ||
9 | #include "config.h" | |
e0d28505 | 10 | #include "base/Subscription.h" |
8822ebee AR |
11 | #include "base/TextException.h" |
12 | #include "CacheManager.h" | |
0d0bce6a | 13 | #include "comm.h" |
e0d28505 | 14 | #include "comm/Connection.h" |
10cefb7b | 15 | #include "ipc/Coordinator.h" |
0d0bce6a | 16 | #include "ipc/SharedListen.h" |
8822ebee AR |
17 | #include "mgr/Inquirer.h" |
18 | #include "mgr/Request.h" | |
19 | #include "mgr/Response.h" | |
f738d783 | 20 | #if SQUID_SNMP |
d6e3ad20 CT |
21 | #include "snmp/Inquirer.h" |
22 | #include "snmp/Request.h" | |
23 | #include "snmp/Response.h" | |
f738d783 | 24 | #endif |
10cefb7b | 25 | |
26 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); | |
a2c48c98 | 27 | Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL; |
10cefb7b | 28 | |
29 | ||
30 | Ipc::Coordinator::Coordinator(): | |
5667a628 | 31 | Port(coordinatorAddr) |
10cefb7b | 32 | { |
33 | } | |
34 | ||
35 | void Ipc::Coordinator::start() | |
36 | { | |
ba568924 | 37 | Port::start(); |
10cefb7b | 38 | } |
39 | ||
1bac0258 | 40 | Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) |
10cefb7b | 41 | { |
8822ebee AR |
42 | typedef StrandCoords::iterator SI; |
43 | for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
10cefb7b | 44 | if (iter->kidId == kidId) |
45 | return &(*iter); | |
46 | } | |
47 | return NULL; | |
48 | } | |
49 | ||
1bac0258 | 50 | void Ipc::Coordinator::registerStrand(const StrandCoord& strand) |
10cefb7b | 51 | { |
254912f3 AR |
52 | debugs(54, 3, HERE << "registering kid" << strand.kidId << |
53 | ' ' << strand.tag); | |
b04c601b AR |
54 | if (StrandCoord* found = findStrand(strand.kidId)) { |
55 | const String oldTag = found->tag; | |
10cefb7b | 56 | *found = strand; |
b04c601b AR |
57 | if (oldTag.size() && !strand.tag.size()) |
58 | found->tag = oldTag; // keep more detailed info (XXX?) | |
59 | } else { | |
8822ebee | 60 | strands_.push_back(strand); |
b04c601b | 61 | } |
254912f3 AR |
62 | |
63 | // notify searchers waiting for this new strand, if any | |
64 | typedef Searchers::iterator SRI; | |
65 | for (SRI i = searchers.begin(); i != searchers.end();) { | |
66 | if (i->tag == strand.tag) { | |
67 | notifySearcher(*i, strand); | |
68 | i = searchers.erase(i); | |
69 | } else { | |
70 | ++i; | |
71 | } | |
72 | } | |
10cefb7b | 73 | } |
74 | ||
1bac0258 | 75 | void Ipc::Coordinator::receive(const TypedMsgHdr& message) |
10cefb7b | 76 | { |
77 | switch (message.type()) { | |
ba568924 | 78 | case mtRegistration: |
10cefb7b | 79 | debugs(54, 6, HERE << "Registration request"); |
254912f3 | 80 | handleRegistrationRequest(HereIamMessage(message)); |
10cefb7b | 81 | break; |
82 | ||
9a51593d DK |
83 | case mtStrandSearchRequest: { |
84 | const StrandSearchRequest sr(message); | |
85 | debugs(54, 6, HERE << "Strand search request: " << sr.requestorId << | |
86 | " tag: " << sr.tag); | |
254912f3 AR |
87 | handleSearchRequest(sr); |
88 | break; | |
9a51593d | 89 | } |
254912f3 | 90 | |
0d0bce6a AR |
91 | case mtSharedListenRequest: |
92 | debugs(54, 6, HERE << "Shared listen request"); | |
93 | handleSharedListenRequest(SharedListenRequest(message)); | |
94 | break; | |
95 | ||
3595fd68 | 96 | case mtCacheMgrRequest: { |
8822ebee | 97 | debugs(54, 6, HERE << "Cache manager request"); |
3595fd68 CT |
98 | const Mgr::Request req(message); |
99 | handleCacheMgrRequest(req); | |
100 | } | |
101 | break; | |
8822ebee | 102 | |
3595fd68 | 103 | case mtCacheMgrResponse: { |
8822ebee | 104 | debugs(54, 6, HERE << "Cache manager response"); |
3595fd68 CT |
105 | const Mgr::Response resp(message); |
106 | handleCacheMgrResponse(resp); | |
107 | } | |
108 | break; | |
8822ebee | 109 | |
f738d783 | 110 | #if SQUID_SNMP |
3595fd68 | 111 | case mtSnmpRequest: { |
51ea0904 | 112 | debugs(54, 6, HERE << "SNMP request"); |
3595fd68 CT |
113 | const Snmp::Request req(message); |
114 | handleSnmpRequest(req); | |
115 | } | |
116 | break; | |
51ea0904 | 117 | |
3595fd68 | 118 | case mtSnmpResponse: { |
51ea0904 | 119 | debugs(54, 6, HERE << "SNMP response"); |
3595fd68 CT |
120 | const Snmp::Response resp(message); |
121 | handleSnmpResponse(resp); | |
122 | } | |
123 | break; | |
f738d783 | 124 | #endif |
8822ebee | 125 | |
10cefb7b | 126 | default: |
7230e9c7 | 127 | debugs(54, 1, HERE << "Unhandled message type: " << message.type()); |
10cefb7b | 128 | break; |
129 | } | |
130 | } | |
131 | ||
254912f3 | 132 | void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg) |
10cefb7b | 133 | { |
254912f3 | 134 | registerStrand(msg.strand); |
ba568924 AR |
135 | |
136 | // send back an acknowledgement; TODO: remove as not needed? | |
1bac0258 | 137 | TypedMsgHdr message; |
254912f3 AR |
138 | msg.pack(message); |
139 | SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message); | |
10cefb7b | 140 | } |
a2c48c98 | 141 | |
0d0bce6a AR |
142 | void |
143 | Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request) | |
144 | { | |
145 | debugs(54, 4, HERE << "kid" << request.requestorId << | |
5667a628 | 146 | " needs shared listen FD for " << request.params.addr); |
0d0bce6a AR |
147 | Listeners::const_iterator i = listeners.find(request.params); |
148 | int errNo = 0; | |
e0d28505 | 149 | const Comm::ConnectionPointer c = (i != listeners.end()) ? |
dc49061a | 150 | i->second : openListenSocket(request, errNo); |
0d0bce6a | 151 | |
e0d28505 | 152 | debugs(54, 3, HERE << "sending shared listen " << c << " for " << |
5667a628 AR |
153 | request.params.addr << " to kid" << request.requestorId << |
154 | " mapId=" << request.mapId); | |
0d0bce6a | 155 | |
b72fb55d | 156 | SharedListenResponse response(c->fd, errNo, request.mapId); |
0d0bce6a AR |
157 | TypedMsgHdr message; |
158 | response.pack(message); | |
159 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
160 | } | |
161 | ||
8822ebee AR |
162 | void |
163 | Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request) | |
164 | { | |
165 | debugs(54, 4, HERE); | |
166 | ||
167 | // Let the strand know that we are now responsible for handling the request | |
168 | Mgr::Response response(request.requestId); | |
169 | TypedMsgHdr message; | |
170 | response.pack(message); | |
171 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
172 | ||
173 | Mgr::Action::Pointer action = | |
174 | CacheManager::GetInstance()->createRequestedAction(request.params); | |
51ea0904 | 175 | AsyncJob::Start(new Mgr::Inquirer(action, request, strands_)); |
8822ebee AR |
176 | } |
177 | ||
178 | void | |
179 | Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) | |
180 | { | |
181 | Mgr::Inquirer::HandleRemoteAck(response); | |
182 | } | |
183 | ||
254912f3 AR |
184 | void |
185 | Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request) | |
186 | { | |
187 | // do we know of a strand with the given search tag? | |
188 | const StrandCoord *strand = NULL; | |
189 | typedef StrandCoords::const_iterator SCCI; | |
190 | for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) { | |
191 | if (i->tag == request.tag) | |
192 | strand = &(*i); | |
193 | } | |
194 | ||
195 | if (strand) { | |
196 | notifySearcher(request, *strand); | |
197 | return; | |
198 | } | |
199 | ||
200 | searchers.push_back(request); | |
201 | debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId << | |
202 | " who " << request.tag << " is"); | |
203 | } | |
204 | ||
205 | void | |
206 | Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request, | |
207 | const StrandCoord& strand) | |
208 | { | |
209 | debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " << | |
210 | request.tag << " is kid" << strand.kidId); | |
b2aa0934 | 211 | const StrandSearchResponse response(strand); |
254912f3 | 212 | TypedMsgHdr message; |
9a51593d | 213 | response.pack(message); |
254912f3 AR |
214 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); |
215 | } | |
216 | ||
f738d783 | 217 | #if SQUID_SNMP |
51ea0904 CT |
218 | void |
219 | Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request) | |
220 | { | |
221 | debugs(54, 4, HERE); | |
222 | ||
223 | Snmp::Response response(request.requestId); | |
224 | TypedMsgHdr message; | |
225 | response.pack(message); | |
226 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
227 | ||
228 | AsyncJob::Start(new Snmp::Inquirer(request, strands_)); | |
229 | } | |
230 | ||
231 | void | |
232 | Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response) | |
233 | { | |
234 | debugs(54, 4, HERE); | |
235 | Snmp::Inquirer::HandleRemoteAck(response); | |
236 | } | |
f738d783 | 237 | #endif |
254912f3 | 238 | |
e0d28505 | 239 | Comm::ConnectionPointer |
0d0bce6a | 240 | Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, |
5667a628 | 241 | int &errNo) |
0d0bce6a AR |
242 | { |
243 | const OpenListenerParams &p = request.params; | |
244 | ||
245 | debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << | |
5667a628 | 246 | request.requestorId); |
0d0bce6a | 247 | |
e0d28505 AJ |
248 | Comm::ConnectionPointer conn = new Comm::Connection; |
249 | conn->local = p.addr; // comm_open_listener may modify it | |
250 | conn->flags = p.flags; | |
0d0bce6a AR |
251 | |
252 | enter_suid(); | |
e0d28505 AJ |
253 | comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote)); |
254 | errNo = Comm::IsConnOpen(conn) ? 0 : errno; | |
0d0bce6a AR |
255 | leave_suid(); |
256 | ||
e0d28505 AJ |
257 | debugs(54, 6, HERE << "tried listening on " << conn << " for kid" << |
258 | request.requestorId); | |
259 | ||
0d0bce6a | 260 | // cache positive results |
e0d28505 AJ |
261 | if (Comm::IsConnOpen(conn)) |
262 | listeners[request.params] = conn; | |
0d0bce6a | 263 | |
e0d28505 | 264 | return conn; |
0d0bce6a AR |
265 | } |
266 | ||
a2c48c98 AR |
267 | void Ipc::Coordinator::broadcastSignal(int sig) const |
268 | { | |
8822ebee AR |
269 | typedef StrandCoords::const_iterator SCI; |
270 | for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
a2c48c98 | 271 | debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << |
5667a628 | 272 | ", PID=" << iter->pid); |
a2c48c98 AR |
273 | kill(iter->pid, sig); |
274 | } | |
275 | } | |
276 | ||
277 | Ipc::Coordinator* Ipc::Coordinator::Instance() | |
278 | { | |
279 | if (!TheInstance) | |
280 | TheInstance = new Coordinator; | |
281 | // XXX: if the Coordinator job quits, this pointer will become invalid | |
282 | // we could make Coordinator death fatal, except during exit, but since | |
283 | // Strands do not re-register, even process death would be pointless. | |
284 | return TheInstance; | |
285 | } | |
8822ebee AR |
286 | |
287 | const Ipc::StrandCoords& | |
288 | Ipc::Coordinator::strands() const | |
289 | { | |
290 | return strands_; | |
291 | } |