}
const auto& ds = conn->getDS();
- const auto& it = t_downstreamConnections.find(ds);
- if (it != t_downstreamConnections.end()) {
- auto& list = it->second;
- if (list.size() >= s_maxCachedConnectionsPerDownstream) {
- /* too many connections queued already */
- conn.reset();
- return;
- }
-
- list.push_back(std::move(conn));
+ auto& list = t_downstreamConnections[ds];
+ if (list.size() >= s_maxCachedConnectionsPerDownstream) {
+ /* too many connections queued already */
+ conn.reset();
}
else {
- t_downstreamConnections[ds].push_back(std::move(conn));
+ list.push_back(std::move(conn));
}
}
static void cleanupClosedTCPConnections()
{
- for(auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
+ for (auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
if (*connIt && isTCPSocketUsable((*connIt)->getHandle())) {
++connIt;
downstream = getActiveDownstreamConnection(ds, tlvs);
if (!downstream) {
- /* we don't have a connection to this backend active yet, let's ask one (it might not be a fresh one, though) */
+ /* we don't have a connection to this backend active yet, let's get one (it might not be a fresh one, though) */
downstream = DownstreamConnectionsManager::getConnectionToDownstream(d_threadData.mplexer, ds, now);
registerActiveDownstreamConnection(downstream);
}
std::thread t1(tcpClientThread, pipefds[0]);
t1.detach();
}
- catch(const std::runtime_error& e) {
+ catch (const std::runtime_error& e) {
/* the thread creation failed, don't leak */
errlog("Error creating a TCP thread: %s", e.what());
if (!d_useSinglePipe) {
tcpClientCountIncremented = false;
try {
socklen_t remlen = remote.getSocklen();
- ci = std::unique_ptr<ConnectionInfo>(new ConnectionInfo(cs));
+ ci = std::make_unique<ConnectionInfo>(cs);
#ifdef HAVE_ACCEPT4
ci->fd = accept4(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
#else
ci->fd = accept(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
#endif
+ // will be decremented when the ConnectionInfo object is destroyed, no matter the reason
++cs->tcpCurrentConnections;
- if(ci->fd < 0) {
+ if (ci->fd < 0) {
throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str());
}
- if(!acl->match(remote)) {
+ if (!acl->match(remote)) {
++g_stats.aclDrops;
vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
continue;
}
#endif
setTCPNoDelay(ci->fd); // disable NAGLE
- if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
+ if (g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
continue;
}
queuedCounterIncremented = true;
auto tmp = ci.release();
try {
+ // throws on failure
writen2WithTimeout(pipe, &tmp, sizeof(tmp), 0);
}
- catch(...) {
+ catch (...) {
delete tmp;
tmp = nullptr;
throw;
else {
g_tcpclientthreads->decrementQueuedCount();
queuedCounterIncremented = false;
- if(tcpClientCountIncremented) {
+ if (tcpClientCountIncremented) {
decrementTCPClientCount(remote);
}
}
}
- catch(const std::exception& e) {
+ catch (const std::exception& e) {
errlog("While reading a TCP question: %s", e.what());
- if(tcpClientCountIncremented) {
+ if (tcpClientCountIncremented) {
decrementTCPClientCount(remote);
}
if (queuedCounterIncremented) {
g_tcpclientthreads->decrementQueuedCount();
}
}
- catch(...){}
+ catch (...){}
}
}