]>
Commit | Line | Data |
---|---|---|
10cefb7b | 1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
8 | ||
9 | #include "config.h" | |
8822ebee AR |
10 | #include "base/TextException.h" |
11 | #include "CacheManager.h" | |
0d0bce6a | 12 | #include "comm.h" |
10cefb7b | 13 | #include "ipc/Coordinator.h" |
0d0bce6a | 14 | #include "ipc/SharedListen.h" |
8822ebee AR |
15 | #include "mgr/Inquirer.h" |
16 | #include "mgr/Request.h" | |
17 | #include "mgr/Response.h" | |
f738d783 | 18 | #if SQUID_SNMP |
51ea0904 CT |
19 | #include "snmpx/Inquirer.h" |
20 | #include "snmpx/Request.h" | |
21 | #include "snmpx/Response.h" | |
f738d783 | 22 | #endif |
10cefb7b | 23 | |
24 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); | |
a2c48c98 | 25 | Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL; |
10cefb7b | 26 | |
27 | ||
28 | Ipc::Coordinator::Coordinator(): | |
5667a628 | 29 | Port(coordinatorAddr) |
10cefb7b | 30 | { |
31 | } | |
32 | ||
33 | void Ipc::Coordinator::start() | |
34 | { | |
ba568924 | 35 | Port::start(); |
10cefb7b | 36 | } |
37 | ||
1bac0258 | 38 | Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) |
10cefb7b | 39 | { |
8822ebee AR |
40 | typedef StrandCoords::iterator SI; |
41 | for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
10cefb7b | 42 | if (iter->kidId == kidId) |
43 | return &(*iter); | |
44 | } | |
45 | return NULL; | |
46 | } | |
47 | ||
1bac0258 | 48 | void Ipc::Coordinator::registerStrand(const StrandCoord& strand) |
10cefb7b | 49 | { |
1bac0258 | 50 | if (StrandCoord* found = findStrand(strand.kidId)) |
10cefb7b | 51 | *found = strand; |
52 | else | |
8822ebee | 53 | strands_.push_back(strand); |
10cefb7b | 54 | } |
55 | ||
1bac0258 | 56 | void Ipc::Coordinator::receive(const TypedMsgHdr& message) |
10cefb7b | 57 | { |
58 | switch (message.type()) { | |
ba568924 | 59 | case mtRegistration: |
10cefb7b | 60 | debugs(54, 6, HERE << "Registration request"); |
1bac0258 | 61 | handleRegistrationRequest(StrandCoord(message)); |
10cefb7b | 62 | break; |
63 | ||
0d0bce6a AR |
64 | case mtSharedListenRequest: |
65 | debugs(54, 6, HERE << "Shared listen request"); | |
66 | handleSharedListenRequest(SharedListenRequest(message)); | |
67 | break; | |
68 | ||
8822ebee AR |
69 | case mtCacheMgrRequest: |
70 | debugs(54, 6, HERE << "Cache manager request"); | |
71 | handleCacheMgrRequest(Mgr::Request(message)); | |
72 | break; | |
73 | ||
74 | case mtCacheMgrResponse: | |
75 | debugs(54, 6, HERE << "Cache manager response"); | |
76 | handleCacheMgrResponse(Mgr::Response(message)); | |
77 | break; | |
78 | ||
f738d783 | 79 | #if SQUID_SNMP |
51ea0904 CT |
80 | case mtSnmpRequest: |
81 | debugs(54, 6, HERE << "SNMP request"); | |
82 | handleSnmpRequest(Snmp::Request(message)); | |
83 | break; | |
84 | ||
85 | case mtSnmpResponse: | |
86 | debugs(54, 6, HERE << "SNMP response"); | |
87 | handleSnmpResponse(Snmp::Response(message)); | |
88 | break; | |
f738d783 | 89 | #endif |
51ea0904 | 90 | |
10cefb7b | 91 | default: |
7230e9c7 | 92 | debugs(54, 1, HERE << "Unhandled message type: " << message.type()); |
10cefb7b | 93 | break; |
94 | } | |
95 | } | |
96 | ||
1bac0258 | 97 | void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) |
10cefb7b | 98 | { |
ba568924 AR |
99 | registerStrand(strand); |
100 | ||
101 | // send back an acknowledgement; TODO: remove as not needed? | |
1bac0258 AR |
102 | TypedMsgHdr message; |
103 | strand.pack(message); | |
104 | SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); | |
10cefb7b | 105 | } |
a2c48c98 | 106 | |
0d0bce6a AR |
107 | void |
108 | Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request) | |
109 | { | |
110 | debugs(54, 4, HERE << "kid" << request.requestorId << | |
5667a628 | 111 | " needs shared listen FD for " << request.params.addr); |
0d0bce6a AR |
112 | Listeners::const_iterator i = listeners.find(request.params); |
113 | int errNo = 0; | |
114 | const int sock = (i != listeners.end()) ? | |
5667a628 | 115 | i->second : openListenSocket(request, errNo); |
0d0bce6a AR |
116 | |
117 | debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " << | |
5667a628 AR |
118 | request.params.addr << " to kid" << request.requestorId << |
119 | " mapId=" << request.mapId); | |
0d0bce6a AR |
120 | |
121 | SharedListenResponse response(sock, errNo, request.mapId); | |
122 | TypedMsgHdr message; | |
123 | response.pack(message); | |
124 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
125 | } | |
126 | ||
8822ebee AR |
127 | void |
128 | Ipc::Coordinator::handleCacheMgrRequest(const Mgr::Request& request) | |
129 | { | |
130 | debugs(54, 4, HERE); | |
131 | ||
132 | // Let the strand know that we are now responsible for handling the request | |
133 | Mgr::Response response(request.requestId); | |
134 | TypedMsgHdr message; | |
135 | response.pack(message); | |
136 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
137 | ||
138 | Mgr::Action::Pointer action = | |
139 | CacheManager::GetInstance()->createRequestedAction(request.params); | |
51ea0904 | 140 | AsyncJob::Start(new Mgr::Inquirer(action, request, strands_)); |
8822ebee AR |
141 | } |
142 | ||
143 | void | |
144 | Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) | |
145 | { | |
146 | Mgr::Inquirer::HandleRemoteAck(response); | |
147 | } | |
148 | ||
f738d783 | 149 | #if SQUID_SNMP |
51ea0904 CT |
150 | void |
151 | Ipc::Coordinator::handleSnmpRequest(const Snmp::Request& request) | |
152 | { | |
153 | debugs(54, 4, HERE); | |
154 | ||
155 | Snmp::Response response(request.requestId); | |
156 | TypedMsgHdr message; | |
157 | response.pack(message); | |
158 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
159 | ||
160 | AsyncJob::Start(new Snmp::Inquirer(request, strands_)); | |
161 | } | |
162 | ||
163 | void | |
164 | Ipc::Coordinator::handleSnmpResponse(const Snmp::Response& response) | |
165 | { | |
166 | debugs(54, 4, HERE); | |
167 | Snmp::Inquirer::HandleRemoteAck(response); | |
168 | } | |
f738d783 | 169 | #endif |
51ea0904 | 170 | |
0d0bce6a AR |
171 | int |
172 | Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, | |
5667a628 | 173 | int &errNo) |
0d0bce6a AR |
174 | { |
175 | const OpenListenerParams &p = request.params; | |
176 | ||
177 | debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << | |
5667a628 | 178 | request.requestorId); |
0d0bce6a | 179 | |
a67d2b2e | 180 | Ip::Address addr = p.addr; // comm_open_listener may modify it |
0d0bce6a AR |
181 | |
182 | enter_suid(); | |
183 | const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags, | |
5667a628 | 184 | FdNote(p.fdNote)); |
0d0bce6a AR |
185 | errNo = (sock >= 0) ? 0 : errno; |
186 | leave_suid(); | |
187 | ||
188 | // cache positive results | |
189 | if (sock >= 0) | |
190 | listeners[request.params] = sock; | |
191 | ||
192 | return sock; | |
193 | } | |
194 | ||
a2c48c98 AR |
195 | void Ipc::Coordinator::broadcastSignal(int sig) const |
196 | { | |
8822ebee AR |
197 | typedef StrandCoords::const_iterator SCI; |
198 | for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) { | |
a2c48c98 | 199 | debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << |
5667a628 | 200 | ", PID=" << iter->pid); |
a2c48c98 AR |
201 | kill(iter->pid, sig); |
202 | } | |
203 | } | |
204 | ||
205 | Ipc::Coordinator* Ipc::Coordinator::Instance() | |
206 | { | |
207 | if (!TheInstance) | |
208 | TheInstance = new Coordinator; | |
209 | // XXX: if the Coordinator job quits, this pointer will become invalid | |
210 | // we could make Coordinator death fatal, except during exit, but since | |
211 | // Strands do not re-register, even process death would be pointless. | |
212 | return TheInstance; | |
213 | } | |
8822ebee AR |
214 | |
215 | const Ipc::StrandCoords& | |
216 | Ipc::Coordinator::strands() const | |
217 | { | |
218 | return strands_; | |
219 | } |