From: Marcin Siodelski Date: Tue, 14 May 2019 06:34:29 +0000 (+0200) Subject: [#599,!320] Introduce transaction id into the HTTP client. X-Git-Tag: Kea-1.6.0-beta~163 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1e0d64d773918a75db2c6bed818385d61915b93f;p=thirdparty%2Fkea.git [#599,!320] Introduce transaction id into the HTTP client. --- diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index adfd0ef5c7..91359e47f9 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -146,8 +146,12 @@ public: /// it is participating in is still alive. If it is not, it should simply /// return. This method also logs such situation. /// + /// @param transid identifier of the transaction for which the handler + /// is being invoked. It is compared against the current transaction + /// id for this connection. + /// /// @return true if the premature timeout is suspected, false otherwise. - bool checkPrematureTimeout() const; + bool checkPrematureTimeout(const uint64_t transid); private: @@ -177,12 +181,16 @@ private: /// @brief Asynchronously sends data over the socket. /// /// The data sent over the socket are stored in the @c buf_. - void doSend(); + /// + /// @param transid Current transaction id. + void doSend(const uint64_t transid); /// @brief Asynchronously receives data over the socket. /// /// The data received over the socket are store into the @c input_buf_. - void doReceive(); + /// + /// @param transid Current transaction id. + void doReceive(const uint64_t transid); /// @brief Local callback invoked when the connection is established. /// @@ -191,8 +199,10 @@ private: /// /// @param Pointer to the callback to be invoked when client connects to /// the server. + /// @param transid Current transaction id. /// @param ec Error code being a result of the connection attempt. void connectCallback(HttpClient::ConnectHandler connect_callback, + const uint64_t transid, const boost::system::error_code& ec); /// @brief Local callback invoked when an attempt to send a portion of data @@ -202,16 +212,20 @@ private: /// data from the buffer were sent, the callback will start to asynchronously /// receive a response from the server. /// + /// @param transid Current transaction id. /// @param ec Error code being a result of sending the data. /// @param length Number of bytes sent. - void sendCallback(const boost::system::error_code& ec, size_t length); + void sendCallback(const uint64_t transid, const boost::system::error_code& ec, + size_t length); /// @brief Local callback invoked when an attempt to receive a portion of data /// over the socket has ended. /// + /// @param transid Current transaction id. /// @param ec Error code being a result of receiving the data. /// @param length Number of bytes received. - void receiveCallback(const boost::system::error_code& ec, size_t length); + void receiveCallback(const uint64_t transid, const boost::system::error_code& ec, + size_t length); /// @brief Local callback invoked when request timeout occurs. void timerCallback(); @@ -248,6 +262,9 @@ private: /// @brief Input buffer. std::array input_buf_; + + /// @brief Identifier of the current transaction. + uint64_t current_transid_; }; /// @brief Shared pointer to the connection. @@ -449,7 +466,7 @@ Connection::Connection(IOService& io_service, const Url& url) : conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service), current_request_(), current_response_(), parser_(), current_callback_(), - buf_(), input_buf_() { + buf_(), input_buf_(), current_transid_(0) { } Connection::~Connection() { @@ -477,6 +494,9 @@ Connection::doTransaction(const HttpRequestPtr& request, parser_->initModel(); current_callback_ = callback; + // Starting new transaction. Generate new transaction id. + ++current_transid_; + buf_ = request->toString(); // If the socket is open we check if it is possible to transmit the data @@ -509,7 +529,7 @@ Connection::doTransaction(const HttpRequestPtr& request, TCPEndpoint endpoint(url_.getStrippedHostname(), static_cast(url_.getPort())); SocketCallback socket_cb(boost::bind(&Connection::connectCallback, shared_from_this(), - connect_callback, _1)); + connect_callback, current_transid_, _1)); // Establish new connection or use existing connection. socket_.open(&endpoint, socket_cb); @@ -533,9 +553,13 @@ Connection::isTransactionOngoing() const { } bool -Connection::checkPrematureTimeout() const { - if (!isTransactionOngoing()) { - // The transaction state is was reset, so we need to log a warning message. +Connection::checkPrematureTimeout(const uint64_t transid) { + // If there is no transaction but the handlers are invoked it means + // that the last transaction in the queue timed out prematurely. + // Also, if there is a transaction in progress but the ID of that + // transaction doesn't match the one associated with the handler it, + // also means that the transaction timed out prematurely. + if (!isTransactionOngoing() || (transid != current_transid_)) { LOG_WARN(http_logger, HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED); return (true); } @@ -622,9 +646,9 @@ Connection::scheduleTimer(const long request_timeout) { } void -Connection::doSend() { +Connection::doSend(const uint64_t transid) { SocketCallback socket_cb(boost::bind(&Connection::sendCallback, shared_from_this(), - _1, _2)); + transid, _1, _2)); try { socket_.asyncSend(&buf_[0], buf_.size(), socket_cb); @@ -634,10 +658,10 @@ Connection::doSend() { } void -Connection::doReceive() { +Connection::doReceive(const uint64_t transid) { TCPEndpoint endpoint; SocketCallback socket_cb(boost::bind(&Connection::receiveCallback, shared_from_this(), - _1, _2)); + transid, _1, _2)); try { socket_.asyncReceive(static_cast(input_buf_.data()), input_buf_.size(), 0, @@ -649,8 +673,9 @@ Connection::doReceive() { void Connection::connectCallback(HttpClient::ConnectHandler connect_callback, + const uint64_t transid, const boost::system::error_code& ec) { - if (checkPrematureTimeout()) { + if (checkPrematureTimeout(transid)) { return; } @@ -677,13 +702,14 @@ Connection::connectCallback(HttpClient::ConnectHandler connect_callback, } else { // Start sending the request asynchronously. - doSend(); + doSend(transid); } } void -Connection::sendCallback(const boost::system::error_code& ec, size_t length) { - if (checkPrematureTimeout()) { +Connection::sendCallback(const uint64_t transid, const boost::system::error_code& ec, + size_t length) { + if (checkPrematureTimeout(transid)) { return; } @@ -716,16 +742,17 @@ Connection::sendCallback(const boost::system::error_code& ec, size_t length) { // If there is no more data to be sent, start receiving a response. Otherwise, // continue sending. if (buf_.empty()) { - doReceive(); + doReceive(transid); } else { - doSend(); + doSend(transid); } } void -Connection::receiveCallback(const boost::system::error_code& ec, size_t length) { - if (checkPrematureTimeout()) { +Connection::receiveCallback(const uint64_t transid, const boost::system::error_code& ec, + size_t length) { + if (checkPrematureTimeout(transid)) { return; } @@ -758,7 +785,7 @@ Connection::receiveCallback(const boost::system::error_code& ec, size_t length) // If the parser still needs data, let's schedule another receive. if (parser_->needData()) { - doReceive(); + doReceive(transid); } else if (parser_->httpParseOk()) { // No more data needed and parsing has been successful so far. Let's