--- /dev/null
+use crate::pdns::{PbdnsMessage, pbdns_message};
+use byteorder::{ByteOrder, NetworkEndian};
+use chrono::DateTime;
+use std::fmt;
+use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
+
+pub struct ClientMessage {
+ pub client_addr: SocketAddr,
+ pub msg: PbdnsMessage,
+}
+
+#[derive(Clone, Copy)]
+enum Direction {
+ In,
+ Out,
+}
+
+fn make_hex_string(bytes: &[u8]) -> String {
+ bytes
+ .iter()
+ .map(|b| format!("{b:02X}").to_string())
+ .collect::<String>()
+}
+
+fn make_addr_port(
+ msg_family: Option<i32>,
+ msg_addr: Option<&Vec<u8>>,
+ msg_port: Option<u32>,
+) -> String {
+ if let Some(family) = msg_family
+ && let Some(addr) = msg_addr
+ {
+ let fromaddr = match pbdns_message::SocketFamily::try_from(family) {
+ Ok(pbdns_message::SocketFamily::Inet) => {
+ Ipv4Addr::from_bits(NetworkEndian::read_u32(&addr[0..4])).to_string()
+ }
+ Ok(pbdns_message::SocketFamily::Inet6) => {
+ format!(
+ "[{}]",
+ Ipv6Addr::from_bits(NetworkEndian::read_u128(&addr[0..16]))
+ )
+ }
+ Err(_) => "unsupported".into(),
+ };
+ if let Some(port) = msg_port {
+ return format!("{fromaddr}:{port}");
+ }
+ return fromaddr;
+ }
+
+ "unknown".into()
+}
+
+#[allow(clippy::too_many_lines)]
+fn print_summary(cmsg: &ClientMessage, dir: Direction, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", cmsg.client_addr)?;
+
+ write!(
+ f,
+ " {}",
+ match cmsg.msg.time_sec {
+ Some(epoch_secs) => {
+ let mut micros = i64::from(epoch_secs) * 1_000_000;
+ if let Some(epoch_usec) = cmsg.msg.time_usec {
+ micros += i64::from(epoch_usec);
+ }
+ DateTime::from_timestamp_micros(micros)
+ .unwrap()
+ .to_rfc3339_opts(chrono::SecondsFormat::Micros, false)
+ }
+ None => "unknown".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " {} ({})",
+ match cmsg.msg.r#type() {
+ pbdns_message::Type::DnsQueryType | pbdns_message::Type::DnsOutgoingQueryType =>
+ "Query",
+ pbdns_message::Type::DnsResponseType | pbdns_message::Type::DnsIncomingResponseType =>
+ "Response",
+ },
+ match dir {
+ Direction::In => "I",
+ Direction::Out => "O",
+ }
+ )?;
+
+ write!(
+ f,
+ " {}",
+ match cmsg.msg.in_bytes {
+ Some(bytes) => bytes.to_string(),
+ None => "unknown".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " {} {}",
+ match dir {
+ Direction::In => make_addr_port(
+ cmsg.msg.socket_family,
+ cmsg.msg.from.as_ref(),
+ cmsg.msg.from_port
+ ),
+ Direction::Out => make_addr_port(
+ cmsg.msg.socket_family,
+ cmsg.msg.to.as_ref(),
+ cmsg.msg.to_port
+ ),
+ },
+ match dir {
+ Direction::In => make_addr_port(
+ cmsg.msg.socket_family,
+ cmsg.msg.to.as_ref(),
+ cmsg.msg.to_port
+ ),
+ Direction::Out => make_addr_port(
+ cmsg.msg.socket_family,
+ cmsg.msg.from.as_ref(),
+ cmsg.msg.from_port
+ ),
+ }
+ )?;
+
+ write!(
+ f,
+ " {}",
+ match &cmsg.msg.original_requestor_subnet {
+ Some(addr) => match addr.len() {
+ 4 => Ipv4Addr::from_bits(NetworkEndian::read_u32(&addr[0..4])).to_string(),
+ 16 => format!(
+ "[{}]",
+ Ipv6Addr::from_bits(NetworkEndian::read_u128(&addr[0..16]))
+ ),
+ _ => "unsupported".into(),
+ },
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " {}",
+ match cmsg.msg.socket_protocol {
+ Some(proto) => match pbdns_message::SocketProtocol::try_from(proto) {
+ Ok(p) => p.as_str_name(),
+ Err(_) => "unsupported",
+ },
+ None => "unknown",
+ }
+ )?;
+
+ write!(
+ f,
+ " {}",
+ // msg.id and message_id are optional in the protobuf schema, but will always be present
+ match (
+ cmsg.msg.id,
+ &cmsg.msg.message_id,
+ &cmsg.msg.initial_request_id
+ ) {
+ (Some(id), Some(msg_id), None) => format!("id: {id} uuid: {}", make_hex_string(msg_id)),
+ (Some(id), Some(msg_id), Some(initial_id)) => format!(
+ "id: {id} uuid: {}, initial uuid: {}",
+ make_hex_string(msg_id),
+ make_hex_string(initial_id)
+ ),
+ (_, _, _) => unreachable!(),
+ }
+ )?;
+
+ write!(
+ f,
+ " requestorid: {}",
+ match &cmsg.msg.requestor_id {
+ Some(id) => id.clone(),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " deviceid: {}",
+ match &cmsg.msg.device_id {
+ Some(id) => make_hex_string(id),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " devicename: {}",
+ match &cmsg.msg.device_name {
+ Some(name) => name.clone(),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " serverid: {}",
+ // server_identity is not a string in the protobuf schema, but should be
+ match &cmsg.msg.server_identity {
+ Some(id) => String::from_utf8_lossy(id),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " nod: {}",
+ match cmsg.msg.newly_observed_domain {
+ Some(nod) => nod.to_string(),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " workerId: {}",
+ match &cmsg.msg.worker_id {
+ Some(id) => id.to_string(),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " pcCacheHit: {}",
+ match cmsg.msg.packet_cache_hit {
+ Some(hit) => hit.to_string(),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " outgoingQueries: {}",
+ match cmsg.msg.outgoing_queries {
+ Some(queries) => queries.to_string(),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " headerFlags: {}",
+ match cmsg.msg.header_flags {
+ Some(flags) => format!("{:#08X}", u32::from_be(flags)),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(
+ f,
+ " ednsVersion: {}",
+ match cmsg.msg.edns_version {
+ Some(version) => format!("{:#08X}", u32::from_be(version)),
+ None => "N/A".into(),
+ }
+ )?;
+
+ write!(f, " openTelemetryData: len N/A")?;
+
+ writeln!(f)?;
+
+ Ok(())
+}
+
+fn print_meta(cmsg: &ClientMessage, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ for m in &cmsg.msg.meta {
+ let values = m
+ .value
+ .string_val
+ .clone()
+ .into_iter()
+ .chain(m.value.int_val.clone().into_iter().map(|v| v.to_string()))
+ .collect::<Vec<String>>()
+ .join(", ");
+ writeln!(f, "{} - meta {} -> {}", cmsg.client_addr, m.key, values)?;
+ }
+
+ Ok(())
+}
+
+fn print_query(cmsg: &ClientMessage, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match &cmsg.msg.question {
+ Some(q) => writeln!(
+ f,
+ "{} - Question {}, {}. {}",
+ cmsg.client_addr,
+ q.q_class.unwrap_or(1),
+ match q.q_type {
+ Some(t) => t.to_string(),
+ None => "unknown".into(),
+ },
+ match q.q_name.clone() {
+ Some(n) => n,
+ None => "unknown".into(),
+ }
+ ),
+ None => Ok(()),
+ }
+}
+
+impl fmt::Display for ClientMessage {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.msg.r#type() {
+ pbdns_message::Type::DnsQueryType | pbdns_message::Type::DnsIncomingResponseType => {
+ print_summary(self, Direction::In, f)?;
+ print_meta(self, f)?;
+ print_query(self, f)
+ }
+ pbdns_message::Type::DnsResponseType | pbdns_message::Type::DnsOutgoingQueryType => {
+ print_summary(self, Direction::Out, f)?;
+ print_meta(self, f)?;
+ print_query(self, f)
+ }
+ }
+ }
+}
--- /dev/null
+use crate::display::ClientMessage;
+use anyhow::Result;
+use byteorder::{BigEndian, ByteOrder};
+use log::{debug, error, info, warn};
+use prost::Message;
+use std::net::IpAddr;
+use tokio::io::AsyncReadExt;
+use tokio::net::TcpListener;
+use tokio::sync::mpsc;
+
+pub async fn listen(address: IpAddr, port: u16) -> Result<()> {
+ let listener = TcpListener::bind((address, port)).await?;
+ let (tx, mut rx) = mpsc::unbounded_channel::<String>();
+
+ tokio::spawn(async move {
+ while let Some(message) = rx.recv().await {
+ print!("{message}");
+ }
+ });
+
+ info!("Ready to accept connections on {address}:{port}");
+
+ loop {
+ let (mut socket, client_addr) = listener.accept().await?;
+ let task_tx = tx.clone();
+
+ tokio::spawn(async move {
+ let mut length_buf = [0; 2];
+ info!("{client_addr}: connection accepted");
+
+ loop {
+ if let Err(e) = socket.read_exact(&mut length_buf).await {
+ error!("{client_addr}: error reading message length: {e}");
+ return;
+ }
+ let message_length = BigEndian::read_u16(&length_buf) as usize;
+ debug!("{client_addr}: message is {message_length} bytes");
+
+ let mut message_buf = vec![0u8; message_length];
+ if let Err(e) = socket.read_exact(&mut message_buf).await {
+ error!("{client_addr}: error reading message: {e}");
+ return;
+ }
+
+ match crate::pdns::PbdnsMessage::decode(&message_buf[..]) {
+ Ok(msg) => {
+ debug!("{client_addr}: {msg:?}");
+ task_tx
+ .send(format!("{}", ClientMessage { client_addr, msg }))
+ .unwrap();
+ }
+ Err(e) => {
+ warn!("{client_addr}: error decoding message: {e}");
+ }
+ }
+ }
+ });
+ }
+}
--- /dev/null
+use anyhow::Result;
+use clap::Parser;
+use log::error;
+use std::net::IpAddr;
+use std::ops::RangeInclusive;
+
+mod display;
+mod listener;
+mod pdns {
+ #![allow(clippy::all, clippy::pedantic)]
+ include!(concat!(env!("OUT_DIR"), "/pdns.rs"));
+}
+
+#[derive(Parser)]
+#[command(version, about)]
+struct Cli {
+ #[arg(value_parser = ip_address)]
+ listen_address: IpAddr,
+ #[arg(value_parser = port_in_range)]
+ listen_port: u16,
+ #[command(flatten)]
+ verbosity: clap_verbosity_flag::Verbosity,
+}
+
+fn ip_address(s: &str) -> Result<IpAddr, String> {
+ let addr: IpAddr = s
+ .parse()
+ .map_err(|_| format!("`{s}` isn't a valid IP address"))?;
+
+ Ok(addr)
+}
+
+const PORT_RANGE: RangeInclusive<usize> = 1..=65535;
+
+fn port_in_range(s: &str) -> Result<u16, String> {
+ let port: usize = s
+ .parse()
+ .map_err(|_| format!("`{s}` isn't a port number"))?;
+ if PORT_RANGE.contains(&port) {
+ Ok(u16::try_from(port).unwrap())
+ } else {
+ Err(format!(
+ "port not in range {}-{}",
+ PORT_RANGE.start(),
+ PORT_RANGE.end()
+ ))
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Cli::parse();
+ env_logger::Builder::new()
+ .filter_level(args.verbosity.into())
+ .init();
+
+ if let Err(e) = listener::listen(args.listen_address, args.listen_port).await {
+ error!("Listener failed with error: `{e}`");
+ }
+
+ Ok(())
+}
+
+#[test]
+fn verify_cli() {
+ use clap::CommandFactory;
+ Cli::command().debug_assert();
+}