]> git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
Merged from trunk (r13515).
[thirdparty/squid.git] / src / PeerPoolMgr.cc
1 #include "squid.h"
2 #include "base/AsyncJobCalls.h"
3 #include "base/RunnersRegistry.h"
4 #include "CachePeer.h"
5 #include "comm/Connection.h"
6 #include "comm/ConnOpener.h"
7 #include "Debug.h"
8 #include "fd.h"
9 #include "FwdState.h"
10 #include "globals.h"
11 #include "HttpRequest.h"
12 #include "neighbors.h"
13 #include "pconn.h"
14 #include "PeerPoolMgr.h"
15 #include "SquidConfig.h"
16 #include "SquidTime.h"
17 #if USE_OPENSSL
18 #include "ssl/PeerConnector.h"
19 #endif
20
21 CBDATA_CLASS_INIT(PeerPoolMgr);
22
23 #if USE_OPENSSL
24 /// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
25 class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>,
26 public Ssl::PeerConnector::CbDialer
27 {
28 public:
29 MyAnswerDialer(const JobPointer &aJob, Method aMethod):
30 UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>(aJob, aMethod, Ssl::PeerConnectorAnswer()) {}
31
32 /* Ssl::PeerConnector::CbDialer API */
33 virtual Ssl::PeerConnectorAnswer &answer() { return arg1; }
34 };
35 #endif
36
37 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
38 peer(cbdataReference(aPeer)),
39 request(),
40 opener(),
41 securer(),
42 closer(),
43 addrUsed(0)
44 {
45 }
46
47 PeerPoolMgr::~PeerPoolMgr()
48 {
49 cbdataReferenceDone(peer);
50 }
51
52 void
53 PeerPoolMgr::start()
54 {
55 AsyncJob::start();
56
57 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
58 // We fake one. TODO: Optionally send this request to peers?
59 request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*");
60 request->SetHost(peer->host);
61
62 checkpoint("peer initialized");
63 }
64
65 void
66 PeerPoolMgr::swanSong()
67 {
68 AsyncJob::swanSong();
69 }
70
71 bool
72 PeerPoolMgr::validPeer() const
73 {
74 return peer && cbdataReferenceValid(peer) && peer->standby.pool;
75 }
76
77 bool
78 PeerPoolMgr::doneAll() const
79 {
80 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
81 }
82
83 void
84 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
85 {
86 opener = NULL;
87
88 if (!validPeer()) {
89 debugs(48, 3, "peer gone");
90 if (params.conn != NULL)
91 params.conn->close();
92 return;
93 }
94
95 if (params.flag != Comm::OK) {
96 /* it might have been a timeout with a partially open link */
97 if (params.conn != NULL)
98 params.conn->close();
99 peerConnectFailed(peer);
100 checkpoint("conn opening failure"); // may retry
101 return;
102 }
103
104 Must(params.conn != NULL);
105
106 #if USE_OPENSSL
107 // Handle SSL peers.
108 if (peer->use_ssl) {
109 typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
110 closer = JobCallback(48, 3, CloserDialer, this,
111 PeerPoolMgr::handleSecureClosure);
112 comm_add_close_handler(params.conn->fd, closer);
113
114 securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
115 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
116
117 const int peerTimeout = peer->connect_timeout > 0 ?
118 peer->connect_timeout : Config.Timeout.peer_connect;
119 const int timeUsed = squid_curtime - params.conn->startTime();
120 // Use positive timeout when less than one second is left for conn.
121 const int timeLeft = max(1, (peerTimeout - timeUsed));
122 Ssl::PeerConnector *connector =
123 new Ssl::PeerConnector(request, params.conn, securer, timeLeft);
124 AsyncJob::Start(connector); // will call our callback
125 return;
126 }
127 #endif
128
129 pushNewConnection(params.conn);
130 }
131
132 void
133 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
134 {
135 Must(validPeer());
136 Must(Comm::IsConnOpen(conn));
137 peer->standby.pool->push(conn, NULL /* domain */);
138 // push() will trigger a checkpoint()
139 }
140
141 #if USE_OPENSSL
142 void
143 PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer &answer)
144 {
145 Must(securer != NULL);
146 securer = NULL;
147
148 if (closer != NULL) {
149 if (answer.conn != NULL)
150 comm_remove_close_handler(answer.conn->fd, closer);
151 else
152 closer->cancel("securing completed");
153 closer = NULL;
154 }
155
156 if (!validPeer()) {
157 debugs(48, 3, "peer gone");
158 if (answer.conn != NULL)
159 answer.conn->close();
160 return;
161 }
162
163 if (answer.error.get()) {
164 if (answer.conn != NULL)
165 answer.conn->close();
166 // PeerConnector calls peerConnectFailed() for us;
167 checkpoint("conn securing failure"); // may retry
168 return;
169 }
170
171 pushNewConnection(answer.conn);
172 }
173
174 void
175 PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
176 {
177 Must(closer != NULL);
178 Must(securer != NULL);
179 securer->cancel("conn closed by a 3rd party");
180 securer = NULL;
181 closer = NULL;
182 // allow the closing connection to fully close before we check again
183 Checkpoint(this, "conn closure while securing");
184 }
185 #endif
186
187 void
188 PeerPoolMgr::openNewConnection()
189 {
190 // KISS: Do nothing else when we are already doing something.
191 if (opener != NULL || securer != NULL || shutting_down) {
192 debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
193 return; // there will be another checkpoint when we are done opening/securing
194 }
195
196 // Do not talk to a peer until it is ready.
197 if (!neighborUp(peer)) // provides debugging
198 return; // there will be another checkpoint when peer is up
199
200 // Do not violate peer limits.
201 if (!peerCanOpenMore(peer)) { // provides debugging
202 peer->standby.waitingForClose = true; // may already be true
203 return; // there will be another checkpoint when a peer conn closes
204 }
205
206 // Do not violate global restrictions.
207 if (fdUsageHigh()) {
208 debugs(48, 7, "overwhelmed");
209 peer->standby.waitingForClose = true; // may already be true
210 // There will be another checkpoint when a peer conn closes OR when
211 // a future pop() fails due to an empty pool. See PconnPool::pop().
212 return;
213 }
214
215 peer->standby.waitingForClose = false;
216
217 Comm::ConnectionPointer conn = new Comm::Connection;
218 Must(peer->n_addresses); // guaranteed by neighborUp() above
219 // cycle through all available IP addresses
220 conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
221 conn->remote.port(peer->http_port);
222 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
223 conn->setPeer(peer);
224 getOutgoingAddress(request.getRaw(), conn);
225 GetMarkingsToServer(request.getRaw(), *conn);
226
227 const int ctimeout = peer->connect_timeout > 0 ?
228 peer->connect_timeout : Config.Timeout.peer_connect;
229 typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
230 opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
231 Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
232 AsyncJob::Start(cs);
233 }
234
235 void
236 PeerPoolMgr::closeOldConnections(const int howMany)
237 {
238 debugs(48, 8, howMany);
239 peer->standby.pool->closeN(howMany);
240 }
241
242 void
243 PeerPoolMgr::checkpoint(const char *reason)
244 {
245 if (!validPeer()) {
246 debugs(48, 3, reason << " and peer gone");
247 return; // nothing to do after our owner dies; the job will quit
248 }
249
250 const int count = peer->standby.pool->count();
251 const int limit = peer->standby.limit;
252 debugs(48, 7, reason << " with " << count << " ? " << limit);
253
254 if (count < limit)
255 openNewConnection();
256 else if (count > limit)
257 closeOldConnections(count - limit);
258 }
259
260 void
261 PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
262 {
263 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
264 }
265
266 /// launches PeerPoolMgrs for peers configured with standby.limit
267 class PeerPoolMgrsRr: public RegisteredRunner
268 {
269 public:
270 /* RegisteredRunner API */
271 virtual void useConfig() { syncConfig(); }
272 virtual void syncConfig();
273 };
274
275 RunnerRegistrationEntry(PeerPoolMgrsRr);
276
277 void
278 PeerPoolMgrsRr::syncConfig()
279 {
280 for (CachePeer *p = Config.peers; p; p = p->next) {
281 // On reconfigure, Squid deletes the old config (and old peers in it),
282 // so should always be dealing with a brand new configuration.
283 assert(!p->standby.mgr);
284 assert(!p->standby.pool);
285 if (p->standby.limit) {
286 p->standby.mgr = new PeerPoolMgr(p);
287 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
288 AsyncJob::Start(p->standby.mgr.get());
289 }
290 }
291 }