class PDNSPBConnHandler(object):
- def __init__(self, conn, oturl, printjson):
+
+ def __init__(self, conn, oturl, printjson, frame4):
self._conn = conn
self._oturl = oturl
self._printjson = printjson
+ self._frame4 = frame4
messageTypeToStringMap = {
dnsmessage_pb2.PBDNSMessage.UNKNOWN: "Unknown",
def run(self):
while True:
- data = self._conn.recv(2)
- if not data or len(data) < 2:
- break
+ if self._frame4:
+ data = self._conn.recv(4)
+ if not data or len(data) < 4:
+ break
+ (datalen,) = struct.unpack("!L", data)
+ else:
+ data = self._conn.recv(2)
+ if not data or len(data) < 2:
+ break
+ (datalen,) = struct.unpack("!H", data)
- (datalen,) = struct.unpack("!H", data)
data = b""
remaining = datalen
self.convertIDs(values)
json_string = json.dumps(values, indent=True)
print("- openTelemetry: " + json_string)
- else:
- print(
- "- openTelemetry decoding not available, see the comments in ProtoBuffer.py to make it available."
- )
+ else:
+ print("- openTelemetry decoding not available, see the comments in ProtoBuffer.py to make it available.")
@staticmethod
def getAppliedPolicyTypeAsString(polType):
if (rrclass == 1 or rrclass == 255) and rr.HasField("rdata"):
if rrtype == 1:
rdatastr = socket.inet_ntop(socket.AF_INET, rr.rdata)
- elif rrtype in (5, 35, 64, 65):
+ elif rrtype in (5, 16, 35, 64, 65):
rdatastr = rr.rdata
elif rrtype == 28:
rdatastr = socket.inet_ntop(socket.AF_INET6, rr.rdata)
return requestorstr
-class PDNSPBListener(object):
- def __init__(self, addr, port, oturl, printjson):
+ def __init__(self, addr, port, oturl, printjson, frame4):
self._oturl = oturl
self._printjson = printjson
- res = socket.getaddrinfo(addr, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+ self._frame4 = frame4
+ res = socket.getaddrinfo(addr, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0,
+ socket.AI_PASSIVE)
if len(res) != 1:
print("Error parsing the supplied address")
sys.exit(1)
while True:
(conn, _) = self._sock.accept()
- handler = PDNSPBConnHandler(conn, self._oturl, self._printjson)
- thread = threading.Thread(name="Connection Handler", target=PDNSPBConnHandler.run, args=[handler])
+ handler = PDNSPBConnHandler(conn, self._oturl, self._printjson, self._frame4)
+ thread = threading.Thread(name='Connection Handler',
+ target=PDNSPBConnHandler.run,
+ args=[handler])
thread.daemon = True
thread.start()
epilog="URL is an optional url of a OpenTelemetry Trace collector endpoint",
)
- parser.add_argument("-json", action="store_true")
- parser.add_argument("address")
- parser.add_argument("port")
- parser.add_argument("-url")
- args = parser.parse_args()
- PDNSPBListener(args.address, args.port, args.url, args.json).run()
+ parser.add_argument('-json', action='store_true')
+ parser.add_argument('address')
+ parser.add_argument('port')
+ parser.add_argument('-url')
+ parser.add_argument('-frame4', action='store_true')
+ args = parser.parse_args();
+ PDNSPBListener(args.address, args.port, args.url, args.json, args.frame4).run()
sys.exit(0)
std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
loggers.reserve(config.connection_count);
for (uint64_t i = 0; i < config.connection_count; i++) {
- loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode));
+ loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode, RemoteLogger::FrameSize::Two));
}
object = std::shared_ptr<RemoteLoggerInterface>(std::make_shared<RemoteLoggerPool>(std::move(loggers)));
}
else {
- object = std::shared_ptr<RemoteLoggerInterface>(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode));
+ object = std::shared_ptr<RemoteLoggerInterface>(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode, RemoteLogger::FrameSize::Two));
}
dnsdist::configuration::yaml::registerType<RemoteLoggerInterface>(object, config.name);
#endif
std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
loggers.reserve(count);
for (uint64_t i = 0; i < count; i++) {
- loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+ loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client, RemoteLogger::FrameSize::Two));
}
return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
}
- return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+ return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client, RemoteLogger::FrameSize::Two));
});
luaCtx.writeFunction("newFrameStreamUnixLogger", [client, configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] std::optional<LuaAssociativeTable<unsigned int>> params) {
m.setResponseCode(rcode);
}
+ const auto limit = (outgoingLoggers->size() > 0 ? outgoingLoggers->at(0)->maxSize() : std::numeric_limits<uint16_t>::max()) / 2;
for (const auto& record : records) {
- m.addRR(record, exportTypes, std::nullopt);
+ m.addRR(record, exportTypes, std::nullopt, limit);
}
m.commitResponse();
#endif /* NOD ENABLED */
if (t_protobufServers.servers) {
- // Max size is 64k, but we're conservative here, as other fields are added after the answers have been added
- // If a single answer causes a too big protobuf message, it will be dropped by queueData()
- // But note addRR has code to prevent that
- if (pbMessage.size() < std::numeric_limits<uint16_t>::max() / 2) {
- pbMessage.addRR(record, luaconfsLocal->protobufExportConfig.exportTypes, udr);
+ // Max size is 64k for 2 bytes frames, but we're conservative here, as other fields are
+ // added after the answers have been added. If a single answer causes a too big protobuf
+ // message, it will be dropped by queueData(), but note addRR has code to prevent that.
+ const auto limit = (t_protobufServers.servers->size() > 0 ? t_protobufServers.servers->at(0)->maxSize() : std::numeric_limits<uint16_t>::max()) / 2;
+ if (pbMessage.size() < limit) {
+ pbMessage.addRR(record, luaconfsLocal->protobufExportConfig.exportTypes, udr, limit);
}
}
}
configA.logQueries == configB.logQueries &&
configA.logResponses == configB.logResponses &&
configA.taggedOnly == configB.taggedOnly &&
- configA.logMappedFrom == configB.logMappedFrom;
+ configA.logMappedFrom == configB.logMappedFrom &&
+ configA.frame4 == configB.frame4;
// clang-format on
}
config.logMappedFrom = boost::get<bool>(have.at("logMappedFrom"));
}
+ if (have.count("frame4") != 0) {
+ config.frame4 = boost::get<bool>(have.at("frame4"));
+ }
+
if (have.count("exportTypes") != 0) {
config.exportTypes.clear();
bool logResponses{true};
bool taggedOnly{false};
bool logMappedFrom{false};
+ bool frame4{false};
};
bool operator==(const ProtobufExportConfig& configA, const ProtobufExportConfig& configB);
for (const auto& server : config.servers) {
try {
- auto logger = make_unique<RemoteLogger>(server, config.timeout, 100 * config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
+ auto logger = make_unique<RemoteLogger>(server, config.timeout, 100 * config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect, config.frame4 ? RemoteLogger::FrameSize::Four : RemoteLogger::FrameSize::Two);
logger->setLogQueries(config.logQueries);
logger->setLogResponses(config.logResponses);
result->emplace_back(std::move(logger));
#include "rec-protozero.hh"
#include <variant>
-void pdns::ProtoZero::RecMessage::addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, [[maybe_unused]] std::optional<bool> udr)
+void pdns::ProtoZero::RecMessage::addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, [[maybe_unused]] std::optional<bool> udr, size_t limit)
{
if (record.d_place != DNSResourceRecord::ANSWER || record.d_class != QClass::IN) {
return;
pbf_rr.add_uint32(static_cast<protozero::pbf_tag_type>(pdns::ProtoZero::Message::RRField::ttl), record.d_ttl);
auto add = [&](const std::string& str) {
- if (size() + str.length() < std::numeric_limits<uint16_t>::max() / 2) {
+ if (size() + str.length() < limit) {
pbf_rr.add_string(static_cast<protozero::pbf_tag_type>(pdns::ProtoZero::Message::RRField::rdata), str);
}
};
// DNSResponse related fields below
- void addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, std::optional<bool> udr);
+ void addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, std::optional<bool> udr, size_t limit);
void setAppliedPolicyType(const DNSFilterEngine::PolicyType type)
{
pbServer.exportTypes.emplace_back(QType(num).toString());
}
pbServer.logMappedFrom = pbConfig.logMappedFrom;
+ pbServer.frame4 = pbConfig.frame4;
}
void fromLuaToRust(const FrameStreamExportConfig& fsc, pdns::rust::settings::rec::DNSTapFrameStreamServer& dnstap)
exp.logResponses = pbServer.logResponses;
exp.taggedOnly = pbServer.taggedOnly;
exp.logMappedFrom = pbServer.logMappedFrom;
+ exp.frame4 = pbServer.frame4;
}
void fromRustToLuaConfig(const pdns::rust::settings::rec::DNSTapFrameStreamServer& dnstap, FrameStreamExportConfig& exp)
exportTypes: Vec<String>,
#[serde(default, skip_serializing_if = "crate::is_default", alias = "log_mapped_from")]
logMappedFrom: bool,
+ // Added in 5.5.0
+ #[serde(default, skip_serializing_if = "crate::is_default")]
+ frame4: bool,
}
// A dnstap logging server
}
insertseq(&mut map, "exportTypes", &seq2);
insertb(&mut map, "logMappedFrom", self.logMappedFrom);
+ insertb(&mut map, "frame4", self.frame4);
serde_yaml::Value::Mapping(map)
}
}
bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
{
- return d_buffer.size() + 2 + str.size() <= d_buffer.capacity();
+ return d_buffer.size() + d_framesize + str.size() <= d_buffer.capacity();
+}
+
+bool CircularWriteBuffer::tooBig(const std::string& str) const
+{
+ return str.size() > (d_framesize == 2 ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max());
}
bool CircularWriteBuffer::write(const std::string& str)
{
- if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
+ if (tooBig(str) || !hasRoomFor(str)) {
return false;
}
- uint16_t len = htons(str.size());
- const char* ptr = reinterpret_cast<const char*>(&len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
- d_buffer.insert(d_buffer.end(), ptr, ptr + sizeof(len)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ if (d_framesize == 2) {
+ uint16_t len = htons(str.size());
+ const char* ptr = reinterpret_cast<const char*>(&len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
+ d_buffer.insert(d_buffer.end(), ptr, ptr + sizeof(len)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ }
+ else {
+ uint32_t len = htonl(str.size());
+ const char* ptr = reinterpret_cast<const char*>(&len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
+ d_buffer.insert(d_buffer.end(), ptr, ptr + sizeof(len)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ }
d_buffer.insert(d_buffer.end(), str.begin(), str.end());
-
return true;
}
return str.at(std::min(tmp, 4U));
}
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect) : d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect, RemoteLogger::FrameSize frame) : d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes, frame == FrameSize::Two ? 2 : 4), nullptr}), d_framesize(frame)
{
if (!d_asyncConnect) {
reconnect();
{
auto runtime = d_runtime.lock();
- if (data.size() > std::numeric_limits<uint16_t>::max()) {
+ if (runtime->d_writer.tooBig(data)) {
++runtime->d_stats.d_tooLarge;
return Result::TooLarge;
}
class CircularWriteBuffer
{
public:
- explicit CircularWriteBuffer(size_t size) : d_buffer(size)
+ explicit CircularWriteBuffer(size_t size, uint8_t frame) : d_buffer(size), d_framesize(frame)
{
}
[[nodiscard]] bool hasRoomFor(const std::string& str) const;
+ [[nodiscard]] bool tooBig(const std::string& str) const;
bool write(const std::string& str);
bool flush(int fileDesc);
private:
boost::circular_buffer<char> d_buffer;
+ uint8_t d_framesize;
};
class RemoteLoggerInterface
class RemoteLogger : public RemoteLoggerInterface
{
public:
+ enum class FrameSize : uint8_t
+ {
+ Two,
+ Four,
+ };
RemoteLogger(const RemoteLogger&) = delete;
RemoteLogger(RemoteLogger&&) = delete;
RemoteLogger& operator=(const RemoteLogger&) = delete;
RemoteLogger(const ComboAddress& remote, uint16_t timeout = 2,
uint64_t maxQueuedBytes = 100000,
uint8_t reconnectWaitTime = 1,
- bool asyncConnect = false);
+ bool asyncConnect = false,
+ FrameSize frame = FrameSize::Two);
~RemoteLogger() override;
std::string address() const override
}
[[nodiscard]] Result queueData(const std::string& data) override;
+ [[nodiscard]] size_t maxSize() const
+ {
+ return d_framesize == FrameSize::Two ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max();
+ }
[[nodiscard]] std::string name() const override
{
return "protobuf";
LockGuarded<RuntimeData> d_runtime;
std::thread d_thread;
+ FrameSize d_framesize;
};