]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: forward dnstap logs to kafka
authorAleš Mrázek <ales.mrazek@nic.cz>
Fri, 24 Oct 2025 13:25:54 +0000 (15:25 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Wed, 29 Oct 2025 21:44:13 +0000 (22:44 +0100)
15 files changed:
.gitlab-ci.yml
distro/pkg/deb/control
distro/pkg/rpm/knot-resolver.spec
doc/_static/config.schema.json
etc/config/config.test-kafka.yaml
pyproject.toml
python/knot_resolver/constants.py
python/knot_resolver/constants.py.in
python/knot_resolver/datamodel/kafka_schema.py
python/knot_resolver/manager/dnstap/dnstap.proto [new file with mode: 0644]
python/knot_resolver/manager/dnstap/dnstap_pb2.py [new file with mode: 0644]
python/knot_resolver/manager/dnstap/listener.py [new file with mode: 0644]
python/knot_resolver/manager/kafka_client.py
python/knot_resolver/manager/server.py
setup.py

index afd49a102dea5926376d1ea30ada82b14b6be981..5d1206f96e996a7fb8e5eefeceee44566492eab1 100644 (file)
@@ -917,7 +917,7 @@ python:unit:
   parallel:
     matrix:
       - PYTHON_VERSION:
-          - '3.8'
+          - '3.8'
           - '3.9'
           - '3.10'
           - '3.11'
index abc929e2124b61a98af341bc3e1ea898fa179952..7b73363c877338b001766530e36d12be8006bdf4 100644 (file)
@@ -59,6 +59,8 @@ Recommends:
  python3-prometheus-client,
  python3-watchdog,
  python3-kafka,
+ python3-fstrm,
+ python3-protobuf,
 Suggests:
  knot-resolver6-module-http,
 Description: caching, DNSSEC-validating DNS resolver - core binaries
index 608f87bf2b8b0a26855922b23c7b8d66e1a6854c..18e51319c8ee65285cc3dc62a7a4ca30b10071bf 100644 (file)
@@ -67,6 +67,8 @@ Requires:       python3-typing-extensions
 Recommends:     python3-prometheus_client
 Recommends:     python3-watchdog
 Recommends:     python3-kafka
+Recommends:     python3-fstrm
+Recommends:     python3-protobuf
 
 # dnstap module dependencies
 # SUSE is missing protoc protobuf compiler
index ed17f54ec80c960f0403baa6da2f83dc2269d2b8..e0cfb2ca2fe574f5f90b9a2ab2e09d9437714a11 100644 (file)
                     "description": "Topic to subscribe data from.",
                     "default": "knot-resolver"
                 },
