/// transaction completes.
/// @param connect_callback Pointer to the callback function to be invoked when
/// the client connects to the server.
+ /// @param close_callback Pointer to the callback function to be invoked when
+ /// the client closes the socket to the server.
void doTransaction(const HttpRequestPtr& request, const HttpResponsePtr& response,
const long request_timeout, const HttpClient::RequestHandler& callback,
- const HttpClient::ConnectHandler& connect_callback);
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback);
/// @brief Closes the socket and cancels the request timer.
void close();
/// @brief Identifier of the current transaction.
uint64_t current_transid_;
+
+ /// @brief User supplied callback.
+ HttpClient::CloseHandler close_callback_;
};
/// @brief Shared pointer to the connection.
/// @param callback Pointer to the user callback for this request.
/// @param connect_callback Pointer to the user callback invoked when
/// the client connects to the server.
+ /// @param close_callback Pointer to the user callback invoked when
+ /// the client closes the connection to the server.
///
/// @return true if the request for the given URL has been retrieved,
/// false if there are no more requests queued for this URL.
HttpResponsePtr& response,
long& request_timeout,
HttpClient::RequestHandler& callback,
- HttpClient::ConnectHandler& connect_callback) {
+ HttpClient::ConnectHandler& connect_callback,
+ HttpClient::CloseHandler& close_callback) {
// Check if there is a queue for this URL. If there is no queue, there
// is no request queued either.
auto it = queue_.find(url);
request_timeout = desc.request_timeout_,
callback = desc.callback_;
connect_callback = desc.connect_callback_;
+ close_callback = desc.close_callback_;
return (true);
}
}
/// transaction ends.
/// @param connect_callback Pointer to the user callback to be invoked when the
/// client connects to the server.
+ /// @param close_callback Pointer to the user callback to be invoked when the
+ /// client closes the connection to the server.
void queueRequest(const Url& url,
const HttpRequestPtr& request,
const HttpResponsePtr& response,
const long request_timeout,
const HttpClient::RequestHandler& request_callback,
- const HttpClient::ConnectHandler& connect_callback) {
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback) {
auto it = conns_.find(url);
if (it != conns_.end()) {
ConnectionPtr conn = it->second;
queue_[url].push(RequestDescriptor(request, response,
request_timeout,
request_callback,
- connect_callback));
+ connect_callback,
+ close_callback));
} else {
// Connection is idle, so we can start the transaction.
conn->doTransaction(request, response, request_timeout,
- request_callback, connect_callback);
+ request_callback, connect_callback, close_callback);
}
} else {
ConnectionPtr conn(new Connection(io_service_, shared_from_this(),
url));
conn->doTransaction(request, response, request_timeout, request_callback,
- connect_callback);
+ connect_callback, close_callback);
conns_[url] = conn;
}
}
/// @param callback Pointer to the user callback.
/// @param connect_callback pointer to the user callback to be invoked
/// when the client connects to the server.
+ /// @param close_callback pointer to the user callback to be invoked
+ /// when the client closes the connection to the server.
RequestDescriptor(const HttpRequestPtr& request,
const HttpResponsePtr& response,
const long request_timeout,
const HttpClient::RequestHandler& callback,
- const HttpClient::ConnectHandler& connect_callback)
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback)
: request_(request), response_(response),
request_timeout_(request_timeout),
callback_(callback),
- connect_callback_(connect_callback) {
+ connect_callback_(connect_callback),
+ close_callback_(close_callback) {
}
/// @brief Holds pointer to the request.
HttpClient::RequestHandler callback_;
/// @brief Holds pointer to the user callback for connect.
HttpClient::ConnectHandler connect_callback_;
+
+ /// @brief Holds pointer to the user callback for close.
+ HttpClient::CloseHandler close_callback_;
};
/// @brief Holds the queue of requests for different URLs.
const Url& url)
: conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service),
current_request_(), current_response_(), parser_(), current_callback_(),
- buf_(), input_buf_(), current_transid_(0) {
+ buf_(), input_buf_(), current_transid_(0), close_callback_() {
}
Connection::~Connection() {
const HttpResponsePtr& response,
const long request_timeout,
const HttpClient::RequestHandler& callback,
- const HttpClient::ConnectHandler& connect_callback) {
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback) {
try {
current_request_ = request;
current_response_ = response;
parser_.reset(new HttpResponseParser(*current_response_));
parser_->initModel();
current_callback_ = callback;
+ close_callback_ = close_callback;
// Starting new transaction. Generate new transaction id.
++current_transid_;
// data over this socket, when the peer may close the connection. In this
// case we'll need to re-transmit but we don't handle it here.
if (socket_.getASIOSocket().is_open() && !socket_.isUsable()) {
+ if (close_callback) {
+ close_callback(socket_.getNative());
+ }
socket_.close();
}
void
Connection::close() {
+ if (close_callback_) {
+ close_callback_(socket_.getNative());
+ }
+
timer_.cancel();
socket_.close();
resetState();
long request_timeout;
HttpClient::RequestHandler callback;
HttpClient::ConnectHandler connect_callback;
+ HttpClient::CloseHandler close_callback;
ConnectionPoolPtr conn_pool = conn_pool_.lock();
if (conn_pool && conn_pool->getNextRequest(url_, request, response, request_timeout,
- callback, connect_callback)) {
- doTransaction(request, response, request_timeout, callback, connect_callback);
+ callback, connect_callback, close_callback)) {
+ doTransaction(request, response, request_timeout, callback,
+ connect_callback, close_callback);
}
}
if (connect_callback) {
// If the user defined callback indicates that the connection
// should not be continued.
- if (!connect_callback(ec)) {
+ if (!connect_callback(ec, socket_.getNative())) {
return;
}
}
const HttpResponsePtr& response,
const HttpClient::RequestHandler& request_callback,
const HttpClient::RequestTimeout& request_timeout,
- const HttpClient::ConnectHandler& connect_callback) {
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback) {
if (!url.isValid()) {
isc_throw(HttpClientError, "invalid URL specified for the HTTP client");
}
}
impl_->conn_pool_->queueRequest(url, request, response, request_timeout.value_,
- request_callback, connect_callback);
+ request_callback, connect_callback, close_callback);
}
void
"Content-Length: 3\r\n\r\n"
"{ }";
- // Use custom listener and the specialized connection object.
+ // Use custom listener and the specialized connection object.
HttpListenerCustom<HttpConnectionType>
listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT,
factory_, HttpListener::RequestTimeout(REQUEST_TIMEOUT),
ASSERT_NO_THROW(runIOService());
}
+ /// @brief Tests that underlying TCP socket can be registered and
+ /// unregsitered via connection and close callbacks.
+ ///
+ /// It conducts to consequetive requests over the same client.
+ ///
+ /// @param version HTTP version to be used.
+ void testConnectCloseCallbacks(const HttpVersion& version) {
+ // Start the server.
+ ASSERT_NO_THROW(listener_.start());
+
+ // Create a client and specify the URL on which the server can be reached.
+ HttpClient client(io_service_);
+ Url url("http://127.0.0.1:18123");
+
+ // Initiate request to the server.
+ PostHttpRequestJsonPtr request1 = createRequest("sequence", 1, version);
+ HttpResponseJsonPtr response1(new HttpResponseJson());
+ unsigned resp_num = 0;
+ ExternalMonitor monitor;
+ ASSERT_NO_THROW(client.asyncSendRequest(url, request1, response1,
+ [this, &resp_num](const boost::system::error_code& ec,
+ const HttpResponsePtr&,
+ const std::string&) {
+ if (++resp_num > 1) {
+ io_service_.stop();
+ }
+ EXPECT_FALSE(ec);
+ },
+ HttpClient::RequestTimeout(10000),
+ boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2),
+ boost::bind(&ExternalMonitor::closeHandler, &monitor, _1)
+ ));
+
+ // Initiate another request to the destination.
+ PostHttpRequestJsonPtr request2 = createRequest("sequence", 2, version);
+ HttpResponseJsonPtr response2(new HttpResponseJson());
+ ASSERT_NO_THROW(client.asyncSendRequest(url, request2, response2,
+ [this, &resp_num](const boost::system::error_code& ec,
+ const HttpResponsePtr&,
+ const std::string&) {
+ if (++resp_num > 1) {
+ io_service_.stop();
+ }
+ EXPECT_FALSE(ec);
+ },
+ HttpClient::RequestTimeout(10000),
+ boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2),
+ boost::bind(&ExternalMonitor::closeHandler, &monitor, _1)
+ ));
+
+ // Actually trigger the requests. The requests should be handlded by the
+ // server one after another. While the first request is being processed
+ // the server should queue another one.
+ ASSERT_NO_THROW(runIOService());
+
+ // Make sure that the received responses are different. We check that by
+ // comparing value of the sequence parameters.
+ ASSERT_TRUE(response1);
+ ConstElementPtr sequence1 = response1->getJsonElement("sequence");
+ ASSERT_TRUE(sequence1);
+
+ ASSERT_TRUE(response2);
+ ConstElementPtr sequence2 = response2->getJsonElement("sequence");
+ ASSERT_TRUE(sequence2);
+
+ EXPECT_NE(sequence1->intValue(), sequence2->intValue());
+ }
+
+ /// @brief Simulates external registery of Connection TCP sockets
+ ///
+ /// Provides methods compatible with Connection callbacks for connnect
+ /// and close operations.
+ class ExternalMonitor {
+ public:
+ /// @breif Constructor
+ ExternalMonitor() : registered_fd_(-1) {};
+
+ /// @brief Connect callback handler
+ /// @param ec Error status of the ASIO connect
+ /// @param tcp_native_fd socket descriptor to register
+ bool connectHandler(const boost::system::error_code& ec, int tcp_native_fd) {
+ if (!ec || ec.value() == boost::asio::error::in_progress) {
+ if (tcp_native_fd >= 0) {
+ registered_fd_ = tcp_native_fd;
+ return (true);
+ }
+
+ // Invalid fd?, this really should not be possible. EXPECT makes
+ // sure we log it.
+ EXPECT_TRUE (tcp_native_fd >= 0) << "no ec error but invalid fd?";
+ return (false);
+
+ } else if (ec.value() == boost::asio::error::already_connected) {
+ if (registered_fd_ != tcp_native_fd) {
+ return (false);
+ }
+ }
+
+ // ec indicates an error, return true, so that error can be handled
+ // by Connection logic.
+ return (true);
+ }
+
+ /// @brief Close callback handler
+ ///
+ /// @param tcp_native_fd socket descriptor to register
+ void closeHandler(int tcp_native_fd) {
+ EXPECT_EQ(tcp_native_fd, registered_fd_) << "closeHandler fd mismatch";
+ if (tcp_native_fd >= 0) {
+ registered_fd_ = -1;
+ }
+ }
+
+ /// @brief Keeps track of socket currently "registered" for external monitoring.
+ int registered_fd_;
+ };
+
/// @brief Instance of the listener used in the tests.
HttpListener listener_;
/// @brief Instance of the third listener used in the tests (with short idle
/// timeout).
HttpListener listener3_;
+
};
// Test that two conscutive requests can be sent over the same (persistent)
// try to send a request to the server. This simulates the
// case of connect() taking very long and should eventually
// cause the transaction to time out.
- [](const boost::system::error_code& /*ec*/) {
+ [](const boost::system::error_code& /*ec*/, int) {
return (false);
}));
ASSERT_NO_THROW(runIOService());
}
+/// Tests that connect and close callbacks work correctly.
+TEST_F(HttpClientTest, connectCloseCallbacks) {
+ ASSERT_NO_FATAL_FAILURE(testConnectCloseCallbacks(HttpVersion(1, 1)));
+}
}