From: Kevin P. Fleming Date: Sat, 5 Jul 2025 20:27:24 +0000 (-0400) Subject: Add Protobuf logger written in Rust. X-Git-Tag: rec-5.3.0-alpha2~1^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=73e0f85707cd147b032f0e98d4898fe11e152188;p=thirdparty%2Fpdns.git Add Protobuf logger written in Rust. This was inspired by the ProtobufLogger.py already present in the 'contrib' directory. In addition to being written in Rust instead of Python, there are various other differences: * Each line of output is prefixed with the sender's 'socket address' (IP address and port number). * Messages from multiple clients will be properly output, they will not be mixed. * Timestamp format is slightly different (full ISO-8601 with UTC offset). * Command-line arguments are handled by a full parser, which can generate help text and report the program's version. * All 'optional' fields in the protobuf messages are checked for presence before being read. * Output to stdout will never block reception/decoding/formatting of protobuf messages; if stdout blocks for some reason, incoming messages will be stored in memory until they can be printed. * Summary, meta, and question lines are printed; responses are not, nor is OpenTelemetry data. Future work for another contributor! * 'meta' output is untested. * A Cargo feature 'opentelemetry' is available to be the starting point of OT support. No AI or LLM tools were used in the creation or testing of this code. --- diff --git a/contrib/pblogger-rs/.gitignore b/contrib/pblogger-rs/.gitignore new file mode 100644 index 0000000000..a9d37c560c --- /dev/null +++ b/contrib/pblogger-rs/.gitignore @@ -0,0 +1,2 @@ +target +Cargo.lock diff --git a/contrib/pblogger-rs/Cargo.toml b/contrib/pblogger-rs/Cargo.toml new file mode 100644 index 0000000000..ab57a92a80 --- /dev/null +++ b/contrib/pblogger-rs/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "pblogger-rs" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.98" +byteorder = "1.5.0" +chrono = "0.4.41" +clap = { version = "4.5.40", features = ["derive"] } +clap-verbosity-flag = "3.0.3" +env_logger = "0.11.8" +log = "0.4.27" +prost = "0.14.1" +tokio = { version = "1.46.0", features = ["full"] } + +[build-dependencies] +prost-build = "0.14.1" + +[features] +opentelemetry = [] + +[lints.clippy] +pedantic = { level = "warn", priority = -1 } +cargo = { level = "warn", priority = -1 } +cargo_common_metadata = "allow" +multiple_crate_versions = "allow" # necessary for prost crates diff --git a/contrib/pblogger-rs/build.rs b/contrib/pblogger-rs/build.rs new file mode 100644 index 0000000000..5914979adb --- /dev/null +++ b/contrib/pblogger-rs/build.rs @@ -0,0 +1,13 @@ +use std::io::Result; + +fn main() -> Result<()> { + let mut prost_build = prost_build::Config::new(); + + prost_build.default_package_filename("pdns"); + prost_build.compile_protos(&["dnsmessage.proto"], &["../../pdns"])?; + + #[cfg(feature = "opentelemetry")] + todo!(); + + Ok(()) +} diff --git a/contrib/pblogger-rs/rust-toolchain.toml b/contrib/pblogger-rs/rust-toolchain.toml new file mode 100644 index 0000000000..02cb8fcb53 --- /dev/null +++ b/contrib/pblogger-rs/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +profile = "default" diff --git a/contrib/pblogger-rs/rustfmt.toml b/contrib/pblogger-rs/rustfmt.toml new file mode 100644 index 0000000000..f3e454b618 --- /dev/null +++ b/contrib/pblogger-rs/rustfmt.toml @@ -0,0 +1,2 @@ +edition = "2024" +style_edition = "2024" diff --git a/contrib/pblogger-rs/src/display.rs b/contrib/pblogger-rs/src/display.rs new file mode 100644 index 0000000000..d85f0ca62e --- /dev/null +++ b/contrib/pblogger-rs/src/display.rs @@ -0,0 +1,324 @@ +use crate::pdns::{PbdnsMessage, pbdns_message}; +use byteorder::{ByteOrder, NetworkEndian}; +use chrono::DateTime; +use std::fmt; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; + +pub struct ClientMessage { + pub client_addr: SocketAddr, + pub msg: PbdnsMessage, +} + +#[derive(Clone, Copy)] +enum Direction { + In, + Out, +} + +fn make_hex_string(bytes: &[u8]) -> String { + bytes + .iter() + .map(|b| format!("{b:02X}").to_string()) + .collect::() +} + +fn make_addr_port( + msg_family: Option, + msg_addr: Option<&Vec>, + msg_port: Option, +) -> String { + if let Some(family) = msg_family + && let Some(addr) = msg_addr + { + let fromaddr = match pbdns_message::SocketFamily::try_from(family) { + Ok(pbdns_message::SocketFamily::Inet) => { + Ipv4Addr::from_bits(NetworkEndian::read_u32(&addr[0..4])).to_string() + } + Ok(pbdns_message::SocketFamily::Inet6) => { + format!( + "[{}]", + Ipv6Addr::from_bits(NetworkEndian::read_u128(&addr[0..16])) + ) + } + Err(_) => "unsupported".into(), + }; + if let Some(port) = msg_port { + return format!("{fromaddr}:{port}"); + } + return fromaddr; + } + + "unknown".into() +} + +#[allow(clippy::too_many_lines)] +fn print_summary(cmsg: &ClientMessage, dir: Direction, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", cmsg.client_addr)?; + + write!( + f, + " {}", + match cmsg.msg.time_sec { + Some(epoch_secs) => { + let mut micros = i64::from(epoch_secs) * 1_000_000; + if let Some(epoch_usec) = cmsg.msg.time_usec { + micros += i64::from(epoch_usec); + } + DateTime::from_timestamp_micros(micros) + .unwrap() + .to_rfc3339_opts(chrono::SecondsFormat::Micros, false) + } + None => "unknown".into(), + } + )?; + + write!( + f, + " {} ({})", + match cmsg.msg.r#type() { + pbdns_message::Type::DnsQueryType | pbdns_message::Type::DnsOutgoingQueryType => + "Query", + pbdns_message::Type::DnsResponseType | pbdns_message::Type::DnsIncomingResponseType => + "Response", + }, + match dir { + Direction::In => "I", + Direction::Out => "O", + } + )?; + + write!( + f, + " {}", + match cmsg.msg.in_bytes { + Some(bytes) => bytes.to_string(), + None => "unknown".into(), + } + )?; + + write!( + f, + " {} {}", + match dir { + Direction::In => make_addr_port( + cmsg.msg.socket_family, + cmsg.msg.from.as_ref(), + cmsg.msg.from_port + ), + Direction::Out => make_addr_port( + cmsg.msg.socket_family, + cmsg.msg.to.as_ref(), + cmsg.msg.to_port + ), + }, + match dir { + Direction::In => make_addr_port( + cmsg.msg.socket_family, + cmsg.msg.to.as_ref(), + cmsg.msg.to_port + ), + Direction::Out => make_addr_port( + cmsg.msg.socket_family, + cmsg.msg.from.as_ref(), + cmsg.msg.from_port + ), + } + )?; + + write!( + f, + " {}", + match &cmsg.msg.original_requestor_subnet { + Some(addr) => match addr.len() { + 4 => Ipv4Addr::from_bits(NetworkEndian::read_u32(&addr[0..4])).to_string(), + 16 => format!( + "[{}]", + Ipv6Addr::from_bits(NetworkEndian::read_u128(&addr[0..16])) + ), + _ => "unsupported".into(), + }, + None => "N/A".into(), + } + )?; + + write!( + f, + " {}", + match cmsg.msg.socket_protocol { + Some(proto) => match pbdns_message::SocketProtocol::try_from(proto) { + Ok(p) => p.as_str_name(), + Err(_) => "unsupported", + }, + None => "unknown", + } + )?; + + write!( + f, + " {}", + // msg.id and message_id are optional in the protobuf schema, but will always be present + match ( + cmsg.msg.id, + &cmsg.msg.message_id, + &cmsg.msg.initial_request_id + ) { + (Some(id), Some(msg_id), None) => format!("id: {id} uuid: {}", make_hex_string(msg_id)), + (Some(id), Some(msg_id), Some(initial_id)) => format!( + "id: {id} uuid: {}, initial uuid: {}", + make_hex_string(msg_id), + make_hex_string(initial_id) + ), + (_, _, _) => unreachable!(), + } + )?; + + write!( + f, + " requestorid: {}", + match &cmsg.msg.requestor_id { + Some(id) => id.clone(), + None => "N/A".into(), + } + )?; + + write!( + f, + " deviceid: {}", + match &cmsg.msg.device_id { + Some(id) => make_hex_string(id), + None => "N/A".into(), + } + )?; + + write!( + f, + " devicename: {}", + match &cmsg.msg.device_name { + Some(name) => name.clone(), + None => "N/A".into(), + } + )?; + + write!( + f, + " serverid: {}", + // server_identity is not a string in the protobuf schema, but should be + match &cmsg.msg.server_identity { + Some(id) => String::from_utf8_lossy(id), + None => "N/A".into(), + } + )?; + + write!( + f, + " nod: {}", + match cmsg.msg.newly_observed_domain { + Some(nod) => nod.to_string(), + None => "N/A".into(), + } + )?; + + write!( + f, + " workerId: {}", + match &cmsg.msg.worker_id { + Some(id) => id.to_string(), + None => "N/A".into(), + } + )?; + + write!( + f, + " pcCacheHit: {}", + match cmsg.msg.packet_cache_hit { + Some(hit) => hit.to_string(), + None => "N/A".into(), + } + )?; + + write!( + f, + " outgoingQueries: {}", + match cmsg.msg.outgoing_queries { + Some(queries) => queries.to_string(), + None => "N/A".into(), + } + )?; + + write!( + f, + " headerFlags: {}", + match cmsg.msg.header_flags { + Some(flags) => format!("{:#08X}", u32::from_be(flags)), + None => "N/A".into(), + } + )?; + + write!( + f, + " ednsVersion: {}", + match cmsg.msg.edns_version { + Some(version) => format!("{:#08X}", u32::from_be(version)), + None => "N/A".into(), + } + )?; + + write!(f, " openTelemetryData: len N/A")?; + + writeln!(f)?; + + Ok(()) +} + +fn print_meta(cmsg: &ClientMessage, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for m in &cmsg.msg.meta { + let values = m + .value + .string_val + .clone() + .into_iter() + .chain(m.value.int_val.clone().into_iter().map(|v| v.to_string())) + .collect::>() + .join(", "); + writeln!(f, "{} - meta {} -> {}", cmsg.client_addr, m.key, values)?; + } + + Ok(()) +} + +fn print_query(cmsg: &ClientMessage, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &cmsg.msg.question { + Some(q) => writeln!( + f, + "{} - Question {}, {}. {}", + cmsg.client_addr, + q.q_class.unwrap_or(1), + match q.q_type { + Some(t) => t.to_string(), + None => "unknown".into(), + }, + match q.q_name.clone() { + Some(n) => n, + None => "unknown".into(), + } + ), + None => Ok(()), + } +} + +impl fmt::Display for ClientMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.msg.r#type() { + pbdns_message::Type::DnsQueryType | pbdns_message::Type::DnsIncomingResponseType => { + print_summary(self, Direction::In, f)?; + print_meta(self, f)?; + print_query(self, f) + } + pbdns_message::Type::DnsResponseType | pbdns_message::Type::DnsOutgoingQueryType => { + print_summary(self, Direction::Out, f)?; + print_meta(self, f)?; + print_query(self, f) + } + } + } +} diff --git a/contrib/pblogger-rs/src/listener.rs b/contrib/pblogger-rs/src/listener.rs new file mode 100644 index 0000000000..949c29ef41 --- /dev/null +++ b/contrib/pblogger-rs/src/listener.rs @@ -0,0 +1,59 @@ +use crate::display::ClientMessage; +use anyhow::Result; +use byteorder::{BigEndian, ByteOrder}; +use log::{debug, error, info, warn}; +use prost::Message; +use std::net::IpAddr; +use tokio::io::AsyncReadExt; +use tokio::net::TcpListener; +use tokio::sync::mpsc; + +pub async fn listen(address: IpAddr, port: u16) -> Result<()> { + let listener = TcpListener::bind((address, port)).await?; + let (tx, mut rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + while let Some(message) = rx.recv().await { + print!("{message}"); + } + }); + + info!("Ready to accept connections on {address}:{port}"); + + loop { + let (mut socket, client_addr) = listener.accept().await?; + let task_tx = tx.clone(); + + tokio::spawn(async move { + let mut length_buf = [0; 2]; + info!("{client_addr}: connection accepted"); + + loop { + if let Err(e) = socket.read_exact(&mut length_buf).await { + error!("{client_addr}: error reading message length: {e}"); + return; + } + let message_length = BigEndian::read_u16(&length_buf) as usize; + debug!("{client_addr}: message is {message_length} bytes"); + + let mut message_buf = vec![0u8; message_length]; + if let Err(e) = socket.read_exact(&mut message_buf).await { + error!("{client_addr}: error reading message: {e}"); + return; + } + + match crate::pdns::PbdnsMessage::decode(&message_buf[..]) { + Ok(msg) => { + debug!("{client_addr}: {msg:?}"); + task_tx + .send(format!("{}", ClientMessage { client_addr, msg })) + .unwrap(); + } + Err(e) => { + warn!("{client_addr}: error decoding message: {e}"); + } + } + } + }); + } +} diff --git a/contrib/pblogger-rs/src/main.rs b/contrib/pblogger-rs/src/main.rs new file mode 100644 index 0000000000..890f5d629d --- /dev/null +++ b/contrib/pblogger-rs/src/main.rs @@ -0,0 +1,68 @@ +use anyhow::Result; +use clap::Parser; +use log::error; +use std::net::IpAddr; +use std::ops::RangeInclusive; + +mod display; +mod listener; +mod pdns { + #![allow(clippy::all, clippy::pedantic)] + include!(concat!(env!("OUT_DIR"), "/pdns.rs")); +} + +#[derive(Parser)] +#[command(version, about)] +struct Cli { + #[arg(value_parser = ip_address)] + listen_address: IpAddr, + #[arg(value_parser = port_in_range)] + listen_port: u16, + #[command(flatten)] + verbosity: clap_verbosity_flag::Verbosity, +} + +fn ip_address(s: &str) -> Result { + let addr: IpAddr = s + .parse() + .map_err(|_| format!("`{s}` isn't a valid IP address"))?; + + Ok(addr) +} + +const PORT_RANGE: RangeInclusive = 1..=65535; + +fn port_in_range(s: &str) -> Result { + let port: usize = s + .parse() + .map_err(|_| format!("`{s}` isn't a port number"))?; + if PORT_RANGE.contains(&port) { + Ok(u16::try_from(port).unwrap()) + } else { + Err(format!( + "port not in range {}-{}", + PORT_RANGE.start(), + PORT_RANGE.end() + )) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Cli::parse(); + env_logger::Builder::new() + .filter_level(args.verbosity.into()) + .init(); + + if let Err(e) = listener::listen(args.listen_address, args.listen_port).await { + error!("Listener failed with error: `{e}`"); + } + + Ok(()) +} + +#[test] +fn verify_cli() { + use clap::CommandFactory; + Cli::command().debug_assert(); +}