#include <boost/enable_shared_from_this.hpp>
#include <boost/weak_ptr.hpp>
+#include <atomic>
#include <array>
#include <iostream>
#include <map>
/// @brief Checks if a transaction has been initiated over this connection.
///
/// @return true if transaction has been initiated, false otherwise.
- bool isTransactionOngoing();
+ bool isTransactionOngoing() const;
/// @brief Checks if a socket descriptor belongs to this connection.
///
/// Should be called in a thread safe context.
void closeInternal();
- /// @brief Checks if a transaction has been initiated over this connection.
- ///
- /// Should be called in a thread safe context.
- ///
- /// @return true if transaction has been initiated, false otherwise.
- bool isTransactionOngoingInternal() const;
-
/// @brief Checks and logs if premature transaction timeout is suspected.
///
/// Should be called in a thread safe context.
/// @brief User supplied callback.
HttpClient::CloseHandler close_callback_;
+ /// @brief Flag to indicate that a transaction is running.
+ std::atomic<bool> started_;
+
/// @brief Mutex to protect the internal state.
std::mutex mutex_;
};
///
/// @return true if the request for the given URL has been retrieved,
/// false if there are no more requests queued for this URL.
- bool getNextRequest(const Url& url,
- HttpRequestPtr& request,
- HttpResponsePtr& response,
- long& request_timeout,
- HttpClient::RequestHandler& callback,
- HttpClient::ConnectHandler& connect_callback,
- HttpClient::CloseHandler& close_callback) {
+ bool processNextRequest(const Url& url) {
if (MultiThreadingMgr::instance().getMode()) {
std::lock_guard<std::mutex> lk(mutex_);
- return (getNextRequestInternal(url, request, response,
- request_timeout, callback,
- connect_callback, close_callback));
+ return (processNextRequestInternal(url));
} else {
- return (getNextRequestInternal(url, request, response,
- request_timeout, callback,
- connect_callback, close_callback));
+ return (processNextRequestInternal(url));
}
}
///
/// @return true if the request for the given URL has been retrieved,
/// false if there are no more requests queued for this URL.
- bool getNextRequestInternal(const Url& url,
- HttpRequestPtr& request,
- HttpResponsePtr& response,
- long& request_timeout,
- HttpClient::RequestHandler& callback,
- HttpClient::ConnectHandler& connect_callback,
- HttpClient::CloseHandler& close_callback) {
+ bool processNextRequestInternal(const Url& url) {
// Check if there is a queue for this URL. If there is no queue, there
// is no request queued either.
auto it = queue_.find(url);
if (!it->second.empty()) {
RequestDescriptor desc = it->second.front();
it->second.pop();
- request = desc.request_;
- response = desc.response_;
- request_timeout = desc.request_timeout_,
- callback = desc.callback_;
- connect_callback = desc.connect_callback_;
- close_callback = desc.close_callback_;
+ desc.conn_->doTransaction(desc.request_, desc.response_,
+ desc.request_timeout_, desc.callback_,
+ desc.connect_callback_,
+ desc.close_callback_);
return (true);
}
}
// There is a connection for this URL already. Check if it is idle.
if (conn->isTransactionOngoing()) {
// Connection is busy, so let's queue the request.
- queue_[url].push(RequestDescriptor(request, response,
+ queue_[url].push(RequestDescriptor(conn, request, response,
request_timeout,
request_callback,
connect_callback,
/// when the client connects to the server.
/// @param close_callback pointer to the user callback to be invoked
/// when the client closes the connection to the server.
- RequestDescriptor(const HttpRequestPtr& request,
+ RequestDescriptor(const ConnectionPtr& conn,
+ const HttpRequestPtr& request,
const HttpResponsePtr& response,
- const long request_timeout,
+ const long& request_timeout,
const HttpClient::RequestHandler& callback,
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback)
- : request_(request), response_(response),
- request_timeout_(request_timeout),
- callback_(callback),
+ : conn_(conn), request_(request), response_(response),
+ request_timeout_(request_timeout), callback_(callback),
connect_callback_(connect_callback),
close_callback_(close_callback) {
}
+ /// @brief Holds the connection.
+ ConnectionPtr conn_;
+
/// @brief Holds pointer to the request.
HttpRequestPtr request_;
const Url& url)
: conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service),
current_request_(), current_response_(), parser_(), current_callback_(),
- buf_(), input_buf_(), current_transid_(0), close_callback_() {
+ buf_(), input_buf_(), current_transid_(0), close_callback_(),
+ started_(false) {
}
Connection::~Connection() {
void
Connection::resetState() {
+ started_ = false;
current_request_.reset();
current_response_.reset();
parser_.reset();
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::CloseHandler& close_callback) {
try {
+ started_ = true;
current_request_ = request;
current_response_ = response;
parser_.reset(new HttpResponseParser(*current_response_));
}
bool
-Connection::isTransactionOngoing() {
- if (MultiThreadingMgr::instance().getMode()) {
- std::lock_guard<std::mutex> lk(mutex_);
- return (isTransactionOngoingInternal());
- } else {
- return (isTransactionOngoingInternal());
- }
-}
-
-bool
-Connection::isTransactionOngoingInternal() const {
- return (static_cast<bool>(current_request_));
+Connection::isTransactionOngoing() const {
+ return (started_);
}
bool
// Also, if there is a transaction in progress but the ID of that
// transaction doesn't match the one associated with the handler it,
// also means that the transaction timed out prematurely.
- if (!isTransactionOngoingInternal() || (transid != current_transid_)) {
+ if (!isTransactionOngoing() || (transid != current_transid_)) {
LOG_WARN(http_logger, HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED);
return (true);
}
const std::string& parsing_error) {
HttpResponsePtr response;
- if (isTransactionOngoingInternal()) {
+ if (isTransactionOngoing()) {
timer_.cancel();
socket_.cancel();
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
HTTP_SERVER_RESPONSE_RECEIVED_DETAILS)
.arg(url_.toText())
- .arg((parser_ ?
- parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
- "[HttpResponseParser is null]"));
+ .arg(parser_ ?
+ parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
+ "[HttpResponseParser is null]");
} else {
std::string err = parsing_error.empty() ? ec.message() :
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
HTTP_BAD_SERVER_RESPONSE_RECEIVED_DETAILS)
.arg(url_.toText())
- .arg((parser_ ? parser_->getBufferAsString() :
- "[HttpResponseParser is null]"));
+ .arg(parser_ ?
+ parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
+ "[HttpResponseParser is null]");
}
}
- // unlock mutex so that callback can call any locking function.
- if (MultiThreadingMgr::instance().getMode()) {
- mutex_.unlock();
- }
try {
// The callback should take care of its own exceptions but one
// never knows.
} catch (...) {
}
- // lock mutex so that we can continue processing.
- if (MultiThreadingMgr::instance().getMode()) {
- mutex_.lock();
- }
// If we're not requesting connection persistence, we should close the socket.
// We're going to reconnect for the next transaction.
resetState();
}
+ // unlock mutex so that the next request can be safely processed.
+ if (MultiThreadingMgr::instance().getMode()) {
+ mutex_.unlock();
+ }
+
// Check if there are any requests queued for this connection and start
// another transaction if there is at least one.
- HttpRequestPtr request;
- long request_timeout;
- HttpClient::RequestHandler callback;
- HttpClient::ConnectHandler connect_callback;
- HttpClient::CloseHandler close_callback;
ConnectionPoolPtr conn_pool = conn_pool_.lock();
- if (conn_pool && conn_pool->getNextRequest(url_, request, response,
- request_timeout, callback,
- connect_callback,
- close_callback)) {
- doTransactionInternal(request, response, request_timeout, callback,
- connect_callback, close_callback);
+ if (conn_pool) {
+ conn_pool->processNextRequest(url_);
+ }
+
+ // lock mutex so that processing can continue.
+ if (MultiThreadingMgr::instance().getMode()) {
+ mutex_.lock();
}
}