]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (C) 1996-2023 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | #include "squid.h" | |
10 | #include "AccessLogEntry.h" | |
11 | #include "base/AsyncCallbacks.h" | |
12 | #include "base/RunnersRegistry.h" | |
13 | #include "CachePeer.h" | |
14 | #include "CachePeers.h" | |
15 | #include "comm/Connection.h" | |
16 | #include "comm/ConnOpener.h" | |
17 | #include "debug/Stream.h" | |
18 | #include "fd.h" | |
19 | #include "FwdState.h" | |
20 | #include "globals.h" | |
21 | #include "HttpRequest.h" | |
22 | #include "MasterXaction.h" | |
23 | #include "neighbors.h" | |
24 | #include "pconn.h" | |
25 | #include "PeerPoolMgr.h" | |
26 | #include "security/BlindPeerConnector.h" | |
27 | #include "SquidConfig.h" | |
28 | ||
29 | CBDATA_CLASS_INIT(PeerPoolMgr); | |
30 | ||
31 | PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), | |
32 | peer(cbdataReference(aPeer)), | |
33 | request(), | |
34 | transportWait(), | |
35 | encryptionWait(), | |
36 | addrUsed(0) | |
37 | { | |
38 | } | |
39 | ||
40 | PeerPoolMgr::~PeerPoolMgr() | |
41 | { | |
42 | cbdataReferenceDone(peer); | |
43 | } | |
44 | ||
45 | void | |
46 | PeerPoolMgr::start() | |
47 | { | |
48 | AsyncJob::start(); | |
49 | ||
50 | const auto mx = MasterXaction::MakePortless<XactionInitiator::initPeerPool>(); | |
51 | // ErrorState, getOutgoingAddress(), and other APIs may require a request. | |
52 | // We fake one. TODO: Optionally send this request to peers? | |
53 | request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "http", "*", mx); | |
54 | request->url.host(peer->host); | |
55 | ||
56 | checkpoint("peer initialized"); | |
57 | } | |
58 | ||
59 | void | |
60 | PeerPoolMgr::swanSong() | |
61 | { | |
62 | AsyncJob::swanSong(); | |
63 | } | |
64 | ||
65 | bool | |
66 | PeerPoolMgr::validPeer() const | |
67 | { | |
68 | return peer && cbdataReferenceValid(peer) && peer->standby.pool; | |
69 | } | |
70 | ||
71 | bool | |
72 | PeerPoolMgr::doneAll() const | |
73 | { | |
74 | return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll(); | |
75 | } | |
76 | ||
77 | void | |
78 | PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) | |
79 | { | |
80 | transportWait.finish(); | |
81 | ||
82 | if (!validPeer()) { | |
83 | debugs(48, 3, "peer gone"); | |
84 | if (params.conn != nullptr) | |
85 | params.conn->close(); | |
86 | return; | |
87 | } | |
88 | ||
89 | if (params.flag != Comm::OK) { | |
90 | NoteOutgoingConnectionFailure(peer, Http::scNone); | |
91 | checkpoint("conn opening failure"); // may retry | |
92 | return; | |
93 | } | |
94 | ||
95 | Must(params.conn != nullptr); | |
96 | ||
97 | // Handle TLS peers. | |
98 | if (peer->secure.encryptTransport) { | |
99 | // XXX: Exceptions orphan params.conn | |
100 | const auto callback = asyncCallback(48, 4, PeerPoolMgr::handleSecuredPeer, this); | |
101 | ||
102 | const auto peerTimeout = peer->connectTimeout(); | |
103 | const int timeUsed = squid_curtime - params.conn->startTime(); | |
104 | // Use positive timeout when less than one second is left for conn. | |
105 | const int timeLeft = positiveTimeout(peerTimeout - timeUsed); | |
106 | const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); | |
107 | encryptionWait.start(connector, callback); | |
108 | return; | |
109 | } | |
110 | ||
111 | pushNewConnection(params.conn); | |
112 | } | |
113 | ||
114 | void | |
115 | PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) | |
116 | { | |
117 | Must(validPeer()); | |
118 | Must(Comm::IsConnOpen(conn)); | |
119 | peer->standby.pool->push(conn, nullptr /* domain */); | |
120 | // push() will trigger a checkpoint() | |
121 | } | |
122 | ||
123 | void | |
124 | PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) | |
125 | { | |
126 | encryptionWait.finish(); | |
127 | ||
128 | if (!validPeer()) { | |
129 | debugs(48, 3, "peer gone"); | |
130 | if (answer.conn != nullptr) | |
131 | answer.conn->close(); | |
132 | return; | |
133 | } | |
134 | ||
135 | assert(!answer.tunneled); | |
136 | if (answer.error.get()) { | |
137 | assert(!answer.conn); | |
138 | // PeerConnector calls NoteOutgoingConnectionFailure() for us | |
139 | checkpoint("conn securing failure"); // may retry | |
140 | return; | |
141 | } | |
142 | ||
143 | assert(answer.conn); | |
144 | ||
145 | // The socket could get closed while our callback was queued. Sync | |
146 | // Connection. XXX: Connection::fd may already be stale/invalid here. | |
147 | if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { | |
148 | answer.conn->noteClosure(); | |
149 | checkpoint("external connection closure"); // may retry | |
150 | return; | |
151 | } | |
152 | ||
153 | pushNewConnection(answer.conn); | |
154 | } | |
155 | ||
156 | void | |
157 | PeerPoolMgr::openNewConnection() | |
158 | { | |
159 | // KISS: Do nothing else when we are already doing something. | |
160 | if (transportWait || encryptionWait || shutting_down) { | |
161 | debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down); | |
162 | return; // there will be another checkpoint when we are done opening/securing | |
163 | } | |
164 | ||
165 | // Do not talk to a peer until it is ready. | |
166 | if (!neighborUp(peer)) // provides debugging | |
167 | return; // there will be another checkpoint when peer is up | |
168 | ||
169 | // Do not violate peer limits. | |
170 | if (!peerCanOpenMore(peer)) { // provides debugging | |
171 | peer->standby.waitingForClose = true; // may already be true | |
172 | return; // there will be another checkpoint when a peer conn closes | |
173 | } | |
174 | ||
175 | // Do not violate global restrictions. | |
176 | if (fdUsageHigh()) { | |
177 | debugs(48, 7, "overwhelmed"); | |
178 | peer->standby.waitingForClose = true; // may already be true | |
179 | // There will be another checkpoint when a peer conn closes OR when | |
180 | // a future pop() fails due to an empty pool. See PconnPool::pop(). | |
181 | return; | |
182 | } | |
183 | ||
184 | peer->standby.waitingForClose = false; | |
185 | ||
186 | Comm::ConnectionPointer conn = new Comm::Connection; | |
187 | Must(peer->n_addresses); // guaranteed by neighborUp() above | |
188 | // cycle through all available IP addresses | |
189 | conn->remote = peer->addresses[addrUsed++ % peer->n_addresses]; | |
190 | conn->remote.port(peer->http_port); | |
191 | conn->peerType = STANDBY_POOL; // should be reset by peerSelect() | |
192 | conn->setPeer(peer); | |
193 | getOutgoingAddress(request.getRaw(), conn); | |
194 | GetMarkingsToServer(request.getRaw(), *conn); | |
195 | ||
196 | const auto ctimeout = peer->connectTimeout(); | |
197 | typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; | |
198 | AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); | |
199 | const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); | |
200 | transportWait.start(cs, callback); | |
201 | } | |
202 | ||
203 | void | |
204 | PeerPoolMgr::closeOldConnections(const int howMany) | |
205 | { | |
206 | debugs(48, 8, howMany); | |
207 | peer->standby.pool->closeN(howMany); | |
208 | } | |
209 | ||
210 | void | |
211 | PeerPoolMgr::checkpoint(const char *reason) | |
212 | { | |
213 | if (!validPeer()) { | |
214 | debugs(48, 3, reason << " and peer gone"); | |
215 | return; // nothing to do after our owner dies; the job will quit | |
216 | } | |
217 | ||
218 | const int count = peer->standby.pool->count(); | |
219 | const int limit = peer->standby.limit; | |
220 | debugs(48, 7, reason << " with " << count << " ? " << limit); | |
221 | ||
222 | if (count < limit) | |
223 | openNewConnection(); | |
224 | else if (count > limit) | |
225 | closeOldConnections(count - limit); | |
226 | } | |
227 | ||
228 | void | |
229 | PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason) | |
230 | { | |
231 | CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason); | |
232 | } | |
233 | ||
234 | /// launches PeerPoolMgrs for peers configured with standby.limit | |
235 | class PeerPoolMgrsRr: public RegisteredRunner | |
236 | { | |
237 | public: | |
238 | /* RegisteredRunner API */ | |
239 | void useConfig() override { syncConfig(); } | |
240 | void syncConfig() override; | |
241 | }; | |
242 | ||
243 | DefineRunnerRegistrator(PeerPoolMgrsRr); | |
244 | ||
245 | void | |
246 | PeerPoolMgrsRr::syncConfig() | |
247 | { | |
248 | for (const auto &peer: CurrentCachePeers()) { | |
249 | const auto p = peer.get(); | |
250 | // On reconfigure, Squid deletes the old config (and old peers in it), | |
251 | // so should always be dealing with a brand new configuration. | |
252 | assert(!p->standby.mgr); | |
253 | assert(!p->standby.pool); | |
254 | if (p->standby.limit) { | |
255 | p->standby.mgr = new PeerPoolMgr(p); | |
256 | p->standby.pool = new PconnPool(p->name, p->standby.mgr); | |
257 | AsyncJob::Start(p->standby.mgr.get()); | |
258 | } | |
259 | } | |
260 | } | |
261 |