]> git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
merge from trunk r13423
[thirdparty/squid.git] / src / PeerPoolMgr.cc
1 #include "squid.h"
2 #include "base/AsyncJobCalls.h"
3 #include "base/RunnersRegistry.h"
4 #include "CachePeer.h"
5 #include "comm/Connection.h"
6 #include "comm/ConnOpener.h"
7 #include "Debug.h"
8 #include "fd.h"
9 #include "FwdState.h"
10 #include "globals.h"
11 #include "HttpRequest.h"
12 #include "neighbors.h"
13 #include "pconn.h"
14 #include "PeerPoolMgr.h"
15 #include "SquidConfig.h"
16 #if USE_OPENSSL
17 #include "ssl/PeerConnector.h"
18 #endif
19
20 CBDATA_CLASS_INIT(PeerPoolMgr);
21
22 #if USE_OPENSSL
23 /// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
24 class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>,
25 public Ssl::PeerConnector::CbDialer
26 {
27 public:
28 MyAnswerDialer(const JobPointer &aJob, Method aMethod):
29 UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>(aJob, aMethod, Ssl::PeerConnectorAnswer()) {}
30
31 /* Ssl::PeerConnector::CbDialer API */
32 virtual Ssl::PeerConnectorAnswer &answer() { return arg1; }
33 };
34 #endif
35
36 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
37 peer(cbdataReference(aPeer)),
38 request(),
39 opener(),
40 securer(),
41 closer(),
42 addrUsed(0)
43 {
44 }
45
46 PeerPoolMgr::~PeerPoolMgr()
47 {
48 cbdataReferenceDone(peer);
49 }
50
51 void
52 PeerPoolMgr::start()
53 {
54 AsyncJob::start();
55
56 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
57 // We fake one. TODO: Optionally send this request to peers?
58 request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*");
59 request->SetHost(peer->host);
60
61 checkpoint("peer initialized");
62 }
63
64 void
65 PeerPoolMgr::swanSong()
66 {
67 AsyncJob::swanSong();
68 }
69
70 bool
71 PeerPoolMgr::validPeer() const
72 {
73 return peer && cbdataReferenceValid(peer) && peer->standby.pool;
74 }
75
76 bool
77 PeerPoolMgr::doneAll() const
78 {
79 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
80 }
81
82 void
83 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
84 {
85 opener = NULL;
86
87 if (!validPeer()) {
88 debugs(48, 3, "peer gone");
89 if (params.conn != NULL)
90 params.conn->close();
91 return;
92 }
93
94 if (params.flag != COMM_OK) {
95 /* it might have been a timeout with a partially open link */
96 if (params.conn != NULL)
97 params.conn->close();
98 peerConnectFailed(peer);
99 checkpoint("conn opening failure"); // may retry
100 return;
101 }
102
103 Must(params.conn != NULL);
104
105 #if USE_OPENSSL
106 // Handle SSL peers.
107 if (peer->use_ssl) {
108 typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
109 closer = JobCallback(48, 3, CloserDialer, this,
110 PeerPoolMgr::handleSecureClosure);
111 comm_add_close_handler(params.conn->fd, closer);
112
113 securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
114 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
115 Ssl::PeerConnector *connector =
116 new Ssl::PeerConnector(request, params.conn, securer);
117 AsyncJob::Start(connector); // will call our callback
118 return;
119 }
120 #endif
121
122 pushNewConnection(params.conn);
123 }
124
125 void
126 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
127 {
128 Must(validPeer());
129 Must(Comm::IsConnOpen(conn));
130 peer->standby.pool->push(conn, NULL /* domain */);
131 // push() will trigger a checkpoint()
132 }
133
134 #if USE_OPENSSL
135 void
136 PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer &answer)
137 {
138 Must(securer != NULL);
139 securer = NULL;
140
141 if (closer != NULL) {
142 if (answer.conn != NULL)
143 comm_remove_close_handler(answer.conn->fd, closer);
144 else
145 closer->cancel("securing completed");
146 closer = NULL;
147 }
148
149 if (!validPeer()) {
150 debugs(48, 3, "peer gone");
151 if (answer.conn != NULL)
152 answer.conn->close();
153 return;
154 }
155
156 if (answer.error.get()) {
157 if (answer.conn != NULL)
158 answer.conn->close();
159 // PeerConnector calls peerConnectFailed() for us;
160 checkpoint("conn securing failure"); // may retry
161 return;
162 }
163
164 pushNewConnection(answer.conn);
165 }
166
167 void
168 PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
169 {
170 Must(closer != NULL);
171 Must(securer != NULL);
172 securer->cancel("conn closed by a 3rd party");
173 securer = NULL;
174 closer = NULL;
175 // allow the closing connection to fully close before we check again
176 Checkpoint(this, "conn closure while securing");
177 }
178 #endif
179
180 void
181 PeerPoolMgr::openNewConnection()
182 {
183 // KISS: Do nothing else when we are already doing something.
184 if (opener != NULL || securer != NULL || shutting_down) {
185 debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
186 return; // there will be another checkpoint when we are done opening/securing
187 }
188
189 // Do not talk to a peer until it is ready.
190 if (!neighborUp(peer)) // provides debugging
191 return; // there will be another checkpoint when peer is up
192
193 // Do not violate peer limits.
194 if (!peerCanOpenMore(peer)) { // provides debugging
195 peer->standby.waitingForClose = true; // may already be true
196 return; // there will be another checkpoint when a peer conn closes
197 }
198
199 // Do not violate global restrictions.
200 if (fdUsageHigh()) {
201 debugs(48, 7, "overwhelmed");
202 peer->standby.waitingForClose = true; // may already be true
203 // There will be another checkpoint when a peer conn closes OR when
204 // a future pop() fails due to an empty pool. See PconnPool::pop().
205 return;
206 }
207
208 peer->standby.waitingForClose = false;
209
210 Comm::ConnectionPointer conn = new Comm::Connection;
211 Must(peer->n_addresses); // guaranteed by neighborUp() above
212 // cycle through all available IP addresses
213 conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
214 conn->remote.port(peer->http_port);
215 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
216 conn->setPeer(peer);
217 getOutgoingAddress(request.getRaw(), conn);
218 GetMarkingsToServer(request.getRaw(), *conn);
219
220 const int ctimeout = peer->connect_timeout > 0 ?
221 peer->connect_timeout : Config.Timeout.peer_connect;
222 typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
223 opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
224 Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
225 AsyncJob::Start(cs);
226 }
227
228 void
229 PeerPoolMgr::closeOldConnections(const int howMany)
230 {
231 debugs(48, 8, howMany);
232 peer->standby.pool->closeN(howMany);
233 }
234
235 void
236 PeerPoolMgr::checkpoint(const char *reason)
237 {
238 if (!validPeer()) {
239 debugs(48, 3, reason << " and peer gone");
240 return; // nothing to do after our owner dies; the job will quit
241 }
242
243 const int count = peer->standby.pool->count();
244 const int limit = peer->standby.limit;
245 debugs(48, 7, reason << " with " << count << " ? " << limit);
246
247 if (count < limit)
248 openNewConnection();
249 else if (count > limit)
250 closeOldConnections(count - limit);
251 }
252
253 void
254 PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
255 {
256 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
257 }
258
259 /// launches PeerPoolMgrs for peers configured with standby.limit
260 class PeerPoolMgrsRr: public RegisteredRunner
261 {
262 public:
263 /* RegisteredRunner API */
264 virtual void useConfig() { syncConfig(); }
265 virtual void syncConfig();
266 };
267
268 RunnerRegistrationEntry(PeerPoolMgrsRr);
269
270 void
271 PeerPoolMgrsRr::syncConfig()
272 {
273 for (CachePeer *p = Config.peers; p; p = p->next) {
274 // On reconfigure, Squid deletes the old config (and old peers in it),
275 // so should always be dealing with a brand new configuration.
276 assert(!p->standby.mgr);
277 assert(!p->standby.pool);
278 if (p->standby.limit) {
279 p->standby.mgr = new PeerPoolMgr(p);
280 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
281 AsyncJob::Start(p->standby.mgr.get());
282 }
283 }
284 }