+                "topic-dnstap": {
+                    "type": "string",
+                    "description": "Topic to send dnstap data to.",
+                    "default": "dnstap"
+                },
                 "server": {
                     "anyOf": [
                         {
             "default": {
                 "enable": false,
                 "topic": "knot-resolver",
+                "topic_dnstap": "dnstap",
                 "server": [
                     "localhost@9092"
                 ],
index 3409eebbc008b80b7063b0b02450ed5a110a1202..7723099ae4622a2216a22fc69ce5915beffcaab0 100644 (file)
@@ -1,4 +1,9 @@
+logging:
+  dnstap:
+    unix-socket: /tmp/dnstap.sock
+
 kafka:
   enable: true
   topic: knot-resolver
+  topic-dnstap: dnstap
   server: localhost@9092
index 3b1f3dd31adc86e4ffc8def320dc8d9c9cdf2f0b..1da7318b17fa6375e20b240131adc20ab4cbb147 100644 (file)
@@ -27,7 +27,7 @@ script = "build_c_extensions.py"
 generate-setup-file = true
 
 [tool.poetry.dependencies]
-python = "^3.8"
+python = "^3.9"
 aiohttp = "*"
 jinja2 = "*"
 pyyaml = "*"
@@ -36,11 +36,14 @@ typing-extensions = "*"
 prometheus-client = { version = "*", optional = true }
 watchdog = { version = "*", optional = true }
 kafka-python = { version = "*", optional = true }
+fstrm = { version = "*", optional = true }
+protobuf = { version = "*", optional = true }
 
 [tool.poetry.extras]
 prometheus = ["prometheus-client"]
 watchdog = ["watchdog"]
 kafka = ["kafka-python"]
+dnstap = ["fstrm", "protobuf"]
 
 [tool.poetry.group.build.dependencies]
 poetry-core = ">=1.0.0"
@@ -62,6 +65,7 @@ toml = "^0.10.2"
 ruff = "^0.6.9"
 mypy = "^1.8.0"
 types-pyyaml = "^6.0.12.12"
+types-protobuf = "^6.32.1.20250918"
 
 [tool.poetry.group.docs.dependencies]
 sphinx = "^5.3.0"
@@ -93,7 +97,7 @@ clean = { cmd = "scripts/poe-tasks/clean", help="Cleanup build directories and f
 
 [tool.ruff]
 line-length = 120
-target-version = "py38"
+target-version = "py39"
 exclude = ["setup.py"]
 
 [tool.ruff.lint]
@@ -118,7 +122,7 @@ ignore = [
 known-first-party=["knot_resolver"]
 
 [tool.mypy]
-python_version = "3.8"
+python_version = "3.9"
 disallow_any_generics = true
 disallow_subclassing_any = true
 disallow_untyped_calls = false
index 39343fb2d14044f5e04188bc152e54e59f4b05fe..33ebadd0fc974933cce5af14eb3c618a6624ef8c 100644 (file)
@@ -31,3 +31,11 @@ if importlib.util.find_spec("prometheus_client"):
 KAFKA_LIB = False
 if importlib.util.find_spec("kafka"):
     KAFKA_LIB = True
+
+FSTRM_LIB = False
+if importlib.util.find_spec("fstrm"):
+    FSTRM_LIB = True
+
+PROTOBUF_LIB = False
+if importlib.util.find_spec("google.protobuf"):
+    PROTOBUF_LIB = True
index fcb4f13fa6afdfdd6817aeaba22a1ce1836b636e..7b67ebe60c4927ed90d7059ec04f0e730e79f8f7 100644 (file)
@@ -31,3 +31,11 @@ if importlib.util.find_spec("prometheus_client"):
 KAFKA_LIB = False
 if importlib.util.find_spec("kafka"):
     KAFKA_LIB = True
+
+FSTRM_LIB = False
+if importlib.util.find_spec("fstrm"):
+    FSTRM_LIB = True
+
+PROTOBUF_LIB = False
+if importlib.util.find_spec("google.protobuf"):
+    PROTOBUF_LIB = True
index a31b77bd1b8dca0053af514ff9e2ddf7ce2683cb..bf0a9aea80792f9793f52ec9c5f2831ed6f4a97d 100644 (file)
@@ -19,6 +19,7 @@ class KafkaSchema(ConfigSchema):
     ---
     enable: Enable/disable Kafka client.
     topic: Topic to subscribe data from.
+    topic_dnstap: Topic to send dnstap data to.
     server: Kafka server(s) to connect.
     files_dir: Directory for storing files received via Kafka.
     security_protocol: Protocol used to communicate with server(broker).
@@ -29,6 +30,7 @@ class KafkaSchema(ConfigSchema):
 
     enable: bool = False
     topic: EscapedStr = EscapedStr("knot-resolver")
+    topic_dnstap: EscapedStr = EscapedStr("dnstap")
     server: ListOrItem[Union[IPAddressOptionalPort, DomainNameOptionalPort]] = ListOrItem(
         DomainNameOptionalPort("localhost@9092")
     )
diff --git a/python/knot_resolver/manager/dnstap/dnstap.proto b/python/knot_resolver/manager/dnstap/dnstap.proto
new file mode 100644 (file)
index 0000000..4e98683
--- /dev/null
@@ -0,0 +1,343 @@
+// dnstap: flexible, structured event replication format for DNS software
+//
+// This file contains the protobuf schemas for the "dnstap" structured event
+// replication format for DNS software.
+
+// Written in 2013-2025 by the dnstap contributors.
+//
+// To the extent possible under law, the author(s) have dedicated all
+// copyright and related and neighboring rights to this file to the public
+// domain worldwide. This file is distributed without any warranty.
+//
+// You should have received a copy of the CC0 Public Domain Dedication along
+// with this file. If not, see:
+//
+// <https://creativecommons.org/publicdomain/zero/1.0/>.
+
+syntax = "proto2";
+package dnstap;
+
+// "Dnstap": this is the top-level dnstap type, which is a "union" type that
+// contains other kinds of dnstap payloads, although currently only one type
+// of dnstap payload is defined.
+// See: https://developers.google.com/protocol-buffers/docs/techniques#union
+message Dnstap {
+    // DNS server identity.
+    // If enabled, this is the identity string of the DNS server which generated
+    // this message. Typically this would be the same string as returned by an
+    // "NSID" (RFC 5001) query.
+    optional bytes      identity = 1;
+
+    // DNS server version.
+    // If enabled, this is the version string of the DNS server which generated
+    // this message. Typically this would be the same string as returned by a
+    // "version.bind" query.
+    optional bytes      version = 2;
+
+    // Extra data for this payload.
+    // This field can be used for adding an arbitrary byte-string annotation to
+    // the payload. No encoding or interpretation is applied or enforced.
+    optional bytes      extra = 3;
+
+    // Identifies which field below is filled in.
+    enum Type {
+        MESSAGE = 1;
+    }
+    required Type       type = 15;
+
+    // One of the following will be filled in.
+    optional Message    message = 14;
+}
+
+// SocketFamily: the network protocol family of a socket. This specifies how
+// to interpret "network address" fields.
+enum SocketFamily {
+    INET = 1;   // IPv4 (RFC 791)
+    INET6 = 2;  // IPv6 (RFC 2460)
+}
+
+// SocketProtocol: the protocol used to transport a DNS message.
+enum SocketProtocol {
+    UDP = 1;         // DNS over UDP transport (RFC 1035 section 4.2.1)
+    TCP = 2;         // DNS over TCP transport (RFC 1035 section 4.2.2)
+    DOT = 3;         // DNS over TLS (RFC 7858)
+    DOH = 4;         // DNS over HTTPS (RFC 8484)
+    DNSCryptUDP = 5; // DNSCrypt over UDP (https://dnscrypt.info/protocol)
+    DNSCryptTCP = 6; // DNSCrypt over TCP (https://dnscrypt.info/protocol)
+    DOQ = 7;         // DNS over QUIC (RFC 9250)
+}
+
+// HttpProtocol: the HTTP protocol version used to transport a DNS message over
+// an HTTP-based protocol such as DNS over HTTPS.
+enum HttpProtocol {
+    HTTP1 = 1; // HTTP/1
+    HTTP2 = 2; // HTTP/2
+    HTTP3 = 3; // HTTP/3
+}
+
+// Policy: information about any name server operator policy
+// applied to the processing of a DNS message.
+message Policy {
+
+    // Match: what aspect of the message or message exchange
+    // triggered the application of the Policy.
+    enum Match {
+        QNAME = 1;       // Name in question section of query
+        CLIENT_IP = 2;   // Client IP address
+        RESPONSE_IP = 3; // Address in A/AAAA RRSet
+        NS_NAME = 4;     // Authoritative name server, by name
+        NS_IP = 5;       // Authoritative name server, by IP address
+    }
+
+    // The Action taken to implement the Policy.
+    enum Action {
+        NXDOMAIN = 1;   // Respond with NXDOMAIN
+        NODATA = 2;     // Respond with empty answer section
+        PASS = 3;       // Do not alter the response (passthrough)
+        DROP = 4;       // Do not respond.
+        TRUNCATE = 5;   // Truncate UDP response, forcing TCP retry
+        LOCAL_DATA = 6; // Respond with local data from policy
+    }
+
+    // type: the type of policy applied, e.g. "RPZ" for a
+    // policy from a Response Policy Zone.
+    optional string type = 1;
+
+    // rule: the rule matched by the message.
+    //
+    // In a RPZ context, this is the owner name of the rule in
+    // the Reponse Policy Zone in wire format.
+    optional bytes rule = 2;
+
+    // action: the policy action taken in response to the
+    // rule match.
+    optional Action action = 3;
+
+    // match: the feature of the message exchange which matched the rule.
+    optional Match match = 4;
+
+    // The matched value. Format depends on the matched feature .
+    optional bytes value = 5;
+}
+
+// Message: a wire-format (RFC 1035 section 4) DNS message and associated
+// metadata. Applications generating "Message" payloads should follow
+// certain requirements based on the MessageType, see below.
+message Message {
+
+    // There are eight types of "Message" defined that correspond to the
+    // four arrows in the following diagram, slightly modified from RFC 1035
+    // section 2:
+
+    //    +---------+               +----------+           +--------+
+    //    |         |     query     |          |   query   |        |
+    //    | Stub    |-SQ--------CQ->| Recursive|-RQ----AQ->| Auth.  |
+    //    | Resolver|               | Server   |           | Name   |
+    //    |         |<-SR--------CR-|          |<-RR----AR-| Server |
+    //    +---------+    response   |          |  response |        |
+    //                              +----------+           +--------+
+
+    // Each arrow has two Type values each, one for each "end" of each arrow,
+    // because these are considered to be distinct events. Each end of each
+    // arrow on the diagram above has been marked with a two-letter Type
+    // mnemonic. Clockwise from upper left, these mnemonic values are:
+    //
+    //   SQ:        STUB_QUERY
+    //   CQ:      CLIENT_QUERY
+    //   RQ:    RESOLVER_QUERY
+    //   AQ:        AUTH_QUERY
+    //   AR:        AUTH_RESPONSE
+    //   RR:    RESOLVER_RESPONSE
+    //   CR:      CLIENT_RESPONSE
+    //   SR:        STUB_RESPONSE
+
+    // Two additional types of "Message" have been defined for the
+    // "forwarding" case where an upstream DNS server is responsible for
+    // further recursion. These are not shown on the diagram above, but have
+    // the following mnemonic values:
+
+    //   FQ:   FORWARDER_QUERY
+    //   FR:   FORWARDER_RESPONSE
+
+    // The "Message" Type values are defined below.
+
+    enum Type {
+        // AUTH_QUERY is a DNS query message received from a resolver by an
+        // authoritative name server, from the perspective of the authoritative
+        // name server.
+        AUTH_QUERY = 1;
+
+        // AUTH_RESPONSE is a DNS response message sent from an authoritative
+        // name server to a resolver, from the perspective of the authoritative
+        // name server.
+        AUTH_RESPONSE = 2;
+
+        // RESOLVER_QUERY is a DNS query message sent from a resolver to an
+        // authoritative name server, from the perspective of the resolver.
+        // Resolvers typically clear the RD (recursion desired) bit when
+        // sending queries.
+        RESOLVER_QUERY = 3;
+
+        // RESOLVER_RESPONSE is a DNS response message received from an
+        // authoritative name server by a resolver, from the perspective of
+        // the resolver.
+        RESOLVER_RESPONSE = 4;
+
+        // CLIENT_QUERY is a DNS query message sent from a client to a DNS
+        // server which is expected to perform further recursion, from the
+        // perspective of the DNS server. The client may be a stub resolver or
+        // forwarder or some other type of software which typically sets the RD
+        // (recursion desired) bit when querying the DNS server. The DNS server
+        // may be a simple forwarding proxy or it may be a full recursive
+        // resolver.
+        CLIENT_QUERY = 5;
+
+        // CLIENT_RESPONSE is a DNS response message sent from a DNS server to
+        // a client, from the perspective of the DNS server. The DNS server
+        // typically sets the RA (recursion available) bit when responding.
+        CLIENT_RESPONSE = 6;
+
+        // FORWARDER_QUERY is a DNS query message sent from a downstream DNS
+        // server to an upstream DNS server which is expected to perform
+        // further recursion, from the perspective of the downstream DNS
+        // server.
+        FORWARDER_QUERY = 7;
+
+        // FORWARDER_RESPONSE is a DNS response message sent from an upstream
+        // DNS server performing recursion to a downstream DNS server, from the
+        // perspective of the downstream DNS server.
+        FORWARDER_RESPONSE = 8;
+
+        // STUB_QUERY is a DNS query message sent from a stub resolver to a DNS
+        // server, from the perspective of the stub resolver.
+        STUB_QUERY = 9;
+
+        // STUB_RESPONSE is a DNS response message sent from a DNS server to a
+        // stub resolver, from the perspective of the stub resolver.
+        STUB_RESPONSE = 10;
+
+        // TOOL_QUERY is a DNS query message sent from a DNS software tool to a
+        // DNS server, from the perspective of the tool.
+        TOOL_QUERY = 11;
+
+        // TOOL_RESPONSE is a DNS response message received by a DNS software
+        // tool from a DNS server, from the perspective of the tool.
+        TOOL_RESPONSE = 12;
+
+        // UPDATE_QUERY is a Dynamic DNS Update request (RFC 2136) received
+        // by an authoritative name server, from the perspective of the
+        // authoritative name server.
+        UPDATE_QUERY = 13;
+
+        // UPDATE_RESPONSE is a Dynamic DNS Update response (RFC 2136) sent
+        // from an authoritative name server, from the perspective of the
+        // authoritative name server.
+        UPDATE_RESPONSE = 14;
+    }
+
+    // One of the Type values described above.
+    required Type               type = 1;
+
+    // One of the SocketFamily values described above.
+    optional SocketFamily       socket_family = 2;
+
+    // One of the SocketProtocol values described above.
+    optional SocketProtocol     socket_protocol = 3;
+
+    // The network address of the message initiator.
+    // For SocketFamily INET, this field is 4 octets (IPv4 address).
+    // For SocketFamily INET6, this field is 16 octets (IPv6 address).
+    optional bytes              query_address = 4;
+
+    // The network address of the message responder.
+    // For SocketFamily INET, this field is 4 octets (IPv4 address).
+    // For SocketFamily INET6, this field is 16 octets (IPv6 address).
+    optional bytes              response_address = 5;
+
+    // The transport port of the message initiator.
+    // This is a 16-bit UDP or TCP port number, depending on SocketProtocol.
+    optional uint32             query_port = 6;
+
+    // The transport port of the message responder.
+    // This is a 16-bit UDP or TCP port number, depending on SocketProtocol.
+    optional uint32             response_port = 7;
+
+    // The time at which the DNS query message was sent or received, depending
+    // on whether this is an AUTH_QUERY, RESOLVER_QUERY, or CLIENT_QUERY.
+    // This is the number of seconds since the UNIX epoch.
+    optional uint64             query_time_sec = 8;
+
+    // The time at which the DNS query message was sent or received.
+    // This is the seconds fraction, expressed as a count of nanoseconds.
+    optional fixed32            query_time_nsec = 9;
+
+    // The initiator's original wire-format DNS query message, verbatim.
+    optional bytes              query_message = 10;
+
+    // The "zone" or "bailiwick" pertaining to the DNS query message.
+    // This is a wire-format DNS domain name.
+    optional bytes              query_zone = 11;
+
+    // The time at which the DNS response message was sent or received,
+    // depending on whether this is an AUTH_RESPONSE, RESOLVER_RESPONSE, or
+    // CLIENT_RESPONSE.
+    // This is the number of seconds since the UNIX epoch.
+    optional uint64             response_time_sec = 12;
+
+    // The time at which the DNS response message was sent or received.
+    // This is the seconds fraction, expressed as a count of nanoseconds.
+    optional fixed32            response_time_nsec = 13;
+
+    // The responder's original wire-format DNS response message, verbatim.
+    optional bytes              response_message = 14;
+
+    // Operator policy applied to the processing of this message, if any.
+    optional Policy             policy = 15;
+
+    // One of the HttpProtocol values described above. This field should only be
+    // set if socket_protocol is set to DOH.
+    optional HttpProtocol       http_protocol = 16;
+}
+
+// All fields except for 'type' in the Message schema are optional.
+// It is recommended that at least the following fields be filled in for
+// particular types of Messages.
+
+// AUTH_QUERY:
+//      socket_family, socket_protocol
+//      query_address, query_port
+//      query_message
+//      query_time_sec, query_time_nsec
+
+// AUTH_RESPONSE:
+//      socket_family, socket_protocol
+//      query_address, query_port
+//      query_time_sec, query_time_nsec
+//      response_message
+//      response_time_sec, response_time_nsec
+
+// RESOLVER_QUERY:
+//      socket_family, socket_protocol
+//      query_message
+//      query_time_sec, query_time_nsec
+//      query_zone
+//      response_address, response_port
+
+// RESOLVER_RESPONSE:
+//      socket_family, socket_protocol
+//      query_time_sec, query_time_nsec
+//      query_zone
+//      response_address, response_port
+//      response_message
+//      response_time_sec, response_time_nsec
+
+// CLIENT_QUERY:
+//      socket_family, socket_protocol
+//      query_message
+//      query_time_sec, query_time_nsec
+
+// CLIENT_RESPONSE:
+//      socket_family, socket_protocol
+//      query_time_sec, query_time_nsec
+//      response_message
+//      response_time_sec, response_time_nsec
diff --git a/python/knot_resolver/manager/dnstap/dnstap_pb2.py b/python/knot_resolver/manager/dnstap/dnstap_pb2.py
new file mode 100644 (file)
index 0000000..7fcfac0
--- /dev/null
@@ -0,0 +1,49 @@
+# ruff: noqa
+# type: ignore
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: dnstap.proto
+"""Generated protocol buffer code."""
+
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import runtime_version as _runtime_version
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+
+_runtime_version.ValidateProtobufRuntimeVersion(_runtime_version.Domain.PUBLIC, 6, 32, 0, "", "dnstap.proto")
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
+    b'\n\x0c\x64nstap.proto\x12\x06\x64nstap"\x94\x01\n\x06\x44nstap\x12\x10\n\x08identity\x18\x01 \x01(\x0c\x12\x0f\n\x07version\x18\x02 \x01(\x0c\x12\r\n\x05\x65xtra\x18\x03 \x01(\x0c\x12!\n\x04type\x18\x0f \x02(\x0e\x32\x13.dnstap.Dnstap.Type\x12 \n\x07message\x18\x0e \x01(\x0b\x32\x0f.dnstap.Message"\x13\n\x04Type\x12\x0b\n\x07MESSAGE\x10\x01"\xa1\x02\n\x06Policy\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0c\n\x04rule\x18\x02 \x01(\x0c\x12%\n\x06\x61\x63tion\x18\x03 \x01(\x0e\x32\x15.dnstap.Policy.Action\x12#\n\x05match\x18\x04 \x01(\x0e\x32\x14.dnstap.Policy.Match\x12\r\n\x05value\x18\x05 \x01(\x0c"J\n\x05Match\x12\t\n\x05QNAME\x10\x01\x12\r\n\tCLIENT_IP\x10\x02\x12\x0f\n\x0bRESPONSE_IP\x10\x03\x12\x0b\n\x07NS_NAME\x10\x04\x12\t\n\x05NS_IP\x10\x05"T\n\x06\x41\x63tion\x12\x0c\n\x08NXDOMAIN\x10\x01\x12\n\n\x06NODATA\x10\x02\x12\x08\n\x04PASS\x10\x03\x12\x08\n\x04\x44ROP\x10\x04\x12\x0c\n\x08TRUNCATE\x10\x05\x12\x0e\n\nLOCAL_DATA\x10\x06"\xf9\x05\n\x07Message\x12"\n\x04type\x18\x01 \x02(\x0e\x32\x14.dnstap.Message.Type\x12+\n\rsocket_family\x18\x02 \x01(\x0e\x32\x14.dnstap.SocketFamily\x12/\n\x0fsocket_protocol\x18\x03 \x01(\x0e\x32\x16.dnstap.SocketProtocol\x12\x15\n\rquery_address\x18\x04 \x01(\x0c\x12\x18\n\x10response_address\x18\x05 \x01(\x0c\x12\x12\n\nquery_port\x18\x06 \x01(\r\x12\x15\n\rresponse_port\x18\x07 \x01(\r\x12\x16\n\x0equery_time_sec\x18\x08 \x01(\x04\x12\x17\n\x0fquery_time_nsec\x18\t \x01(\x07\x12\x15\n\rquery_message\x18\n \x01(\x0c\x12\x12\n\nquery_zone\x18\x0b \x01(\x0c\x12\x19\n\x11response_time_sec\x18\x0c \x01(\x04\x12\x1a\n\x12response_time_nsec\x18\r \x01(\x07\x12\x18\n\x10response_message\x18\x0e \x01(\x0c\x12\x1e\n\x06policy\x18\x0f \x01(\x0b\x32\x0e.dnstap.Policy\x12+\n\rhttp_protocol\x18\x10 \x01(\x0e\x32\x14.dnstap.HttpProtocol"\x95\x02\n\x04Type\x12\x0e\n\nAUTH_QUERY\x10\x01\x12\x11\n\rAUTH_RESPONSE\x10\x02\x12\x12\n\x0eRESOLVER_QUERY\x10\x03\x12\x15\n\x11RESOLVER_RESPONSE\x10\x04\x12\x10\n\x0c\x43LIENT_QUERY\x10\x05\x12\x13\n\x0f\x43LIENT_RESPONSE\x10\x06\x12\x13\n\x0f\x46ORWARDER_QUERY\x10\x07\x12\x16\n\x12\x46ORWARDER_RESPONSE\x10\x08\x12\x0e\n\nSTUB_QUERY\x10\t\x12\x11\n\rSTUB_RESPONSE\x10\n\x12\x0e\n\nTOOL_QUERY\x10\x0b\x12\x11\n\rTOOL_RESPONSE\x10\x0c\x12\x10\n\x0cUPDATE_QUERY\x10\r\x12\x13\n\x0fUPDATE_RESPONSE\x10\x0e*#\n\x0cSocketFamily\x12\x08\n\x04INET\x10\x01\x12\t\n\x05INET6\x10\x02*_\n\x0eSocketProtocol\x12\x07\n\x03UDP\x10\x01\x12\x07\n\x03TCP\x10\x02\x12\x07\n\x03\x44OT\x10\x03\x12\x07\n\x03\x44OH\x10\x04\x12\x0f\n\x0b\x44NSCryptUDP\x10\x05\x12\x0f\n\x0b\x44NSCryptTCP\x10\x06\x12\x07\n\x03\x44OQ\x10\x07*/\n\x0cHttpProtocol\x12\t\n\x05HTTP1\x10\x01\x12\t\n\x05HTTP2\x10\x02\x12\t\n\x05HTTP3\x10\x03'
+)
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "dnstap_pb2", _globals)
+if not _descriptor._USE_C_DESCRIPTORS:
+    DESCRIPTOR._loaded_options = None
+    _globals["_SOCKETFAMILY"]._serialized_start = 1231
+    _globals["_SOCKETFAMILY"]._serialized_end = 1266
+    _globals["_SOCKETPROTOCOL"]._serialized_start = 1268
+    _globals["_SOCKETPROTOCOL"]._serialized_end = 1363
+    _globals["_HTTPPROTOCOL"]._serialized_start = 1365
+    _globals["_HTTPPROTOCOL"]._serialized_end = 1412
+    _globals["_DNSTAP"]._serialized_start = 25
+    _globals["_DNSTAP"]._serialized_end = 173
+    _globals["_DNSTAP_TYPE"]._serialized_start = 154
+    _globals["_DNSTAP_TYPE"]._serialized_end = 173
+    _globals["_POLICY"]._serialized_start = 176
+    _globals["_POLICY"]._serialized_end = 465
+    _globals["_POLICY_MATCH"]._serialized_start = 305
+    _globals["_POLICY_MATCH"]._serialized_end = 379
+    _globals["_POLICY_ACTION"]._serialized_start = 381
+    _globals["_POLICY_ACTION"]._serialized_end = 465
+    _globals["_MESSAGE"]._serialized_start = 468
+    _globals["_MESSAGE"]._serialized_end = 1229
+    _globals["_MESSAGE_TYPE"]._serialized_start = 952
+    _globals["_MESSAGE_TYPE"]._serialized_end = 1229
+# @@protoc_insertion_point(module_scope)
diff --git a/python/knot_resolver/manager/dnstap/listener.py b/python/knot_resolver/manager/dnstap/listener.py
new file mode 100644 (file)
index 0000000..c6c1493
--- /dev/null
@@ -0,0 +1,97 @@
+import asyncio
+import logging
+from pathlib import Path
+
+from knot_resolver.constants import FSTRM_LIB, PROTOBUF_LIB
+from knot_resolver.manager.config_store import ConfigStore
+from knot_resolver.utils import compat
+
+logger = logging.getLogger(__name__)
+
+
+if FSTRM_LIB and PROTOBUF_LIB:
+    import fstrm  # type: ignore[import-untyped]
+
+    from knot_resolver.manager.dnstap import dnstap_pb2
+
+    async def callback(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
+        peername = writer.get_extra_info("peername")
+        if not len(peername):
+            peername = "(unix-socket)"
+        logger.info(f"New connection from {peername}")
+
+        content_type = b"protobuf:dnstap.Dnstap"
+        fstrm_handler = fstrm.FstrmCodec()
+        # loop = asyncio.get_event_loop()
+        dnstap_protobuf = dnstap_pb2.Dnstap()  # type: ignore[attr-defined]
+
+        try:
+            running = True
+            while running:
+                read_task = asyncio.create_task(reader.read(fstrm_handler.pending_nb_bytes()))
+                data = await read_task
+                if not len(data):
+                    running = False
+                    break
+
+                # append data to the buffer
+                fstrm_handler.append(data=data)
+
+                # process the buffer, check if we have received a complete frame ?
+                if fstrm_handler.process():
+                    # Ok, the frame is complete so let's decode it
+                    ctrl, ct, payload = fstrm_handler.decode()
+
+                    # handle the DATA frame
+                    if ctrl == fstrm.FSTRM_DATA_FRAME:
+                        dnstap_protobuf.ParseFromString(payload)
+                        dm = dnstap_protobuf.message
+                        logger.debug(dm)
+
+                    # handle the control frame READY
+                    if ctrl == fstrm.FSTRM_CONTROL_READY:
+                        if content_type not in ct:
+                            raise Exception("content type error: %s" % ct)
+
+                        # todo, checking content type
+                        ctrl_accept = fstrm_handler.encode(ctrl=fstrm.FSTRM_CONTROL_ACCEPT, ct=[content_type])
+                        # respond with accept only if the content type is dnstap
+                        writer.write(ctrl_accept)
+                        await writer.drain()
+
+                    # handle the control frame STOP
+                    if ctrl == fstrm.FSTRM_CONTROL_STOP:
+                        fstrm_handler.reset()
+
+                        # send finish control
+                        ctrl_finish = fstrm_handler.encode(ctrl=fstrm.FSTRM_CONTROL_FINISH)
+                        writer.write(ctrl_finish)
+                        await writer.drain()
+
+        except asyncio.IncompleteReadError:
+            pass
+        except ConnectionError:
+            writer.close()
+        except asyncio.CancelledError:
+            writer.close()
+            await writer.wait_closed()
+
+    async def start_dnstap_listener(socket_path: Path) -> None:
+        if socket_path.exists():
+            socket_path.unlink()
+
+        server = await asyncio.start_unix_server(callback, path=socket_path)
+        logger.info(f"Listening dnstap on '{socket_path}'")
+
+        async with server:
+            await server.serve_forever()
+
+
+async def init_dnstap_listener(config_store: ConfigStore) -> None:
+    config = config_store.get()
+    if FSTRM_LIB and PROTOBUF_LIB and config.logging.dnstap:
+        socket_path = config.logging.dnstap.unix_socket.to_path()
+        if compat.asyncio.is_event_loop_running():
+            compat.asyncio.create_task(start_dnstap_listener(socket_path))
+        else:
+            compat.asyncio.run(start_dnstap_listener(socket_path))
index a33f8e667451e553154dbee2c666bf4929c7d5e5..59e01fccc8181e4da695cdf959fef4a87307d68e 100644 (file)
@@ -1,3 +1,4 @@
+import json
 import logging
 import shutil
 from pathlib import Path
@@ -24,7 +25,7 @@ def kafka_config(config: KresConfig) -> List[Any]:
 
 
 if KAFKA_LIB:
-    from kafka import KafkaConsumer  # type: ignore[import-untyped,import-not-found]
+    from kafka import KafkaConsumer, KafkaProducer  # type: ignore[import-untyped,import-not-found]
     from kafka.consumer.fetcher import ConsumerRecord  # type: ignore[import-untyped,import-not-found]
     from kafka.errors import KafkaError  # type: ignore[import-untyped,import-not-found]
     from kafka.structs import TopicPartition  # type: ignore[import-untyped,import-not-found]
@@ -245,6 +246,7 @@ if KAFKA_LIB:
             self._config = config
             self._consumer_timer: Optional[Timer] = None
             self._consumer: Optional[KafkaConsumer] = None
+            self._producer: Optional[KafkaProducer] = None
 
             # reduce the verbosity of kafka module logger
             kafka_logger = logging.getLogger("kafka")
@@ -258,9 +260,10 @@ if KAFKA_LIB:
                 brokers.append(broker.replace("@", ":") if server.port else f"{broker}:9092")
             self._brokers: List[str] = brokers
             self._consumer_run()
+            self._producer_connect()
 
         def _consumer_connect(self) -> None:
-            error_msg_prefix = f"Connecting to Kafka broker(s) '{self._brokers}' has failed with"
+            error_msg_prefix = f"Connecting consumer to Kafka broker(s) '{self._brokers}' has failed with"
             kafka_conf = self._config.kafka
 
             # close old consumer connection
@@ -268,7 +271,7 @@ if KAFKA_LIB:
                 self._consumer.close()
                 self._consumer = None
 
-            logger.info("Connecting to Kafka broker(s)...")
+            logger.info("Connecting consumer to Kafka broker(s)...")
             try:
                 consumer = KafkaConsumer(
                     str(kafka_conf.topic),
@@ -280,7 +283,7 @@ if KAFKA_LIB:
                     ssl_keyfile=str(kafka_conf.key_file) if kafka_conf.key_file else None,
                 )
                 self._consumer = consumer
-                logger.info("Successfully connected to Kafka broker")
+                logger.info("Successfully connected consumer to Kafka broker")
             except KafkaError as e:
                 logger.error(f"{error_msg_prefix} {e}")
             except Exception as e:
@@ -292,6 +295,29 @@ if KAFKA_LIB:
             if self._consumer:
                 self._consumer.close()
                 self._consumer = None
+            if self._producer:
+                self._producer.close()
+                self._producer = None
+
+        def _producer_connect(self) -> None:
+            error_msg_prefix = f"Connecting producer to Kafka broker(s) '{self._brokers}' has failed with"
+
+            # close old producer connection
+            if self._producer:
+                self._producer.close()
+                self._producer = None
+
+            logger.info("Connecting producer to Kafka broker(s)...")
+            try:
+                producer = KafkaProducer(
+                    bootstrap_servers=self._brokers, value_serializer=lambda v: json.dumps(v).encode("utf-8")
+                )
+                self._producer = producer
+                logger.info("Successfully connected producer to Kafka broker")
+            except KafkaError as e:
+                logger.error(f"{error_msg_prefix} {e}")
+            except Exception as e:
+                logger.error(f"{error_msg_prefix} unknown error:\n{e}")
 
         def _consumer_run(self) -> None:
             keep_consuming = False
index 83a66ba2afba7f1a77d91a19be6276d287bd7ecd..972fb42c0e9408937e3702bc41558aba3bb2bfd5 100644 (file)
@@ -29,6 +29,7 @@ from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without
 from knot_resolver.datamodel.globals import Context, set_global_validation_context
 from knot_resolver.datamodel.management_schema import ManagementSchema
 from knot_resolver.manager import files, kafka_client, metrics
+from knot_resolver.manager.dnstap.listener import init_dnstap_listener
 from knot_resolver.utils import custom_atexit as atexit
 from knot_resolver.utils import ignore_exceptions_optional
 from knot_resolver.utils.async_utils import readfile
@@ -632,6 +633,8 @@ async def start_server(config: List[str]) -> int:  # noqa: PLR0915
 
         await kafka_client.init_kafka_client(config_store)
 
+        await init_dnstap_listener(config_store)
+
         # After we have loaded the configuration, we can start worrying about subprocess management.
         manager = await _init_manager(config_store)
 
index 67810ad068db6e035502df73e5e7f45683b69358..8398ea7f07a3a4d779cb9ccb3711e2e1e1181a9c 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -15,6 +15,7 @@ packages = \
  'knot_resolver.datamodel.templates',
  'knot_resolver.datamodel.types',
  'knot_resolver.manager',
+ 'knot_resolver.manager.dnstap',
  'knot_resolver.manager.files',
  'knot_resolver.manager.metrics',
  'knot_resolver.utils',
@@ -28,7 +29,8 @@ install_requires = \
 ['aiohttp', 'jinja2', 'pyyaml', 'supervisor', 'typing-extensions']
 
 extras_require = \
-{'kafka': ['kafka-python'],
+{'dnstap': ['fstrm', 'protobuf'],
+ 'kafka': ['kafka-python'],
  'prometheus': ['prometheus-client'],
  'watchdog': ['watchdog']}
 
@@ -52,7 +54,7 @@ setup_kwargs = {
     'install_requires': install_requires,
     'extras_require': extras_require,
     'entry_points': entry_points,
-    'python_requires': '>=3.8,<4.0',
+    'python_requires': '>=3.9,<4.0',
 }
 from build_c_extensions import *
 build(setup_kwargs)