static thread_local map<ComboAddress, std::deque<std::unique_ptr<Socket>>> t_downstreamSockets;
static std::mutex tcpClientsCountMutex;
static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
+static const size_t g_maxCachedConnectionsPerDownstream = 20;
uint64_t g_maxTCPQueuedConnections{1000};
size_t g_maxTCPQueriesPerConn{0};
size_t g_maxTCPConnectionDuration{0};
bool g_useTCPSinglePipe{false};
std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
-static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState> ds, uint16_t& downstreamFailures, int timeout)
+static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, int timeout)
{
std::unique_ptr<Socket> result;
const auto& it = t_downstreamSockets.find(ds->remote);
if (it != t_downstreamSockets.end()) {
auto& list = it->second;
- if (list.size() >= 20) {
+ if (list.size() >= g_maxCachedConnectionsPerDownstream) {
/* too many connections queued already */
socket.reset();
return;
// XXX could probably be implemented as a TCPIOHandler
IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t toRead)
{
+ if (buffer.size() < (pos + toRead)) {
+ throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos));
+ }
+
size_t got = 0;
do {
ssize_t res = ::read(fd, reinterpret_cast<char*>(&buffer.at(pos)), toRead - got);
return res;
}
- bool maxConnectionDurationReached(unsigned int maxConnectionDuration, const struct timeval now)
+ bool maxConnectionDurationReached(unsigned int maxConnectionDuration, const struct timeval& now)
{
if (maxConnectionDuration) {
time_t curtime = now.tv_sec;
ssize_t got = read(pipefd, &citmp, sizeof(citmp));
if (got == 0) {
- throw std::runtime_error("EOF while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mod");
+ throw std::runtime_error("EOF while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
}
else if (got == -1) {
if (errno == EAGAIN || errno == EINTR) {
return;
}
- throw std::runtime_error("Error while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mde:" + strerror(errno));
+ throw std::runtime_error("Error while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + strerror(errno));
}
else if (got != sizeof(citmp)) {
throw std::runtime_error("Partial read while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
}
- g_tcpclientthreads->decrementQueuedCount();
- auto ci = std::move(*citmp);
- delete citmp;
- citmp = nullptr;
+ try {
+ g_tcpclientthreads->decrementQueuedCount();
- struct timeval now;
- gettimeofday(&now, 0);
- auto state = std::make_shared<IncomingTCPConnectionState>(std::move(ci), *threadData, now);
+ struct timeval now;
+ gettimeofday(&now, 0);
+ auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
+ delete citmp;
+ citmp = nullptr;
- /* let's update the remaining time */
- state->d_remainingTime = g_maxTCPConnectionDuration;
+ /* let's update the remaining time */
+ state->d_remainingTime = g_maxTCPConnectionDuration;
- handleIO(state, now);
+ handleIO(state, now);
+ }
+ catch(...) {
+ delete citmp;
+ citmp = nullptr;
+ throw;
+ }
}
void tcpClientThread(int pipefd)
TCPClientThreadData data;
data.mplexer->addReadFD(pipefd, handleIncomingTCPQuery, &data);
- time_t lastTCPCleanup = time(nullptr);
- time_t lastTimeoutScan = time(nullptr);
struct timeval now;
gettimeofday(&now, 0);
+ time_t lastTCPCleanup = now.tv_sec;
+ time_t lastTimeoutScan = now.tv_sec;
for (;;) {
data.mplexer->run(&now);