public:
TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpStates);
- int getThread()
- {
- if (d_numthreads == 0) {
- throw std::runtime_error("No TCP worker thread yet");
- }
-
- uint64_t pos = d_pos++;
- ++d_queued;
- return d_tcpclientthreads.at(pos % d_numthreads).d_newConnectionPipe.getHandle();
- }
-
bool passConnectionToThread(std::unique_ptr<ConnectionInfo>&& conn)
{
if (d_numthreads == 0) {
auto pipe = d_tcpclientthreads.at(pos % d_numthreads).d_newConnectionPipe.getHandle();
auto tmp = conn.release();
+ /* we need to increment this counter _before_ writing to the pipe,
+ otherwise there is a very real possiblity that the other end
+ decrement the counter before we can increment it, leading to an underflow */
+ ++d_queued;
if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ --d_queued;
++g_stats.tcpQueryPipeFull;
delete tmp;
tmp = nullptr;
return false;
}
- ++d_queued;
return true;
}