]> git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
Merge from trunk rev.13994
[thirdparty/squid.git] / src / PeerPoolMgr.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "base/RunnersRegistry.h"
12 #include "CachePeer.h"
13 #include "comm/Connection.h"
14 #include "comm/ConnOpener.h"
15 #include "Debug.h"
16 #include "fd.h"
17 #include "FwdState.h"
18 #include "globals.h"
19 #include "HttpRequest.h"
20 #include "neighbors.h"
21 #include "pconn.h"
22 #include "PeerPoolMgr.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #if USE_OPENSSL
26 #include "ssl/PeerConnector.h"
27 #else
28 #include "security/EncryptorAnswer.h"
29 #endif
30
31 CBDATA_CLASS_INIT(PeerPoolMgr);
32
33 #if USE_OPENSSL
34 /// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
35 class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Security::EncryptorAnswer, Security::EncryptorAnswer&>,
36 public Ssl::PeerConnector::CbDialer
37 {
38 public:
39 MyAnswerDialer(const JobPointer &aJob, Method aMethod):
40 UnaryMemFunT<PeerPoolMgr, Security::EncryptorAnswer, Security::EncryptorAnswer&>(aJob, aMethod, Security::EncryptorAnswer()) {}
41
42 /* Ssl::PeerConnector::CbDialer API */
43 virtual Security::EncryptorAnswer &answer() { return arg1; }
44 };
45 #endif
46
47 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
48 peer(cbdataReference(aPeer)),
49 request(),
50 opener(),
51 securer(),
52 closer(),
53 addrUsed(0)
54 {
55 }
56
57 PeerPoolMgr::~PeerPoolMgr()
58 {
59 cbdataReferenceDone(peer);
60 }
61
62 void
63 PeerPoolMgr::start()
64 {
65 AsyncJob::start();
66
67 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
68 // We fake one. TODO: Optionally send this request to peers?
69 request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*");
70 request->SetHost(peer->host);
71
72 checkpoint("peer initialized");
73 }
74
75 void
76 PeerPoolMgr::swanSong()
77 {
78 AsyncJob::swanSong();
79 }
80
81 bool
82 PeerPoolMgr::validPeer() const
83 {
84 return peer && cbdataReferenceValid(peer) && peer->standby.pool;
85 }
86
87 bool
88 PeerPoolMgr::doneAll() const
89 {
90 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
91 }
92
93 void
94 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
95 {
96 opener = NULL;
97
98 if (!validPeer()) {
99 debugs(48, 3, "peer gone");
100 if (params.conn != NULL)
101 params.conn->close();
102 return;
103 }
104
105 if (params.flag != Comm::OK) {
106 /* it might have been a timeout with a partially open link */
107 if (params.conn != NULL)
108 params.conn->close();
109 peerConnectFailed(peer);
110 checkpoint("conn opening failure"); // may retry
111 return;
112 }
113
114 Must(params.conn != NULL);
115
116 #if USE_OPENSSL
117 // Handle SSL peers.
118 if (peer->secure.encryptTransport) {
119 typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
120 closer = JobCallback(48, 3, CloserDialer, this,
121 PeerPoolMgr::handleSecureClosure);
122 comm_add_close_handler(params.conn->fd, closer);
123
124 securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
125 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
126
127 const int peerTimeout = peer->connect_timeout > 0 ?
128 peer->connect_timeout : Config.Timeout.peer_connect;
129 const int timeUsed = squid_curtime - params.conn->startTime();
130 // Use positive timeout when less than one second is left for conn.
131 const int timeLeft = max(1, (peerTimeout - timeUsed));
132 Ssl::PeerConnector *connector =
133 new Ssl::PeerConnector(request, params.conn, NULL, securer, timeLeft);
134 AsyncJob::Start(connector); // will call our callback
135 return;
136 }
137 #endif
138
139 pushNewConnection(params.conn);
140 }
141
142 void
143 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
144 {
145 Must(validPeer());
146 Must(Comm::IsConnOpen(conn));
147 peer->standby.pool->push(conn, NULL /* domain */);
148 // push() will trigger a checkpoint()
149 }
150
151 void
152 PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
153 {
154 Must(securer != NULL);
155 securer = NULL;
156
157 if (closer != NULL) {
158 if (answer.conn != NULL)
159 comm_remove_close_handler(answer.conn->fd, closer);
160 else
161 closer->cancel("securing completed");
162 closer = NULL;
163 }
164
165 if (!validPeer()) {
166 debugs(48, 3, "peer gone");
167 if (answer.conn != NULL)
168 answer.conn->close();
169 return;
170 }
171
172 if (answer.error.get()) {
173 if (answer.conn != NULL)
174 answer.conn->close();
175 // PeerConnector calls peerConnectFailed() for us;
176 checkpoint("conn securing failure"); // may retry
177 return;
178 }
179
180 pushNewConnection(answer.conn);
181 }
182
183 void
184 PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
185 {
186 Must(closer != NULL);
187 Must(securer != NULL);
188 securer->cancel("conn closed by a 3rd party");
189 securer = NULL;
190 closer = NULL;
191 // allow the closing connection to fully close before we check again
192 Checkpoint(this, "conn closure while securing");
193 }
194
195 void
196 PeerPoolMgr::openNewConnection()
197 {
198 // KISS: Do nothing else when we are already doing something.
199 if (opener != NULL || securer != NULL || shutting_down) {
200 debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
201 return; // there will be another checkpoint when we are done opening/securing
202 }
203
204 // Do not talk to a peer until it is ready.
205 if (!neighborUp(peer)) // provides debugging
206 return; // there will be another checkpoint when peer is up
207
208 // Do not violate peer limits.
209 if (!peerCanOpenMore(peer)) { // provides debugging
210 peer->standby.waitingForClose = true; // may already be true
211 return; // there will be another checkpoint when a peer conn closes
212 }
213
214 // Do not violate global restrictions.
215 if (fdUsageHigh()) {
216 debugs(48, 7, "overwhelmed");
217 peer->standby.waitingForClose = true; // may already be true
218 // There will be another checkpoint when a peer conn closes OR when
219 // a future pop() fails due to an empty pool. See PconnPool::pop().
220 return;
221 }
222
223 peer->standby.waitingForClose = false;
224
225 Comm::ConnectionPointer conn = new Comm::Connection;
226 Must(peer->n_addresses); // guaranteed by neighborUp() above
227 // cycle through all available IP addresses
228 conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
229 conn->remote.port(peer->http_port);
230 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
231 conn->setPeer(peer);
232 getOutgoingAddress(request.getRaw(), conn);
233 GetMarkingsToServer(request.getRaw(), *conn);
234
235 const int ctimeout = peer->connect_timeout > 0 ?
236 peer->connect_timeout : Config.Timeout.peer_connect;
237 typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
238 opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
239 Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
240 AsyncJob::Start(cs);
241 }
242
243 void
244 PeerPoolMgr::closeOldConnections(const int howMany)
245 {
246 debugs(48, 8, howMany);
247 peer->standby.pool->closeN(howMany);
248 }
249
250 void
251 PeerPoolMgr::checkpoint(const char *reason)
252 {
253 if (!validPeer()) {
254 debugs(48, 3, reason << " and peer gone");
255 return; // nothing to do after our owner dies; the job will quit
256 }
257
258 const int count = peer->standby.pool->count();
259 const int limit = peer->standby.limit;
260 debugs(48, 7, reason << " with " << count << " ? " << limit);
261
262 if (count < limit)
263 openNewConnection();
264 else if (count > limit)
265 closeOldConnections(count - limit);
266 }
267
268 void
269 PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
270 {
271 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
272 }
273
274 /// launches PeerPoolMgrs for peers configured with standby.limit
275 class PeerPoolMgrsRr: public RegisteredRunner
276 {
277 public:
278 /* RegisteredRunner API */
279 virtual void useConfig() { syncConfig(); }
280 virtual void syncConfig();
281 };
282
283 RunnerRegistrationEntry(PeerPoolMgrsRr);
284
285 void
286 PeerPoolMgrsRr::syncConfig()
287 {
288 for (CachePeer *p = Config.peers; p; p = p->next) {
289 // On reconfigure, Squid deletes the old config (and old peers in it),
290 // so should always be dealing with a brand new configuration.
291 assert(!p->standby.mgr);
292 assert(!p->standby.pool);
293 if (p->standby.limit) {
294 p->standby.mgr = new PeerPoolMgr(p);
295 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
296 AsyncJob::Start(p->standby.mgr.get());
297 }
298 }
299 }
300