From: Thomas Markwalder Date: Sat, 22 Jun 2019 15:38:55 +0000 (-0400) Subject: [#691,!395] Add Connection socket exposure and close_callback handler X-Git-Tag: Kea-1.6.0-beta2~202 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=770aeae623c1c5f046149abeff50458175225731;p=thirdparty%2Fkea.git [#691,!395] Add Connection socket exposure and close_callback handler Addes close_callback and exposes Connectin's TCP socket to it and connect_callback. src/lib/http/client.h b/src/lib/http/client.h HttpClient: Added second parameter, socket FD, to ConnectHandler Added CloseHandler typedef asyncSendRequest() - added close_callback parameter src/lib/http/client.cc Connection - added close_callback parameter to all methods that accept connect_callback parameter Added invocation of close_callback wherever the connection's socket is closed. src/lib/http/tests/server_client_unittests.cc TEST_F(HttpClientTest, connectCloseCallbacks) - new test that verifies connect and close callback operations --- diff --git a/configure.ac b/configure.ac index 737bc64f6d..455a9fa3e8 100644 --- a/configure.ac +++ b/configure.ac @@ -1327,7 +1327,7 @@ fi # # Doesn't seem to be required? -CPPFLAGS="$CPPFLAGS -DBOOST_ASIO_HEADER_ONLY" +#CPPFLAGS="$CPPFLAGS -DBOOST_ASIO_HEADER_ONLY" # # Disable threads: they seems to break things on some systems # As now we use threads in boost ASIO this is commented out... diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index 971e493e11..db6473115e 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -124,9 +124,12 @@ public: /// transaction completes. /// @param connect_callback Pointer to the callback function to be invoked when /// the client connects to the server. + /// @param close_callback Pointer to the callback function to be invoked when + /// the client closes the socket to the server. void doTransaction(const HttpRequestPtr& request, const HttpResponsePtr& response, const long request_timeout, const HttpClient::RequestHandler& callback, - const HttpClient::ConnectHandler& connect_callback); + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback); /// @brief Closes the socket and cancels the request timer. void close(); @@ -265,6 +268,9 @@ private: /// @brief Identifier of the current transaction. uint64_t current_transid_; + + /// @brief User supplied callback. + HttpClient::CloseHandler close_callback_; }; /// @brief Shared pointer to the connection. @@ -305,6 +311,8 @@ public: /// @param callback Pointer to the user callback for this request. /// @param connect_callback Pointer to the user callback invoked when /// the client connects to the server. + /// @param close_callback Pointer to the user callback invoked when + /// the client closes the connection to the server. /// /// @return true if the request for the given URL has been retrieved, /// false if there are no more requests queued for this URL. @@ -313,7 +321,8 @@ public: HttpResponsePtr& response, long& request_timeout, HttpClient::RequestHandler& callback, - HttpClient::ConnectHandler& connect_callback) { + HttpClient::ConnectHandler& connect_callback, + HttpClient::CloseHandler& close_callback) { // Check if there is a queue for this URL. If there is no queue, there // is no request queued either. auto it = queue_.find(url); @@ -327,6 +336,7 @@ public: request_timeout = desc.request_timeout_, callback = desc.callback_; connect_callback = desc.connect_callback_; + close_callback = desc.close_callback_; return (true); } } @@ -349,12 +359,15 @@ public: /// transaction ends. /// @param connect_callback Pointer to the user callback to be invoked when the /// client connects to the server. + /// @param close_callback Pointer to the user callback to be invoked when the + /// client closes the connection to the server. void queueRequest(const Url& url, const HttpRequestPtr& request, const HttpResponsePtr& response, const long request_timeout, const HttpClient::RequestHandler& request_callback, - const HttpClient::ConnectHandler& connect_callback) { + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback) { auto it = conns_.find(url); if (it != conns_.end()) { ConnectionPtr conn = it->second; @@ -364,12 +377,13 @@ public: queue_[url].push(RequestDescriptor(request, response, request_timeout, request_callback, - connect_callback)); + connect_callback, + close_callback)); } else { // Connection is idle, so we can start the transaction. conn->doTransaction(request, response, request_timeout, - request_callback, connect_callback); + request_callback, connect_callback, close_callback); } } else { @@ -378,7 +392,7 @@ public: ConnectionPtr conn(new Connection(io_service_, shared_from_this(), url)); conn->doTransaction(request, response, request_timeout, request_callback, - connect_callback); + connect_callback, close_callback); conns_[url] = conn; } } @@ -434,15 +448,19 @@ private: /// @param callback Pointer to the user callback. /// @param connect_callback pointer to the user callback to be invoked /// when the client connects to the server. + /// @param close_callback pointer to the user callback to be invoked + /// when the client closes the connection to the server. RequestDescriptor(const HttpRequestPtr& request, const HttpResponsePtr& response, const long request_timeout, const HttpClient::RequestHandler& callback, - const HttpClient::ConnectHandler& connect_callback) + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback) : request_(request), response_(response), request_timeout_(request_timeout), callback_(callback), - connect_callback_(connect_callback) { + connect_callback_(connect_callback), + close_callback_(close_callback) { } /// @brief Holds pointer to the request. @@ -455,6 +473,9 @@ private: HttpClient::RequestHandler callback_; /// @brief Holds pointer to the user callback for connect. HttpClient::ConnectHandler connect_callback_; + + /// @brief Holds pointer to the user callback for close. + HttpClient::CloseHandler close_callback_; }; /// @brief Holds the queue of requests for different URLs. @@ -466,7 +487,7 @@ Connection::Connection(IOService& io_service, const Url& url) : conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service), current_request_(), current_response_(), parser_(), current_callback_(), - buf_(), input_buf_(), current_transid_(0) { + buf_(), input_buf_(), current_transid_(0), close_callback_() { } Connection::~Connection() { @@ -486,13 +507,15 @@ Connection::doTransaction(const HttpRequestPtr& request, const HttpResponsePtr& response, const long request_timeout, const HttpClient::RequestHandler& callback, - const HttpClient::ConnectHandler& connect_callback) { + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback) { try { current_request_ = request; current_response_ = response; parser_.reset(new HttpResponseParser(*current_response_)); parser_->initModel(); current_callback_ = callback; + close_callback_ = close_callback; // Starting new transaction. Generate new transaction id. ++current_transid_; @@ -506,6 +529,9 @@ Connection::doTransaction(const HttpRequestPtr& request, // data over this socket, when the peer may close the connection. In this // case we'll need to re-transmit but we don't handle it here. if (socket_.getASIOSocket().is_open() && !socket_.isUsable()) { + if (close_callback) { + close_callback(socket_.getNative()); + } socket_.close(); } @@ -542,6 +568,10 @@ Connection::doTransaction(const HttpRequestPtr& request, void Connection::close() { + if (close_callback_) { + close_callback_(socket_.getNative()); + } + timer_.cancel(); socket_.close(); resetState(); @@ -632,10 +662,12 @@ Connection::terminate(const boost::system::error_code& ec, long request_timeout; HttpClient::RequestHandler callback; HttpClient::ConnectHandler connect_callback; + HttpClient::CloseHandler close_callback; ConnectionPoolPtr conn_pool = conn_pool_.lock(); if (conn_pool && conn_pool->getNextRequest(url_, request, response, request_timeout, - callback, connect_callback)) { - doTransaction(request, response, request_timeout, callback, connect_callback); + callback, connect_callback, close_callback)) { + doTransaction(request, response, request_timeout, callback, + connect_callback, close_callback); } } @@ -685,7 +717,7 @@ Connection::connectCallback(HttpClient::ConnectHandler connect_callback, if (connect_callback) { // If the user defined callback indicates that the connection // should not be continued. - if (!connect_callback(ec)) { + if (!connect_callback(ec, socket_.getNative())) { return; } } @@ -844,7 +876,8 @@ HttpClient::asyncSendRequest(const Url& url, const HttpRequestPtr& request, const HttpResponsePtr& response, const HttpClient::RequestHandler& request_callback, const HttpClient::RequestTimeout& request_timeout, - const HttpClient::ConnectHandler& connect_callback) { + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback) { if (!url.isValid()) { isc_throw(HttpClientError, "invalid URL specified for the HTTP client"); } @@ -862,7 +895,7 @@ HttpClient::asyncSendRequest(const Url& url, const HttpRequestPtr& request, } impl_->conn_pool_->queueRequest(url, request, response, request_timeout.value_, - request_callback, connect_callback); + request_callback, connect_callback, close_callback); } void diff --git a/src/lib/http/client.h b/src/lib/http/client.h index ac33c9f9a3..46ec5c8615 100644 --- a/src/lib/http/client.h +++ b/src/lib/http/client.h @@ -1,4 +1,4 @@ -// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC") // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this @@ -86,9 +86,18 @@ public: /// /// Returned boolean value indicates whether the client should continue /// connecting to the server (if true) or not (false). + /// It is passed the IO error code along with the native socket handle of + /// the connection's TCP socket. This always the socket's event readiness + /// to be monitored via select() or epoll. + /// /// @note Beware that the IO error code can be set to "in progress" /// so a not null error code does not always mean the connect failed. - typedef std::function ConnectHandler; + typedef std::function ConnectHandler; + + /// @brief Optional handler invoked when client closes the connection to the server. + /// + /// It is passed the native socket handler of the connection's TCP socket. + typedef std::function CloseHandler; /// @brief Constructor. /// @@ -167,7 +176,9 @@ public: const RequestTimeout& request_timeout = RequestTimeout(10000), const ConnectHandler& connect_callback = - ConnectHandler()); + ConnectHandler(), + const CloseHandler& close_callback = + CloseHandler()); /// @brief Closes all connections. void stop(); diff --git a/src/lib/http/tests/server_client_unittests.cc b/src/lib/http/tests/server_client_unittests.cc index a0bdbb20bf..d31de574a7 100644 --- a/src/lib/http/tests/server_client_unittests.cc +++ b/src/lib/http/tests/server_client_unittests.cc @@ -724,7 +724,7 @@ public: "Content-Length: 3\r\n\r\n" "{ }"; - // Use custom listener and the specialized connection object. + // Use custom listener and the specialized connection object. HttpListenerCustom listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT, factory_, HttpListener::RequestTimeout(REQUEST_TIMEOUT), @@ -1375,6 +1375,123 @@ public: ASSERT_NO_THROW(runIOService()); } + /// @brief Tests that underlying TCP socket can be registered and + /// unregsitered via connection and close callbacks. + /// + /// It conducts to consequetive requests over the same client. + /// + /// @param version HTTP version to be used. + void testConnectCloseCallbacks(const HttpVersion& version) { + // Start the server. + ASSERT_NO_THROW(listener_.start()); + + // Create a client and specify the URL on which the server can be reached. + HttpClient client(io_service_); + Url url("http://127.0.0.1:18123"); + + // Initiate request to the server. + PostHttpRequestJsonPtr request1 = createRequest("sequence", 1, version); + HttpResponseJsonPtr response1(new HttpResponseJson()); + unsigned resp_num = 0; + ExternalMonitor monitor; + ASSERT_NO_THROW(client.asyncSendRequest(url, request1, response1, + [this, &resp_num](const boost::system::error_code& ec, + const HttpResponsePtr&, + const std::string&) { + if (++resp_num > 1) { + io_service_.stop(); + } + EXPECT_FALSE(ec); + }, + HttpClient::RequestTimeout(10000), + boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2), + boost::bind(&ExternalMonitor::closeHandler, &monitor, _1) + )); + + // Initiate another request to the destination. + PostHttpRequestJsonPtr request2 = createRequest("sequence", 2, version); + HttpResponseJsonPtr response2(new HttpResponseJson()); + ASSERT_NO_THROW(client.asyncSendRequest(url, request2, response2, + [this, &resp_num](const boost::system::error_code& ec, + const HttpResponsePtr&, + const std::string&) { + if (++resp_num > 1) { + io_service_.stop(); + } + EXPECT_FALSE(ec); + }, + HttpClient::RequestTimeout(10000), + boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2), + boost::bind(&ExternalMonitor::closeHandler, &monitor, _1) + )); + + // Actually trigger the requests. The requests should be handlded by the + // server one after another. While the first request is being processed + // the server should queue another one. + ASSERT_NO_THROW(runIOService()); + + // Make sure that the received responses are different. We check that by + // comparing value of the sequence parameters. + ASSERT_TRUE(response1); + ConstElementPtr sequence1 = response1->getJsonElement("sequence"); + ASSERT_TRUE(sequence1); + + ASSERT_TRUE(response2); + ConstElementPtr sequence2 = response2->getJsonElement("sequence"); + ASSERT_TRUE(sequence2); + + EXPECT_NE(sequence1->intValue(), sequence2->intValue()); + } + + /// @brief Simulates external registery of Connection TCP sockets + /// + /// Provides methods compatible with Connection callbacks for connnect + /// and close operations. + class ExternalMonitor { + public: + /// @breif Constructor + ExternalMonitor() : registered_fd_(-1) {}; + + /// @brief Connect callback handler + /// @param ec Error status of the ASIO connect + /// @param tcp_native_fd socket descriptor to register + bool connectHandler(const boost::system::error_code& ec, int tcp_native_fd) { + if (!ec || ec.value() == boost::asio::error::in_progress) { + if (tcp_native_fd >= 0) { + registered_fd_ = tcp_native_fd; + return (true); + } + + // Invalid fd?, this really should not be possible. EXPECT makes + // sure we log it. + EXPECT_TRUE (tcp_native_fd >= 0) << "no ec error but invalid fd?"; + return (false); + + } else if (ec.value() == boost::asio::error::already_connected) { + if (registered_fd_ != tcp_native_fd) { + return (false); + } + } + + // ec indicates an error, return true, so that error can be handled + // by Connection logic. + return (true); + } + + /// @brief Close callback handler + /// + /// @param tcp_native_fd socket descriptor to register + void closeHandler(int tcp_native_fd) { + EXPECT_EQ(tcp_native_fd, registered_fd_) << "closeHandler fd mismatch"; + if (tcp_native_fd >= 0) { + registered_fd_ = -1; + } + } + + /// @brief Keeps track of socket currently "registered" for external monitoring. + int registered_fd_; + }; + /// @brief Instance of the listener used in the tests. HttpListener listener_; @@ -1384,6 +1501,7 @@ public: /// @brief Instance of the third listener used in the tests (with short idle /// timeout). HttpListener listener3_; + }; // Test that two conscutive requests can be sent over the same (persistent) @@ -1671,7 +1789,7 @@ TEST_F(HttpClientTest, clientConnectTimeout) { // try to send a request to the server. This simulates the // case of connect() taking very long and should eventually // cause the transaction to time out. - [](const boost::system::error_code& /*ec*/) { + [](const boost::system::error_code& /*ec*/, int) { return (false); })); @@ -1688,5 +1806,9 @@ TEST_F(HttpClientTest, clientConnectTimeout) { ASSERT_NO_THROW(runIOService()); } +/// Tests that connect and close callbacks work correctly. +TEST_F(HttpClientTest, connectCloseCallbacks) { + ASSERT_NO_FATAL_FAILURE(testConnectCloseCallbacks(HttpVersion(1, 1))); +} }