]>
Commit | Line | Data |
---|---|---|
bbc27441 | 1 | /* |
bf95c10a | 2 | * Copyright (C) 1996-2022 The Squid Software Foundation and contributors |
bbc27441 AJ |
3 | * |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
e8dca475 | 9 | #include "squid.h" |
a72b6e88 | 10 | #include "AccessLogEntry.h" |
e5ddd4ce | 11 | #include "base/AsyncCallbacks.h" |
e8dca475 CT |
12 | #include "base/RunnersRegistry.h" |
13 | #include "CachePeer.h" | |
14 | #include "comm/Connection.h" | |
15 | #include "comm/ConnOpener.h" | |
675b8408 | 16 | #include "debug/Stream.h" |
e8dca475 CT |
17 | #include "fd.h" |
18 | #include "FwdState.h" | |
19 | #include "globals.h" | |
20 | #include "HttpRequest.h" | |
5ceaee75 | 21 | #include "MasterXaction.h" |
e8dca475 CT |
22 | #include "neighbors.h" |
23 | #include "pconn.h" | |
24 | #include "PeerPoolMgr.h" | |
a72b6e88 | 25 | #include "security/BlindPeerConnector.h" |
e8dca475 | 26 | #include "SquidConfig.h" |
e8dca475 CT |
27 | |
28 | CBDATA_CLASS_INIT(PeerPoolMgr); | |
29 | ||
e8dca475 | 30 | PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), |
f53969cc SM |
31 | peer(cbdataReference(aPeer)), |
32 | request(), | |
2b6b1bcb AR |
33 | transportWait(), |
34 | encryptionWait(), | |
f53969cc | 35 | addrUsed(0) |
e8dca475 CT |
36 | { |
37 | } | |
38 | ||
39 | PeerPoolMgr::~PeerPoolMgr() | |
40 | { | |
41 | cbdataReferenceDone(peer); | |
42 | } | |
43 | ||
44 | void | |
45 | PeerPoolMgr::start() | |
46 | { | |
47 | AsyncJob::start(); | |
48 | ||
ad05b958 | 49 | const auto mx = MasterXaction::MakePortless<XactionInitiator::initPeerPool>(); |
e8dca475 CT |
50 | // ErrorState, getOutgoingAddress(), and other APIs may require a request. |
51 | // We fake one. TODO: Optionally send this request to peers? | |
5ceaee75 | 52 | request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "http", "*", mx); |
851feda6 | 53 | request->url.host(peer->host); |
e8dca475 CT |
54 | |
55 | checkpoint("peer initialized"); | |
56 | } | |
57 | ||
58 | void | |
59 | PeerPoolMgr::swanSong() | |
60 | { | |
61 | AsyncJob::swanSong(); | |
62 | } | |
63 | ||
64 | bool | |
e2849af8 A |
65 | PeerPoolMgr::validPeer() const |
66 | { | |
e8dca475 CT |
67 | return peer && cbdataReferenceValid(peer) && peer->standby.pool; |
68 | } | |
69 | ||
70 | bool | |
71 | PeerPoolMgr::doneAll() const | |
72 | { | |
73 | return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll(); | |
74 | } | |
75 | ||
76 | void | |
77 | PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) | |
78 | { | |
2b6b1bcb | 79 | transportWait.finish(); |
e8dca475 CT |
80 | |
81 | if (!validPeer()) { | |
82 | debugs(48, 3, "peer gone"); | |
aee3523a | 83 | if (params.conn != nullptr) |
e8dca475 CT |
84 | params.conn->close(); |
85 | return; | |
86 | } | |
87 | ||
c8407295 | 88 | if (params.flag != Comm::OK) { |
022dbabd | 89 | NoteOutgoingConnectionFailure(peer, Http::scNone); |
e8dca475 CT |
90 | checkpoint("conn opening failure"); // may retry |
91 | return; | |
92 | } | |
93 | ||
aee3523a | 94 | Must(params.conn != nullptr); |
e8dca475 | 95 | |
a72b6e88 | 96 | // Handle TLS peers. |
1f1f29e8 | 97 | if (peer->secure.encryptTransport) { |
2b6b1bcb | 98 | // XXX: Exceptions orphan params.conn |
e5ddd4ce | 99 | const auto callback = asyncCallback(48, 4, PeerPoolMgr::handleSecuredPeer, this); |
8aec3e1b | 100 | |
5f5d319e | 101 | const auto peerTimeout = peer->connectTimeout(); |
8aec3e1b CT |
102 | const int timeUsed = squid_curtime - params.conn->startTime(); |
103 | // Use positive timeout when less than one second is left for conn. | |
0ce8e93b | 104 | const int timeLeft = positiveTimeout(peerTimeout - timeUsed); |
2b6b1bcb AR |
105 | const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); |
106 | encryptionWait.start(connector, callback); | |
e8dca475 CT |
107 | return; |
108 | } | |
e8dca475 CT |
109 | |
110 | pushNewConnection(params.conn); | |
111 | } | |
112 | ||
113 | void | |
114 | PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) | |
115 | { | |
116 | Must(validPeer()); | |
117 | Must(Comm::IsConnOpen(conn)); | |
aee3523a | 118 | peer->standby.pool->push(conn, nullptr /* domain */); |
e8dca475 CT |
119 | // push() will trigger a checkpoint() |
120 | } | |
121 | ||
e8dca475 | 122 | void |
fcfdf7f9 | 123 | PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) |
e8dca475 | 124 | { |
2b6b1bcb | 125 | encryptionWait.finish(); |
e8dca475 CT |
126 | |
127 | if (!validPeer()) { | |
128 | debugs(48, 3, "peer gone"); | |
aee3523a | 129 | if (answer.conn != nullptr) |
e8dca475 CT |
130 | answer.conn->close(); |
131 | return; | |
132 | } | |
133 | ||
2b6b1bcb | 134 | assert(!answer.tunneled); |
e8dca475 | 135 | if (answer.error.get()) { |
2b6b1bcb | 136 | assert(!answer.conn); |
022dbabd | 137 | // PeerConnector calls NoteOutgoingConnectionFailure() for us |
e8dca475 CT |
138 | checkpoint("conn securing failure"); // may retry |
139 | return; | |
140 | } | |
141 | ||
2b6b1bcb | 142 | assert(answer.conn); |
e8dca475 | 143 | |
2b6b1bcb AR |
144 | // The socket could get closed while our callback was queued. Sync |
145 | // Connection. XXX: Connection::fd may already be stale/invalid here. | |
146 | if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { | |
147 | answer.conn->noteClosure(); | |
148 | checkpoint("external connection closure"); // may retry | |
149 | return; | |
150 | } | |
151 | ||
152 | pushNewConnection(answer.conn); | |
e8dca475 | 153 | } |
e8dca475 CT |
154 | |
155 | void | |
156 | PeerPoolMgr::openNewConnection() | |
157 | { | |
158 | // KISS: Do nothing else when we are already doing something. | |
2b6b1bcb AR |
159 | if (transportWait || encryptionWait || shutting_down) { |
160 | debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down); | |
e8dca475 CT |
161 | return; // there will be another checkpoint when we are done opening/securing |
162 | } | |
163 | ||
164 | // Do not talk to a peer until it is ready. | |
165 | if (!neighborUp(peer)) // provides debugging | |
166 | return; // there will be another checkpoint when peer is up | |
167 | ||
168 | // Do not violate peer limits. | |
169 | if (!peerCanOpenMore(peer)) { // provides debugging | |
170 | peer->standby.waitingForClose = true; // may already be true | |
171 | return; // there will be another checkpoint when a peer conn closes | |
172 | } | |
173 | ||
174 | // Do not violate global restrictions. | |
175 | if (fdUsageHigh()) { | |
176 | debugs(48, 7, "overwhelmed"); | |
177 | peer->standby.waitingForClose = true; // may already be true | |
178 | // There will be another checkpoint when a peer conn closes OR when | |
179 | // a future pop() fails due to an empty pool. See PconnPool::pop(). | |
180 | return; | |
181 | } | |
182 | ||
183 | peer->standby.waitingForClose = false; | |
184 | ||
185 | Comm::ConnectionPointer conn = new Comm::Connection; | |
186 | Must(peer->n_addresses); // guaranteed by neighborUp() above | |
187 | // cycle through all available IP addresses | |
188 | conn->remote = peer->addresses[addrUsed++ % peer->n_addresses]; | |
189 | conn->remote.port(peer->http_port); | |
190 | conn->peerType = STANDBY_POOL; // should be reset by peerSelect() | |
191 | conn->setPeer(peer); | |
192 | getOutgoingAddress(request.getRaw(), conn); | |
193 | GetMarkingsToServer(request.getRaw(), *conn); | |
194 | ||
5f5d319e | 195 | const auto ctimeout = peer->connectTimeout(); |
e8dca475 | 196 | typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; |
2b6b1bcb AR |
197 | AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); |
198 | const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); | |
199 | transportWait.start(cs, callback); | |
e8dca475 CT |
200 | } |
201 | ||
202 | void | |
203 | PeerPoolMgr::closeOldConnections(const int howMany) | |
204 | { | |
205 | debugs(48, 8, howMany); | |
206 | peer->standby.pool->closeN(howMany); | |
207 | } | |
208 | ||
209 | void | |
210 | PeerPoolMgr::checkpoint(const char *reason) | |
211 | { | |
212 | if (!validPeer()) { | |
213 | debugs(48, 3, reason << " and peer gone"); | |
214 | return; // nothing to do after our owner dies; the job will quit | |
215 | } | |
216 | ||
217 | const int count = peer->standby.pool->count(); | |
218 | const int limit = peer->standby.limit; | |
219 | debugs(48, 7, reason << " with " << count << " ? " << limit); | |
220 | ||
221 | if (count < limit) | |
222 | openNewConnection(); | |
223 | else if (count > limit) | |
224 | closeOldConnections(count - limit); | |
225 | } | |
226 | ||
227 | void | |
228 | PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason) | |
229 | { | |
230 | CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason); | |
231 | } | |
232 | ||
233 | /// launches PeerPoolMgrs for peers configured with standby.limit | |
234 | class PeerPoolMgrsRr: public RegisteredRunner | |
235 | { | |
236 | public: | |
237 | /* RegisteredRunner API */ | |
337b9aa4 AR |
238 | void useConfig() override { syncConfig(); } |
239 | void syncConfig() override; | |
e8dca475 CT |
240 | }; |
241 | ||
242 | RunnerRegistrationEntry(PeerPoolMgrsRr); | |
243 | ||
244 | void | |
245 | PeerPoolMgrsRr::syncConfig() | |
246 | { | |
247 | for (CachePeer *p = Config.peers; p; p = p->next) { | |
248 | // On reconfigure, Squid deletes the old config (and old peers in it), | |
249 | // so should always be dealing with a brand new configuration. | |
250 | assert(!p->standby.mgr); | |
251 | assert(!p->standby.pool); | |
252 | if (p->standby.limit) { | |
253 | p->standby.mgr = new PeerPoolMgr(p); | |
254 | p->standby.pool = new PconnPool(p->name, p->standby.mgr); | |
255 | AsyncJob::Start(p->standby.mgr.get()); | |
256 | } | |
257 | } | |
258 | } | |
f53969cc | 259 |