From: Remi Gacogne Date: Tue, 6 Mar 2018 15:20:48 +0000 (+0100) Subject: dnsdist: Use the best multiplexer available instead of poll() X-Git-Tag: dnsdist-1.3.0~35^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F6317%2Fhead;p=thirdparty%2Fpdns.git dnsdist: Use the best multiplexer available instead of poll() --- diff --git a/pdns/devpollmplexer.cc b/pdns/devpollmplexer.cc index 5e11404d1c..f9234965a8 100644 --- a/pdns/devpollmplexer.cc +++ b/pdns/devpollmplexer.cc @@ -46,11 +46,12 @@ public: close(d_devpollfd); } - virtual int run(struct timeval* tv, int timeout=500); + virtual int run(struct timeval* tv, int timeout=500) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + string getName() const override { return "/dev/poll"; } @@ -112,6 +113,26 @@ void DevPollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) } } +void DevPollFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + struct dvpoll dvp; + dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size(); + dvp.dp_fds = new pollfd[dvp.dp_nfds]; + dvp.dp_timeout = timeout; + int ret=ioctl(d_devpollfd, DP_POLL, &dvp); + + if(ret < 0 && errno!=EINTR) { + delete[] dvp.dp_fds; + throw FDMultiplexerException("/dev/poll returned error: "+stringerror()); + } + + for(int n=0; n < ret; ++n) { + fds.push_back(dvp.dp_fds[n].fd); + } + + delete[] dvp.dp_fds; +} + int DevPollFDMultiplexer::run(struct timeval* now, int timeout) { if(d_inrun) { @@ -121,9 +142,9 @@ int DevPollFDMultiplexer::run(struct timeval* now, int timeout) dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size(); dvp.dp_fds = new pollfd[dvp.dp_nfds]; dvp.dp_timeout = timeout; - int ret=ioctl(d_devpollfd, DP_POLL, &dvp); + int ret=ioctl(d_devpollfd, DP_POLL, &dvp); gettimeofday(now,0); // MANDATORY! - + if(ret < 0 && errno!=EINTR) { delete[] dvp.dp_fds; throw FDMultiplexerException("/dev/poll returned error: "+stringerror()); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 449779a27a..b91222cdff 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -386,29 +386,22 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in static int pickBackendSocketForSending(DownstreamState* state) { - return state->fds[state->fdOffset++ % state->fds.size()]; + return state->sockets[state->socketsOffset++ % state->sockets.size()]; } -static int pickBackendSocketForReceiving(const std::shared_ptr& state) +static void pickBackendSocketsReadyForReceiving(const std::shared_ptr& state, std::vector& ready) { - if (state->fds.size() == 1) { - return state->fds[0]; - } + ready.clear(); - std::set fds; - for (auto fd : state->fds) { - if (fd >= 0) { - fds.insert(fd); - } + if (state->sockets.size() == 1) { + ready.push_back(state->sockets[0]); + return ; } - int selected = -1; - int res = waitForMultiData(fds, -1, -1, &selected); - if (res != 1) { - throw std::runtime_error("Error selecting a socket for a backend " + state->remote.toStringWithPort() + ": " + strerror(errno)); + { + std::lock_guard lock(state->socketsLock); + state->mplexer->getAvailableFDs(ready, -1); } - - return selected; } // listens on a dedicated socket, lobs answers from downstream servers to original requestors @@ -427,115 +420,120 @@ try { vector rewrittenResponse; uint16_t queryId = 0; + std::vector sockets; + sockets.reserve(dss->sockets.size()); + for(;;) { dnsheader* dh = reinterpret_cast(packet); bool outstandingDecreased = false; try { - int fd = pickBackendSocketForReceiving(dss); - ssize_t got = recv(fd, packet, sizeof(packet), 0); - char * response = packet; - size_t responseSize = sizeof(packet); + pickBackendSocketsReadyForReceiving(dss, sockets); + for (const auto& fd : sockets) { + ssize_t got = recv(fd, packet, sizeof(packet), 0); + char * response = packet; + size_t responseSize = sizeof(packet); - if (got < (ssize_t) sizeof(dnsheader)) - continue; + if (got < (ssize_t) sizeof(dnsheader)) + continue; - uint16_t responseLen = (uint16_t) got; - queryId = dh->id; + uint16_t responseLen = (uint16_t) got; + queryId = dh->id; - if(queryId >= dss->idStates.size()) - continue; + if(queryId >= dss->idStates.size()) + continue; - IDState* ids = &dss->idStates[queryId]; - int origFD = ids->origFD; + IDState* ids = &dss->idStates[queryId]; + int origFD = ids->origFD; - if(origFD < 0) // duplicate - continue; + if(origFD < 0) // duplicate + continue; - /* setting age to 0 to prevent the maintainer thread from - cleaning this IDS while we process the response. - We have already a copy of the origFD, so it would - mostly mess up the outstanding counter. - */ - ids->age = 0; + /* setting age to 0 to prevent the maintainer thread from + cleaning this IDS while we process the response. + We have already a copy of the origFD, so it would + mostly mess up the outstanding counter. + */ + ids->age = 0; - if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote)) { - continue; - } + if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote)) { + continue; + } - --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket - outstandingDecreased = true; + --dss->outstanding; // you'd think an attacker could game this, but we're using connected socket + outstandingDecreased = true; - if(dh->tc && g_truncateTC) { - truncateTC(response, &responseLen); - } + if(dh->tc && g_truncateTC) { + truncateTC(response, &responseLen); + } - dh->id = ids->origID; + dh->id = ids->origID; - uint16_t addRoom = 0; - DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start); + uint16_t addRoom = 0; + DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start); #ifdef HAVE_PROTOBUF - dr.uniqueId = ids->uniqueId; + dr.uniqueId = ids->uniqueId; #endif - dr.qTag = ids->qTag; + dr.qTag = ids->qTag; - if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) { - continue; - } + if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) { + continue; + } #ifdef HAVE_DNSCRYPT - if (ids->dnsCryptQuery) { - addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE; - } + if (ids->dnsCryptQuery) { + addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE; + } #endif - if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) { - continue; - } + if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) { + continue; + } - if (ids->packetCache && !ids->skipCache) { - ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL); - } + if (ids->packetCache && !ids->skipCache) { + ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL); + } - if (ids->cs && !ids->cs->muted) { + if (ids->cs && !ids->cs->muted) { #ifdef HAVE_DNSCRYPT - if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) { - continue; - } + if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) { + continue; + } #endif - ComboAddress empty; - empty.sin4.sin_family = 0; - /* if ids->destHarvested is false, origDest holds the listening address. - We don't want to use that as a source since it could be 0.0.0.0 for example. */ - sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote); - } + ComboAddress empty; + empty.sin4.sin_family = 0; + /* if ids->destHarvested is false, origDest holds the listening address. + We don't want to use that as a source since it could be 0.0.0.0 for example. */ + sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote); + } - g_stats.responses++; + g_stats.responses++; - double udiff = ids->sentTime.udiff(); - vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff); + double udiff = ids->sentTime.udiff(); + vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff); - { - struct timespec ts; - gettime(&ts); - std::lock_guard lock(g_rings.respMutex); - g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote}); - } + { + struct timespec ts; + gettime(&ts); + std::lock_guard lock(g_rings.respMutex); + g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote}); + } - if(dh->rcode == RCode::ServFail) - g_stats.servfailResponses++; - dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0; + if(dh->rcode == RCode::ServFail) + g_stats.servfailResponses++; + dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0; - doLatencyStats(udiff); + doLatencyStats(udiff); - if (ids->origFD == origFD) { + if (ids->origFD == origFD) { #ifdef HAVE_DNSCRYPT - ids->dnsCryptQuery = nullptr; + ids->dnsCryptQuery = nullptr; #endif - ids->origFD = -1; - outstandingDecreased = false; - } + ids->origFD = -1; + outstandingDecreased = false; + } - rewrittenResponse.clear(); + rewrittenResponse.clear(); + } } catch(const std::exception& e){ vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what()); @@ -570,8 +568,12 @@ catch(...) void DownstreamState::reconnect() { connected = false; - for (auto& fd : fds) { + for (auto& fd : sockets) { if (fd != -1) { + { + std::lock_guard lock(socketsLock); + mplexer->removeReadFD(fd); + } /* shutdown() is needed to wake up recv() in the responderThread */ shutdown(fd, SHUT_RDWR); close(fd); @@ -585,6 +587,10 @@ void DownstreamState::reconnect() } try { SConnect(fd, remote); + { + std::lock_guard lock(socketsLock); + mplexer->addReadFD(fd, [](int, boost::any) {}); + } connected = true; } catch(const std::runtime_error& error) { @@ -597,7 +603,7 @@ void DownstreamState::reconnect() /* if at least one (re-)connection failed, close all sockets */ if (!connected) { - for (auto& fd : fds) { + for (auto& fd : sockets) { if (fd != -1) { /* shutdown() is needed to wake up recv() in the responderThread */ shutdown(fd, SHUT_RDWR); @@ -610,8 +616,10 @@ void DownstreamState::reconnect() DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) { - fds.resize(numberOfSockets); - for (auto& fd : fds) { + mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); + + sockets.resize(numberOfSockets); + for (auto& fd : sockets) { fd = -1; } @@ -1842,9 +1850,13 @@ void* healthChecksThread() warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); if (newState && !dss->connected) { - for (auto& fd : dss->fds) { + for (auto& fd : dss->sockets) { try { SConnect(fd, dss->remote); + { + std::lock_guard lock(dss->socketsLock); + dss->mplexer->addReadFD(fd, [](int, boost::any) {}); + } dss->connected = true; } catch(const std::runtime_error& error) { @@ -1989,7 +2001,7 @@ static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCoun /* UDP sockets to backends */ size_t backendUDPSocketsCount = 0; for (const auto& backend : backends) { - backendUDPSocketsCount += backend->fds.size(); + backendUDPSocketsCount += backend->sockets.size(); } requiredFDsCount += backendUDPSocketsCount; /* TCP sockets to backends */ diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 131b280a95..49e028d8fc 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -24,6 +24,7 @@ #include "ext/luawrapper/include/LuaContext.hpp" #include #include "misc.hh" +#include "mplexer.hh" #include "iputils.hh" #include "dnsname.hh" #include @@ -527,7 +528,7 @@ struct DownstreamState DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {} ~DownstreamState() { - for (auto& fd : fds) { + for (auto& fd : sockets) { if (fd >= 0) { close(fd); fd = -1; @@ -535,7 +536,9 @@ struct DownstreamState } } - std::vector fds; + std::vector sockets; + std::mutex socketsLock; + std::unique_ptr mplexer{nullptr}; std::thread tid; ComboAddress remote; QPSLimiter qps; @@ -555,7 +558,7 @@ struct DownstreamState std::atomic queries{0}; } prev; string name; - size_t fdOffset{0}; + size_t socketsOffset{0}; double queryLoad{0.0}; double dropRate{0.0}; double latencyUsec{0.0}; diff --git a/pdns/epollmplexer.cc b/pdns/epollmplexer.cc index 2deae83b76..97d7e82ff6 100644 --- a/pdns/epollmplexer.cc +++ b/pdns/epollmplexer.cc @@ -42,11 +42,12 @@ public: close(d_epollfd); } - virtual int run(struct timeval* tv, int timeout=500); + virtual int run(struct timeval* tv, int timeout=500) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + string getName() const override { return "epoll"; } @@ -69,8 +70,8 @@ static struct EpollRegisterOurselves } } doItEpoll; - int EpollFDMultiplexer::s_maxevents=1024; + EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents]) { d_epollfd=epoll_create(s_maxevents); // not hard max @@ -123,6 +124,18 @@ void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) throw FDMultiplexerException("Removing fd from epoll set: "+stringerror()); } +void EpollFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout); + + if(ret < 0 && errno!=EINTR) + throw FDMultiplexerException("epoll returned error: "+stringerror()); + + for(int n=0; n < ret; ++n) { + fds.push_back(d_eevents[n].data.fd); + } +} + int EpollFDMultiplexer::run(struct timeval* now, int timeout) { if(d_inrun) { @@ -131,7 +144,7 @@ int EpollFDMultiplexer::run(struct timeval* now, int timeout) int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout); gettimeofday(now,0); // MANDATORY - + if(ret < 0 && errno!=EINTR) throw FDMultiplexerException("epoll returned error: "+stringerror()); diff --git a/pdns/kqueuemplexer.cc b/pdns/kqueuemplexer.cc index 2fb66c8c78..44d3f46735 100644 --- a/pdns/kqueuemplexer.cc +++ b/pdns/kqueuemplexer.cc @@ -44,11 +44,12 @@ public: close(d_kqueuefd); } - virtual int run(struct timeval* tv, int timeout=500); + virtual int run(struct timeval* tv, int timeout=500) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + string getName() const override { return "kqueue"; } @@ -85,7 +86,7 @@ void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toD struct kevent kqevent; EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0); - + if(kevent(d_kqueuefd, &kqevent, 1, 0, 0, 0) < 0) { cbmap.erase(fd); throw FDMultiplexerException("Adding fd to kqueue set: "+stringerror()); @@ -103,6 +104,22 @@ void KqueueFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) throw FDMultiplexerException("Removing fd from kqueue set: "+stringerror()); } +void KqueueFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + struct timespec ts; + ts.tv_sec=timeout/1000; + ts.tv_nsec=(timeout % 1000) * 1000000; + + int ret = kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts); + + if(ret < 0 && errno != EINTR) + throw FDMultiplexerException("kqueue returned error: "+stringerror()); + + for(int n=0; n < ret; ++n) { + fds.push_back(d_kevents[n].ident); + } +} + int KqueueFDMultiplexer::run(struct timeval* now, int timeout) { if(d_inrun) { @@ -115,7 +132,7 @@ int KqueueFDMultiplexer::run(struct timeval* now, int timeout) int ret=kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts); gettimeofday(now,0); // MANDATORY! - + if(ret < 0 && errno!=EINTR) throw FDMultiplexerException("kqueue returned error: "+stringerror()); diff --git a/pdns/mplexer.hh b/pdns/mplexer.hh index ed0f2a02a7..d70143d46b 100644 --- a/pdns/mplexer.hh +++ b/pdns/mplexer.hh @@ -73,6 +73,9 @@ public: /* timeout is in ms */ virtual int run(struct timeval* tv, int timeout=500) = 0; + /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */ + virtual void getAvailableFDs(std::vector& fds, int timeout) = 0; + //! Add an fd to the read watch list - currently an fd can only be on one list at a time! virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t()) { @@ -132,7 +135,7 @@ public: return theMap; } - virtual std::string getName() = 0; + virtual std::string getName() const = 0; protected: typedef std::map callbackmap_t; diff --git a/pdns/pollmplexer.cc b/pdns/pollmplexer.cc index 37f78290aa..e155072a77 100644 --- a/pdns/pollmplexer.cc +++ b/pdns/pollmplexer.cc @@ -34,15 +34,18 @@ public: { } - virtual int run(struct timeval* tv, int timeout=500); + virtual int run(struct timeval* tv, int timeout=500) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + + string getName() const override { return "poll"; } private: + vector preparePollFD() const; }; static FDMultiplexer* make() @@ -77,32 +80,50 @@ void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer"); } -bool pollfdcomp(const struct pollfd& a, const struct pollfd& b) +vector PollFDMultiplexer::preparePollFD() const { - return a.fd < b.fd; -} - -int PollFDMultiplexer::run(struct timeval* now, int timeout) -{ - if(d_inrun) { - throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); - } - vector pollfds; - + pollfds.reserve(d_readCallbacks.size() + d_writeCallbacks.size()); + struct pollfd pollfd; - for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) { - pollfd.fd = i->first; + for(const auto& cb : d_readCallbacks) { + pollfd.fd = cb.first; pollfd.events = POLLIN; pollfds.push_back(pollfd); } - for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) { - pollfd.fd = i->first; + for(const auto& cb : d_writeCallbacks) { + pollfd.fd = cb.first; pollfd.events = POLLOUT; pollfds.push_back(pollfd); } + return pollfds; +} + +void PollFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + auto pollfds = preparePollFD(); + int ret = poll(&pollfds[0], pollfds.size(), timeout); + + if (ret < 0 && errno != EINTR) + throw FDMultiplexerException("poll returned error: " + stringerror()); + + for(const auto& pollfd : pollfds) { + if (pollfd.revents == POLLIN || pollfd.revents == POLLOUT) { + fds.push_back(pollfd.fd); + } + } +} + +int PollFDMultiplexer::run(struct timeval* now, int timeout) +{ + if(d_inrun) { + throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); + } + + auto pollfds = preparePollFD(); + int ret=poll(&pollfds[0], pollfds.size(), timeout); gettimeofday(now, 0); // MANDATORY! @@ -112,17 +133,17 @@ int PollFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_readCallbacks.end(); d_inrun=true; - for(unsigned int n = 0; n < pollfds.size(); ++n) { - if(pollfds[n].revents == POLLIN) { - d_iter=d_readCallbacks.find(pollfds[n].fd); + for(const auto& pollfd : pollfds) { + if(pollfd.revents == POLLIN) { + d_iter=d_readCallbacks.find(pollfd.fd); if(d_iter != d_readCallbacks.end()) { d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); continue; // so we don't refind ourselves as writable! } } - else if(pollfds[n].revents == POLLOUT) { - d_iter=d_writeCallbacks.find(pollfds[n].fd); + else if(pollfd.revents == POLLOUT) { + d_iter=d_writeCallbacks.find(pollfd.fd); if(d_iter != d_writeCallbacks.end()) { d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);