]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#599,!320] Introduce transaction id into the HTTP client.
authorMarcin Siodelski <marcin@isc.org>
Tue, 14 May 2019 06:34:29 +0000 (08:34 +0200)
committerMarcin Siodelski <marcin@isc.org>
Tue, 14 May 2019 06:34:29 +0000 (08:34 +0200)
src/lib/http/client.cc

index adfd0ef5c7e1eb27ac1e149e5fee4d45bd693eef..91359e47f9ee8519ffa01a2d2e61dab975ccc26b 100644 (file)
@@ -146,8 +146,12 @@ public:
     /// it is participating in is still alive. If it is not, it should simply
     /// return. This method also logs such situation.
     ///
+    /// @param transid identifier of the transaction for which the handler
+    /// is being invoked. It is compared against the current transaction
+    /// id for this connection.
+    ///
     /// @return true if the premature timeout is suspected, false otherwise.
-    bool checkPrematureTimeout() const;
+    bool checkPrematureTimeout(const uint64_t transid);
 
 private:
 
@@ -177,12 +181,16 @@ private:
     /// @brief Asynchronously sends data over the socket.
     ///
     /// The data sent over the socket are stored in the @c buf_.
-    void doSend();
+    ///
+    /// @param transid Current transaction id.
+    void doSend(const uint64_t transid);
 
     /// @brief Asynchronously receives data over the socket.
     ///
     /// The data received over the socket are store into the @c input_buf_.
-    void doReceive();
+    ///
+    /// @param transid Current transaction id.
+    void doReceive(const uint64_t transid);
 
     /// @brief Local callback invoked when the connection is established.
     ///
@@ -191,8 +199,10 @@ private:
     ///
     /// @param Pointer to the callback to be invoked when client connects to
     /// the server.
+    /// @param transid Current transaction id.
     /// @param ec Error code being a result of the connection attempt.
     void connectCallback(HttpClient::ConnectHandler connect_callback,
+                         const uint64_t transid,
                          const boost::system::error_code& ec);
 
     /// @brief Local callback invoked when an attempt to send a portion of data
@@ -202,16 +212,20 @@ private:
     /// data from the buffer were sent, the callback will start to asynchronously
     /// receive a response from the server.
     ///
+    /// @param transid Current transaction id.
     /// @param ec Error code being a result of sending the data.
     /// @param length Number of bytes sent.
-    void sendCallback(const boost::system::error_code& ec, size_t length);
+    void sendCallback(const uint64_t transid, const boost::system::error_code& ec,
+                      size_t length);
 
     /// @brief Local callback invoked when an attempt to receive a portion of data
     /// over the socket has ended.
     ///
+    /// @param transid Current transaction id.
     /// @param ec Error code being a result of receiving the data.
     /// @param length Number of bytes received.
-    void receiveCallback(const boost::system::error_code& ec, size_t length);
+    void receiveCallback(const uint64_t transid, const boost::system::error_code& ec,
+                         size_t length);
 
     /// @brief Local callback invoked when request timeout occurs.
     void timerCallback();
@@ -248,6 +262,9 @@ private:
 
     /// @brief Input buffer.
     std::array<char, 32768> input_buf_;
+
+    /// @brief Identifier of the current transaction.
+    uint64_t current_transid_;
 };
 
 /// @brief Shared pointer to the connection.
@@ -449,7 +466,7 @@ Connection::Connection(IOService& io_service,
                        const Url& url)
     : conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service),
       current_request_(), current_response_(), parser_(), current_callback_(),
