From: Remi Gacogne Date: Wed, 29 Aug 2018 12:29:55 +0000 (+0200) Subject: ixfrdist: Speedup and optimize memory usage when writing answers X-Git-Tag: dnsdist-1.3.3~122^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ba779b1202755305cf9b97ee240d74279ef9b604;p=thirdparty%2Fpdns.git ixfrdist: Speedup and optimize memory usage when writing answers We now group as many records as possible inside one DNS message, and send a message as soon as it is ready, so we can reduce the memory usage by not keeping all outgoing messages in memory, and send a lower number of messages. --- diff --git a/pdns/ixfrdist.cc b/pdns/ixfrdist.cc index a3e0a5db72..27047007c3 100644 --- a/pdns/ixfrdist.cc +++ b/pdns/ixfrdist.cc @@ -135,7 +135,7 @@ bool g_exiting = false; NetmaskGroup g_acl; -void handleSignal(int signum) { +static void handleSignal(int signum) { g_log< info_msg; g_log< that represents the full response to a SOA * query. QNAME is read from mdp. */ -bool makeSOAPacket(const MOADNSParser& mdp, vector& packet) { +static bool makeSOAPacket(const MOADNSParser& mdp, vector& packet) { auto zoneInfo = getCurrentZoneInfo(mdp.d_qname); if (zoneInfo == nullptr) { @@ -470,7 +470,7 @@ bool makeSOAPacket(const MOADNSParser& mdp, vector& packet) { return true; } -vector getSOAPacket(const MOADNSParser& mdp, const shared_ptr& soa) { +static vector getSOAPacket(const MOADNSParser& mdp, const shared_ptr& soa) { vector packet; DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype); pw.getHeader()->id = mdp.d_header.id; @@ -484,7 +484,75 @@ vector getSOAPacket(const MOADNSParser& mdp, const shared_ptr>& packets) { +static bool sendPacketOverTCP(int fd, const std::vector& packet) +{ + char sendBuf[2]; + sendBuf[0]=packet.size()/256; + sendBuf[1]=packet.size()%256; + + ssize_t send = writen2(fd, sendBuf, 2); + send += writen2(fd, &packet[0], packet.size()); + return true; +} + +static bool addRecordToWriter(DNSPacketWriter& pw, const DNSName& zoneName, const DNSRecord& record) +{ + pw.startRecord(record.d_name + zoneName, record.d_type); + record.d_content->toPacket(pw); + if (pw.size() > 65535) { + pw.rollback(); + return false; + } + return true; +} + +template static bool sendRecordsOverTCP(int fd, const MOADNSParser& mdp, const T& records) +{ + vector packet; + + for (auto it = records.cbegin(); it != records.cend();) { + bool recordsAdded = false; + packet.clear(); + DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype); + pw.getHeader()->id = mdp.d_header.id; + pw.getHeader()->rd = mdp.d_header.rd; + pw.getHeader()->qr = 1; + + while (it != records.cend()) { + if (it->d_type == QType::SOA) { + it++; + continue; + } + + if (addRecordToWriter(pw, mdp.d_qname, *it)) { + recordsAdded = true; + it++; + } + else { + if (recordsAdded) { + pw.commit(); + sendPacketOverTCP(fd, packet); + } + if (it == records.cbegin()) { + /* something is wrong */ + return false; + } + + break; + } + } + + if (it == records.cend() && recordsAdded) { + pw.commit(); + sendPacketOverTCP(fd, packet); + } + } + + return true; +} + + +static bool handleAXFR(int fd, const MOADNSParser& mdp) { /* we get a shared pointer of the zone info that we can't modify, ever. A newer one may arise in the meantime, but this one will stay valid until we release it. @@ -496,56 +564,29 @@ bool makeAXFRPackets(const MOADNSParser& mdp, vector>& packets) shared_ptr soa = zoneInfo->soa; const records_t& records = zoneInfo->latestAXFR; - packets.reserve(packets.size() + /* SOAs */ 2 + records.size()); // Initial SOA const auto soaPacket = getSOAPacket(mdp, soa); - packets.push_back(soaPacket); + if (!sendPacketOverTCP(fd, soaPacket)) { + return false; + } - for (auto const &record : records) { - if (record.d_type == QType::SOA) { - continue; - } - vector packet; - DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype); - pw.getHeader()->id = mdp.d_header.id; - pw.getHeader()->rd = mdp.d_header.rd; - pw.getHeader()->qr = 1; - pw.startRecord(record.d_name + mdp.d_qname, record.d_type); - record.d_content->toPacket(pw); - pw.commit(); - packets.push_back(packet); + if (!sendRecordsOverTCP(fd, mdp, records)) { + return false; } // Final SOA - packets.push_back(soaPacket); + if (!sendPacketOverTCP(fd, soaPacket)) { + return false; + } return true; } -void makeXFRPacketsFromDNSRecords(const MOADNSParser& mdp, const vector& records, vector>& packets) { - - for(const auto& r : records) { - if (r.d_type == QType::SOA) { - continue; - } - - vector packet; - DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype); - pw.getHeader()->id = mdp.d_header.id; - pw.getHeader()->rd = mdp.d_header.rd; - pw.getHeader()->qr = 1; - pw.startRecord(r.d_name + mdp.d_qname, r.d_type); - r.d_content->toPacket(pw); - pw.commit(); - packets.push_back(packet); - } -} - /* Produces an IXFR if one can be made according to the rules in RFC 1995 and * creates a SOA or AXFR packet when required by the RFC. */ -bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr& clientSOA, vector>& packets) { +static bool handleIXFR(int fd, const ComboAddress& destination, const MOADNSParser& mdp, const shared_ptr& clientSOA) { vector> toSend; /* we get a shared pointer of the zone info that we can't modify, ever. @@ -568,7 +609,7 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr vector packet; bool ret = makeSOAPacket(mdp, packet); if (ret) { - packets.push_back(packet); + sendPacketOverTCP(fd, packet); } return ret; } @@ -590,9 +631,10 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr if (toSend.empty()) { g_log<d_st.serial<<" for zone "<> packets; for (const auto& diff : toSend) { /* An IXFR packet's ANSWER section looks as follows: * SOA new_serial @@ -602,24 +644,43 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr * ... added records ... * SOA new_serial */ - packets.reserve(packets.size() + /* SOAs */ 4 + diff->removals.size() + diff->additions.size()); - packets.push_back(getSOAPacket(mdp, diff->newSOA)); - packets.push_back(getSOAPacket(mdp, diff->oldSOA)); - makeXFRPacketsFromDNSRecords(mdp, diff->removals, packets); - packets.push_back(getSOAPacket(mdp, diff->newSOA)); - makeXFRPacketsFromDNSRecords(mdp, diff->additions, packets); - packets.push_back(getSOAPacket(mdp, diff->newSOA)); + const auto newSOAPacket = getSOAPacket(mdp, diff->newSOA); + const auto oldSOAPacket = getSOAPacket(mdp, diff->oldSOA); + + if (!sendPacketOverTCP(fd, newSOAPacket)) { + return false; + } + + if (!sendPacketOverTCP(fd, oldSOAPacket)) { + return false; + } + + if (!sendRecordsOverTCP(fd, mdp, diff->removals)) { + return false; + } + + if (!sendPacketOverTCP(fd, newSOAPacket)) { + return false; + } + + if (!sendRecordsOverTCP(fd, mdp, diff->additions)) { + return false; + } + + if (!sendPacketOverTCP(fd, newSOAPacket)) { + return false; + } } return true; } -bool allowedByACL(const ComboAddress& addr) { +static bool allowedByACL(const ComboAddress& addr) { return g_acl.match(addr); } -void handleUDPRequest(int fd, boost::any&) { +static void handleUDPRequest(int fd, boost::any&) { // TODO make the buffer-size configurable char buf[4096]; ComboAddress saddr; @@ -672,7 +733,7 @@ void handleUDPRequest(int fd, boost::any&) { return; } -void handleTCPRequest(int fd, boost::any&) { +static void handleTCPRequest(int fd, boost::any&) { ComboAddress saddr; int cfd = 0; @@ -705,7 +766,7 @@ void handleTCPRequest(int fd, boost::any&) { /* Thread to handle TCP traffic */ -void tcpWorker(int tid) { +static void tcpWorker(int tid) { string prefix = "TCP Worker " + std::to_string(tid) + ": "; while(true) { @@ -746,7 +807,6 @@ void tcpWorker(int tid) { continue; } - vector> packets; if (mdp.d_qtype == QType::SOA) { vector packet; bool ret = makeSOAPacket(mdp, packet); @@ -754,17 +814,15 @@ void tcpWorker(int tid) { close(cfd); continue; } - packets.push_back(packet); + sendPacketOverTCP(cfd, packet); } - - if (mdp.d_qtype == QType::AXFR) { - if (!makeAXFRPackets(mdp, packets)) { + else if (mdp.d_qtype == QType::AXFR) { + if (!handleAXFR(cfd, mdp)) { close(cfd); continue; } } - - if (mdp.d_qtype == QType::IXFR) { + else if (mdp.d_qtype == QType::IXFR) { /* RFC 1995 section 3: * The IXFR query packet format is the same as that of a normal DNS * query, but with the query type being IXFR and the authority section @@ -788,21 +846,12 @@ void tcpWorker(int tid) { continue; } - if (!makeIXFRPackets(mdp, clientSOA, packets)) { + if (!handleIXFR(cfd, saddr, mdp, clientSOA)) { close(cfd); continue; } } /* if (mdp.d_qtype == QType::IXFR) */ - g_log<