]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Add Protobuf logger written in Rust.
authorKevin P. Fleming <kevin@km6g.us>
Sat, 5 Jul 2025 20:27:24 +0000 (16:27 -0400)
committerKevin P. Fleming <kevin@km6g.us>
Mon, 7 Jul 2025 10:15:37 +0000 (06:15 -0400)
This was inspired by the ProtobufLogger.py already present in the
'contrib' directory.

In addition to being written in Rust instead of Python, there are
various other differences:

* Each line of output is prefixed with the sender's 'socket address'
  (IP address and port number).

* Messages from multiple clients will be properly output, they will
  not be mixed.

* Timestamp format is slightly different (full ISO-8601 with UTC
  offset).

* Command-line arguments are handled by a full parser, which can
  generate help text and report the program's version.

* All 'optional' fields in the protobuf messages are checked for
  presence before being read.

* Output to stdout will never block reception/decoding/formatting of
  protobuf messages; if stdout blocks for some reason, incoming
  messages will be stored in memory until they can be printed.

* Summary, meta, and question lines are printed; responses are not,
  nor is OpenTelemetry data. Future work for another contributor!

* 'meta' output is untested.

* A Cargo feature 'opentelemetry' is available to be the starting
  point of OT support.

No AI or LLM tools were used in the creation or testing of this code.

contrib/pblogger-rs/.gitignore [new file with mode: 0644]
contrib/pblogger-rs/Cargo.toml [new file with mode: 0644]
contrib/pblogger-rs/build.rs [new file with mode: 0644]
contrib/pblogger-rs/rust-toolchain.toml [new file with mode: 0644]
contrib/pblogger-rs/rustfmt.toml [new file with mode: 0644]
contrib/pblogger-rs/src/display.rs [new file with mode: 0644]
contrib/pblogger-rs/src/listener.rs [new file with mode: 0644]
contrib/pblogger-rs/src/main.rs [new file with mode: 0644]

