mod proto;
pub use proto::*;
+use dbg::DBG_ERR;
use ntstatus_gen::*;
use param::LoadParm;
use serde_json::{from_slice as json_from_slice, to_vec as json_to_vec};
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
pub struct ClientStream {
stream: UnixStream,
impl ClientStream {
pub fn new(path: &str) -> Result<Self, Box<dyn Error>> {
Ok(ClientStream {
- stream: UnixStream::connect(path)?,
+ stream: UnixStream::connect(path)
+ .map_err(|e| {
+ DBG_ERR!("Unix socket stream setup error while connecting to {}: {:?}",
+ path, e
+ );
+ e
+ })?,
})
}
req: &Request,
timeout: u64,
) -> Result<Response, Box<dyn Error>> {
+ // Set the timeout
let timeout = Duration::from_secs(timeout);
self.stream.set_read_timeout(Some(timeout))?;
self.stream.set_write_timeout(Some(timeout))?;
+
+ // Encode the request as bytes
let req_bytes = json_to_vec(req)?;
+
+ // Send the request
self.stream.write_all(&req_bytes)?;
- let mut buf = Vec::new();
- self.stream.read_to_end(&mut buf)?;
- let resp: Response = json_from_slice(&buf)?;
+
+ // Now wait on the response
+ let start = SystemTime::now();
+ let mut read_started = false;
+ let mut data = Vec::with_capacity(1024);
+ let mut counter = 0;
+
+ loop {
+ let mut buffer = [0; 1024];
+ let durr =
+ SystemTime::now().duration_since(start).map_err(Box::new)?;
+ if durr > timeout {
+ DBG_ERR!("Socket timeout");
+ break;
+ }
+ match self.stream.read(&mut buffer) {
+ Ok(0) => {
+ if read_started {
+ break;
+ } else {
+ continue;
+ }
+ }
+ Ok(count) => {
+ data.extend_from_slice(&buffer);
+ counter += count;
+ if count == 1024 {
+ read_started = true;
+ continue;
+ } else {
+ break;
+ }
+ }
+ Err(e) => {
+ DBG_ERR!(
+ "Stream read failure from {:?}: {:?}",
+ &self.stream,
+ e
+ );
+ return Err(Box::new(e));
+ }
+ }
+ }
+
+ data.truncate(counter);
+
+ // Now decode the response
+ let resp: Response = json_from_slice(data.as_slice())?;
Ok(resp)
}
}