]> git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / PeerPoolMgr.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "base/RunnersRegistry.h"
12 #include "CachePeer.h"
13 #include "comm/Connection.h"
14 #include "comm/ConnOpener.h"
15 #include "Debug.h"
16 #include "fd.h"
17 #include "FwdState.h"
18 #include "globals.h"
19 #include "HttpRequest.h"
20 #include "neighbors.h"
21 #include "pconn.h"
22 #include "PeerPoolMgr.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #if USE_OPENSSL
26 #include "ssl/PeerConnector.h"
27 #endif
28
29 CBDATA_CLASS_INIT(PeerPoolMgr);
30
31 #if USE_OPENSSL
32 /// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
33 class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>,
34 public Ssl::PeerConnector::CbDialer
35 {
36 public:
37 MyAnswerDialer(const JobPointer &aJob, Method aMethod):
38 UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>(aJob, aMethod, Ssl::PeerConnectorAnswer()) {}
39
40 /* Ssl::PeerConnector::CbDialer API */
41 virtual Ssl::PeerConnectorAnswer &answer() { return arg1; }
42 };
43 #endif
44
45 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
46 peer(cbdataReference(aPeer)),
47 request(),
48 opener(),
49 securer(),
50 closer(),
51 addrUsed(0)
52 {
53 }
54
55 PeerPoolMgr::~PeerPoolMgr()
56 {
57 cbdataReferenceDone(peer);
58 }
59
60 void
61 PeerPoolMgr::start()
62 {
63 AsyncJob::start();
64
65 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
66 // We fake one. TODO: Optionally send this request to peers?
67 request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*");
68 request->SetHost(peer->host);
69
70 checkpoint("peer initialized");
71 }
72
73 void
74 PeerPoolMgr::swanSong()
75 {
76 AsyncJob::swanSong();
77 }
78
79 bool
80 PeerPoolMgr::validPeer() const
81 {
82 return peer && cbdataReferenceValid(peer) && peer->standby.pool;
83 }
84
85 bool
86 PeerPoolMgr::doneAll() const
87 {
88 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
89 }
90
91 void
92 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
93 {
94 opener = NULL;
95
96 if (!validPeer()) {
97 debugs(48, 3, "peer gone");
98 if (params.conn != NULL)
99 params.conn->close();
100 return;
101 }
102
103 if (params.flag != Comm::OK) {
104 /* it might have been a timeout with a partially open link */
105 if (params.conn != NULL)
106 params.conn->close();
107 peerConnectFailed(peer);
108 checkpoint("conn opening failure"); // may retry
109 return;
110 }
111
112 Must(params.conn != NULL);
113
114 #if USE_OPENSSL
115 // Handle SSL peers.
116 if (peer->use_ssl) {
117 typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
118 closer = JobCallback(48, 3, CloserDialer, this,
119 PeerPoolMgr::handleSecureClosure);
120 comm_add_close_handler(params.conn->fd, closer);
121
122 securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
123 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
124
125 const int peerTimeout = peer->connect_timeout > 0 ?
126 peer->connect_timeout : Config.Timeout.peer_connect;
127 const int timeUsed = squid_curtime - params.conn->startTime();
128 // Use positive timeout when less than one second is left for conn.
129 const int timeLeft = max(1, (peerTimeout - timeUsed));
130 Ssl::PeerConnector *connector =
131 new Ssl::PeerConnector(request, params.conn, NULL, securer, timeLeft);
132 AsyncJob::Start(connector); // will call our callback
133 return;
134 }
135 #endif
136
137 pushNewConnection(params.conn);
138 }
139
140 void
141 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
142 {
143 Must(validPeer());
144 Must(Comm::IsConnOpen(conn));
145 peer->standby.pool->push(conn, NULL /* domain */);
146 // push() will trigger a checkpoint()
147 }
148
149 #if USE_OPENSSL
150 void
151 PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer &answer)
152 {
153 Must(securer != NULL);
154 securer = NULL;
155
156 if (closer != NULL) {
157 if (answer.conn != NULL)
158 comm_remove_close_handler(answer.conn->fd, closer);
159 else
160 closer->cancel("securing completed");
161 closer = NULL;
162 }
163
164 if (!validPeer()) {
165 debugs(48, 3, "peer gone");
166 if (answer.conn != NULL)
167 answer.conn->close();
168 return;
169 }
170
171 if (answer.error.get()) {
172 if (answer.conn != NULL)
173 answer.conn->close();
174 // PeerConnector calls peerConnectFailed() for us;
175 checkpoint("conn securing failure"); // may retry
176 return;
177 }
178
179 pushNewConnection(answer.conn);
180 }
181
182 void
183 PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
184 {
185 Must(closer != NULL);
186 Must(securer != NULL);
187 securer->cancel("conn closed by a 3rd party");
188 securer = NULL;
189 closer = NULL;
190 // allow the closing connection to fully close before we check again
191 Checkpoint(this, "conn closure while securing");
192 }
193 #endif
194
195 void
196 PeerPoolMgr::openNewConnection()
197 {
198 // KISS: Do nothing else when we are already doing something.
199 if (opener != NULL || securer != NULL || shutting_down) {
200 debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
201 return; // there will be another checkpoint when we are done opening/securing
202 }
203
204 // Do not talk to a peer until it is ready.
205 if (!neighborUp(peer)) // provides debugging
206 return; // there will be another checkpoint when peer is up
207
208 // Do not violate peer limits.
209 if (!peerCanOpenMore(peer)) { // provides debugging
210 peer->standby.waitingForClose = true; // may already be true
211 return; // there will be another checkpoint when a peer conn closes
212 }
213
214 // Do not violate global restrictions.
215 if (fdUsageHigh()) {
216 debugs(48, 7, "overwhelmed");
217 peer->standby.waitingForClose = true; // may already be true
218 // There will be another checkpoint when a peer conn closes OR when
219 // a future pop() fails due to an empty pool. See PconnPool::pop().
220 return;
221 }
222
223 peer->standby.waitingForClose = false;
224
225 Comm::ConnectionPointer conn = new Comm::Connection;
226 Must(peer->n_addresses); // guaranteed by neighborUp() above
227 // cycle through all available IP addresses
228 conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
229 conn->remote.port(peer->http_port);
230 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
231 conn->setPeer(peer);
232 getOutgoingAddress(request.getRaw(), conn);
233 GetMarkingsToServer(request.getRaw(), *conn);
234
235 const int ctimeout = peer->connect_timeout > 0 ?
236 peer->connect_timeout : Config.Timeout.peer_connect;
237 typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
238 opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
239 Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
240 AsyncJob::Start(cs);
241 }
242
243 void
244 PeerPoolMgr::closeOldConnections(const int howMany)
245 {
246 debugs(48, 8, howMany);
247 peer->standby.pool->closeN(howMany);
248 }
249
250 void
251 PeerPoolMgr::checkpoint(const char *reason)
252 {
253 if (!validPeer()) {
254 debugs(48, 3, reason << " and peer gone");
255 return; // nothing to do after our owner dies; the job will quit
256 }
257
258 const int count = peer->standby.pool->count();
259 const int limit = peer->standby.limit;
260 debugs(48, 7, reason << " with " << count << " ? " << limit);
261
262 if (count < limit)
263 openNewConnection();
264 else if (count > limit)
265 closeOldConnections(count - limit);
266 }
267
268 void
269 PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
270 {
271 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
272 }
273
274 /// launches PeerPoolMgrs for peers configured with standby.limit
275 class PeerPoolMgrsRr: public RegisteredRunner
276 {
277 public:
278 /* RegisteredRunner API */
279 virtual void useConfig() { syncConfig(); }
280 virtual void syncConfig();
281 };
282
283 RunnerRegistrationEntry(PeerPoolMgrsRr);
284
285 void
286 PeerPoolMgrsRr::syncConfig()
287 {
288 for (CachePeer *p = Config.peers; p; p = p->next) {
289 // On reconfigure, Squid deletes the old config (and old peers in it),
290 // so should always be dealing with a brand new configuration.
291 assert(!p->standby.mgr);
292 assert(!p->standby.pool);
293 if (p->standby.limit) {
294 p->standby.mgr = new PeerPoolMgr(p);
295 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
296 AsyncJob::Start(p->standby.mgr.get());
297 }
298 }
299 }