diff --git a/contrib/pblogger-rs/.gitignore b/contrib/pblogger-rs/.gitignore
new file mode 100644 (file)
index 0000000..a9d37c5
--- /dev/null
@@ -0,0 +1,2 @@
+target
+Cargo.lock
diff --git a/contrib/pblogger-rs/Cargo.toml b/contrib/pblogger-rs/Cargo.toml
new file mode 100644 (file)
index 0000000..ab57a92
--- /dev/null
@@ -0,0 +1,27 @@
+[package]
+name = "pblogger-rs"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+anyhow = "1.0.98"
+byteorder = "1.5.0"
+chrono = "0.4.41"
+clap = { version = "4.5.40", features = ["derive"] }
+clap-verbosity-flag = "3.0.3"
+env_logger = "0.11.8"
+log = "0.4.27"
+prost = "0.14.1"
+tokio = { version = "1.46.0", features = ["full"] }
+
+[build-dependencies]
+prost-build = "0.14.1"
+
+[features]
+opentelemetry = []
+
+[lints.clippy]
+pedantic = { level = "warn", priority = -1 }
+cargo = { level = "warn", priority = -1 }
+cargo_common_metadata = "allow"
+multiple_crate_versions = "allow" # necessary for prost crates
diff --git a/contrib/pblogger-rs/build.rs b/contrib/pblogger-rs/build.rs
new file mode 100644 (file)
index 0000000..5914979
--- /dev/null
@@ -0,0 +1,13 @@
+use std::io::Result;
+
+fn main() -> Result<()> {
+    let mut prost_build = prost_build::Config::new();
+
+    prost_build.default_package_filename("pdns");
+    prost_build.compile_protos(&["dnsmessage.proto"], &["../../pdns"])?;
+
+    #[cfg(feature = "opentelemetry")]
+    todo!();
+
+    Ok(())
+}
diff --git a/contrib/pblogger-rs/rust-toolchain.toml b/contrib/pblogger-rs/rust-toolchain.toml
new file mode 100644 (file)
index 0000000..02cb8fc
--- /dev/null
@@ -0,0 +1,3 @@
+[toolchain]
+channel = "stable"
+profile = "default"
diff --git a/contrib/pblogger-rs/rustfmt.toml b/contrib/pblogger-rs/rustfmt.toml
new file mode 100644 (file)
index 0000000..f3e454b
--- /dev/null
@@ -0,0 +1,2 @@
+edition = "2024"
+style_edition = "2024"
diff --git a/contrib/pblogger-rs/src/display.rs b/contrib/pblogger-rs/src/display.rs
new file mode 100644 (file)
index 0000000..d85f0ca
--- /dev/null
@@ -0,0 +1,324 @@
+use crate::pdns::{PbdnsMessage, pbdns_message};
+use byteorder::{ByteOrder, NetworkEndian};
+use chrono::DateTime;
+use std::fmt;
+use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
+
+pub struct ClientMessage {
+    pub client_addr: SocketAddr,
+    pub msg: PbdnsMessage,
+}
+
+#[derive(Clone, Copy)]
+enum Direction {
+    In,
+    Out,
+}
+
+fn make_hex_string(bytes: &[u8]) -> String {
+    bytes
+        .iter()
+        .map(|b| format!("{b:02X}").to_string())
+        .collect::<String>()
+}
+
+fn make_addr_port(
+    msg_family: Option<i32>,
+    msg_addr: Option<&Vec<u8>>,
+    msg_port: Option<u32>,
+) -> String {
+    if let Some(family) = msg_family
+        && let Some(addr) = msg_addr
+    {
+        let fromaddr = match pbdns_message::SocketFamily::try_from(family) {
+            Ok(pbdns_message::SocketFamily::Inet) => {
+                Ipv4Addr::from_bits(NetworkEndian::read_u32(&addr[0..4])).to_string()
+            }
+            Ok(pbdns_message::SocketFamily::Inet6) => {
+                format!(
+                    "[{}]",
+                    Ipv6Addr::from_bits(NetworkEndian::read_u128(&addr[0..16]))
+                )
+            }
+            Err(_) => "unsupported".into(),
+        };
+        if let Some(port) = msg_port {
+            return format!("{fromaddr}:{port}");
+        }
+        return fromaddr;
+    }
+
+    "unknown".into()
+}
+
+#[allow(clippy::too_many_lines)]
+fn print_summary(cmsg: &ClientMessage, dir: Direction, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+    write!(f, "{}", cmsg.client_addr)?;
+
+    write!(
+        f,
+        " {}",
+        match cmsg.msg.time_sec {
+            Some(epoch_secs) => {
+                let mut micros = i64::from(epoch_secs) * 1_000_000;
+                if let Some(epoch_usec) = cmsg.msg.time_usec {
+                    micros += i64::from(epoch_usec);
+                }
+                DateTime::from_timestamp_micros(micros)
+                    .unwrap()
+                    .to_rfc3339_opts(chrono::SecondsFormat::Micros, false)
+            }
+            None => "unknown".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " {} ({})",
+        match cmsg.msg.r#type() {
+            pbdns_message::Type::DnsQueryType | pbdns_message::Type::DnsOutgoingQueryType =>
+                "Query",
+            pbdns_message::Type::DnsResponseType | pbdns_message::Type::DnsIncomingResponseType =>
+                "Response",
+        },
+        match dir {
+            Direction::In => "I",
+            Direction::Out => "O",
+        }
+    )?;
+
+    write!(
+        f,
+        " {}",
+        match cmsg.msg.in_bytes {
+            Some(bytes) => bytes.to_string(),
+            None => "unknown".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " {} {}",
+        match dir {
+            Direction::In => make_addr_port(
+                cmsg.msg.socket_family,
+                cmsg.msg.from.as_ref(),
+                cmsg.msg.from_port
+            ),
+            Direction::Out => make_addr_port(
+                cmsg.msg.socket_family,
+                cmsg.msg.to.as_ref(),
+                cmsg.msg.to_port
+            ),
+        },
+        match dir {
+            Direction::In => make_addr_port(
+                cmsg.msg.socket_family,
+                cmsg.msg.to.as_ref(),
+                cmsg.msg.to_port
+            ),
+            Direction::Out => make_addr_port(
+                cmsg.msg.socket_family,
+                cmsg.msg.from.as_ref(),
+                cmsg.msg.from_port
+            ),
+        }
+    )?;
+
+    write!(
+        f,
+        " {}",
+        match &cmsg.msg.original_requestor_subnet {
+            Some(addr) => match addr.len() {
+                4 => Ipv4Addr::from_bits(NetworkEndian::read_u32(&addr[0..4])).to_string(),
+                16 => format!(
+                    "[{}]",
+                    Ipv6Addr::from_bits(NetworkEndian::read_u128(&addr[0..16]))
+                ),
+                _ => "unsupported".into(),
+            },
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " {}",
+        match cmsg.msg.socket_protocol {
+            Some(proto) => match pbdns_message::SocketProtocol::try_from(proto) {
+                Ok(p) => p.as_str_name(),
+                Err(_) => "unsupported",
+            },
+            None => "unknown",
+        }
+    )?;
+
+    write!(
+        f,
+        " {}",
+        // msg.id and message_id are optional in the protobuf schema, but will always be present
+        match (
+            cmsg.msg.id,
+            &cmsg.msg.message_id,
+            &cmsg.msg.initial_request_id
+        ) {
+            (Some(id), Some(msg_id), None) => format!("id: {id} uuid: {}", make_hex_string(msg_id)),
+            (Some(id), Some(msg_id), Some(initial_id)) => format!(
+                "id: {id} uuid: {}, initial uuid: {}",
+                make_hex_string(msg_id),
+                make_hex_string(initial_id)
+            ),
+            (_, _, _) => unreachable!(),
+        }
+    )?;
+
+    write!(
+        f,
+        " requestorid: {}",
+        match &cmsg.msg.requestor_id {
+            Some(id) => id.clone(),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " deviceid: {}",
+        match &cmsg.msg.device_id {
+            Some(id) => make_hex_string(id),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " devicename: {}",
+        match &cmsg.msg.device_name {
+            Some(name) => name.clone(),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " serverid: {}",
+        // server_identity is not a string in the protobuf schema, but should be
+        match &cmsg.msg.server_identity {
+            Some(id) => String::from_utf8_lossy(id),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " nod: {}",
+        match cmsg.msg.newly_observed_domain {
+            Some(nod) => nod.to_string(),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " workerId: {}",
+        match &cmsg.msg.worker_id {
+            Some(id) => id.to_string(),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " pcCacheHit: {}",
+        match cmsg.msg.packet_cache_hit {
+            Some(hit) => hit.to_string(),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " outgoingQueries: {}",
+        match cmsg.msg.outgoing_queries {
+            Some(queries) => queries.to_string(),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " headerFlags: {}",
+        match cmsg.msg.header_flags {
+            Some(flags) => format!("{:#08X}", u32::from_be(flags)),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(
+        f,
+        " ednsVersion: {}",
+        match cmsg.msg.edns_version {
+            Some(version) => format!("{:#08X}", u32::from_be(version)),
+            None => "N/A".into(),
+        }
+    )?;
+
+    write!(f, " openTelemetryData: len N/A")?;
+
+    writeln!(f)?;
+
+    Ok(())
+}
+
+fn print_meta(cmsg: &ClientMessage, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+    for m in &cmsg.msg.meta {
+        let values = m
+            .value
+            .string_val
+            .clone()
+            .into_iter()
+            .chain(m.value.int_val.clone().into_iter().map(|v| v.to_string()))
+            .collect::<Vec<String>>()
+            .join(", ");
+        writeln!(f, "{} - meta {} -> {}", cmsg.client_addr, m.key, values)?;
+    }
+
+    Ok(())
+}
+
+fn print_query(cmsg: &ClientMessage, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+    match &cmsg.msg.question {
+        Some(q) => writeln!(
+            f,
+            "{} - Question {}, {}. {}",
+            cmsg.client_addr,
+            q.q_class.unwrap_or(1),
+            match q.q_type {
+                Some(t) => t.to_string(),
+                None => "unknown".into(),
+            },
+            match q.q_name.clone() {
+                Some(n) => n,
+                None => "unknown".into(),
+            }
+        ),
+        None => Ok(()),
+    }
+}
+
+impl fmt::Display for ClientMessage {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self.msg.r#type() {
+            pbdns_message::Type::DnsQueryType | pbdns_message::Type::DnsIncomingResponseType => {
+                print_summary(self, Direction::In, f)?;
+                print_meta(self, f)?;
+                print_query(self, f)
+            }
+            pbdns_message::Type::DnsResponseType | pbdns_message::Type::DnsOutgoingQueryType => {
+                print_summary(self, Direction::Out, f)?;
+                print_meta(self, f)?;
+                print_query(self, f)
+            }
+        }
+    }
+}
diff --git a/contrib/pblogger-rs/src/listener.rs b/contrib/pblogger-rs/src/listener.rs
new file mode 100644 (file)
index 0000000..949c29e
--- /dev/null
@@ -0,0 +1,59 @@
+use crate::display::ClientMessage;
+use anyhow::Result;
+use byteorder::{BigEndian, ByteOrder};
+use log::{debug, error, info, warn};
+use prost::Message;
+use std::net::IpAddr;
+use tokio::io::AsyncReadExt;
+use tokio::net::TcpListener;
+use tokio::sync::mpsc;
+
+pub async fn listen(address: IpAddr, port: u16) -> Result<()> {
+    let listener = TcpListener::bind((address, port)).await?;
+    let (tx, mut rx) = mpsc::unbounded_channel::<String>();
+
+    tokio::spawn(async move {
+        while let Some(message) = rx.recv().await {
+            print!("{message}");
+        }
+    });
+
+    info!("Ready to accept connections on {address}:{port}");
+
+    loop {
+        let (mut socket, client_addr) = listener.accept().await?;
+        let task_tx = tx.clone();
+
+        tokio::spawn(async move {
+            let mut length_buf = [0; 2];
+            info!("{client_addr}: connection accepted");
+
+            loop {
+                if let Err(e) = socket.read_exact(&mut length_buf).await {
+                    error!("{client_addr}: error reading message length: {e}");
+                    return;
+                }
+                let message_length = BigEndian::read_u16(&length_buf) as usize;
+                debug!("{client_addr}: message is {message_length} bytes");
+
+                let mut message_buf = vec![0u8; message_length];
+                if let Err(e) = socket.read_exact(&mut message_buf).await {
+                    error!("{client_addr}: error reading message: {e}");
+                    return;
+                }
+
+                match crate::pdns::PbdnsMessage::decode(&message_buf[..]) {
+                    Ok(msg) => {
+                        debug!("{client_addr}: {msg:?}");
+                        task_tx
+                            .send(format!("{}", ClientMessage { client_addr, msg }))
+                            .unwrap();
+                    }
+                    Err(e) => {
+                        warn!("{client_addr}: error decoding message: {e}");
+                    }
+                }
+            }
+        });
+    }
+}
diff --git a/contrib/pblogger-rs/src/main.rs b/contrib/pblogger-rs/src/main.rs
new file mode 100644 (file)
index 0000000..890f5d6
--- /dev/null
@@ -0,0 +1,68 @@
+use anyhow::Result;
+use clap::Parser;
+use log::error;
+use std::net::IpAddr;
+use std::ops::RangeInclusive;
+
+mod display;
+mod listener;
+mod pdns {
+    #![allow(clippy::all, clippy::pedantic)]
+    include!(concat!(env!("OUT_DIR"), "/pdns.rs"));
+}
+
+#[derive(Parser)]
+#[command(version, about)]
+struct Cli {
+    #[arg(value_parser = ip_address)]
+    listen_address: IpAddr,
+    #[arg(value_parser = port_in_range)]
+    listen_port: u16,
+    #[command(flatten)]
+    verbosity: clap_verbosity_flag::Verbosity,
+}
+
+fn ip_address(s: &str) -> Result<IpAddr, String> {
+    let addr: IpAddr = s
+        .parse()
+        .map_err(|_| format!("`{s}` isn't a valid IP address"))?;
+
+    Ok(addr)
+}
+
+const PORT_RANGE: RangeInclusive<usize> = 1..=65535;
+
+fn port_in_range(s: &str) -> Result<u16, String> {
+    let port: usize = s
+        .parse()
+        .map_err(|_| format!("`{s}` isn't a port number"))?;
+    if PORT_RANGE.contains(&port) {
+        Ok(u16::try_from(port).unwrap())
+    } else {
+        Err(format!(
+            "port not in range {}-{}",
+            PORT_RANGE.start(),
+            PORT_RANGE.end()
+        ))
+    }
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let args = Cli::parse();
+    env_logger::Builder::new()
+        .filter_level(args.verbosity.into())
+        .init();
+
+    if let Err(e) = listener::listen(args.listen_address, args.listen_port).await {
+        error!("Listener failed with error: `{e}`");
+    }
+
+    Ok(())
+}
+
+#[test]
+fn verify_cli() {
+    use clap::CommandFactory;
+    Cli::command().debug_assert();
+}