-      buf_(), input_buf_() {
+      buf_(), input_buf_(), current_transid_(0) {
 }
 
 Connection::~Connection() {
@@ -477,6 +494,9 @@ Connection::doTransaction(const HttpRequestPtr& request,
         parser_->initModel();
         current_callback_ = callback;
 
+        // Starting new transaction. Generate new transaction id.
+        ++current_transid_;
+
         buf_ = request->toString();
 
         // If the socket is open we check if it is possible to transmit the data
@@ -509,7 +529,7 @@ Connection::doTransaction(const HttpRequestPtr& request,
         TCPEndpoint endpoint(url_.getStrippedHostname(),
                              static_cast<unsigned short>(url_.getPort()));
         SocketCallback socket_cb(boost::bind(&Connection::connectCallback, shared_from_this(),
-                                             connect_callback, _1));
+                                             connect_callback, current_transid_, _1));
 
         // Establish new connection or use existing connection.
         socket_.open(&endpoint, socket_cb);
@@ -533,9 +553,13 @@ Connection::isTransactionOngoing() const {
 }
 
 bool
-Connection::checkPrematureTimeout() const {
-    if (!isTransactionOngoing()) {
-        // The transaction state is was reset, so we need to log a warning message.
+Connection::checkPrematureTimeout(const uint64_t transid) {
+    // If there is no transaction but the handlers are invoked it means
+    // that the last transaction in the queue timed out prematurely.
+    // Also, if there is a transaction in progress but the ID of that
+    // transaction doesn't match the one associated with the handler it,
+    // also means that the transaction timed out prematurely.
+    if (!isTransactionOngoing() || (transid != current_transid_)) {
         LOG_WARN(http_logger, HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED);
         return (true);
     }
@@ -622,9 +646,9 @@ Connection::scheduleTimer(const long request_timeout) {
 }
 
 void
-Connection::doSend() {
+Connection::doSend(const uint64_t transid) {
     SocketCallback socket_cb(boost::bind(&Connection::sendCallback, shared_from_this(),
-                                         _1, _2));
+                                         transid, _1, _2));
     try {
         socket_.asyncSend(&buf_[0], buf_.size(), socket_cb);
 
@@ -634,10 +658,10 @@ Connection::doSend() {
 }
 
 void
-Connection::doReceive() {
+Connection::doReceive(const uint64_t transid) {
     TCPEndpoint endpoint;
     SocketCallback socket_cb(boost::bind(&Connection::receiveCallback, shared_from_this(),
-                                         _1, _2));
+                                         transid, _1, _2));
 
     try {
         socket_.asyncReceive(static_cast<void*>(input_buf_.data()), input_buf_.size(), 0,
@@ -649,8 +673,9 @@ Connection::doReceive() {
 
 void
 Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
+                            const uint64_t transid,
                             const boost::system::error_code& ec) {
-    if (checkPrematureTimeout()) {
+    if (checkPrematureTimeout(transid)) {
         return;
     }
 
@@ -677,13 +702,14 @@ Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
 
     } else {
         // Start sending the request asynchronously.
-        doSend();
+        doSend(transid);
     }
 }
 
 void
-Connection::sendCallback(const boost::system::error_code& ec, size_t length) {
-    if (checkPrematureTimeout()) {
+Connection::sendCallback(const uint64_t transid, const boost::system::error_code& ec,
+                         size_t length) {
+    if (checkPrematureTimeout(transid)) {
         return;
     }
 
@@ -716,16 +742,17 @@ Connection::sendCallback(const boost::system::error_code& ec, size_t length) {
     // If there is no more data to be sent, start receiving a response. Otherwise,
     // continue sending.
     if (buf_.empty()) {
-        doReceive();
+        doReceive(transid);
 
     } else {
-        doSend();
+        doSend(transid);
     }
 }
 
 void
-Connection::receiveCallback(const boost::system::error_code& ec, size_t length) {
-    if (checkPrematureTimeout()) {
+Connection::receiveCallback(const uint64_t transid, const boost::system::error_code& ec,
+                            size_t length) {
+    if (checkPrematureTimeout(transid)) {
         return;
     }
 
@@ -758,7 +785,7 @@ Connection::receiveCallback(const boost::system::error_code& ec, size_t length)
 
     // If the parser still needs data, let's schedule another receive.
     if (parser_->needData()) {
-        doReceive();
+        doReceive(transid);
 
     } else if (parser_->httpParseOk()) {
         // No more data needed and parsing has been successful so far. Let's