]>
Commit | Line | Data |
---|---|---|
bbc27441 | 1 | /* |
b8ae064d | 2 | * Copyright (C) 1996-2023 The Squid Software Foundation and contributors |
bbc27441 AJ |
3 | * |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
e8dca475 | 9 | #include "squid.h" |
a72b6e88 | 10 | #include "AccessLogEntry.h" |
e5ddd4ce | 11 | #include "base/AsyncCallbacks.h" |
e8dca475 CT |
12 | #include "base/RunnersRegistry.h" |
13 | #include "CachePeer.h" | |
2e24d0bf | 14 | #include "CachePeers.h" |
e8dca475 CT |
15 | #include "comm/Connection.h" |
16 | #include "comm/ConnOpener.h" | |
675b8408 | 17 | #include "debug/Stream.h" |
e8dca475 CT |
18 | #include "fd.h" |
19 | #include "FwdState.h" | |
20 | #include "globals.h" | |
21 | #include "HttpRequest.h" | |
5ceaee75 | 22 | #include "MasterXaction.h" |
e8dca475 CT |
23 | #include "neighbors.h" |
24 | #include "pconn.h" | |
25 | #include "PeerPoolMgr.h" | |
a72b6e88 | 26 | #include "security/BlindPeerConnector.h" |
e8dca475 | 27 | #include "SquidConfig.h" |
e8dca475 CT |
28 | |
29 | CBDATA_CLASS_INIT(PeerPoolMgr); | |
30 | ||
e8dca475 | 31 | PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), |
f53969cc SM |
32 | peer(cbdataReference(aPeer)), |
33 | request(), | |
2b6b1bcb AR |
34 | transportWait(), |
35 | encryptionWait(), | |
f53969cc | 36 | addrUsed(0) |
e8dca475 CT |
37 | { |
38 | } | |
39 | ||
40 | PeerPoolMgr::~PeerPoolMgr() | |
41 | { | |
42 | cbdataReferenceDone(peer); | |
43 | } | |
44 | ||
45 | void | |
46 | PeerPoolMgr::start() | |
47 | { | |
48 | AsyncJob::start(); | |
49 | ||
ad05b958 | 50 | const auto mx = MasterXaction::MakePortless<XactionInitiator::initPeerPool>(); |
e8dca475 CT |
51 | // ErrorState, getOutgoingAddress(), and other APIs may require a request. |
52 | // We fake one. TODO: Optionally send this request to peers? | |
5ceaee75 | 53 | request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "http", "*", mx); |
851feda6 | 54 | request->url.host(peer->host); |
e8dca475 CT |
55 | |
56 | checkpoint("peer initialized"); | |
57 | } | |
58 | ||
59 | void | |
60 | PeerPoolMgr::swanSong() | |
61 | { | |
62 | AsyncJob::swanSong(); | |
63 | } | |
64 | ||
65 | bool | |
e2849af8 A |
66 | PeerPoolMgr::validPeer() const |
67 | { | |
e8dca475 CT |
68 | return peer && cbdataReferenceValid(peer) && peer->standby.pool; |
69 | } | |
70 | ||
71 | bool | |
72 | PeerPoolMgr::doneAll() const | |
73 | { | |
74 | return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll(); | |
75 | } | |
76 | ||
77 | void | |
78 | PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) | |
79 | { | |
2b6b1bcb | 80 | transportWait.finish(); |
e8dca475 CT |
81 | |
82 | if (!validPeer()) { | |
83 | debugs(48, 3, "peer gone"); | |
aee3523a | 84 | if (params.conn != nullptr) |
e8dca475 CT |
85 | params.conn->close(); |
86 | return; | |
87 | } | |
88 | ||
c8407295 | 89 | if (params.flag != Comm::OK) { |
022dbabd | 90 | NoteOutgoingConnectionFailure(peer, Http::scNone); |
e8dca475 CT |
91 | checkpoint("conn opening failure"); // may retry |
92 | return; | |
93 | } | |
94 | ||
aee3523a | 95 | Must(params.conn != nullptr); |
e8dca475 | 96 | |
a72b6e88 | 97 | // Handle TLS peers. |
1f1f29e8 | 98 | if (peer->secure.encryptTransport) { |
2b6b1bcb | 99 | // XXX: Exceptions orphan params.conn |
e5ddd4ce | 100 | const auto callback = asyncCallback(48, 4, PeerPoolMgr::handleSecuredPeer, this); |
8aec3e1b | 101 | |
5f5d319e | 102 | const auto peerTimeout = peer->connectTimeout(); |
8aec3e1b CT |
103 | const int timeUsed = squid_curtime - params.conn->startTime(); |
104 | // Use positive timeout when less than one second is left for conn. | |
0ce8e93b | 105 | const int timeLeft = positiveTimeout(peerTimeout - timeUsed); |
2b6b1bcb AR |
106 | const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); |
107 | encryptionWait.start(connector, callback); | |
e8dca475 CT |
108 | return; |
109 | } | |
e8dca475 CT |
110 | |
111 | pushNewConnection(params.conn); | |
112 | } | |
113 | ||
114 | void | |
115 | PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) | |
116 | { | |
117 | Must(validPeer()); | |
118 | Must(Comm::IsConnOpen(conn)); | |
aee3523a | 119 | peer->standby.pool->push(conn, nullptr /* domain */); |
e8dca475 CT |
120 | // push() will trigger a checkpoint() |
121 | } | |
122 | ||
e8dca475 | 123 | void |
fcfdf7f9 | 124 | PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) |
e8dca475 | 125 | { |
2b6b1bcb | 126 | encryptionWait.finish(); |
e8dca475 CT |
127 | |
128 | if (!validPeer()) { | |
129 | debugs(48, 3, "peer gone"); | |
aee3523a | 130 | if (answer.conn != nullptr) |
e8dca475 CT |
131 | answer.conn->close(); |
132 | return; | |
133 | } | |
134 | ||
2b6b1bcb | 135 | assert(!answer.tunneled); |
e8dca475 | 136 | if (answer.error.get()) { |
2b6b1bcb | 137 | assert(!answer.conn); |
022dbabd | 138 | // PeerConnector calls NoteOutgoingConnectionFailure() for us |
e8dca475 CT |
139 | checkpoint("conn securing failure"); // may retry |
140 | return; | |
141 | } | |
142 | ||
2b6b1bcb | 143 | assert(answer.conn); |
e8dca475 | 144 | |
2b6b1bcb AR |
145 | // The socket could get closed while our callback was queued. Sync |
146 | // Connection. XXX: Connection::fd may already be stale/invalid here. | |
147 | if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { | |
148 | answer.conn->noteClosure(); | |
149 | checkpoint("external connection closure"); // may retry | |
150 | return; | |
151 | } | |
152 | ||
153 | pushNewConnection(answer.conn); | |
e8dca475 | 154 | } |
e8dca475 CT |
155 | |
156 | void | |
157 | PeerPoolMgr::openNewConnection() | |
158 | { | |
159 | // KISS: Do nothing else when we are already doing something. | |
2b6b1bcb AR |
160 | if (transportWait || encryptionWait || shutting_down) { |
161 | debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down); | |
e8dca475 CT |
162 | return; // there will be another checkpoint when we are done opening/securing |
163 | } | |
164 | ||
165 | // Do not talk to a peer until it is ready. | |
166 | if (!neighborUp(peer)) // provides debugging | |
167 | return; // there will be another checkpoint when peer is up | |
168 | ||
169 | // Do not violate peer limits. | |
170 | if (!peerCanOpenMore(peer)) { // provides debugging | |
171 | peer->standby.waitingForClose = true; // may already be true | |
172 | return; // there will be another checkpoint when a peer conn closes | |
173 | } | |
174 | ||
175 | // Do not violate global restrictions. | |
176 | if (fdUsageHigh()) { | |
177 | debugs(48, 7, "overwhelmed"); | |
178 | peer->standby.waitingForClose = true; // may already be true | |
179 | // There will be another checkpoint when a peer conn closes OR when | |
180 | // a future pop() fails due to an empty pool. See PconnPool::pop(). | |
181 | return; | |
182 | } | |
183 | ||
184 | peer->standby.waitingForClose = false; | |
185 | ||
186 | Comm::ConnectionPointer conn = new Comm::Connection; | |
187 | Must(peer->n_addresses); // guaranteed by neighborUp() above | |
188 | // cycle through all available IP addresses | |
189 | conn->remote = peer->addresses[addrUsed++ % peer->n_addresses]; | |
190 | conn->remote.port(peer->http_port); | |
191 | conn->peerType = STANDBY_POOL; // should be reset by peerSelect() | |
192 | conn->setPeer(peer); | |
193 | getOutgoingAddress(request.getRaw(), conn); | |
194 | GetMarkingsToServer(request.getRaw(), *conn); | |
195 | ||
5f5d319e | 196 | const auto ctimeout = peer->connectTimeout(); |
e8dca475 | 197 | typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; |
2b6b1bcb AR |
198 | AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); |
199 | const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); | |
200 | transportWait.start(cs, callback); | |
e8dca475 CT |
201 | } |
202 | ||
203 | void | |
204 | PeerPoolMgr::closeOldConnections(const int howMany) | |
205 | { | |
206 | debugs(48, 8, howMany); | |
207 | peer->standby.pool->closeN(howMany); | |
208 | } | |
209 | ||
210 | void | |
211 | PeerPoolMgr::checkpoint(const char *reason) | |
212 | { | |
213 | if (!validPeer()) { | |
214 | debugs(48, 3, reason << " and peer gone"); | |
215 | return; // nothing to do after our owner dies; the job will quit | |
216 | } | |
217 | ||
218 | const int count = peer->standby.pool->count(); | |
219 | const int limit = peer->standby.limit; | |
220 | debugs(48, 7, reason << " with " << count << " ? " << limit); | |
221 | ||
222 | if (count < limit) | |
223 | openNewConnection(); | |
224 | else if (count > limit) | |
225 | closeOldConnections(count - limit); | |
226 | } | |
227 | ||
228 | void | |
229 | PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason) | |
230 | { | |
231 | CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason); | |
232 | } | |
233 | ||
234 | /// launches PeerPoolMgrs for peers configured with standby.limit | |
235 | class PeerPoolMgrsRr: public RegisteredRunner | |
236 | { | |
237 | public: | |
238 | /* RegisteredRunner API */ | |
337b9aa4 AR |
239 | void useConfig() override { syncConfig(); } |
240 | void syncConfig() override; | |
e8dca475 CT |
241 | }; |
242 | ||
230d4410 | 243 | DefineRunnerRegistrator(PeerPoolMgrsRr); |
e8dca475 CT |
244 | |
245 | void | |
246 | PeerPoolMgrsRr::syncConfig() | |
247 | { | |
2e24d0bf EB |
248 | for (const auto &peer: CurrentCachePeers()) { |
249 | const auto p = peer.get(); | |
e8dca475 CT |
250 | // On reconfigure, Squid deletes the old config (and old peers in it), |
251 | // so should always be dealing with a brand new configuration. | |
252 | assert(!p->standby.mgr); | |
253 | assert(!p->standby.pool); | |
254 | if (p->standby.limit) { | |
255 | p->standby.mgr = new PeerPoolMgr(p); | |
256 | p->standby.pool = new PconnPool(p->name, p->standby.mgr); | |
257 | AsyncJob::Start(p->standby.mgr.get()); | |
258 | } | |
259 | } | |
260 | } | |
f53969cc | 261 |