namespace isc {
namespace tcp {
+/// @brief Implement a simple length:data input stream message.
+///
+/// This class can be used to receive a single message from a TCP
+/// stream where the message consists of a 16-bit unsigned length (in
+/// network order), followed by that number of bytes of data.
class TcpStreamRequest : public TcpRequest {
public:
/// @brief Constructor.
/// @brief Returns request contents formatted for log output
///
- /// @param limit Maximum length of the buffer to be output. If the limit
+ /// @param limit Maximum length of the buffer to be output. If the limit
/// is 0, the length of the output is unlimited.
/// @return Textual representation of the input buffer.
virtual std::string logFormatRequest(const size_t limit = 0) const;
+ /// @brief Unpacks the wire data into a string request.
virtual void unpack();
+ /// @brief Fetches the unpacked string request.
std::string getRequest() const {
return(request_);
};
protected:
+ /// @brief Unpacked request string.
std::string request_;
private:
size_t expected_size_;
};
+/// @brief Pointer to a TcpStreamRequest.
typedef boost::shared_ptr<TcpStreamRequest> TcpStreamRequestPtr;
+/// @brief Implements a simple length:data output stream message.
+///
+/// This class can be used to send a single message on a TCP
+/// stream where the message consists of a 16-bit unsigned length (in
+/// network order), followed by that number of bytes of data.
class TcpStreamResponse : public TcpResponse {
public:
/// @brief Constructor.
virtual void pack();
private:
+ /// @brief Unpacked response data to send.
std::string response_;
};
+/// @brief Pointer to a TcpStreamResponse.
typedef boost::shared_ptr<TcpStreamResponse> TcpStreamResponsePtr;
} // end of namespace isc::tcp
namespace {
-/// @brief IP address to which HTTP service is bound.
+/// @brief IP address to which service is bound.
const std::string SERVER_ADDRESS = "127.0.0.1";
-/// @brief IPv6 address to whch HTTP service is bound.
+/// @brief IPv6 address to whch service is bound.
const std::string IPV6_SERVER_ADDRESS = "::1";
-/// @brief Port number to which HTTP service is bound.
+/// @brief Port number to which service is bound.
const unsigned short SERVER_PORT = 18123;
/// @brief Request Timeout used in most of the tests (ms).
req->unpack();
auto request_str = req->getRequest();
- std::cout << "request_str: [" << request_str << "]" << std::endl;
std::ostringstream os;
if (request_str == "I am done") {
os << "good bye";
os << "echo " << request_str;
}
- std::cout << "send client this: [" << os.str() << "]" << std::endl;
TcpStreamResponsePtr resp(new TcpStreamResponse());
resp->setResponseData(os.str());
resp->pack();
/// @brief Destructor.
///
- /// Removes active HTTP clients.
+ /// Removes active clients.
virtual ~TcpListenerTest() {
for (auto client = clients_.begin(); client != clients_.end();
++client) {
/// This method creates TcpTestClient instance and retains it in the clients_
/// list.
///
- /// @param request String containing the HTTP request to be sent.
+ /// @param request String containing the request to be sent.
void startRequest(const std::string& request) {
TcpTestClientPtr client(new TcpTestClient(io_service_,
std::bind(&TcpListenerTest::clientDone, this)));
clients_.back()->startRequest(request);
}
+ void startRequests(const std::list<std::string>& requests) {
+ TcpTestClientPtr client(new TcpTestClient(io_service_,
+ std::bind(&TcpListenerTest::clientDone, this)));
+ clients_.push_back(client);
+ clients_.back()->startRequests(requests);
+ }
+
/// @brief Callback function invoke upon test timeout.
///
/// It stops the IO service and reports test timeout.
io_service_.poll();
}
+TEST_F(TcpListenerTest, multipleRequetsPerClients) {
+ std::list<std::string>requests{ "one", "two", "three", "I am done"};
+
+ TcpTestListener listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT,
+ TlsContextPtr(), TcpListener::IdleTimeout(IDLE_TIMEOUT));
+
+ ASSERT_NO_THROW(listener.start());
+ ASSERT_EQ(SERVER_ADDRESS, listener.getLocalAddress().toText());
+ ASSERT_EQ(SERVER_PORT, listener.getLocalPort());
+ size_t num_clients = 5;
+ for ( auto i = 0; i < num_clients; ++i ) {
+ ASSERT_NO_THROW(startRequests(requests));
+ }
+
+ ASSERT_NO_THROW(runIOService());
+ ASSERT_EQ(num_clients, clients_.size());
+
+ std::list<std::string>expected_responses{ "echo one", "echo two", "echo three", "good bye"};
+ for (auto client = clients_.begin(); client != clients_.end(); ++client) {
+ EXPECT_TRUE((*client)->receiveDone());
+ EXPECT_FALSE((*client)->expectedEof());
+ EXPECT_EQ(expected_responses, (*client)->getResponses());
+ }
+
+ listener.stop();
+ io_service_.poll();
+}
}
/// @brief Connect to the listener.
///
- /// @param request HTTP request in the textual format.
+ /// @param request request string to send.
void connect() {
tcp::endpoint endpoint(address::from_string(server_address_), server_port_);
socket_.async_connect(endpoint,
});
}
- /// @brief Send HTTP request specified in textual format.
+ /// @brief Send request specified in textual format.
///
- /// @param request HTTP request in the textual format.
+ /// @param request request in the textual format.
void startRequest(const std::string& request) {
tcp::endpoint endpoint(address::from_string(server_address_), server_port_);
socket_.async_connect(endpoint,
});
}
+ /// @brief Send request specified in textual format.
+ ///
+ /// @param request request in the textual format.
+ void startRequests(const std::list<std::string>& requests) {
+ requests_to_send_ = requests;
+
+ tcp::endpoint endpoint(address::from_string(server_address_), server_port_);
+ socket_.async_connect(endpoint,
+ [this](const boost::system::error_code& ec) {
+ receive_done_ = false;
+ expected_eof_ = false;
+ if (ec) {
+ // One would expect that async_connect wouldn't return
+ // EINPROGRESS error code, but simply wait for the connection
+ // to get established before the handler is invoked. It turns out,
+ // however, that on some OSes the connect handler may receive this
+ // error code which doesn't necessarily indicate a problem.
+ // Making an attempt to write and read from this socket will
+ // typically succeed. So, we ignore this error.
+ if (ec.value() != boost::asio::error::in_progress) {
+ ADD_FAILURE() << "error occurred while connecting: "
+ << ec.message();
+ done_callback_();
+ return;
+ }
+ }
+
+ sendNextRequest();
+ });
+ }
+
+ /// @brief Sends the next request from the list of requests to send.
+ void sendNextRequest() {
+ // If there are any requests left to send, send them.
+ if (!requests_to_send_.empty()) {
+ std::string request = requests_to_send_.front();
+ requests_to_send_.pop_front();
+ sendRequest(request);
+ }
+ }
+
/// @brief Send a stream request.
///
/// @param request request data to send textual format.
/// @brief Wait for a server to close the connection.
void waitForEof() {
+ stream_response_.reset(new TcpStreamRequest());
receivePartialResponse(true);
}
sendPartialRequest(wire_request);
} else {
// Request has been sent. Start receiving response.
- response_.clear();
receivePartialResponse();
}
});
socket_.async_read_some(boost::asio::buffer(buf_.data(), buf_.size()),
[this, expect_eof](const boost::system::error_code& ec,
std::size_t bytes_transferred) {
+ if (!stream_response_) {
+ stream_response_.reset(new TcpStreamRequest());
+ }
+
if (ec) {
// IO service stopped so simply return.
if (ec.value() == boost::asio::error::operation_aborted) {
return;
-
} else if ((ec.value() == boost::asio::error::try_again) ||
(ec.value() == boost::asio::error::would_block)) {
// If we should try again, make sure that there is no garbage
}
}
+ // Post received data to the current response.
if (bytes_transferred > 0) {
- response_.insert(response_.end(), buf_.data(),
- buf_.data() + bytes_transferred);
+ stream_response_->postBuffer(buf_.data(), bytes_transferred);
}
- // Two consecutive new lines end the part of the response we're
- // expecting.
- if (response_.find("good bye", 0) != std::string::npos) {
- receive_done_ = true;
- done_callback_();
- } else {
+ if (stream_response_->needData()) {
+ // Response is incomplete, keep reading.
receivePartialResponse();
+ } else {
+ // Response is complete, process it.
+ responseReceived();
}
});
}
+ /// @brief Process a completed response received from the server.
+ void responseReceived() {
+ /// Unpack wire data into a string.
+ ASSERT_NO_THROW(stream_response_->unpack());
+ std::string response = stream_response_->getRequest();
+ responses_received_.push_back(response);
+
+ // Quit if server tells us "good bye".
+ if (response.find("good bye", 0) != std::string::npos) {
+ receive_done_ = true;
+ done_callback_();
+ }
+
+ // Clear out for the next one.
+ stream_response_.reset();
+ sendNextRequest();
+ }
+
/// @brief Checks if the TCP connection is still open.
///
/// Tests the TCP connection by trying to read from the socket.
socket_.close();
}
- /// @brief Returns the HTTP response string.
- ///
- /// @return string containing the response.
- std::string getResponse() const {
- return (response_);
- }
-
/// @brief Returns true if the receive completed without error.
///
/// @return True if the receive completed successfully, false
return (expected_eof_);
}
+ /// @brief Returns the list of received responses.
+ ///
+ /// @return list of string responses.
+ const std::list<std::string>& getResponses() {
+ return (responses_received_);
+ }
+
private:
/// @brief Holds reference to the IO service.
/// @brief IP port of the server.
uint16_t server_port_;
-
/// @brief Set to true when the receive has completed successfully.
bool receive_done_;
/// expected it to do.
bool expected_eof_;
+ /// @brief Pointer to the server response currently being received.
+ TcpStreamRequestPtr stream_response_;
+
+ /// @brief List of string requests to send.
+ std::list<std::string> requests_to_send_;
+
+ /// @brief List of string responses received.
+ std::list<std::string> responses_received_;
};
/// @brief Pointer to the TcpTestClient.