From 0bcc209d949f1449da9035a1ee5a2a18339f0574 Mon Sep 17 00:00:00 2001 From: David Mulder Date: Fri, 23 Aug 2024 14:51:24 -0600 Subject: [PATCH] Properly handle read/write from the client socket Signed-off-by: David Mulder Reviewed-by: Alexander Bokovoy --- rust/Cargo.lock | 2 ++ rust/sock/Cargo.toml | 2 ++ rust/sock/src/lib.rs | 67 ++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 393707f07e7..9a2af4b9f87 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2016,6 +2016,8 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" name = "sock" version = "4.21.0" dependencies = [ + "chelps", + "dbg", "libc", "libnss", "ntstatus_gen", diff --git a/rust/sock/Cargo.toml b/rust/sock/Cargo.toml index 7f00c71a0f9..be649bee9b4 100644 --- a/rust/sock/Cargo.toml +++ b/rust/sock/Cargo.toml @@ -6,6 +6,8 @@ homepage.workspace = true version.workspace = true [dependencies] +chelps.workspace = true +dbg.workspace = true libc.workspace = true libnss = "0.8.0" ntstatus_gen.workspace = true diff --git a/rust/sock/src/lib.rs b/rust/sock/src/lib.rs index ea07c5b064e..9ccddde5027 100644 --- a/rust/sock/src/lib.rs +++ b/rust/sock/src/lib.rs @@ -22,6 +22,7 @@ mod proto; pub use proto::*; +use dbg::DBG_ERR; use ntstatus_gen::*; use param::LoadParm; use serde_json::{from_slice as json_from_slice, to_vec as json_to_vec}; @@ -29,7 +30,7 @@ use std::error::Error; use std::io::{Read, Write}; use std::os::unix::net::UnixStream; use std::path::{Path, PathBuf}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; pub struct ClientStream { stream: UnixStream, @@ -38,7 +39,13 @@ pub struct ClientStream { impl ClientStream { pub fn new(path: &str) -> Result> { Ok(ClientStream { - stream: UnixStream::connect(path)?, + stream: UnixStream::connect(path) + .map_err(|e| { + DBG_ERR!("Unix socket stream setup error while connecting to {}: {:?}", + path, e + ); + e + })?, }) } @@ -47,14 +54,64 @@ impl ClientStream { req: &Request, timeout: u64, ) -> Result> { + // Set the timeout let timeout = Duration::from_secs(timeout); self.stream.set_read_timeout(Some(timeout))?; self.stream.set_write_timeout(Some(timeout))?; + + // Encode the request as bytes let req_bytes = json_to_vec(req)?; + + // Send the request self.stream.write_all(&req_bytes)?; - let mut buf = Vec::new(); - self.stream.read_to_end(&mut buf)?; - let resp: Response = json_from_slice(&buf)?; + + // Now wait on the response + let start = SystemTime::now(); + let mut read_started = false; + let mut data = Vec::with_capacity(1024); + let mut counter = 0; + + loop { + let mut buffer = [0; 1024]; + let durr = + SystemTime::now().duration_since(start).map_err(Box::new)?; + if durr > timeout { + DBG_ERR!("Socket timeout"); + break; + } + match self.stream.read(&mut buffer) { + Ok(0) => { + if read_started { + break; + } else { + continue; + } + } + Ok(count) => { + data.extend_from_slice(&buffer); + counter += count; + if count == 1024 { + read_started = true; + continue; + } else { + break; + } + } + Err(e) => { + DBG_ERR!( + "Stream read failure from {:?}: {:?}", + &self.stream, + e + ); + return Err(Box::new(e)); + } + } + } + + data.truncate(counter); + + // Now decode the response + let resp: Response = json_from_slice(data.as_slice())?; Ok(resp) } } -- 2.47.3