]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/PeerPoolMgr.cc
2 #include "base/AsyncJobCalls.h"
3 #include "base/RunnersRegistry.h"
5 #include "comm/Connection.h"
6 #include "comm/ConnOpener.h"
11 #include "HttpRequest.h"
12 #include "neighbors.h"
14 #include "PeerPoolMgr.h"
15 #include "SquidConfig.h"
17 #include "ssl/PeerConnector.h"
20 CBDATA_CLASS_INIT(PeerPoolMgr
);
23 /// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
24 class MyAnswerDialer
: public UnaryMemFunT
<PeerPoolMgr
, Ssl::PeerConnectorAnswer
, Ssl::PeerConnectorAnswer
&>,
25 public Ssl::PeerConnector::CbDialer
28 MyAnswerDialer(const JobPointer
&aJob
, Method aMethod
):
29 UnaryMemFunT
<PeerPoolMgr
, Ssl::PeerConnectorAnswer
, Ssl::PeerConnectorAnswer
&>(aJob
, aMethod
, Ssl::PeerConnectorAnswer()) {}
31 /* Ssl::PeerConnector::CbDialer API */
32 virtual Ssl::PeerConnectorAnswer
&answer() { return arg1
; }
36 PeerPoolMgr::PeerPoolMgr(CachePeer
*aPeer
): AsyncJob("PeerPoolMgr"),
37 peer(cbdataReference(aPeer
)),
46 PeerPoolMgr::~PeerPoolMgr()
48 cbdataReferenceDone(peer
);
56 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
57 // We fake one. TODO: Optionally send this request to peers?
58 request
= new HttpRequest(Http::METHOD_OPTIONS
, AnyP::PROTO_HTTP
, "*");
59 request
->SetHost(peer
->host
);
61 checkpoint("peer initialized");
65 PeerPoolMgr::swanSong()
71 PeerPoolMgr::validPeer() const
73 return peer
&& cbdataReferenceValid(peer
) && peer
->standby
.pool
;
77 PeerPoolMgr::doneAll() const
79 return !(validPeer() && peer
->standby
.limit
) && AsyncJob::doneAll();
83 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams
¶ms
)
88 debugs(48, 3, "peer gone");
89 if (params
.conn
!= NULL
)
94 if (params
.flag
!= Comm::OK
) {
95 /* it might have been a timeout with a partially open link */
96 if (params
.conn
!= NULL
)
98 peerConnectFailed(peer
);
99 checkpoint("conn opening failure"); // may retry
103 Must(params
.conn
!= NULL
);
108 typedef CommCbMemFunT
<PeerPoolMgr
, CommCloseCbParams
> CloserDialer
;
109 closer
= JobCallback(48, 3, CloserDialer
, this,
110 PeerPoolMgr::handleSecureClosure
);
111 comm_add_close_handler(params
.conn
->fd
, closer
);
113 securer
= asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
114 MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer
));
115 Ssl::PeerConnector
*connector
=
116 new Ssl::PeerConnector(request
, params
.conn
, securer
);
117 AsyncJob::Start(connector
); // will call our callback
122 pushNewConnection(params
.conn
);
126 PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer
&conn
)
129 Must(Comm::IsConnOpen(conn
));
130 peer
->standby
.pool
->push(conn
, NULL
/* domain */);
131 // push() will trigger a checkpoint()
136 PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer
&answer
)
138 Must(securer
!= NULL
);
141 if (closer
!= NULL
) {
142 if (answer
.conn
!= NULL
)
143 comm_remove_close_handler(answer
.conn
->fd
, closer
);
145 closer
->cancel("securing completed");
150 debugs(48, 3, "peer gone");
151 if (answer
.conn
!= NULL
)
152 answer
.conn
->close();
156 if (answer
.error
.get()) {
157 if (answer
.conn
!= NULL
)
158 answer
.conn
->close();
159 // PeerConnector calls peerConnectFailed() for us;
160 checkpoint("conn securing failure"); // may retry
164 pushNewConnection(answer
.conn
);
168 PeerPoolMgr::handleSecureClosure(const CommCloseCbParams
¶ms
)
170 Must(closer
!= NULL
);
171 Must(securer
!= NULL
);
172 securer
->cancel("conn closed by a 3rd party");
175 // allow the closing connection to fully close before we check again
176 Checkpoint(this, "conn closure while securing");
181 PeerPoolMgr::openNewConnection()
183 // KISS: Do nothing else when we are already doing something.
184 if (opener
!= NULL
|| securer
!= NULL
|| shutting_down
) {
185 debugs(48, 7, "busy: " << opener
<< '|' << securer
<< '|' << shutting_down
);
186 return; // there will be another checkpoint when we are done opening/securing
189 // Do not talk to a peer until it is ready.
190 if (!neighborUp(peer
)) // provides debugging
191 return; // there will be another checkpoint when peer is up
193 // Do not violate peer limits.
194 if (!peerCanOpenMore(peer
)) { // provides debugging
195 peer
->standby
.waitingForClose
= true; // may already be true
196 return; // there will be another checkpoint when a peer conn closes
199 // Do not violate global restrictions.
201 debugs(48, 7, "overwhelmed");
202 peer
->standby
.waitingForClose
= true; // may already be true
203 // There will be another checkpoint when a peer conn closes OR when
204 // a future pop() fails due to an empty pool. See PconnPool::pop().
208 peer
->standby
.waitingForClose
= false;
210 Comm::ConnectionPointer conn
= new Comm::Connection
;
211 Must(peer
->n_addresses
); // guaranteed by neighborUp() above
212 // cycle through all available IP addresses
213 conn
->remote
= peer
->addresses
[addrUsed
++ % peer
->n_addresses
];
214 conn
->remote
.port(peer
->http_port
);
215 conn
->peerType
= STANDBY_POOL
; // should be reset by peerSelect()
217 getOutgoingAddress(request
.getRaw(), conn
);
218 GetMarkingsToServer(request
.getRaw(), *conn
);
220 const int ctimeout
= peer
->connect_timeout
> 0 ?
221 peer
->connect_timeout
: Config
.Timeout
.peer_connect
;
222 typedef CommCbMemFunT
<PeerPoolMgr
, CommConnectCbParams
> Dialer
;
223 opener
= JobCallback(48, 5, Dialer
, this, PeerPoolMgr::handleOpenedConnection
);
224 Comm::ConnOpener
*cs
= new Comm::ConnOpener(conn
, opener
, ctimeout
);
229 PeerPoolMgr::closeOldConnections(const int howMany
)
231 debugs(48, 8, howMany
);
232 peer
->standby
.pool
->closeN(howMany
);
236 PeerPoolMgr::checkpoint(const char *reason
)
239 debugs(48, 3, reason
<< " and peer gone");
240 return; // nothing to do after our owner dies; the job will quit
243 const int count
= peer
->standby
.pool
->count();
244 const int limit
= peer
->standby
.limit
;
245 debugs(48, 7, reason
<< " with " << count
<< " ? " << limit
);
249 else if (count
> limit
)
250 closeOldConnections(count
- limit
);
254 PeerPoolMgr::Checkpoint(const Pointer
&mgr
, const char *reason
)
256 CallJobHere1(48, 5, mgr
, PeerPoolMgr
, checkpoint
, reason
);
259 /// launches PeerPoolMgrs for peers configured with standby.limit
260 class PeerPoolMgrsRr
: public RegisteredRunner
263 /* RegisteredRunner API */
264 virtual void useConfig() { syncConfig(); }
265 virtual void syncConfig();
268 RunnerRegistrationEntry(PeerPoolMgrsRr
);
271 PeerPoolMgrsRr::syncConfig()
273 for (CachePeer
*p
= Config
.peers
; p
; p
= p
->next
) {
274 // On reconfigure, Squid deletes the old config (and old peers in it),
275 // so should always be dealing with a brand new configuration.
276 assert(!p
->standby
.mgr
);
277 assert(!p
->standby
.pool
);
278 if (p
->standby
.limit
) {
279 p
->standby
.mgr
= new PeerPoolMgr(p
);
280 p
->standby
.pool
= new PconnPool(p
->name
, p
->standby
.mgr
);
281 AsyncJob::Start(p
->standby
.mgr
.get());