]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "AccessLogEntry.h"
11 #include "base/AsyncJobCalls.h"
12 #include "base/RunnersRegistry.h"
13 #include "CachePeer.h"
14 #include "comm/Connection.h"
15 #include "comm/ConnOpener.h"
16 #include "debug/Stream.h"
20 #include "HttpRequest.h"
21 #include "MasterXaction.h"
22 #include "neighbors.h"
24 #include "PeerPoolMgr.h"
25 #include "security/BlindPeerConnector.h"
26 #include "SquidConfig.h"
28 CBDATA_CLASS_INIT(PeerPoolMgr
);
30 /// Gives Security::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
31 class MyAnswerDialer
: public UnaryMemFunT
<PeerPoolMgr
, Security::EncryptorAnswer
, Security::EncryptorAnswer
&>,
32 public Security::PeerConnector::CbDialer
35 MyAnswerDialer(const JobPointer
&aJob
, Method aMethod
):
36 UnaryMemFunT
<PeerPoolMgr
, Security::EncryptorAnswer
, Security::EncryptorAnswer
&>(aJob
, aMethod
, Security::EncryptorAnswer()) {}
38 /* Security::PeerConnector::CbDialer API */
39 virtual Security::EncryptorAnswer
&answer() { return arg1
; }
42 PeerPoolMgr::PeerPoolMgr(CachePeer
*aPeer
): AsyncJob("PeerPoolMgr"),
43 peer(cbdataReference(aPeer
)),
51 PeerPoolMgr::~PeerPoolMgr()
53 cbdataReferenceDone(peer
);
61 const auto mx
= MasterXaction::MakePortless
<XactionInitiator::initPeerPool
>();
62 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
63 // We fake one. TODO: Optionally send this request to peers?
64 request
= new HttpRequest(Http::METHOD_OPTIONS
, AnyP::PROTO_HTTP
, "http", "*", mx
);
65 request
->url
.host(peer
->host
);
67 checkpoint("peer initialized");
71 PeerPoolMgr::swanSong()
77 PeerPoolMgr::validPeer() const
79 return peer
&& cbdataReferenceValid(peer
) && peer
->standby
.pool
;
83 PeerPoolMgr::doneAll() const
85 return !(validPeer() && peer
->standby
.limit
) && AsyncJob::doneAll();
89 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams
¶ms
)
91 transportWait
.finish();
94 debugs(48, 3, "peer gone");
95 if (params
.conn
!= NULL
)
100 if (params
.flag
!= Comm::OK
) {
101 peerConnectFailed(peer
);
102 checkpoint("conn opening failure"); // may retry
106 Must(params
.conn
!= NULL
);
109 if (peer
->secure
.encryptTransport
) {
110 // XXX: Exceptions orphan params.conn
111 AsyncCall::Pointer callback
= asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
112 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer
));
114 const auto peerTimeout
= peer
->connectTimeout();
115 const int timeUsed
= squid_curtime
- params
.conn
->startTime();
116 // Use positive timeout when less than one second is left for conn.
117 const int timeLeft
= positiveTimeout(peerTimeout
- timeUsed
);
118 const auto connector
= new Security::BlindPeerConnector(request
, params
.conn
, callback
, nullptr, timeLeft
);
119 encryptionWait
.start(connector
, callback
);
123 pushNewConnection(params
.conn
);
127 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer
&conn
)
130 Must(Comm::IsConnOpen(conn
));
131 peer
->standby
.pool
->push(conn
, NULL
/* domain */);
132 // push() will trigger a checkpoint()
136 PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer
&answer
)
138 encryptionWait
.finish();
141 debugs(48, 3, "peer gone");
142 if (answer
.conn
!= NULL
)
143 answer
.conn
->close();
147 assert(!answer
.tunneled
);
148 if (answer
.error
.get()) {
149 assert(!answer
.conn
);
150 // PeerConnector calls peerConnectFailed() for us;
151 checkpoint("conn securing failure"); // may retry
157 // The socket could get closed while our callback was queued. Sync
158 // Connection. XXX: Connection::fd may already be stale/invalid here.
159 if (answer
.conn
->isOpen() && fd_table
[answer
.conn
->fd
].closing()) {
160 answer
.conn
->noteClosure();
161 checkpoint("external connection closure"); // may retry
165 pushNewConnection(answer
.conn
);
169 PeerPoolMgr::openNewConnection()
171 // KISS: Do nothing else when we are already doing something.
172 if (transportWait
|| encryptionWait
|| shutting_down
) {
173 debugs(48, 7, "busy: " << transportWait
<< '|' << encryptionWait
<< '|' << shutting_down
);
174 return; // there will be another checkpoint when we are done opening/securing
177 // Do not talk to a peer until it is ready.
178 if (!neighborUp(peer
)) // provides debugging
179 return; // there will be another checkpoint when peer is up
181 // Do not violate peer limits.
182 if (!peerCanOpenMore(peer
)) { // provides debugging
183 peer
->standby
.waitingForClose
= true; // may already be true
184 return; // there will be another checkpoint when a peer conn closes
187 // Do not violate global restrictions.
189 debugs(48, 7, "overwhelmed");
190 peer
->standby
.waitingForClose
= true; // may already be true
191 // There will be another checkpoint when a peer conn closes OR when
192 // a future pop() fails due to an empty pool. See PconnPool::pop().
196 peer
->standby
.waitingForClose
= false;
198 Comm::ConnectionPointer conn
= new Comm::Connection
;
199 Must(peer
->n_addresses
); // guaranteed by neighborUp() above
200 // cycle through all available IP addresses
201 conn
->remote
= peer
->addresses
[addrUsed
++ % peer
->n_addresses
];
202 conn
->remote
.port(peer
->http_port
);
203 conn
->peerType
= STANDBY_POOL
; // should be reset by peerSelect()
205 getOutgoingAddress(request
.getRaw(), conn
);
206 GetMarkingsToServer(request
.getRaw(), *conn
);
208 const auto ctimeout
= peer
->connectTimeout();
209 typedef CommCbMemFunT
<PeerPoolMgr
, CommConnectCbParams
> Dialer
;
210 AsyncCall::Pointer callback
= JobCallback(48, 5, Dialer
, this, PeerPoolMgr::handleOpenedConnection
);
211 const auto cs
= new Comm::ConnOpener(conn
, callback
, ctimeout
);
212 transportWait
.start(cs
, callback
);
216 PeerPoolMgr::closeOldConnections(const int howMany
)
218 debugs(48, 8, howMany
);
219 peer
->standby
.pool
->closeN(howMany
);
223 PeerPoolMgr::checkpoint(const char *reason
)
226 debugs(48, 3, reason
<< " and peer gone");
227 return; // nothing to do after our owner dies; the job will quit
230 const int count
= peer
->standby
.pool
->count();
231 const int limit
= peer
->standby
.limit
;
232 debugs(48, 7, reason
<< " with " << count
<< " ? " << limit
);
236 else if (count
> limit
)
237 closeOldConnections(count
- limit
);
241 PeerPoolMgr::Checkpoint(const Pointer
&mgr
, const char *reason
)
243 CallJobHere1(48, 5, mgr
, PeerPoolMgr
, checkpoint
, reason
);
246 /// launches PeerPoolMgrs for peers configured with standby.limit
247 class PeerPoolMgrsRr
: public RegisteredRunner
250 /* RegisteredRunner API */
251 virtual void useConfig() { syncConfig(); }
252 virtual void syncConfig();
255 RunnerRegistrationEntry(PeerPoolMgrsRr
);
258 PeerPoolMgrsRr::syncConfig()
260 for (CachePeer
*p
= Config
.peers
; p
; p
= p
->next
) {
261 // On reconfigure, Squid deletes the old config (and old peers in it),
262 // so should always be dealing with a brand new configuration.
263 assert(!p
->standby
.mgr
);
264 assert(!p
->standby
.pool
);
265 if (p
->standby
.limit
) {
266 p
->standby
.mgr
= new PeerPoolMgr(p
);
267 p
->standby
.pool
= new PconnPool(p
->name
, p
->standby
.mgr
);
268 AsyncJob::Start(p
->standby
.mgr
.get());