]> git.ipfire.org Git - thirdparty/squid.git/blame - src/PeerPoolMgr.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / PeerPoolMgr.cc
CommitLineData
bbc27441
AJ
1/*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
e8dca475
CT
9#include "squid.h"
10#include "base/AsyncJobCalls.h"
11#include "base/RunnersRegistry.h"
12#include "CachePeer.h"
13#include "comm/Connection.h"
14#include "comm/ConnOpener.h"
15#include "Debug.h"
16#include "fd.h"
17#include "FwdState.h"
18#include "globals.h"
19#include "HttpRequest.h"
20#include "neighbors.h"
21#include "pconn.h"
22#include "PeerPoolMgr.h"
23#include "SquidConfig.h"
8aec3e1b 24#include "SquidTime.h"
e8dca475
CT
25#if USE_OPENSSL
26#include "ssl/PeerConnector.h"
27#endif
28
29CBDATA_CLASS_INIT(PeerPoolMgr);
30
31#if USE_OPENSSL
32/// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
33class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>,
e4f14091 34 public Ssl::PeerConnector::CbDialer
e8dca475
CT
35{
36public:
37 MyAnswerDialer(const JobPointer &aJob, Method aMethod):
e2849af8 38 UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>(aJob, aMethod, Ssl::PeerConnectorAnswer()) {}
e8dca475
CT
39
40 /* Ssl::PeerConnector::CbDialer API */
41 virtual Ssl::PeerConnectorAnswer &answer() { return arg1; }
42};
43#endif
44
45PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
46 peer(cbdataReference(aPeer)),
47 request(),
48 opener(),
49 securer(),
50 closer(),
51 addrUsed(0)
52{
53}
54
55PeerPoolMgr::~PeerPoolMgr()
56{
57 cbdataReferenceDone(peer);
58}
59
60void
61PeerPoolMgr::start()
62{
63 AsyncJob::start();
64
65 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
66 // We fake one. TODO: Optionally send this request to peers?
67 request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*");
68 request->SetHost(peer->host);
69
70 checkpoint("peer initialized");
71}
72
73void
74PeerPoolMgr::swanSong()
75{
76 AsyncJob::swanSong();
77}
78
79bool
e2849af8
A
80PeerPoolMgr::validPeer() const
81{
e8dca475
CT
82 return peer && cbdataReferenceValid(peer) && peer->standby.pool;
83}
84
85bool
86PeerPoolMgr::doneAll() const
87{
88 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
89}
90
91void
92PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
93{
94 opener = NULL;
95
96 if (!validPeer()) {
97 debugs(48, 3, "peer gone");
98 if (params.conn != NULL)
99 params.conn->close();
100 return;
101 }
102
c8407295 103 if (params.flag != Comm::OK) {
e8dca475
CT
104 /* it might have been a timeout with a partially open link */
105 if (params.conn != NULL)
106 params.conn->close();
107 peerConnectFailed(peer);
108 checkpoint("conn opening failure"); // may retry
109 return;
110 }
111
112 Must(params.conn != NULL);
113
114#if USE_OPENSSL
115 // Handle SSL peers.
116 if (peer->use_ssl) {
117 typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
118 closer = JobCallback(48, 3, CloserDialer, this,
119 PeerPoolMgr::handleSecureClosure);
120 comm_add_close_handler(params.conn->fd, closer);
121
122 securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
123 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
8aec3e1b
CT
124
125 const int peerTimeout = peer->connect_timeout > 0 ?
719dc243 126 peer->connect_timeout : Config.Timeout.peer_connect;
8aec3e1b
CT
127 const int timeUsed = squid_curtime - params.conn->startTime();
128 // Use positive timeout when less than one second is left for conn.
129 const int timeLeft = max(1, (peerTimeout - timeUsed));
e8dca475 130 Ssl::PeerConnector *connector =
d1f3d8f8 131 new Ssl::PeerConnector(request, params.conn, NULL, securer, timeLeft);
e8dca475
CT
132 AsyncJob::Start(connector); // will call our callback
133 return;
134 }
135#endif
136
137 pushNewConnection(params.conn);
138}
139
140void
141PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
142{
143 Must(validPeer());
144 Must(Comm::IsConnOpen(conn));
145 peer->standby.pool->push(conn, NULL /* domain */);
146 // push() will trigger a checkpoint()
147}
148
149#if USE_OPENSSL
150void
151PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer &answer)
152{
153 Must(securer != NULL);
154 securer = NULL;
155
156 if (closer != NULL) {
157 if (answer.conn != NULL)
158 comm_remove_close_handler(answer.conn->fd, closer);
159 else
160 closer->cancel("securing completed");
161 closer = NULL;
162 }
163
164 if (!validPeer()) {
165 debugs(48, 3, "peer gone");
166 if (answer.conn != NULL)
167 answer.conn->close();
168 return;
169 }
170
171 if (answer.error.get()) {
172 if (answer.conn != NULL)
173 answer.conn->close();
174 // PeerConnector calls peerConnectFailed() for us;
175 checkpoint("conn securing failure"); // may retry
176 return;
177 }
178
179 pushNewConnection(answer.conn);
180}
181
182void
183PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
184{
185 Must(closer != NULL);
186 Must(securer != NULL);
187 securer->cancel("conn closed by a 3rd party");
188 securer = NULL;
189 closer = NULL;
190 // allow the closing connection to fully close before we check again
191 Checkpoint(this, "conn closure while securing");
192}
193#endif
194
195void
196PeerPoolMgr::openNewConnection()
197{
198 // KISS: Do nothing else when we are already doing something.
199 if (opener != NULL || securer != NULL || shutting_down) {
200 debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
201 return; // there will be another checkpoint when we are done opening/securing
202 }
203
204 // Do not talk to a peer until it is ready.
205 if (!neighborUp(peer)) // provides debugging
206 return; // there will be another checkpoint when peer is up
207
208 // Do not violate peer limits.
209 if (!peerCanOpenMore(peer)) { // provides debugging
210 peer->standby.waitingForClose = true; // may already be true
211 return; // there will be another checkpoint when a peer conn closes
212 }
213
214 // Do not violate global restrictions.
215 if (fdUsageHigh()) {
216 debugs(48, 7, "overwhelmed");
217 peer->standby.waitingForClose = true; // may already be true
218 // There will be another checkpoint when a peer conn closes OR when
219 // a future pop() fails due to an empty pool. See PconnPool::pop().
220 return;
221 }
222
223 peer->standby.waitingForClose = false;
224
225 Comm::ConnectionPointer conn = new Comm::Connection;
226 Must(peer->n_addresses); // guaranteed by neighborUp() above
227 // cycle through all available IP addresses
228 conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
229 conn->remote.port(peer->http_port);
230 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
231 conn->setPeer(peer);
232 getOutgoingAddress(request.getRaw(), conn);
233 GetMarkingsToServer(request.getRaw(), *conn);
234
235 const int ctimeout = peer->connect_timeout > 0 ?
236 peer->connect_timeout : Config.Timeout.peer_connect;
237 typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
238 opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
239 Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
240 AsyncJob::Start(cs);
241}
242
243void
244PeerPoolMgr::closeOldConnections(const int howMany)
245{
246 debugs(48, 8, howMany);
247 peer->standby.pool->closeN(howMany);
248}
249
250void
251PeerPoolMgr::checkpoint(const char *reason)
252{
253 if (!validPeer()) {
254 debugs(48, 3, reason << " and peer gone");
255 return; // nothing to do after our owner dies; the job will quit
256 }
257
258 const int count = peer->standby.pool->count();
259 const int limit = peer->standby.limit;
260 debugs(48, 7, reason << " with " << count << " ? " << limit);
261
262 if (count < limit)
263 openNewConnection();
264 else if (count > limit)
265 closeOldConnections(count - limit);
266}
267
268void
269PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
270{
271 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
272}
273
274/// launches PeerPoolMgrs for peers configured with standby.limit
275class PeerPoolMgrsRr: public RegisteredRunner
276{
277public:
278 /* RegisteredRunner API */
279 virtual void useConfig() { syncConfig(); }
280 virtual void syncConfig();
281};
282
283RunnerRegistrationEntry(PeerPoolMgrsRr);
284
285void
286PeerPoolMgrsRr::syncConfig()
287{
288 for (CachePeer *p = Config.peers; p; p = p->next) {
289 // On reconfigure, Squid deletes the old config (and old peers in it),
290 // so should always be dealing with a brand new configuration.
291 assert(!p->standby.mgr);
292 assert(!p->standby.pool);
293 if (p->standby.limit) {
294 p->standby.mgr = new PeerPoolMgr(p);
295 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
296 AsyncJob::Start(p->standby.mgr.get());
297 }
298 }
299}