]> git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
Ensure initClient MasterXactions have listening ports (#993)
[thirdparty/squid.git] / src / PeerPoolMgr.cc
1 /*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "AccessLogEntry.h"
11 #include "base/AsyncJobCalls.h"
12 #include "base/RunnersRegistry.h"
13 #include "CachePeer.h"
14 #include "comm/Connection.h"
15 #include "comm/ConnOpener.h"
16 #include "debug/Stream.h"
17 #include "fd.h"
18 #include "FwdState.h"
19 #include "globals.h"
20 #include "HttpRequest.h"
21 #include "MasterXaction.h"
22 #include "neighbors.h"
23 #include "pconn.h"
24 #include "PeerPoolMgr.h"
25 #include "security/BlindPeerConnector.h"
26 #include "SquidConfig.h"
27
28 CBDATA_CLASS_INIT(PeerPoolMgr);
29
30 /// Gives Security::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
31 class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Security::EncryptorAnswer, Security::EncryptorAnswer&>,
32 public Security::PeerConnector::CbDialer
33 {
34 public:
35 MyAnswerDialer(const JobPointer &aJob, Method aMethod):
36 UnaryMemFunT<PeerPoolMgr, Security::EncryptorAnswer, Security::EncryptorAnswer&>(aJob, aMethod, Security::EncryptorAnswer()) {}
37
38 /* Security::PeerConnector::CbDialer API */
39 virtual Security::EncryptorAnswer &answer() { return arg1; }
40 };
41
42 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
43 peer(cbdataReference(aPeer)),
44 request(),
45 transportWait(),
46 encryptionWait(),
47 addrUsed(0)
48 {
49 }
50
51 PeerPoolMgr::~PeerPoolMgr()
52 {
53 cbdataReferenceDone(peer);
54 }
55
56 void
57 PeerPoolMgr::start()
58 {
59 AsyncJob::start();
60
61 const auto mx = MasterXaction::MakePortless<XactionInitiator::initPeerPool>();
62 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
63 // We fake one. TODO: Optionally send this request to peers?
64 request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "http", "*", mx);
65 request->url.host(peer->host);
66
67 checkpoint("peer initialized");
68 }
69
70 void
71 PeerPoolMgr::swanSong()
72 {
73 AsyncJob::swanSong();
74 }
75
76 bool
77 PeerPoolMgr::validPeer() const
78 {
79 return peer && cbdataReferenceValid(peer) && peer->standby.pool;
80 }
81
82 bool
83 PeerPoolMgr::doneAll() const
84 {
85 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
86 }
87
88 void
89 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
90 {
91 transportWait.finish();
92
93 if (!validPeer()) {
94 debugs(48, 3, "peer gone");
95 if (params.conn != NULL)
96 params.conn->close();
97 return;
98 }
99
100 if (params.flag != Comm::OK) {
101 peerConnectFailed(peer);
102 checkpoint("conn opening failure"); // may retry
103 return;
104 }
105
106 Must(params.conn != NULL);
107
108 // Handle TLS peers.
109 if (peer->secure.encryptTransport) {
110 // XXX: Exceptions orphan params.conn
111 AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
112 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
113
114 const auto peerTimeout = peer->connectTimeout();
115 const int timeUsed = squid_curtime - params.conn->startTime();
116 // Use positive timeout when less than one second is left for conn.
117 const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
118 const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
119 encryptionWait.start(connector, callback);
120 return;
121 }
122
123 pushNewConnection(params.conn);
124 }
125
126 void
127 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
128 {
129 Must(validPeer());
130 Must(Comm::IsConnOpen(conn));
131 peer->standby.pool->push(conn, NULL /* domain */);
132 // push() will trigger a checkpoint()
133 }
134
135 void
136 PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
137 {
138 encryptionWait.finish();
139
140 if (!validPeer()) {
141 debugs(48, 3, "peer gone");
142 if (answer.conn != NULL)
143 answer.conn->close();
144 return;
145 }
146
147 assert(!answer.tunneled);
148 if (answer.error.get()) {
149 assert(!answer.conn);
150 // PeerConnector calls peerConnectFailed() for us;
151 checkpoint("conn securing failure"); // may retry
152 return;
153 }
154
155 assert(answer.conn);
156
157 // The socket could get closed while our callback was queued. Sync
158 // Connection. XXX: Connection::fd may already be stale/invalid here.
159 if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
160 answer.conn->noteClosure();
161 checkpoint("external connection closure"); // may retry
162 return;
163 }
164
165 pushNewConnection(answer.conn);
166 }
167
168 void
169 PeerPoolMgr::openNewConnection()
170 {
171 // KISS: Do nothing else when we are already doing something.
172 if (transportWait || encryptionWait || shutting_down) {
173 debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
174 return; // there will be another checkpoint when we are done opening/securing
175 }
176
177 // Do not talk to a peer until it is ready.
178 if (!neighborUp(peer)) // provides debugging
179 return; // there will be another checkpoint when peer is up
180
181 // Do not violate peer limits.
182 if (!peerCanOpenMore(peer)) { // provides debugging
183 peer->standby.waitingForClose = true; // may already be true
184 return; // there will be another checkpoint when a peer conn closes
185 }
186
187 // Do not violate global restrictions.
188 if (fdUsageHigh()) {
189 debugs(48, 7, "overwhelmed");
190 peer->standby.waitingForClose = true; // may already be true
191 // There will be another checkpoint when a peer conn closes OR when
192 // a future pop() fails due to an empty pool. See PconnPool::pop().
193 return;
194 }
195
196 peer->standby.waitingForClose = false;
197
198 Comm::ConnectionPointer conn = new Comm::Connection;
199 Must(peer->n_addresses); // guaranteed by neighborUp() above
200 // cycle through all available IP addresses
201 conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
202 conn->remote.port(peer->http_port);
203 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
204 conn->setPeer(peer);
205 getOutgoingAddress(request.getRaw(), conn);
206 GetMarkingsToServer(request.getRaw(), *conn);
207
208 const auto ctimeout = peer->connectTimeout();
209 typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
210 AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
211 const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
212 transportWait.start(cs, callback);
213 }
214
215 void
216 PeerPoolMgr::closeOldConnections(const int howMany)
217 {
218 debugs(48, 8, howMany);
219 peer->standby.pool->closeN(howMany);
220 }
221
222 void
223 PeerPoolMgr::checkpoint(const char *reason)
224 {
225 if (!validPeer()) {
226 debugs(48, 3, reason << " and peer gone");
227 return; // nothing to do after our owner dies; the job will quit
228 }
229
230 const int count = peer->standby.pool->count();
231 const int limit = peer->standby.limit;
232 debugs(48, 7, reason << " with " << count << " ? " << limit);
233
234 if (count < limit)
235 openNewConnection();
236 else if (count > limit)
237 closeOldConnections(count - limit);
238 }
239
240 void
241 PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
242 {
243 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
244 }
245
246 /// launches PeerPoolMgrs for peers configured with standby.limit
247 class PeerPoolMgrsRr: public RegisteredRunner
248 {
249 public:
250 /* RegisteredRunner API */
251 virtual void useConfig() { syncConfig(); }
252 virtual void syncConfig();
253 };
254
255 RunnerRegistrationEntry(PeerPoolMgrsRr);
256
257 void
258 PeerPoolMgrsRr::syncConfig()
259 {
260 for (CachePeer *p = Config.peers; p; p = p->next) {
261 // On reconfigure, Squid deletes the old config (and old peers in it),
262 // so should always be dealing with a brand new configuration.
263 assert(!p->standby.mgr);
264 assert(!p->standby.pool);
265 if (p->standby.limit) {
266 p->standby.mgr = new PeerPoolMgr(p);
267 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
268 AsyncJob::Start(p->standby.mgr.get());
269 }
270 }
271 }
272