http = nullptr;
}
-void
-Downloader::CbDialer::print(std::ostream &os) const
+std::ostream &
+operator <<(std::ostream &os, const DownloaderAnswer &answer)
{
- os << " Http Status:" << status << Raw("body data", object.rawContent(), 64).hex();
+ os << "outcome=" << answer.outcome;
+ if (answer.outcome == Http::scOkay)
+ os << ", resource.size=" << answer.resource.length();
+ return os;
}
-Downloader::Downloader(const SBuf &url, const AsyncCall::Pointer &aCallback, const MasterXactionPointer &masterXaction, unsigned int level):
+Downloader::Downloader(const SBuf &url, const AsyncCallback<Answer> &cb, const MasterXactionPointer &mx, const unsigned int level):
AsyncJob("Downloader"),
url_(url),
- callback_(aCallback),
+ callback_(cb),
level_(level),
- masterXaction_(masterXaction)
+ masterXaction_(mx)
{
}
Downloader::callBack(Http::StatusCode const statusCode)
{
assert(callback_);
- CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
- Must(dialer);
- dialer->status = statusCode;
+ auto &answer = callback_.answer();
+ answer.outcome = statusCode;
if (statusCode == Http::scOkay)
- dialer->object = object_;
- ScheduleCallHere(callback_);
- callback_ = nullptr;
+ answer.resource = object_;
+ ScheduleCallHere(callback_.release());
// We cannot deleteThis() because we may be called synchronously from
// doCallouts() via handleReply() (XXX), and doCallouts() may crash if we
#ifndef SQUID_DOWNLOADER_H
#define SQUID_DOWNLOADER_H
+#include "base/AsyncCallbacks.h"
#include "base/AsyncJob.h"
#include "defines.h"
#include "http/forward.h"
class MasterXaction;
using MasterXactionPointer = RefCount<MasterXaction>;
+/// download result
+class DownloaderAnswer {
+public:
+ // The content of a successfully received HTTP 200 OK reply to our GET request.
+ // Unused unless outcome is Http::scOkay.
+ SBuf resource;
+
+ /// Download result summary.
+ /// May differ from the status code of the downloaded HTTP reply.
+ Http::StatusCode outcome = Http::scNone;
+};
+
+std::ostream &operator <<(std::ostream &, const DownloaderAnswer &);
+
/// The Downloader class fetches SBuf-storable things for other Squid
/// components/transactions using internal requests. For example, it is used
/// to fetch missing intermediate certificates when validating origin server
{
CBDATA_CLASS(Downloader);
public:
+ using Answer = DownloaderAnswer;
- /// Callback data to use with Downloader callbacks.
- class CbDialer: public CallDialer {
- public:
- CbDialer(): status(Http::scNone) {}
- virtual ~CbDialer() {}
-
- /* CallDialer API */
- virtual bool canDial(AsyncCall &call) = 0;
- virtual void dial(AsyncCall &call) = 0;
- virtual void print(std::ostream &os) const;
-
- SBuf object;
- Http::StatusCode status;
- };
-
- Downloader(const SBuf &url, const AsyncCall::Pointer &aCallback, const MasterXactionPointer &, unsigned int level = 0);
+ Downloader(const SBuf &url, const AsyncCallback<Answer> &, const MasterXactionPointer &, unsigned int level = 0);
virtual ~Downloader();
virtual void swanSong();
static const size_t MaxObjectSize = 1*1024*1024;
SBuf url_; ///< the url to download
- AsyncCall::Pointer callback_; ///< callback to call when download finishes
+
+ /// answer destination
+ AsyncCallback<Answer> callback_;
+
SBuf object_; ///< the object body data
const unsigned int level_; ///< holds the nested downloads level
MasterXactionPointer masterXaction_; ///< download transaction context
#include "acl/FilledChecklist.h"
#include "acl/Gadgets.h"
#include "anyp/PortCfg.h"
+#include "base/AsyncCallbacks.h"
+#include "base/AsyncCbdataCalls.h"
#include "CacheManager.h"
#include "CachePeer.h"
#include "client_side.h"
CBDATA_CLASS_INIT(FwdState);
-class FwdStatePeerAnswerDialer: public CallDialer, public Security::PeerConnector::CbDialer
-{
-public:
- typedef void (FwdState::*Method)(Security::EncryptorAnswer &);
-
- FwdStatePeerAnswerDialer(Method method, FwdState *fwd):
- method_(method), fwd_(fwd), answer_() {}
-
- /* CallDialer API */
- virtual bool canDial(AsyncCall &) { return fwd_.valid(); }
- void dial(AsyncCall &) { ((&(*fwd_))->*method_)(answer_); }
- virtual void print(std::ostream &os) const {
- os << '(' << fwd_.get() << ", " << answer_ << ')';
- }
-
- /* Security::PeerConnector::CbDialer API */
- virtual Security::EncryptorAnswer &answer() { return answer_; }
-
-private:
- Method method_;
- CbcPointer<FwdState> fwd_;
- Security::EncryptorAnswer answer_;
-};
-
void
FwdState::HandleStoreAbort(FwdState *fwd)
{
void
FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
{
- AsyncCall::Pointer callback = asyncCall(17,4,
- "FwdState::tunnelEstablishmentDone",
- Http::Tunneler::CbDialer<FwdState>(&FwdState::tunnelEstablishmentDone, this));
+ const auto callback = asyncCallback(17, 4, FwdState::tunnelEstablishmentDone, this);
HttpRequest::Pointer requestPointer = request;
const auto tunneler = new Http::Tunneler(conn, requestPointer, callback, connectingTimeout(conn), al);
FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
{
HttpRequest::Pointer requestPointer = request;
- AsyncCall::Pointer callback = asyncCall(17,4,
- "FwdState::ConnectedToPeer",
- FwdStatePeerAnswerDialer(&FwdState::connectedToPeer, this));
+ const auto callback = asyncCallback(17, 4, FwdState::connectedToPeer, this);
const auto sslNegotiationTimeout = connectingTimeout(conn);
Security::PeerConnector *connector = nullptr;
#if USE_OPENSSL
request->hier.startPeerClock();
- AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
-
+ const auto callback = asyncCallback(17, 5, FwdState::noteConnection, this);
HttpRequest::Pointer cause = request;
const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
cs->setHost(request->url.host());
#include "squid.h"
#include "AccessLogEntry.h"
+#include "base/AsyncCallbacks.h"
#include "base/CodeContext.h"
#include "CachePeer.h"
#include "errorpage.h"
/* HappyConnOpener */
-HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const AsyncCall::Pointer &aCall, HttpRequest::Pointer &request, const time_t aFwdStart, int tries, const AccessLogEntry::Pointer &anAle):
+HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const AsyncCallback<Answer> &callback, const HttpRequest::Pointer &request, const time_t aFwdStart, const int tries, const AccessLogEntry::Pointer &anAle):
AsyncJob("HappyConnOpener"),
fwdStart(aFwdStart),
- callback_(aCall),
+ callback_(callback),
destinations(dests),
prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"),
spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
n_tries(tries)
{
assert(destinations);
- assert(dynamic_cast<Answer*>(callback_->getDialer()));
}
HappyConnOpener::~HappyConnOpener()
HappyConnOpener::futureAnswer(const PeerConnectionPointer &conn)
{
if (callback_ && !callback_->canceled()) {
- const auto answer = dynamic_cast<Answer *>(callback_->getDialer());
- assert(answer);
- answer->conn = conn;
- answer->n_tries = n_tries;
- return answer;
+ auto &answer = callback_.answer();
+ answer.conn = conn;
+ answer.n_tries = n_tries;
+ return &answer;
}
+ (void)callback_.release();
return nullptr;
}
if (auto *answer = futureAnswer(conn)) {
answer->reused = reused;
assert(!answer->error);
- ScheduleCallHere(callback_);
+ ScheduleCallHere(callback_.release());
}
- callback_ = nullptr;
}
/// cancels the in-progress attempt, making its path a future candidate
answer->error = lastError;
assert(answer->error.valid());
lastError = nullptr; // the answer owns it now
- ScheduleCallHere(callback_);
+ ScheduleCallHere(callback_.release());
}
- callback_ = nullptr;
}
void
public:
typedef HappyConnOpenerAnswer Answer;
- /// AsyncCall dialer for our callback. Gives us access to callback Answer.
- template <class Initiator>
- class CbDialer: public CallDialer, public Answer {
- public:
- // initiator method to receive our answer
- typedef void (Initiator::*Method)(Answer &);
-
- CbDialer(Method method, Initiator *initiator): initiator_(initiator), method_(method) {}
- virtual ~CbDialer() = default;
-
- /* CallDialer API */
- bool canDial(AsyncCall &) { return initiator_.valid(); }
- void dial(AsyncCall &) {((*initiator_).*method_)(*this); }
- virtual void print(std::ostream &os) const override {
- os << '(' << static_cast<const Answer&>(*this) << ')';
- }
-
- private:
- CbcPointer<Initiator> initiator_; ///< object to deliver the answer to
- Method method_; ///< initiator_ method to call with the answer
- };
-
public:
- HappyConnOpener(const ResolvedPeersPointer &, const AsyncCall::Pointer &, HttpRequestPointer &, const time_t aFwdStart, int tries, const AccessLogEntryPointer &al);
+ HappyConnOpener(const ResolvedPeersPointer &, const AsyncCallback<Answer> &, const HttpRequestPointer &, time_t aFwdStart, int tries, const AccessLogEntryPointer &);
virtual ~HappyConnOpener() override;
/// configures reuse of old connections
const time_t fwdStart; ///< requestor start time
- AsyncCall::Pointer callback_; ///< handler to be called on connection completion.
+ /// answer destination
+ AsyncCallback<Answer> callback_;
/// Candidate paths. Shared with the initiator. May not be finalized yet.
ResolvedPeersPointer destinations;
#include "squid.h"
#include "AccessLogEntry.h"
-#include "base/AsyncJobCalls.h"
+#include "base/AsyncCallbacks.h"
#include "base/RunnersRegistry.h"
#include "CachePeer.h"
#include "comm/Connection.h"
CBDATA_CLASS_INIT(PeerPoolMgr);
-/// Gives Security::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
-class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Security::EncryptorAnswer, Security::EncryptorAnswer&>,
- public Security::PeerConnector::CbDialer
-{
-public:
- MyAnswerDialer(const JobPointer &aJob, Method aMethod):
- UnaryMemFunT<PeerPoolMgr, Security::EncryptorAnswer, Security::EncryptorAnswer&>(aJob, aMethod, Security::EncryptorAnswer()) {}
-
- /* Security::PeerConnector::CbDialer API */
- virtual Security::EncryptorAnswer &answer() { return arg1; }
-};
-
PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
peer(cbdataReference(aPeer)),
request(),
// Handle TLS peers.
if (peer->secure.encryptTransport) {
// XXX: Exceptions orphan params.conn
- AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
- MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+ const auto callback = asyncCallback(48, 4, PeerPoolMgr::handleSecuredPeer, this);
const auto peerTimeout = peer->connectTimeout();
const int timeUsed = squid_curtime - params.conn->startTime();
#include "base/forward.h"
#include "base/Optional.h"
+#include "base/TypeTraits.h"
#include <limits>
-#include <type_traits>
// TODO: Move to src/base/Math.h and drop the Math namespace
// If Sum() performance becomes important, consider using GCC and clang
// built-ins like __builtin_add_overflow() instead of manual overflow checks.
-/// std::enable_if_t replacement until C++14
-/// simplifies declarations further below
-template <bool B, class T = void>
-using EnableIfType = typename std::enable_if<B,T>::type;
-
/// detects a pair of unsigned types
/// reduces code duplication in declarations further below
template <typename T, typename U>
#include "adaptation/icap/Config.h"
#include "adaptation/icap/Launcher.h"
#include "adaptation/icap/Xaction.h"
+#include "base/AsyncCallbacks.h"
#include "base/JobWait.h"
#include "base/Optional.h"
#include "base/TextException.h"
#include "security/PeerConnector.h"
#include "SquidConfig.h"
-/// Gives Security::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
-class MyIcapAnswerDialer: public UnaryMemFunT<Adaptation::Icap::Xaction, Security::EncryptorAnswer, Security::EncryptorAnswer&>,
- public Security::PeerConnector::CbDialer
-{
-public:
- MyIcapAnswerDialer(const JobPointer &aJob, Method aMethod):
- UnaryMemFunT<Adaptation::Icap::Xaction, Security::EncryptorAnswer, Security::EncryptorAnswer&>(aJob, aMethod, Security::EncryptorAnswer()) {}
-
- /* Security::PeerConnector::CbDialer API */
- virtual Security::EncryptorAnswer &answer() { return arg1; }
-};
-
namespace Ssl
{
/// A simple PeerConnector for Secure ICAP services. No SslBump capabilities.
IcapPeerConnector(
Adaptation::Icap::ServiceRep::Pointer &service,
const Comm::ConnectionPointer &aServerConn,
- AsyncCall::Pointer &aCallback,
+ const AsyncCallback<Security::EncryptorAnswer> &aCallback,
AccessLogEntry::Pointer const &alp,
const time_t timeout = 0):
AsyncJob("Ssl::IcapPeerConnector"),
const auto &ssl = fd_table[conn->fd].ssl;
if (!ssl && service().cfg().secure.encryptTransport) {
// XXX: Exceptions orphan conn.
- CbcPointer<Adaptation::Icap::Xaction> me(this);
- AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
- MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
-
+ const auto callback = asyncCallback(93, 4, Adaptation::Icap::Xaction::handleSecuredPeer, this);
const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
encryptionWait.start(sslConnector, callback);
}
bool
-ScheduleCall(const char *fileName, int fileLine, AsyncCall::Pointer &call)
+ScheduleCall(const char *fileName, int fileLine, const AsyncCall::Pointer &call)
{
debugs(call->debugSection, call->debugLevel, fileName << "(" << fileLine <<
") will call " << *call << " [" << call->id << ']' );
// can be called from canFire() for debugging; always returns false
bool cancel(const char *reason);
- bool canceled() { return isCanceled != nullptr; }
+ bool canceled() const { return isCanceled != nullptr; }
virtual CallDialer *getDialer() = 0;
\ingroup AsyncCallAPI
* This template implements an AsyncCall using a specified Dialer class
*/
-template <class Dialer>
+template <class DialerClass>
class AsyncCallT: public AsyncCall
{
public:
+ using Dialer = DialerClass;
+
AsyncCallT(int aDebugSection, int aDebugLevel, const char *aName,
const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
dialer(aDialer) {}
CallDialer *getDialer() { return &dialer; }
+ Dialer dialer;
+
protected:
virtual bool canFire() {
return AsyncCall::canFire() &&
}
virtual void fire() { dialer.dial(*this); }
- Dialer dialer;
-
private:
AsyncCallT & operator=(const AsyncCallT &); // not defined. call assignments not permitted.
};
template <class Dialer>
-inline
-AsyncCall *
+inline RefCount< AsyncCallT<Dialer> >
asyncCall(int aDebugSection, int aDebugLevel, const char *aName,
const Dialer &aDialer)
{
}
/** Call scheduling helper. Use ScheduleCallHere if you can. */
-bool ScheduleCall(const char *fileName, int fileLine, AsyncCall::Pointer &call);
+bool ScheduleCall(const char *fileName, int fileLine, const AsyncCall::Pointer &);
/** Call scheduling helper. */
#define ScheduleCallHere(call) ScheduleCall(__FILE__, __LINE__, (call))
--- /dev/null
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_BASE_ASYNCCALLBACKS_H
+#define SQUID_SRC_BASE_ASYNCCALLBACKS_H
+
+#include "base/AsyncCall.h"
+#include "base/AsyncJobCalls.h"
+#include "base/TypeTraits.h"
+
+/// access to a callback result carried by an asynchronous CallDialer
+template <typename AnswerT>
+class WithAnswer
+{
+public:
+ using Answer = AnswerT;
+
+ virtual ~WithAnswer() = default;
+
+ /// callback results setter
+ virtual Answer &answer() = 0;
+};
+
+/// a smart AsyncCall pointer for delivery of future results
+template <typename Answer>
+class AsyncCallback
+{
+public:
+ // all generated copying/moving functions are correct
+ AsyncCallback() = default;
+
+ template <class Call>
+ explicit AsyncCallback(const RefCount<Call> &call):
+ call_(call),
+ answer_(&(call->dialer.answer()))
+ {
+ }
+
+ Answer &answer()
+ {
+ assert(answer_);
+ return *answer_;
+ }
+
+ /// make this smart pointer nil
+ /// \return the AsyncCall pointer we used to manage before this call
+ AsyncCall::Pointer release()
+ {
+ answer_ = nullptr;
+ const auto call = call_;
+ call_ = nullptr;
+ return call;
+ }
+
+ /// whether the callback has been set but not released
+ explicit operator bool() const { return answer_; }
+
+ /* methods for decaying into an AsyncCall pointer w/o access to answer */
+ operator const AsyncCall::Pointer &() const { return call_; }
+ const AsyncCall &operator *() const { return call_.operator*(); }
+ const AsyncCall *operator ->() const { return call_.operator->(); }
+
+private:
+ /// callback carrying the answer
+ AsyncCall::Pointer call_;
+
+ /// (future) answer inside this->call, obtained when it was still possible
+ /// to reach it without dynamic casts and virtual methods
+ Answer *answer_ = nullptr;
+};
+
+/// CallDialer for single-parameter callback functions
+template <typename Argument1>
+class UnaryFunCallbackDialer:
+ public CallDialer,
+ public WithAnswer<Argument1>
+{
+public:
+ // stand-alone function that receives our answer
+ using Handler = void (Argument1 &);
+
+ explicit UnaryFunCallbackDialer(Handler * const aHandler): handler(aHandler) {}
+ virtual ~UnaryFunCallbackDialer() = default;
+
+ /* CallDialer API */
+ bool canDial(AsyncCall &) { return bool(handler); }
+ void dial(AsyncCall &) { handler(arg1); }
+ virtual void print(std::ostream &os) const final { os << '(' << arg1 << ')'; }
+
+ /* WithAnswer API */
+ virtual Argument1 &answer() final { return arg1; }
+
+private:
+ Handler *handler; ///< the function to call
+ Argument1 arg1; ///< actual call parameter
+};
+
+/// CallDialer for single-parameter callback methods of cbdata-protected classes
+/// that are not AsyncJobs (use UnaryJobCallbackDialer for the latter).
+template <class Destination, typename Argument1>
+class UnaryCbcCallbackDialer:
+ public CallDialer,
+ public WithAnswer<Argument1>
+{
+public:
+ // class member function that receives our answer
+ typedef void (Destination::*Method)(Argument1 &);
+
+ UnaryCbcCallbackDialer(Method method, Destination *destination): destination_(destination), method_(method) {}
+ virtual ~UnaryCbcCallbackDialer() = default;
+
+ /* CallDialer API */
+ bool canDial(AsyncCall &) { return destination_.valid(); }
+ void dial(AsyncCall &) {((*destination_).*method_)(arg1_); }
+ virtual void print(std::ostream &os) const final { os << '(' << arg1_ << ')'; }
+
+ /* WithAnswer API */
+ virtual Argument1 &answer() final { return arg1_; }
+
+private:
+ CbcPointer<Destination> destination_; ///< object to deliver the answer to
+ Method method_; ///< Destination method to call with the answer
+ Argument1 arg1_;
+};
+
+/// CallDialer for single-parameter callback methods of AsyncJob classes.
+/// \sa UnaryCbcCallbackDialer and UnaryFunCallbackDialer.
+template <class Job, typename Argument1>
+class UnaryJobCallbackDialer:
+ public UnaryMemFunT<Job, Argument1, Argument1&>,
+ public WithAnswer<Argument1>
+{
+public:
+ using Base = UnaryMemFunT<Job, Argument1, Argument1&>;
+
+ UnaryJobCallbackDialer(const CbcPointer<Job> &aJob, typename Base::Method aMethod):
+ Base(aJob, aMethod, {}) {}
+
+ /* WithAnswer API */
+ virtual Argument1 &answer() final { return this->arg1; }
+};
+
+/// whether the given type is an AsyncJob
+/// reduces code duplication in declarations further below
+template <typename T>
+using IsAsyncJob = typename std::conditional<
+ std::is_base_of<AsyncJob, T>::value,
+ std::true_type,
+ std::false_type
+ >::type;
+
+/// helper function to simplify UnaryCbcCallbackDialer creation
+template <class Destination, typename Argument1, EnableIfType<!IsAsyncJob<Destination>::value, int> = 0>
+UnaryCbcCallbackDialer<Destination, Argument1>
+callbackDialer(void (Destination::*method)(Argument1 &), Destination * const destination)
+{
+ static_assert(!std::is_base_of<AsyncJob, Destination>::value, "wrong wrapper");
+ return UnaryCbcCallbackDialer<Destination, Argument1>(method, destination);
+}
+
+/// helper function to simplify UnaryJobCallbackDialer creation
+template <class Destination, typename Argument1, EnableIfType<IsAsyncJob<Destination>::value, int> = 0>
+UnaryJobCallbackDialer<Destination, Argument1>
+callbackDialer(void (Destination::*method)(Argument1 &), Destination * const destination)
+{
+ static_assert(std::is_base_of<AsyncJob, Destination>::value, "wrong wrapper");
+ return UnaryJobCallbackDialer<Destination, Argument1>(destination, method);
+}
+
+/// helper function to simplify UnaryFunCallbackDialer creation
+template <typename Argument1>
+UnaryFunCallbackDialer<Argument1>
+callbackDialer(void (*destination)(Argument1 &))
+{
+ return UnaryFunCallbackDialer<Argument1>(destination);
+}
+
+/// helper function to create an AsyncCallback object that matches an AsyncCall
+/// based on a WithAnswer answer dialer.
+template <class Call>
+AsyncCallback<typename Call::Dialer::Answer>
+AsyncCallback_(const RefCount<Call> &call)
+{
+ return AsyncCallback<typename Call::Dialer::Answer>(call);
+}
+
+/// AsyncCall for calling back a class method compatible with
+/// callbackDialer(). TODO: Unify with JobCallback() which requires dialers
+/// that feed the job pointer to the non-default CommCommonCbParams constructor.
+#define asyncCallback(dbgSection, dbgLevel, method, object) \
+ AsyncCallback_(asyncCall((dbgSection), (dbgLevel), #method, \
+ callbackDialer(&method, (object))))
+
+// TODO: Use C++20 __VA_OPT__ to merge this with asyncCallback().
+/// AsyncCall for calling back a function
+#define asyncCallbackFun(dbgSection, dbgLevel, function) \
+ AsyncCallback_(asyncCall((dbgSection), (dbgLevel), #function, \
+ callbackDialer(&function)))
+
+#endif // SQUID_SRC_BASE_ASYNCCALLBACKS_H
+
AsyncCallList.h \
AsyncCallQueue.cc \
AsyncCallQueue.h \
+ AsyncCallbacks.h \
AsyncCbdataCalls.h \
AsyncFunCalls.h \
AsyncJob.cc \
#ifndef SQUID_SRC_BASE_TYPETRAITS_H
#define SQUID_SRC_BASE_TYPETRAITS_H
+#include <type_traits>
+
namespace TypeTraits_ { // a hack to prevent "unintended ADL"
// TODO: Extract reusable paradigms into other mixins (e.g., NonCopyable).
using Interface = TypeTraits_::Interface;
+/// std::enable_if_t replacement until C++14
+template <bool B, class T = void>
+using EnableIfType = typename std::enable_if<B,T>::type;
+
#endif /* SQUID_SRC_BASE_TYPETRAITS_H */
template<class Cbc> class CbcPointer;
template<class RefCountableKid> class RefCount;
template<class Job> class JobWait;
+template<class Answer> class AsyncCallback;
typedef CbcPointer<AsyncJob> AsyncJobPointer;
typedef RefCount<CodeContext> CodeContextPointer;
#include "squid.h"
#include "acl/FilledChecklist.h"
#include "anyp/PortCfg.h"
+#include "base/AsyncCallbacks.h"
#include "base/Subscription.h"
#include "base/TextException.h"
#include "CachePeer.h"
#include <systemd/sd-daemon.h>
#endif
+// TODO: Remove this custom dialer and simplify by creating the TcpAcceptor
+// subscription later, inside clientListenerConnectionOpened() callback, just
+// like htcpOpenPorts(), icpOpenPorts(), and snmpPortOpened() do it.
/// dials clientListenerConnectionOpened call
-class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
+class ListeningStartedDialer:
+ public CallDialer,
+ public WithAnswer<Ipc::StartListeningAnswer>
{
public:
typedef void (*Handler)(AnyP::PortCfgPointer &portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
ListeningStartedDialer(Handler aHandler, AnyP::PortCfgPointer &aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
handler(aHandler), portCfg(aPortCfg), portTypeNote(note), sub(aSub) {}
+ /* CallDialer API */
virtual void print(std::ostream &os) const {
- startPrint(os) <<
- ", " << FdNote(portTypeNote) << " port=" << (void*)&portCfg << ')';
+ os << '(' << answer_ << ", " << FdNote(portTypeNote) << " port=" << (void*)&portCfg << ')';
}
virtual bool canDial(AsyncCall &) const { return true; }
virtual void dial(AsyncCall &) { (handler)(portCfg, portTypeNote, sub); }
+ /* WithAnswer API */
+ virtual Ipc::StartListeningAnswer &answer() { return answer_; }
+
public:
Handler handler;
private:
+ // answer_.conn (set/updated by IPC code) is portCfg.listenConn (used by us)
+ Ipc::StartListeningAnswer answer_; ///< StartListening() results
AnyP::PortCfgPointer portCfg; ///< from HttpPortList
Ipc::FdNoteId portTypeNote; ///< Type of IPC socket being opened
Subscription::Pointer sub; ///< The handler to be subscribed for this connection listener
// route new connections to subCall
typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
- AsyncCall::Pointer listenCall =
+ const auto listenCall =
asyncCall(33, 2, "clientListenerConnectionOpened",
ListeningStartedDialer(&clientListenerConnectionOpened,
port, fdNote, sub));
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, port->listenConn, fdNote, listenCall);
+ AsyncCallback<Ipc::StartListeningAnswer> callback(listenCall);
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, port->listenConn, fdNote, callback);
assert(NHttpSockets < MAXTCPLISTENPORTS);
HttpSockets[NHttpSockets] = -1;
typedef CommCbMemFunT<Gateway, CommAcceptCbParams> AcceptDialer;
typedef AsyncCallT<AcceptDialer> AcceptCall;
- RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(11, 5, AcceptDialer, this, Ftp::Gateway::ftpAcceptDataConnection));
+ const auto call = JobCallback(11, 5, AcceptDialer, this, Ftp::Gateway::ftpAcceptDataConnection);
Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
const char *note = entry->url();
CBDATA_NAMESPACED_CLASS_INIT(Http, Tunneler);
-Http::Tunneler::Tunneler(const Comm::ConnectionPointer &conn, const HttpRequest::Pointer &req, AsyncCall::Pointer &aCallback, time_t timeout, const AccessLogEntryPointer &alp):
+Http::Tunneler::Tunneler(const Comm::ConnectionPointer &conn, const HttpRequest::Pointer &req, const AsyncCallback<Answer> &aCallback, const time_t timeout, const AccessLogEntryPointer &alp):
AsyncJob("Http::Tunneler"),
noteFwdPconnUse(false),
connection(conn),
tunnelEstablished(false)
{
debugs(83, 5, "Http::Tunneler constructed, this=" << (void*)this);
- // detect callers supplying cb dialers that are not our CbDialer
assert(request);
assert(connection);
- assert(callback);
- assert(dynamic_cast<Http::TunnelerAnswer *>(callback->getDialer()));
url = request->url.authority(true);
watchForClosures();
}
return !callback || (requestWritten && tunnelEstablished);
}
-/// convenience method to get to the answer fields
-Http::TunnelerAnswer &
-Http::Tunneler::answer()
-{
- Must(callback);
- const auto tunnelerAnswer = dynamic_cast<Http::TunnelerAnswer *>(callback->getDialer());
- Must(tunnelerAnswer);
- return *tunnelerAnswer;
-}
-
void
Http::Tunneler::start()
{
}
// CONNECT response was successfully parsed
- auto &futureAnswer = answer();
+ auto &futureAnswer = callback.answer();
futureAnswer.peerResponseStatus = rep->sline.status();
request->hier.peer_reply_status = rep->sline.status();
Http::Tunneler::bailWith(ErrorState *error)
{
Must(error);
- answer().squidError = error;
+ callback.answer().squidError = error;
if (const auto failingConnection = connection) {
// TODO: Reuse to-peer connections after a CONNECT error response.
void
Http::Tunneler::sendSuccess()
{
- assert(answer().positive());
+ assert(callback.answer().positive());
assert(Comm::IsConnOpen(connection));
- answer().conn = connection;
+ callback.answer().conn = connection;
disconnect();
callBack();
}
void
Http::Tunneler::callBack()
{
- debugs(83, 5, answer().conn << status());
- assert(!connection); // returned inside answer() or gone
- auto cb = callback;
- callback = nullptr;
- ScheduleCallHere(cb);
+ debugs(83, 5, callback.answer().conn << status());
+ assert(!connection); // returned inside callback.answer() or gone
+ ScheduleCallHere(callback.release());
}
void
#ifndef SQUID_SRC_CLIENTS_HTTP_TUNNELER_H
#define SQUID_SRC_CLIENTS_HTTP_TUNNELER_H
-#include "base/AsyncCbdataCalls.h"
+#include "base/AsyncCallbacks.h"
#include "base/AsyncJob.h"
#include "clients/forward.h"
#include "clients/HttpTunnelerAnswer.h"
CBDATA_CLASS(Tunneler);
public:
- /// Callback dialer API to allow Tunneler to set the answer.
- template <class Initiator>
- class CbDialer: public CallDialer, public Http::TunnelerAnswer
- {
- public:
- // initiator method to receive our answer
- typedef void (Initiator::*Method)(Http::TunnelerAnswer &);
-
- CbDialer(Method method, Initiator *initiator): initiator_(initiator), method_(method) {}
- virtual ~CbDialer() = default;
-
- /* CallDialer API */
- bool canDial(AsyncCall &) { return initiator_.valid(); }
- void dial(AsyncCall &) {((*initiator_).*method_)(*this); }
- virtual void print(std::ostream &os) const override {
- os << '(' << static_cast<const Http::TunnelerAnswer&>(*this) << ')';
- }
- private:
- CbcPointer<Initiator> initiator_; ///< object to deliver the answer to
- Method method_; ///< initiator_ method to call with the answer
- };
+ using Answer = TunnelerAnswer;
-public:
- Tunneler(const Comm::ConnectionPointer &conn, const HttpRequestPointer &req, AsyncCall::Pointer &aCallback, time_t timeout, const AccessLogEntryPointer &alp);
+ Tunneler(const Comm::ConnectionPointer &, const HttpRequestPointer &, const AsyncCallback<Answer> &, time_t timeout, const AccessLogEntryPointer &);
Tunneler(const Tunneler &) = delete;
Tunneler &operator =(const Tunneler &) = delete;
/// updates connection usage history before the connection is closed
void countFailingConnection();
- TunnelerAnswer &answer();
-
AsyncCall::Pointer writer; ///< called when the request has been written
AsyncCall::Pointer reader; ///< called when the response should be read
AsyncCall::Pointer closer; ///< called when the connection is being closed
Comm::ConnectionPointer connection; ///< TCP connection to the cache_peer
HttpRequestPointer request; ///< peer connection trigger or cause
- AsyncCall::Pointer callback; ///< we call this with the results
+ AsyncCallback<Answer> callback; ///< answer destination
SBuf url; ///< request-target for the CONNECT request
time_t lifetimeLimit; ///< do not run longer than this
AccessLogEntryPointer al; ///< info for the future access.log entry
Tcp.h \
TcpAcceptor.cc \
TcpAcceptor.h \
- UdpOpenDialer.h \
Write.cc \
Write.h \
comm_internal.h \
+++ /dev/null
-/*
- * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
- *
- * Squid software is distributed under GPLv2+ license and includes
- * contributions from numerous individuals and organizations.
- * Please see the COPYING and CONTRIBUTORS files for details.
- */
-
-#ifndef SQUID_COMM_UDPOPENDIALER_H
-#define SQUID_COMM_UDPOPENDIALER_H
-
-#include "ipc/StartListening.h"
-
-namespace Comm
-{
-
-/// dials a UDP port-opened call
-class UdpOpenDialer: public CallDialer,
- public Ipc::StartListeningCb
-{
-public:
- typedef void (*Handler)(const Comm::ConnectionPointer &conn, int errNo);
- UdpOpenDialer(Handler aHandler): handler(aHandler) {}
-
- virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
- virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(conn, errNo); }
-
-public:
- Handler handler;
-};
-
-} // namespace Comm
-
-#endif /* SQUID_COMM_UDPOPENDIALER_H */
-
#include "AccessLogEntry.h"
#include "acl/Acl.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCallbacks.h"
#include "CachePeer.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/Loops.h"
-#include "comm/UdpOpenDialer.h"
#include "compat/xalloc.h"
#include "debug/Messages.h"
#include "globals.h"
#include "HttpRequest.h"
#include "icmp/net_db.h"
#include "ip/tools.h"
+#include "ipc/StartListening.h"
#include "md5.h"
#include "mem/forward.h"
#include "MemBuf.h"
RR_RESPONSE
};
-static void htcpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
+static void htcpIncomingConnectionOpened(Ipc::StartListeningAnswer&);
static uint32_t msg_id_counter = 0;
static Comm::ConnectionPointer htcpOutgoingConn = nullptr;
htcpIncomingConn->local.setIPv4();
}
- AsyncCall::Pointer call = asyncCall(31, 2,
- "htcpIncomingConnectionOpened",
- Comm::UdpOpenDialer(&htcpIncomingConnectionOpened));
-
+ auto call = asyncCallbackFun(31, 2, htcpIncomingConnectionOpened);
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
htcpIncomingConn,
}
static void
-htcpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int)
+htcpIncomingConnectionOpened(Ipc::StartListeningAnswer &answer)
{
+ const auto &conn = answer.conn;
+
if (!Comm::IsConnOpen(conn))
fatal("Cannot open HTCP Socket");
#include "AccessLogEntry.h"
#include "acl/Acl.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCallbacks.h"
#include "client_db.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/Loops.h"
-#include "comm/UdpOpenDialer.h"
#include "fd.h"
#include "HttpRequest.h"
#include "icmp/net_db.h"
#include "ICP.h"
#include "ip/Address.h"
#include "ip/tools.h"
+#include "ipc/StartListening.h"
#include "ipcache.h"
#include "md5.h"
#include "multicast.h"
struct timeval queue_time = {}; ///< queuing timestamp
};
-static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
+static void icpIncomingConnectionOpened(Ipc::StartListeningAnswer &);
/// \ingroup ServerProtocolICPInternal2
static void icpLogIcp(const Ip::Address &, const LogTags_ot, int, const char *, const int, AccessLogEntryPointer &);
icpIncomingConn->local.setIPv4();
}
- AsyncCall::Pointer call = asyncCall(12, 2,
- "icpIncomingConnectionOpened",
- Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
-
+ auto call = asyncCallbackFun(12, 2, icpIncomingConnectionOpened);
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
icpIncomingConn,
}
static void
-icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int)
+icpIncomingConnectionOpened(Ipc::StartListeningAnswer &answer)
{
+ const auto &conn = answer.conn;
+
if (!Comm::IsConnOpen(conn))
fatal("Cannot open ICP Port");
#include "ipc/Inquirer.h"
#include "ipc/Port.h"
#include "ipc/TypedMsgHdr.h"
+#include "mem/PoolingAllocator.h"
#include "MemBuf.h"
+
#include <algorithm>
+#include <unordered_map>
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Inquirer);
-Ipc::Inquirer::RequestsMap Ipc::Inquirer::TheRequestsMap;
Ipc::RequestId::Index Ipc::Inquirer::LastRequestId = 0;
+namespace Ipc {
+
+/// maps request->id to the Inquirer waiting for the response to that request
+using InquirerPointer = CbcPointer<Inquirer>;
+using WaitingInquiriesItem = std::pair<const RequestId::Index, InquirerPointer>;
+using WaitingInquiries = std::unordered_map<
+ RequestId::Index,
+ InquirerPointer,
+ std::hash<RequestId::Index>,
+ std::equal_to<RequestId::Index>,
+ PoolingAllocator<WaitingInquiriesItem> >;
+
+/// pending Inquirer requests for this process
+static WaitingInquiries TheWaitingInquirers;
+
+/// returns and forgets the Inquirer waiting for the given requests
+static InquirerPointer
+DequeueRequest(const RequestId::Index requestId)
+{
+ debugs(54, 3, "requestId " << requestId);
+ Assure(requestId != 0);
+ const auto request = TheWaitingInquirers.find(requestId);
+ if (request != TheWaitingInquirers.end()) {
+ const auto inquirer = request->second;
+ TheWaitingInquirers.erase(request);
+ return inquirer; // may already be gone by now
+ }
+ return nullptr;
+}
+
+} // namespace Ipc
+
/// compare Ipc::StrandCoord using kidId, for std::sort() below
static bool
LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
}
Must(request->requestId == 0);
- AsyncCall::Pointer callback = asyncCall(54, 5, "Mgr::Inquirer::handleRemoteAck",
- HandleAckDialer(this, &Inquirer::handleRemoteAck, nullptr));
if (++LastRequestId == 0) // don't use zero value as request->requestId
++LastRequestId;
request->requestId = LastRequestId;
const int kidId = pos->kidId;
debugs(54, 4, "inquire kid: " << kidId << status());
- TheRequestsMap[request->requestId] = callback;
+ TheWaitingInquirers[request->requestId] = this;
TypedMsgHdr message;
request->pack(message);
SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
AsyncJob::callException(e);
}
-/// returns and forgets the right Inquirer callback for strand request
-AsyncCall::Pointer
-Ipc::Inquirer::DequeueRequest(const RequestId::Index requestId)
-{
- debugs(54, 3, " requestId " << requestId);
- Must(requestId != 0);
- AsyncCall::Pointer call;
- RequestsMap::iterator request = TheRequestsMap.find(requestId);
- if (request != TheRequestsMap.end()) {
- call = request->second;
- Must(call != nullptr);
- TheRequestsMap.erase(request);
- }
- return call;
-}
-
void
Ipc::Inquirer::HandleRemoteAck(const Response& response)
{
Must(response.requestId != 0);
- AsyncCall::Pointer call = DequeueRequest(response.requestId);
- if (call != nullptr) {
- HandleAckDialer* dialer = dynamic_cast<HandleAckDialer*>(call->getDialer());
- Must(dialer);
- dialer->arg1 = response.clone();
- ScheduleCallHere(call);
+ const auto inquirer = DequeueRequest(response.requestId);
+ if (inquirer.valid()) {
+ CallService(inquirer->codeContext, [&] {
+ const auto call = asyncCall(54, 5, "Ipc::Inquirer::handleRemoteAck",
+ JobMemFun(inquirer, &Inquirer::handleRemoteAck, response.clone()));
+ ScheduleCallHere(call);
+ });
}
}
#include "ipc/Request.h"
#include "ipc/Response.h"
#include "ipc/StrandCoords.h"
-#include <map>
namespace Ipc
{
virtual bool aggregate(Response::Pointer aResponse) = 0;
private:
- typedef UnaryMemFunT<Inquirer, Response::Pointer, Response::Pointer> HandleAckDialer;
-
void handleRemoteAck(Response::Pointer response);
- static AsyncCall::Pointer DequeueRequest(RequestId::Index);
-
static void RequestTimedOut(void* param);
void requestTimedOut();
void removeTimeoutEvent();
const double timeout; ///< number of seconds to wait for strand response
- /// maps request->id to Inquirer::handleRemoteAck callback
- typedef std::map<RequestId::Index, AsyncCall::Pointer> RequestsMap;
- static RequestsMap TheRequestsMap; ///< pending strand requests
-
static RequestId::Index LastRequestId; ///< last requestId used
};
/* DEBUG: section 54 Interprocess Communication */
#include "squid.h"
+#include "base/AsyncCallbacks.h"
#include "base/TextException.h"
#include "comm.h"
#include "comm/Connection.h"
{
public:
Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
- AsyncCall::Pointer callback; // who to notify
+ Ipc::StartListeningCallback callback; // who to notify
};
/// maps ID assigned at request time to the response callback
}
void
-Ipc::JoinSharedListen(const OpenListenerParams ¶ms, AsyncCall::Pointer &cb)
+Ipc::JoinSharedListen(const OpenListenerParams ¶ms, StartListeningCallback &cb)
{
PendingOpenRequest por;
por.params = params;
Must(por.callback);
TheSharedListenRequestMap.erase(pori);
- StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer());
- assert(cbd && cbd->conn != nullptr);
- Must(cbd && cbd->conn != nullptr);
- cbd->conn->fd = response.fd;
+ auto &answer = por.callback.answer();
+ Assure(answer.conn);
+ auto &conn = answer.conn;
+ conn->fd = response.fd;
- if (Comm::IsConnOpen(cbd->conn)) {
+ if (Comm::IsConnOpen(conn)) {
OpenListenerParams &p = por.params;
- cbd->conn->local = p.addr;
- cbd->conn->flags = p.flags;
+ conn->local = p.addr;
+ conn->flags = p.flags;
// XXX: leave the comm AI stuff to comm_import_opened()?
struct addrinfo *AI = nullptr;
p.addr.getAddrInfo(AI);
AI->ai_socktype = p.sock_type;
AI->ai_protocol = p.proto;
- comm_import_opened(cbd->conn, FdNote(p.fdNote), AI);
+ comm_import_opened(conn, FdNote(p.fdNote), AI);
Ip::Address::FreeAddr(AI);
}
- cbd->errNo = response.errNo;
- ScheduleCallHere(por.callback);
+ answer.errNo = response.errNo;
+ ScheduleCallHere(por.callback.release());
kickDelayedRequest();
}
#include "base/Subscription.h"
#include "ipc/QuestionerId.h"
#include "ipc/RequestId.h"
+#include "ipc/StartListening.h"
namespace Ipc
{
};
/// prepare and send SharedListenRequest to Coordinator
-void JoinSharedListen(const OpenListenerParams &, AsyncCall::Pointer &);
+void JoinSharedListen(const OpenListenerParams &, StartListeningCallback &);
/// process Coordinator response to SharedListenRequest
void SharedListenJoined(const SharedListenResponse &response);
/* DEBUG: section 54 Interprocess Communication */
#include "squid.h"
+#include "base/AsyncCallbacks.h"
#include "base/TextException.h"
#include "comm.h"
#include "comm/Connection.h"
#include <cerrno>
-Ipc::StartListeningCb::StartListeningCb(): conn(nullptr), errNo(0)
+std::ostream &
+Ipc::operator <<(std::ostream &os, const StartListeningAnswer &answer)
{
-}
-
-Ipc::StartListeningCb::~StartListeningCb()
-{
-}
-
-std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
-{
- return os << "(" << conn << ", err=" << errNo;
+ os << answer.conn;
+ if (answer.errNo)
+ os << ", err=" << answer.errNo;
+ return os;
}
void
Ipc::StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
- FdNoteId fdNote, AsyncCall::Pointer &callback)
+ const FdNoteId fdNote, StartListeningCallback &callback)
{
- StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
- Must(cbd);
- cbd->conn = listenConn;
+ auto &answer = callback.answer();
+ answer.conn = listenConn;
const auto giveEachWorkerItsOwnQueue = listenConn->flags & COMM_REUSEPORT;
if (!giveEachWorkerItsOwnQueue && UsingSmp()) {
p.addr = listenConn->local;
p.flags = listenConn->flags;
p.fdNote = fdNote;
- Ipc::JoinSharedListen(p, callback);
+ JoinSharedListen(p, callback);
return; // wait for the call back
}
enter_suid();
- comm_open_listener(sock_type, proto, cbd->conn, FdNote(fdNote));
- cbd->errNo = Comm::IsConnOpen(cbd->conn) ? 0 : errno;
+ comm_open_listener(sock_type, proto, answer.conn, FdNote(fdNote));
+ const auto savedErrno = errno;
leave_suid();
- debugs(54, 3, "opened listen " << cbd->conn);
- ScheduleCallHere(callback);
+ answer.errNo = Comm::IsConnOpen(answer.conn) ? 0 : savedErrno;
+
+ debugs(54, 3, "opened listen " << answer);
+ ScheduleCallHere(callback.release());
}
#define SQUID_IPC_START_LISTENING_H
#include "base/AsyncCall.h"
+#include "base/forward.h"
#include "base/Subscription.h"
#include "comm/forward.h"
#include "ip/forward.h"
namespace Ipc
{
-/// common API for all StartListening() callbacks
-class StartListeningCb
+/// StartListening() result
+class StartListeningAnswer
{
-public:
- StartListeningCb();
- virtual ~StartListeningCb();
-
- /// starts printing arguments, return os
- std::ostream &startPrint(std::ostream &os) const;
-
public:
Comm::ConnectionPointer conn; ///< opened listening socket
- int errNo; ///< errno value from the comm_open_listener() call
+ int errNo = 0; ///< errno value from the comm_open_listener() call
};
+using StartListeningCallback = AsyncCallback<StartListeningAnswer>;
+
/// Depending on whether SMP is on, either ask Coordinator to send us
/// the listening FD or open a listening socket directly.
void StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
- FdNoteId fdNote, AsyncCall::Pointer &callback);
+ FdNoteId, StartListeningCallback &);
+
+std::ostream &operator <<(std::ostream &, const StartListeningAnswer &);
} // namespace Ipc;
public:
BlindPeerConnector(HttpRequestPointer &aRequest,
const Comm::ConnectionPointer &aServerConn,
- AsyncCall::Pointer &aCallback,
+ const AsyncCallback<EncryptorAnswer> &aCallback,
const AccessLogEntryPointer &alp,
const time_t timeout = 0) :
AsyncJob("Security::BlindPeerConnector"),
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCallbacks.h"
#include "base/IoManip.h"
#include "comm/Loops.h"
#include "comm/Read.h"
CBDATA_NAMESPACED_CLASS_INIT(Security, PeerConnector);
-Security::PeerConnector::PeerConnector(const Comm::ConnectionPointer &aServerConn, AsyncCall::Pointer &aCallback, const AccessLogEntryPointer &alp, const time_t timeout) :
+Security::PeerConnector::PeerConnector(const Comm::ConnectionPointer &aServerConn, const AsyncCallback<EncryptorAnswer> &aCallback, const AccessLogEntryPointer &alp, const time_t timeout):
AsyncJob("Security::PeerConnector"),
noteFwdPconnUse(false),
serverConn(aServerConn),
{
debugs(83, 5, serverConn);
- // if this throws, the caller's cb dialer is not our CbDialer
- Must(dynamic_cast<CbDialer*>(callback->getDialer()));
-
// watch for external connection closures
Must(Comm::IsConnOpen(serverConn));
Must(!fd_table[serverConn->fd].closing());
validationRequest.errors = errs;
try {
debugs(83, 5, "Sending SSL certificate for validation to ssl_crtvd.");
- AsyncCall::Pointer call = asyncCall(83,5, "Security::PeerConnector::sslCrtvdHandleReply", Ssl::CertValidationHelper::CbDialer(this, &Security::PeerConnector::sslCrtvdHandleReply, nullptr));
+ const auto call = asyncCallback(83, 5, Security::PeerConnector::sslCrtvdHandleReply, this);
Ssl::CertValidationHelper::Submit(validationRequest, call);
return false;
} catch (const std::exception &e) {
#if USE_OPENSSL
void
-Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
+Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer &validationResponse)
{
Must(validationResponse != nullptr);
Must(Comm::IsConnOpen(serverConnection()));
Security::PeerConnector::answer()
{
assert(callback);
- const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer());
- assert(dialer);
- return dialer->answer();
+ return callback.answer();
}
void
Security::PeerConnector::callBack()
{
debugs(83, 5, "TLS setup ended for " << answer().conn);
-
- AsyncCall::Pointer cb = callback;
- // Do this now so that if we throw below, swanSong() assert that we _tried_
- // to call back holds.
- callback = nullptr; // this should make done() true
- ScheduleCallHere(cb);
+ ScheduleCallHere(callback.release());
+ Assure(done());
}
void
}
#if USE_OPENSSL
-/// CallDialer to allow use Downloader objects within PeerConnector class.
-class PeerConnectorCertDownloaderDialer: public Downloader::CbDialer
-{
-public:
- typedef void (Security::PeerConnector::*Method)(SBuf &object, int status);
-
- PeerConnectorCertDownloaderDialer(Method method, Security::PeerConnector *pc):
- method_(method),
- peerConnector_(pc) {}
-
- /* CallDialer API */
- virtual bool canDial(AsyncCall &) { return peerConnector_.valid(); }
- virtual void dial(AsyncCall &) { ((&(*peerConnector_))->*method_)(object, status); }
- Method method_; ///< The Security::PeerConnector method to dial
- CbcPointer<Security::PeerConnector> peerConnector_; ///< The Security::PeerConnector object
-};
-
/// the number of concurrent PeerConnector jobs waiting for us
unsigned int
Security::PeerConnector::certDownloadNestingLevel() const
void
Security::PeerConnector::startCertDownloading(SBuf &url)
{
- AsyncCall::Pointer certCallback = asyncCall(81, 4,
- "Security::PeerConnector::certDownloadingDone",
- PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
-
+ const auto certCallback = asyncCallback(81, 4, Security::PeerConnector::certDownloadingDone, this);
const auto dl = new Downloader(url, certCallback,
MasterXaction::MakePortless<XactionInitiator::initCertFetcher>(),
certDownloadNestingLevel() + 1);
}
void
-Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
+Security::PeerConnector::certDownloadingDone(DownloaderAnswer &downloaderAnswer)
{
certDownloadWait.finish();
++certsDownloads;
- debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
+ debugs(81, 5, "outcome: " << downloaderAnswer.outcome << "; certificate size: " << downloaderAnswer.resource.length());
Must(Comm::IsConnOpen(serverConnection()));
const auto &sconn = *fd_table[serverConnection()->fd].ssl;
+ // XXX: Do not parse the response when the download has failed.
// Parse Certificate. Assume that it is in DER format.
// According to RFC 4325:
// The server must provide a DER encoded certificate or a collection
// The applications MUST accept DER encoded certificates and SHOULD
// be able to accept collection of certificates.
// TODO: support collection of certificates
- const unsigned char *raw = (const unsigned char*)obj.rawContent();
- if (X509 *cert = d2i_X509(nullptr, &raw, obj.length())) {
+ auto raw = reinterpret_cast<const unsigned char*>(downloaderAnswer.resource.rawContent());
+ if (auto cert = d2i_X509(nullptr, &raw, downloaderAnswer.resource.length())) {
debugs(81, 5, "Retrieved certificate: " << *cert);
if (!downloadedCerts)
#include "acl/Acl.h"
#include "acl/ChecklistFiller.h"
-#include "base/AsyncCbdataCalls.h"
+#include "base/AsyncCallbacks.h"
#include "base/AsyncJob.h"
#include "base/JobWait.h"
#include "CommCalls.h"
class ErrorState;
class Downloader;
+class DownloaderAnswer;
class AccessLogEntry;
typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
public:
typedef CbcPointer<PeerConnector> Pointer;
- /// Callback dialer API to allow PeerConnector to set the answer.
- class CbDialer
- {
- public:
- virtual ~CbDialer() {}
- /// gives PeerConnector access to the in-dialer answer
- virtual Security::EncryptorAnswer &answer() = 0;
- };
-
-public:
PeerConnector(const Comm::ConnectionPointer &aServerConn,
- AsyncCall::Pointer &aCallback,
+ const AsyncCallback<EncryptorAnswer> &,
const AccessLogEntryPointer &alp,
const time_t timeout = 0);
virtual ~PeerConnector();
void startCertDownloading(SBuf &url);
/// Called by Downloader after a certificate object downloaded.
- void certDownloadingDone(SBuf &object, int status);
+ void certDownloadingDone(DownloaderAnswer &);
#endif
/// Called when the openSSL SSL_connect function needs to write data to
HttpRequestPointer request; ///< peer connection trigger or cause
Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
AccessLogEntryPointer al; ///< info for the future access.log entry
- AsyncCall::Pointer callback; ///< we call this with the results
+
+ /// answer destination
+ AsyncCallback<EncryptorAnswer> callback;
+
private:
PeerConnector(const PeerConnector &); // not implemented
PeerConnector &operator =(const PeerConnector &); // not implemented
unsigned int certDownloadNestingLevel() const;
/// Process response from cert validator helper
- void sslCrtvdHandleReply(Ssl::CertValidationResponsePointer);
+ void sslCrtvdHandleReply(Ssl::CertValidationResponsePointer &);
/// Check SSL errors returned from cert validator against sslproxy_cert_error access list
Security::CertErrors *sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &, ErrorDetailPointer &);
typedef CommCbMemFunT<Server, CommAcceptCbParams> AcceptDialer;
typedef AsyncCallT<AcceptDialer> AcceptCall;
- RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(5, 5, AcceptDialer, this, Ftp::Server::acceptDataConnection));
+ const auto call = JobCallback(5, 5, AcceptDialer, this, Ftp::Server::acceptDataConnection);
Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
listener = call.getRaw();
dataListenConn = conn;
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCallbacks.h"
#include "base/CbcPointer.h"
#include "CachePeer.h"
#include "client_db.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/Loops.h"
-#include "comm/UdpOpenDialer.h"
#include "fatal.h"
#include "ip/Address.h"
#include "ip/tools.h"
+#include "ipc/StartListening.h"
#include "snmp/Forwarder.h"
#include "snmp_agent.h"
#include "snmp_core.h"
#include "SquidConfig.h"
#include "tools.h"
-static void snmpPortOpened(const Comm::ConnectionPointer &conn, int errNo);
+static void snmpPortOpened(Ipc::StartListeningAnswer&);
mib_tree_entry *mib_tree_head;
mib_tree_entry *mib_tree_last;
snmpIncomingConn->local.setIPv4();
}
- AsyncCall::Pointer call = asyncCall(49, 2, "snmpIncomingConnectionOpened",
- Comm::UdpOpenDialer(&snmpPortOpened));
+ auto call = asyncCallbackFun(49, 2, snmpPortOpened);
Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpIncomingConn, Ipc::fdnInSnmpSocket, call);
if (!Config.Addrs.snmp_outgoing.isNoAddr()) {
if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && snmpOutgoingConn->local.isAnyAddr()) {
snmpOutgoingConn->local.setIPv4();
}
- AsyncCall::Pointer c = asyncCall(49, 2, "snmpOutgoingConnectionOpened",
- Comm::UdpOpenDialer(&snmpPortOpened));
+ // TODO: Add/use snmpOutgoingPortOpened() instead of snmpPortOpened().
+ auto c = asyncCallbackFun(49, 2, snmpPortOpened);
Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpOutgoingConn, Ipc::fdnOutSnmpSocket, c);
} else {
snmpOutgoingConn = snmpIncomingConn;
}
static void
-snmpPortOpened(const Comm::ConnectionPointer &conn, int)
+snmpPortOpened(Ipc::StartListeningAnswer &answer)
{
+ const auto &conn = answer.conn;
+
if (!Comm::IsConnOpen(conn))
fatalf("Cannot open SNMP %s Port",(conn->fd == snmpIncomingConn->fd?"receiving":"sending"));
Ssl::PeekingPeerConnector::PeekingPeerConnector(HttpRequestPointer &aRequest,
const Comm::ConnectionPointer &aServerConn,
const Comm::ConnectionPointer &aClientConn,
- AsyncCall::Pointer &aCallback,
+ const AsyncCallback<Security::EncryptorAnswer> &aCallback,
const AccessLogEntryPointer &alp,
const time_t timeout):
AsyncJob("Ssl::PeekingPeerConnector"),
PeekingPeerConnector(HttpRequestPointer &aRequest,
const Comm::ConnectionPointer &aServerConn,
const Comm::ConnectionPointer &aClientConn,
- AsyncCall::Pointer &aCallback,
+ const AsyncCallback<Security::EncryptorAnswer> &aCallback,
const AccessLogEntryPointer &alp,
time_t timeout = 0);
#include "squid.h"
#include "../helper.h"
#include "anyp/PortCfg.h"
+#include "base/AsyncCallbacks.h"
#include "cache_cf.h"
#include "fs_io.h"
#include "helper/Reply.h"
public:
SBuf query;
- AsyncCall::Pointer callback;
+ Ssl::CertValidationHelper::Callback callback;
Security::SessionPointer ssl;
};
CBDATA_CLASS_INIT(submitData);
} else
validationResponse->resultCode = reply.result;
- Ssl::CertValidationHelper::CbDialer *dialer = dynamic_cast<Ssl::CertValidationHelper::CbDialer*>(crtdvdData->callback->getDialer());
- Must(dialer);
- dialer->arg1 = validationResponse;
- ScheduleCallHere(crtdvdData->callback);
+ crtdvdData->callback.answer() = validationResponse;
+ ScheduleCallHere(crtdvdData->callback.release());
if (Ssl::CertValidationHelper::HelperCache &&
(validationResponse->resultCode == ::Helper::Okay || validationResponse->resultCode == ::Helper::Error)) {
delete crtdvdData;
}
-void Ssl::CertValidationHelper::Submit(Ssl::CertValidationRequest const &request, AsyncCall::Pointer &callback)
+void
+Ssl::CertValidationHelper::Submit(const Ssl::CertValidationRequest &request, const Callback &callback)
{
Ssl::CertValidationMsg message(Ssl::CrtdMessage::REQUEST);
message.setCode(Ssl::CertValidationMsg::code_cert_validate);
if (CertValidationHelper::HelperCache &&
(validationResponse = CertValidationHelper::HelperCache->get(crtdvdData->query))) {
- CertValidationHelper::CbDialer *dialer = dynamic_cast<CertValidationHelper::CbDialer*>(callback->getDialer());
- Must(dialer);
- dialer->arg1 = *validationResponse;
- ScheduleCallHere(callback);
+ crtdvdData->callback.answer() = *validationResponse;
+ ScheduleCallHere(crtdvdData->callback.release());
delete crtdvdData;
return;
}
Ssl::CertValidationResponse::Pointer resp = new Ssl::CertValidationResponse(crtdvdData->ssl);
resp->resultCode = ::Helper::BrokenHelper;
- Ssl::CertValidationHelper::CbDialer *dialer = dynamic_cast<Ssl::CertValidationHelper::CbDialer*>(callback->getDialer());
- Must(dialer);
- dialer->arg1 = resp;
- ScheduleCallHere(callback);
+ crtdvdData->callback.answer() = resp;
+ ScheduleCallHere(crtdvdData->callback.release());
delete crtdvdData;
return;
}
class CertValidationHelper
{
public:
- typedef UnaryMemFunT<Security::PeerConnector, CertValidationResponse::Pointer> CbDialer;
+ using Answer = CertValidationResponse::Pointer;
+ using Callback = AsyncCallback<Answer>;
typedef void CVHCB(void *, Ssl::CertValidationResponse const &);
static void Init(); ///< Init helper structure.
static void Shutdown(); ///< Shutdown helper structure.
static void Reconfigure(); ///< Reconfigure helper structure
/// Submit crtd request message to external crtd server.
- static void Submit(Ssl::CertValidationRequest const & request, AsyncCall::Pointer &);
+ static void Submit(const Ssl::CertValidationRequest &, const Callback &);
private:
static helper * ssl_crt_validator; ///< helper for management of ssl_crtd.
public:
#include "squid.h"
#include "AccessLogEntry.h"
#include "comm/Connection.h"
-#include "Downloader.h"
#include "HttpRequest.h"
#define STUB_API "security/libsecurity.la"
CBDATA_NAMESPACED_CLASS_INIT(Security, PeerConnector);
namespace Security
{
-PeerConnector::PeerConnector(const Comm::ConnectionPointer &, AsyncCall::Pointer &, const AccessLogEntryPointer &, const time_t) :
+PeerConnector::PeerConnector(const Comm::ConnectionPointer &, const AsyncCallback<EncryptorAnswer> &, const AccessLogEntryPointer &, const time_t):
AsyncJob("Security::PeerConnector") {STUB}
PeerConnector::~PeerConnector() STUB
void PeerConnector::start() STUB
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCallbacks.h"
#include "base/CbcPointer.h"
#include "base/JobWait.h"
#include "base/Raw.h"
void sendError(ErrorState *finalError, const char *reason);
private:
- /// Gives Security::PeerConnector access to Answer in the TunnelStateData callback dialer.
- class MyAnswerDialer: public CallDialer, public Security::PeerConnector::CbDialer
- {
- public:
- typedef void (TunnelStateData::*Method)(Security::EncryptorAnswer &);
-
- MyAnswerDialer(Method method, TunnelStateData *tunnel):
- method_(method), tunnel_(tunnel), answer_() {}
-
- /* CallDialer API */
- virtual bool canDial(AsyncCall &) { return tunnel_.valid(); }
- void dial(AsyncCall &) { ((&(*tunnel_))->*method_)(answer_); }
- virtual void print(std::ostream &os) const {
- os << '(' << tunnel_.get() << ", " << answer_ << ')';
- }
-
- /* Security::PeerConnector::CbDialer API */
- virtual Security::EncryptorAnswer &answer() { return answer_; }
-
- private:
- Method method_;
- CbcPointer<TunnelStateData> tunnel_;
- Security::EncryptorAnswer answer_;
- };
-
void usePinned();
/// callback handler for the Security::PeerConnector encryptor
void
TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
{
- AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
- MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
+ const auto callback = asyncCallback(5, 4, TunnelStateData::noteSecurityPeerConnectorAnswer, this);
const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
encryptionWait.start(connector, callback);
}
void
TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
{
- AsyncCall::Pointer callback = asyncCall(5,4,
- "TunnelStateData::tunnelEstablishmentDone",
- Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
+ const auto callback = asyncCallback(5, 4, TunnelStateData::tunnelEstablishmentDone, this);
const auto tunneler = new Http::Tunneler(conn, request, callback, Config.Timeout.lifetime, al);
#if USE_DELAY_POOLS
tunneler->setDelayId(server.delayId);
assert(!destinations->empty());
assert(!transporting());
- AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
+ const auto callback = asyncCallback(17, 5, TunnelStateData::noteConnection, this);
const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
cs->setHost(request->url.host());
cs->setRetriable(false);