]>
Commit | Line | Data |
---|---|---|
e8dca475 CT |
1 | #include "squid.h" |
2 | #include "base/AsyncJobCalls.h" | |
3 | #include "base/RunnersRegistry.h" | |
4 | #include "CachePeer.h" | |
5 | #include "comm/Connection.h" | |
6 | #include "comm/ConnOpener.h" | |
7 | #include "Debug.h" | |
8 | #include "fd.h" | |
9 | #include "FwdState.h" | |
10 | #include "globals.h" | |
11 | #include "HttpRequest.h" | |
12 | #include "neighbors.h" | |
13 | #include "pconn.h" | |
14 | #include "PeerPoolMgr.h" | |
15 | #include "SquidConfig.h" | |
16 | #if USE_OPENSSL | |
17 | #include "ssl/PeerConnector.h" | |
18 | #endif | |
19 | ||
20 | CBDATA_CLASS_INIT(PeerPoolMgr); | |
21 | ||
22 | #if USE_OPENSSL | |
23 | /// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer. | |
24 | class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>, | |
25 | public Ssl::PeerConnector::CbDialer | |
26 | { | |
27 | public: | |
28 | MyAnswerDialer(const JobPointer &aJob, Method aMethod): | |
e2849af8 | 29 | UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>(aJob, aMethod, Ssl::PeerConnectorAnswer()) {} |
e8dca475 CT |
30 | |
31 | /* Ssl::PeerConnector::CbDialer API */ | |
32 | virtual Ssl::PeerConnectorAnswer &answer() { return arg1; } | |
33 | }; | |
34 | #endif | |
35 | ||
36 | PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), | |
37 | peer(cbdataReference(aPeer)), | |
38 | request(), | |
39 | opener(), | |
40 | securer(), | |
41 | closer(), | |
42 | addrUsed(0) | |
43 | { | |
44 | } | |
45 | ||
46 | PeerPoolMgr::~PeerPoolMgr() | |
47 | { | |
48 | cbdataReferenceDone(peer); | |
49 | } | |
50 | ||
51 | void | |
52 | PeerPoolMgr::start() | |
53 | { | |
54 | AsyncJob::start(); | |
55 | ||
56 | // ErrorState, getOutgoingAddress(), and other APIs may require a request. | |
57 | // We fake one. TODO: Optionally send this request to peers? | |
58 | request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*"); | |
59 | request->SetHost(peer->host); | |
60 | ||
61 | checkpoint("peer initialized"); | |
62 | } | |
63 | ||
64 | void | |
65 | PeerPoolMgr::swanSong() | |
66 | { | |
67 | AsyncJob::swanSong(); | |
68 | } | |
69 | ||
70 | bool | |
e2849af8 A |
71 | PeerPoolMgr::validPeer() const |
72 | { | |
e8dca475 CT |
73 | return peer && cbdataReferenceValid(peer) && peer->standby.pool; |
74 | } | |
75 | ||
76 | bool | |
77 | PeerPoolMgr::doneAll() const | |
78 | { | |
79 | return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll(); | |
80 | } | |
81 | ||
82 | void | |
83 | PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) | |
84 | { | |
85 | opener = NULL; | |
86 | ||
87 | if (!validPeer()) { | |
88 | debugs(48, 3, "peer gone"); | |
89 | if (params.conn != NULL) | |
90 | params.conn->close(); | |
91 | return; | |
92 | } | |
93 | ||
c8407295 | 94 | if (params.flag != Comm::OK) { |
e8dca475 CT |
95 | /* it might have been a timeout with a partially open link */ |
96 | if (params.conn != NULL) | |
97 | params.conn->close(); | |
98 | peerConnectFailed(peer); | |
99 | checkpoint("conn opening failure"); // may retry | |
100 | return; | |
101 | } | |
102 | ||
103 | Must(params.conn != NULL); | |
104 | ||
105 | #if USE_OPENSSL | |
106 | // Handle SSL peers. | |
107 | if (peer->use_ssl) { | |
108 | typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer; | |
109 | closer = JobCallback(48, 3, CloserDialer, this, | |
110 | PeerPoolMgr::handleSecureClosure); | |
111 | comm_add_close_handler(params.conn->fd, closer); | |
112 | ||
113 | securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", | |
114 | MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); | |
115 | Ssl::PeerConnector *connector = | |
116 | new Ssl::PeerConnector(request, params.conn, securer); | |
117 | AsyncJob::Start(connector); // will call our callback | |
118 | return; | |
119 | } | |
120 | #endif | |
121 | ||
122 | pushNewConnection(params.conn); | |
123 | } | |
124 | ||
125 | void | |
126 | PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) | |
127 | { | |
128 | Must(validPeer()); | |
129 | Must(Comm::IsConnOpen(conn)); | |
130 | peer->standby.pool->push(conn, NULL /* domain */); | |
131 | // push() will trigger a checkpoint() | |
132 | } | |
133 | ||
134 | #if USE_OPENSSL | |
135 | void | |
136 | PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer &answer) | |
137 | { | |
138 | Must(securer != NULL); | |
139 | securer = NULL; | |
140 | ||
141 | if (closer != NULL) { | |
142 | if (answer.conn != NULL) | |
143 | comm_remove_close_handler(answer.conn->fd, closer); | |
144 | else | |
145 | closer->cancel("securing completed"); | |
146 | closer = NULL; | |
147 | } | |
148 | ||
149 | if (!validPeer()) { | |
150 | debugs(48, 3, "peer gone"); | |
151 | if (answer.conn != NULL) | |
152 | answer.conn->close(); | |
153 | return; | |
154 | } | |
155 | ||
156 | if (answer.error.get()) { | |
157 | if (answer.conn != NULL) | |
158 | answer.conn->close(); | |
159 | // PeerConnector calls peerConnectFailed() for us; | |
160 | checkpoint("conn securing failure"); // may retry | |
161 | return; | |
162 | } | |
163 | ||
164 | pushNewConnection(answer.conn); | |
165 | } | |
166 | ||
167 | void | |
168 | PeerPoolMgr::handleSecureClosure(const CommCloseCbParams ¶ms) | |
169 | { | |
170 | Must(closer != NULL); | |
171 | Must(securer != NULL); | |
172 | securer->cancel("conn closed by a 3rd party"); | |
173 | securer = NULL; | |
174 | closer = NULL; | |
175 | // allow the closing connection to fully close before we check again | |
176 | Checkpoint(this, "conn closure while securing"); | |
177 | } | |
178 | #endif | |
179 | ||
180 | void | |
181 | PeerPoolMgr::openNewConnection() | |
182 | { | |
183 | // KISS: Do nothing else when we are already doing something. | |
184 | if (opener != NULL || securer != NULL || shutting_down) { | |
185 | debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down); | |
186 | return; // there will be another checkpoint when we are done opening/securing | |
187 | } | |
188 | ||
189 | // Do not talk to a peer until it is ready. | |
190 | if (!neighborUp(peer)) // provides debugging | |
191 | return; // there will be another checkpoint when peer is up | |
192 | ||
193 | // Do not violate peer limits. | |
194 | if (!peerCanOpenMore(peer)) { // provides debugging | |
195 | peer->standby.waitingForClose = true; // may already be true | |
196 | return; // there will be another checkpoint when a peer conn closes | |
197 | } | |
198 | ||
199 | // Do not violate global restrictions. | |
200 | if (fdUsageHigh()) { | |
201 | debugs(48, 7, "overwhelmed"); | |
202 | peer->standby.waitingForClose = true; // may already be true | |
203 | // There will be another checkpoint when a peer conn closes OR when | |
204 | // a future pop() fails due to an empty pool. See PconnPool::pop(). | |
205 | return; | |
206 | } | |
207 | ||
208 | peer->standby.waitingForClose = false; | |
209 | ||
210 | Comm::ConnectionPointer conn = new Comm::Connection; | |
211 | Must(peer->n_addresses); // guaranteed by neighborUp() above | |
212 | // cycle through all available IP addresses | |
213 | conn->remote = peer->addresses[addrUsed++ % peer->n_addresses]; | |
214 | conn->remote.port(peer->http_port); | |
215 | conn->peerType = STANDBY_POOL; // should be reset by peerSelect() | |
216 | conn->setPeer(peer); | |
217 | getOutgoingAddress(request.getRaw(), conn); | |
218 | GetMarkingsToServer(request.getRaw(), *conn); | |
219 | ||
220 | const int ctimeout = peer->connect_timeout > 0 ? | |
221 | peer->connect_timeout : Config.Timeout.peer_connect; | |
222 | typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; | |
223 | opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); | |
224 | Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout); | |
225 | AsyncJob::Start(cs); | |
226 | } | |
227 | ||
228 | void | |
229 | PeerPoolMgr::closeOldConnections(const int howMany) | |
230 | { | |
231 | debugs(48, 8, howMany); | |
232 | peer->standby.pool->closeN(howMany); | |
233 | } | |
234 | ||
235 | void | |
236 | PeerPoolMgr::checkpoint(const char *reason) | |
237 | { | |
238 | if (!validPeer()) { | |
239 | debugs(48, 3, reason << " and peer gone"); | |
240 | return; // nothing to do after our owner dies; the job will quit | |
241 | } | |
242 | ||
243 | const int count = peer->standby.pool->count(); | |
244 | const int limit = peer->standby.limit; | |
245 | debugs(48, 7, reason << " with " << count << " ? " << limit); | |
246 | ||
247 | if (count < limit) | |
248 | openNewConnection(); | |
249 | else if (count > limit) | |
250 | closeOldConnections(count - limit); | |
251 | } | |
252 | ||
253 | void | |
254 | PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason) | |
255 | { | |
256 | CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason); | |
257 | } | |
258 | ||
259 | /// launches PeerPoolMgrs for peers configured with standby.limit | |
260 | class PeerPoolMgrsRr: public RegisteredRunner | |
261 | { | |
262 | public: | |
263 | /* RegisteredRunner API */ | |
264 | virtual void useConfig() { syncConfig(); } | |
265 | virtual void syncConfig(); | |
266 | }; | |
267 | ||
268 | RunnerRegistrationEntry(PeerPoolMgrsRr); | |
269 | ||
270 | void | |
271 | PeerPoolMgrsRr::syncConfig() | |
272 | { | |
273 | for (CachePeer *p = Config.peers; p; p = p->next) { | |
274 | // On reconfigure, Squid deletes the old config (and old peers in it), | |
275 | // so should always be dealing with a brand new configuration. | |
276 | assert(!p->standby.mgr); | |
277 | assert(!p->standby.pool); | |
278 | if (p->standby.limit) { | |
279 | p->standby.mgr = new PeerPoolMgr(p); | |
280 | p->standby.pool = new PconnPool(p->name, p->standby.mgr); | |
281 | AsyncJob::Start(p->standby.mgr.get()); | |
282 | } | |
283 | } | |
284 | } |