}
if !tx.responses.is_empty() {
+ SCLogDebug!("Responses length: {}", tx.responses.len());
js.set_object("response", &log_response_object(tx)?)?;
}
js.close()?;
PgsqlBEMessage::AuthenticationOk(_)
| PgsqlBEMessage::AuthenticationCleartextPassword(_)
| PgsqlBEMessage::AuthenticationSASL(_)
- | PgsqlBEMessage::AuthenticationSASLContinue(_) => {
+ | PgsqlBEMessage::AuthenticationSASLContinue(_)
+ | PgsqlBEMessage::CopyDone(_) => {
jb.set_string("message", res.to_str())?;
}
PgsqlBEMessage::ParameterStatus(ParameterStatusMessage {
}) => {
// We take care of these elsewhere
}
+ PgsqlBEMessage::CopyOutResponse(CopyOutResponse {
+ identifier: _,
+ length: _,
+ column_cnt,
+ }) => {
+ jb.open_object(res.to_str())?;
+ jb.set_uint("copy_column_count", *column_cnt)?;
+ jb.close()?;
+ }
PgsqlBEMessage::BackendKeyData(BackendKeyDataMessage {
identifier: _,
length: _,
}) => {
// We don't want to log this one
}
+ PgsqlBEMessage::ConsolidatedCopyDataOut(ConsolidatedDataRowPacket {
+ identifier: _,
+ row_cnt,
+ data_size,
+ }) => {
+ jb.open_object(res.to_str())?;
+ jb.set_uint("row_count", *row_cnt)?;
+ jb.set_uint("data_size", *data_size)?;
+ jb.close()?;
+ }
PgsqlBEMessage::RowDescription(RowDescriptionMessage {
identifier: _,
length: _,
use nom7::number::streaming::{be_i16, be_i32};
use nom7::number::streaming::{be_u16, be_u32, be_u8};
use nom7::sequence::terminated;
-use nom7::{Err, IResult};
+use nom7::{Err, IResult, ToUsize};
pub const PGSQL_LENGTH_FIELD: u32 = 4;
#[derive(Debug, PartialEq, Eq)]
pub struct ConsolidatedDataRowPacket {
pub identifier: u8,
- pub row_cnt: u64,
+ pub row_cnt: u64, // row or msg cnt
pub data_size: u64,
}
pub payload: Vec<u8>,
}
+#[derive(Debug, PartialEq, Eq)]
+pub struct CopyOutResponse {
+ pub identifier: u8,
+ pub length: u32,
+ pub column_cnt: u16,
+ // for each column, there are column_cnt u16 format codes received
+ // for now, we're not storing those
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct TerminationMessage {
+ pub identifier: u8,
+ pub length: u32,
+}
+
#[derive(Debug, PartialEq, Eq)]
pub enum PgsqlBEMessage {
SSLResponse(SSLResponseMessage),
ParameterStatus(ParameterStatusMessage),
BackendKeyData(BackendKeyDataMessage),
CommandComplete(RegularPacket),
+ CopyOutResponse(CopyOutResponse),
+ ConsolidatedCopyDataOut(ConsolidatedDataRowPacket),
+ CopyDone(TerminationMessage),
ReadyForQuery(ReadyForQueryMessage),
RowDescription(RowDescriptionMessage),
ConsolidatedDataRow(ConsolidatedDataRowPacket),
PgsqlBEMessage::ParameterStatus(_) => "parameter_status",
PgsqlBEMessage::BackendKeyData(_) => "backend_key_data",
PgsqlBEMessage::CommandComplete(_) => "command_completed",
+ PgsqlBEMessage::CopyOutResponse(_) => "copy_out_response",
+ PgsqlBEMessage::ConsolidatedCopyDataOut(_) => "copy_data_out",
+ PgsqlBEMessage::CopyDone(_) => "copy_done",
PgsqlBEMessage::ReadyForQuery(_) => "ready_for_query",
PgsqlBEMessage::RowDescription(_) => "row_description",
PgsqlBEMessage::SSLResponse(SSLResponseMessage::InvalidResponse) => {
type SASLInitialResponse = (SASLAuthenticationMechanism, u32, Vec<u8>);
-#[derive(Debug, PartialEq, Eq)]
-pub struct TerminationMessage {
- pub identifier: u8,
- pub length: u32,
-}
-
#[derive(Debug, PartialEq, Eq)]
pub struct CancelRequestMessage {
pub pid: u32,
data_size
}
+pub fn parse_copy_out_response(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> {
+ let (i, identifier) = verify(be_u8, |&x| x == b'H')(i)?;
+ // copy out message : identifier (u8), length (u32), format (u8), cols (u16), formats (u16*cols)
+ let (i, length) = parse_gte_length(i, 8)?;
+ let (i, _format) = be_u8(i)?;
+ let (i, columns) = be_u16(i)?;
+ let (i, _formats) = many_m_n(0, columns.to_usize(), be_u16)(i)?;
+ Ok((
+ i,
+ PgsqlBEMessage::CopyOutResponse(CopyOutResponse {
+ identifier,
+ length,
+ column_cnt: columns,
+ })
+ ))
+}
+
+pub fn parse_consolidated_copy_data_out(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> {
+ let (i, identifier) = verify(be_u8, |&x| x == b'd')(i)?;
+ let (i, length) = parse_gte_length(i, 5)?;
+ let (i, _data) = take(length - PGSQL_LENGTH_FIELD)(i)?;
+ SCLogDebug!("data_size is {:?}", _data);
+ Ok((
+ i, PgsqlBEMessage::ConsolidatedCopyDataOut(ConsolidatedDataRowPacket {
+ identifier,
+ row_cnt: 1,
+ data_size: (length - PGSQL_LENGTH_FIELD) as u64 })
+ ))
+}
+
+fn parse_copy_done(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> {
+ let (i, identifier) = verify(be_u8, |&x| x == b'c')(i)?;
+ let (i, length) = parse_exact_length(i, PGSQL_LENGTH_FIELD)?;
+ Ok((
+ i, PgsqlBEMessage::CopyDone(TerminationMessage {
+ identifier,
+ length
+ })
+ ))
+}
+
// Currently, we don't store the actual DataRow messages, as those could easily become a burden, memory-wise
// We use ConsolidatedDataRow to store info we still want to log: message size.
// Later on, we calculate the number of lines the command actually returned by counting ConsolidatedDataRow messages
b'R' => pgsql_parse_authentication_message(i)?,
b'S' => parse_parameter_status_message(i)?,
b'C' => parse_command_complete(i)?,
+ b'c' => parse_copy_done(i)?,
b'Z' => parse_ready_for_query(i)?,
b'T' => parse_row_description(i)?,
b'A' => parse_notification_response(i)?,
b'D' => parse_consolidated_data_row(i)?,
+ b'd' => parse_consolidated_copy_data_out(i)?,
+ b'H' => parse_copy_out_response(i)?,
_ => {
let (i, identifier) = be_u8(i)?;
let (i, length) = parse_gte_length(i, PGSQL_LENGTH_FIELD)?;
CancelRequestReceived,
ConnectionTerminated,
// Related to Backend-received messages //
+ CopyOutResponseReceived,
+ CopyDataOutReceived,
+ CopyDoneReceived,
SSLRejectedReceived,
// SSPIAuthenticationReceived, // TODO implement
SASLAuthenticationReceived,
}
PgsqlBEMessage::ReadyForQuery(_) => Some(PgsqlStateProgress::ReadyForQueryReceived),
// TODO should we store any Parameter Status in PgsqlState?
+ // TODO -- For CopyBoth mode, parameterstatus may be important (replication parameter)
PgsqlBEMessage::AuthenticationMD5Password(_)
| PgsqlBEMessage::AuthenticationCleartextPassword(_) => {
Some(PgsqlStateProgress::SimpleAuthenticationReceived)
}
PgsqlBEMessage::RowDescription(_) => Some(PgsqlStateProgress::RowDescriptionReceived),
+ PgsqlBEMessage::CopyOutResponse(_) => Some(PgsqlStateProgress::CopyOutResponseReceived),
PgsqlBEMessage::ConsolidatedDataRow(msg) => {
// Increment tx.data_size here, since we know msg type, so that we can later on log that info
self.transactions.back_mut()?.sum_data_size(msg.data_size);
Some(PgsqlStateProgress::DataRowReceived)
}
+ PgsqlBEMessage::ConsolidatedCopyDataOut(msg) => {
+ // Increment tx.data_size here, since we know msg type, so that we can later on log that info
+ self.transactions.back_mut()?.sum_data_size(msg.data_size);
+ Some(PgsqlStateProgress::CopyDataOutReceived)
+ }
+ PgsqlBEMessage::CopyDone(_) => Some(PgsqlStateProgress::CopyDoneReceived),
PgsqlBEMessage::CommandComplete(_) => {
// TODO Do we want to compare the command that was stored when
// query was sent with what we received here?
PgsqlBEMessage::ErrorResponse(_) => Some(PgsqlStateProgress::ErrorMessageReceived),
_ => {
// We don't always have to change current state when we see a response...
+ // NotificationResponse and NoticeResponse fall here
None
}
}
);
tx.responses.push(dummy_resp);
tx.responses.push(response);
+ // reset values
+ tx.data_row_cnt = 0;
+ tx.data_size = 0;
+ } else if state == PgsqlStateProgress::CopyDataOutReceived {
+ tx.incr_row_cnt();
+ } else if state == PgsqlStateProgress::CopyDoneReceived && tx.get_row_cnt() > 0 {
+ // let's summarize the info from the data_rows in one response
+ let dummy_resp = PgsqlBEMessage::ConsolidatedCopyDataOut(
+ ConsolidatedDataRowPacket {
+ identifier: b'd',
+ row_cnt: tx.get_row_cnt(),
+ data_size: tx.data_size, // total byte count of all data_row messages combined
+ },
+ );
+ tx.responses.push(dummy_resp);
+ tx.responses.push(response);
+ // reset values
+ tx.data_row_cnt = 0;
+ tx.data_size = 0;
} else {
tx.responses.push(response);
if Self::response_is_complete(state) {