return downstream;
}
-static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD);
+static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD, std::vector<ClientState*> tcpAcceptStates);
-TCPClientCollection::TCPClientCollection(size_t maxThreads): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
+TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
{
for (size_t idx = 0; idx < maxThreads; idx++) {
- addTCPClientThread();
+ addTCPClientThread(tcpAcceptStates);
}
}
-void TCPClientCollection::addTCPClientThread()
+void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
{
auto preparePipe = [](int fds[2], const std::string& type) -> bool {
if (pipe(fds) < 0) {
no need to worry about it */
TCPWorkerThread worker(pipefds[1], crossProtocolQueriesFDs[1], crossProtocolResponsesFDs[1]);
try {
- std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1]);
+ std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1], tcpAcceptStates);
t1.detach();
}
catch (const std::runtime_error& e) {
}
}
-static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD)
+struct TCPAcceptorParam
+{
+ ClientState& cs;
+ ComboAddress local;
+ LocalStateHolder<NetmaskGroup>& acl;
+ int socket{-1};
+};
+
+static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData);
+
+static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD, std::vector<ClientState*> tcpAcceptStates)
{
/* we get launched with a pipe on which we receive file descriptors from clients that we own
from that point on */
data.mplexer->addReadFD(crossProtocolQueriesPipeFD, handleCrossProtocolQuery, &data);
data.mplexer->addReadFD(crossProtocolResponsesListenPipeFD, handleCrossProtocolResponse, &data);
+ /* only used in single acceptor mode for now */
+ auto acl = g_ACL.getLocal();
+ std::vector<TCPAcceptorParam> acceptParams;
+ acceptParams.reserve(tcpAcceptStates.size());
+
+ for (auto& state : tcpAcceptStates) {
+ acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD});
+ for (const auto& [addr, socket] : state->d_additionalAddresses) {
+ acceptParams.emplace_back(TCPAcceptorParam{*state, addr, acl, socket});
+ }
+ }
+
+ auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) {
+ auto acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
+ acceptNewConnection(*acceptorParam, &data);
+ };
+
+ for (size_t idx = 0; idx < acceptParams.size(); idx++) {
+ const auto& param = acceptParams.at(idx);
+ data.mplexer->addReadFD(param.socket, acceptCallback, ¶m);
+ }
+
struct timeval now;
gettimeofday(&now, nullptr);
time_t lastTimeoutScan = now.tv_sec;
}
}
-struct TCPAcceptorParam
-{
- ClientState& cs;
- ComboAddress local;
- LocalStateHolder<NetmaskGroup>& acl;
- int socket{-1};
-};
-
-static void acceptNewConnection(const TCPAcceptorParam& param)
+static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData)
{
auto& cs = param.cs;
auto& acl = param.acl;
vinfolog("Got TCP connection from %s", remote.toStringWithPort());
ci->remote = remote;
- if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) {
- if (tcpClientCountIncremented) {
- decrementTCPClientCount(remote);
+ if (threadData == nullptr) {
+ if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) {
+ if (tcpClientCountIncremented) {
+ decrementTCPClientCount(remote);
+ }
}
}
+ else {
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+ auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*ci), *threadData, now);
+ IncomingTCPConnectionState::handleIO(state, now);
+ }
}
catch (const std::exception& e) {
errlog("While reading a TCP question: %s", e.what());
/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
they will hand off to worker threads & spawn more of them if required
*/
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
void tcpAcceptorThread(std::vector<ClientState*> states)
{
setThreadName("dnsdist/tcpAcce");
if (params.size() == 1) {
while (true) {
- acceptNewConnection(params.at(0));
+ acceptNewConnection(params.at(0), nullptr);
}
}
else {
auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
auto acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
- acceptNewConnection(*acceptorParam);
+ acceptNewConnection(*acceptorParam, nullptr);
};
auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
}
}
}
+#endif