/// config: log flags
pub config: AppLayerTxConfig,
+ /// The tx has been updated and needs to be processed : detection, logging, cleaning
+ /// It can then be skipped until new data arrives.
+ /// There is a boolean for both directions : to server and to client
+ pub updated_tc: bool,
+ pub updated_ts: bool,
+
/// logger flags for tx logging api
logged: LoggerFlags,
file_flags: 0,
file_tx: 0,
stream_logged: 0,
+ updated_tc: true,
+ updated_ts: true,
detect_flags_ts: 0,
detect_flags_tc: 0,
de_state: std::ptr::null_mut(),
/// Create new AppLayerTxData for a transaction in a single
/// direction.
pub fn for_direction(direction: Direction) -> Self {
- let (detect_flags_ts, detect_flags_tc) = match direction {
- Direction::ToServer => (0, APP_LAYER_TX_SKIP_INSPECT_FLAG),
- Direction::ToClient => (APP_LAYER_TX_SKIP_INSPECT_FLAG, 0),
+ let (detect_flags_ts, detect_flags_tc, updated_ts, updated_tc) = match direction {
+ Direction::ToServer => (0, APP_LAYER_TX_SKIP_INSPECT_FLAG, true, false),
+ Direction::ToClient => (APP_LAYER_TX_SKIP_INSPECT_FLAG, 0, false, true),
};
Self {
config: AppLayerTxConfig::new(),
file_flags: 0,
file_tx: 0,
stream_logged: 0,
+ updated_tc,
+ updated_ts,
detect_flags_ts,
detect_flags_tc,
de_state: std::ptr::null_mut(),
start = rem;
if let Some(tx) = self.find_request() {
+ tx.tx_data.updated_tc = true;
tx.response = Some(response);
SCLogNotice!("Found response for request:");
SCLogNotice!("- Request: {:?}", tx.request);
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.req_done || !tx_old.resp_done {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
tx_old.req_done = true;
tx_old.resp_done = true;
break;
}
}
}
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.req_done || !tx_old.resp_done {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
tx_old.req_done = true;
tx_old.resp_done = true;
break;
}
if let Some(tx) = otx {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
let done = (hdr.flags1 & PFCL1_FRAG) == 0 || (hdr.flags1 & PFCL1_LASTFRAG) != 0;
match hdr.pkt_type {
fn purge_tx_flood(&mut self) {
let mut event_set = false;
for tx in self.transactions.iter_mut() {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
tx.done = true;
if !event_set {
tx.tx_data.set_event(EnipEvent::TooManyTransactions as u8);
if let Some(req) = &tx.request {
if tx.response.is_none() {
tx.done = true;
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
if response_matches_request(req, pdu) {
return Some(tx);
}
let tx = &mut self.transactions[index - 1];
tx.tx_data.update_file_flags(self.state_data.file_flags);
tx.update_file_flags(tx.tx_data.file_flags);
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
} else {
// do not use SETTINGS_MAX_CONCURRENT_STREAMS as it can grow too much
tx_old.set_event(HTTP2Event::TooManyStreams);
// use a distinct state, even if we do not log it
tx_old.state = HTTP2TransactionState::HTTP2StateTodrop;
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
}
return None;
}
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.complete {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
tx_old.complete = true;
tx_old
.tx_data
if let Some(tx) = self.find_request(response.message_id) {
tx.complete = tx_is_complete(&response.protocol_op, Direction::ToClient);
let tx_id = tx.id();
+ tx.tx_data.updated_tc = true;
tx.responses.push_back(response);
let consumed = start.len() - rem.len();
self.set_frame_tc(flow, tx_id, consumed as i64);
for tx in &mut self.transactions {
if let Some(req) = &tx.request {
if tx.response.is_none() && resp.matches(req) {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
for tx in &mut self.transactions {
if let Some(resp) = &tx.response {
if tx.request.is_none() && req.matches(resp) {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
match self.find_response_and_validate(&mut msg) {
Some(tx) => {
tx.set_events_from_flags(&msg.error_flags);
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
tx.request = Some(msg);
}
None => {
} else {
tx.set_events_from_flags(&msg.error_flags);
}
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
tx.response = Some(msg);
}
None => {
if !tx.complete {
if let Some(mpktid) = tx.pkt_id {
if mpktid == pkt_id {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.complete {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
tx_old.complete = true;
MQTTState::set_event(tx_old, MQTTEvent::TooManyTransactions);
break;
// set at least one another transaction to the drop state
for tx_old in &mut self.transactions {
if !tx_old.request_done || !tx_old.response_done {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
tx_old.request_done = true;
tx_old.response_done = true;
tx_old.is_file_closed = true;
pub fn mark_response_tx_done(&mut self, xid: u32, rpc_status: u32, nfs_status: u32, resp_handle: &[u8])
{
if let Some(mytx) = self.get_tx_by_xid(xid) {
+ mytx.tx_data.updated_tc = true;
+ mytx.tx_data.updated_ts = true;
mytx.response_done = true;
mytx.rpc_response_status = rpc_status;
mytx.nfs_response_status = nfs_status;
tx.tx_data.update_file_flags(self.state_data.file_flags);
d.update_file_flags(tx.tx_data.file_flags);
SCLogDebug!("Found NFS file TX with ID {} XID {:04X}", tx.id, tx.xid);
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if tx_old.tx_res_state < PgsqlTxProgress::TxDone {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
// we don't check for TxReqDone for the majority of requests are basically completed
// when they're parsed, as of now
tx_old.tx_req_state = PgsqlTxProgress::TxFlushedOut;
// A simplified finite state machine for PostgreSQL v3 can be found at:
// https://samadhiweb.com/blog/2013.04.28.graphviz.postgresv3.html
if let Some(tx) = self.find_or_create_tx() {
+ tx.tx_data.updated_ts = true;
tx.request = Some(request);
if let Some(state) = new_state {
if Self::request_is_complete(state) {
self.state_progress = state;
}
if let Some(tx) = self.find_or_create_tx() {
+ tx.tx_data.updated_tc = true;
if tx.tx_res_state == PgsqlTxProgress::TxInit {
tx.tx_res_state = PgsqlTxProgress::TxReceived;
}
fn get_current_tx(&mut self) -> Option<&mut RFBTransaction> {
let tx_id = self.tx_id;
- self.transactions.iter_mut().find(|tx| tx.tx_id == tx_id)
+ let r = self.transactions.iter_mut().find(|tx| tx.tx_id == tx_id);
+ if let Some(tx) = r {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
+ return Some(tx);
+ }
+ return None;
}
fn parse_request(&mut self, flow: *const Flow, stream_slice: StreamSlice) -> AppLayerResult {
_ => { false },
};
if found {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
tx.tx_data.update_file_flags(self.state_data.file_flags);
d.update_file_flags(tx.tx_data.file_flags);
}
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
tx.tx_data.update_file_flags(self.state_data.file_flags);
d.update_file_flags(tx.tx_data.file_flags);
}
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
_ => { false },
};
if hit {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
index += 1;
if !tx_old.request_done || !tx_old.response_done {
+ tx_old.tx_data.updated_tc = true;
+ tx_old.tx_data.updated_ts = true;
tx_old.request_done = true;
tx_old.response_done = true;
tx_old.set_event(SMBEvent::TooManyTransactions);
false
};
if found {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
false
};
if found {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
_ => { false },
};
if found {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
_ => { false },
};
if hit {
+ tx.tx_data.updated_tc = true;
+ tx.tx_data.updated_ts = true;
return Some(tx);
}
}
let state = &mut cast_pointer!(state, SSHState);
let buf = stream_slice.as_slice();
let hdr = &mut state.transaction.cli_hdr;
+ state.transaction.tx_data.updated_ts = true;
if hdr.flags < SSHConnectionState::SshStateBannerDone {
return state.parse_banner(buf, false, pstate, flow, &stream_slice);
} else {
let state = &mut cast_pointer!(state, SSHState);
let buf = stream_slice.as_slice();
let hdr = &mut state.transaction.srv_hdr;
+ state.transaction.tx_data.updated_tc = true;
if hdr.flags < SSHConnectionState::SshStateBannerDone {
return state.parse_banner(buf, true, pstate, flow, &stream_slice);
} else {
/* Update the saved transport header so subsequent segments
* will be matched to this sequence number. */
tx->th = th;
+ tx->tx_data.updated_ts = true;
}
else {
ah = (DNP3ApplicationHeader *)(input + sizeof(DNP3LinkHeader) +
/* Replace the transport header in the transaction with this
* one in case there are more frames. */
tx->th = th;
+ tx->tx_data.updated_tc = true;
}
else {
ah = (DNP3ApplicationHeader *)(input + offset);
SCReturnStruct(APP_LAYER_ERROR);
}
lasttx = tx;
+ tx->tx_data.updated_tc = true;
if (state->command == FTP_COMMAND_UNKNOWN || tx->command_descriptor == NULL) {
/* unknown */
tx->command_descriptor = &FtpCommands[FTP_COMMAND_MAX - 1];
SCTxDataUpdateFileFlags(&ftpdata_state->tx_data, ftpdata_state->state_data.file_flags);
if (ftpdata_state->tx_data.file_tx == 0)
ftpdata_state->tx_data.file_tx = direction & (STREAM_TOSERVER | STREAM_TOCLIENT);
-
+ if (direction & STREAM_TOSERVER) {
+ ftpdata_state->tx_data.updated_ts = true;
+ } else {
+ ftpdata_state->tx_data.updated_tc = true;
+ }
/* we depend on detection engine for file pruning */
const uint16_t flags = FileFlowFlagsToFlags(ftpdata_state->tx_data.file_flags, direction);
int ret = 0;
if (tx_ud == NULL) {
SCReturnInt(HTP_OK);
}
+ tx_ud->tx_data.updated_ts = true;
SCTxDataUpdateFileFlags(&tx_ud->tx_data, hstate->state_data.file_flags);
if (!tx_ud->response_body_init) {
if (tx_ud == NULL) {
SCReturnInt(HTP_OK);
}
+ tx_ud->tx_data.updated_tc = true;
SCTxDataUpdateFileFlags(&tx_ud->tx_data, hstate->state_data.file_flags);
if (!tx_ud->request_body_init) {
tx_ud->request_body_init = 1;
{
HtpTxUserData *htud = (HtpTxUserData *)htp_tx_get_user_data(tx);
if (htud != NULL) {
+ htud->tx_data.updated_ts = true;
htud->request_has_trailers = 1;
}
return HTP_OK;
{
HtpTxUserData *htud = (HtpTxUserData *)htp_tx_get_user_data(tx);
if (htud != NULL) {
+ htud->tx_data.updated_tc = true;
htud->response_has_trailers = 1;
}
return HTP_OK;
}
tx_ud->tx_data.file_tx = STREAM_TOSERVER | STREAM_TOCLIENT; // each http tx may xfer files
htp_tx_set_user_data(tx, tx_ud);
+ } else {
+ tx_ud->tx_data.updated_ts = true;
}
SCReturnInt(HTP_OK);
}
tx_ud->tx_data.file_tx =
STREAM_TOCLIENT; // each http tx may xfer files. Toserver already missed.
htp_tx_set_user_data(tx, tx_ud);
+ } else {
+ tx_ud->tx_data.updated_tc = true;
}
SCReturnInt(HTP_OK);
}
HtpTxUserData *htud = (HtpTxUserData *)htp_tx_get_user_data(tx);
if (htud != NULL) {
+ htud->tx_data.updated_ts = true;
if (htud->tsflags & HTP_FILENAME_SET) {
SCLogDebug("closing file that was being stored");
(void)HTPFileClose(htud, NULL, 0, 0, STREAM_TOSERVER);
HtpTxUserData *htud = (HtpTxUserData *) htp_tx_get_user_data(tx);
if (htud != NULL) {
+ htud->tx_data.updated_tc = true;
if (htud->tcflags & HTP_FILENAME_SET) {
SCLogDebug("closing file that was being stored");
(void)HTPFileClose(htud, NULL, 0, 0, STREAM_TOCLIENT);
return HTP_OK;
}
tx_ud->request_headers_raw = ptmp;
+ tx_ud->tx_data.updated_ts = true;
memcpy(tx_ud->request_headers_raw + tx_ud->request_headers_raw_len,
tx_data->data, tx_data->len);
if (tx_ud == NULL) {
return HTP_OK;
}
+ tx_ud->tx_data.updated_tc = true;
ptmp = HTPRealloc(tx_ud->response_headers_raw,
tx_ud->response_headers_raw_len,
tx_ud->response_headers_raw_len + tx_data->len);
(pkt_dir == STREAM_TOSERVER) ? ts_disrupt_flags : tc_disrupt_flags);
AppLayerParserFileTxHousekeeping(f, tx, pkt_dir, (bool)pkt_dir_trunc);
}
-
+ if (txd) {
+ // should be reset by parser next time it updates the tx
+ if (pkt_dir & STREAM_TOSERVER) {
+ txd->updated_ts = false;
+ } else {
+ txd->updated_tc = false;
+ }
+ }
const int tx_progress_tc =
AppLayerParserGetStateProgress(ipproto, alproto, tx, tc_disrupt_flags);
if (tx_progress_tc < tx_end_state_tc) {
return 0; // to continue processing further
}
+ if (state->curr_tx) {
+ state->curr_tx->tx_data.updated_tc = true;
+ }
/* the reply code has to contain at least 3 bytes, to hold the 3 digit
* reply code */
if (line->len < 3) {
if (frame != NULL && state->curr_tx) {
AppLayerFrameSetTxId(frame, state->curr_tx->tx_id);
}
+ tx->tx_data.updated_ts = true;
state->toserver_data_count += (line->len + line->delim_len);
AppLayerParserState *pstate, StreamSlice stream_slice)
{
SSLState *ssl_state = (SSLState *)alstate;
+ ssl_state->tx_data.updated_tc = true;
+ ssl_state->tx_data.updated_ts = true;
uint32_t counter = 0;
ssl_state->f = f;
const uint8_t *input = StreamSliceGetData(&stream_slice);
DetectTransaction no_tx = NO_TX;
return no_tx;
}
+ const int tx_progress = AppLayerParserGetStateProgress(ipproto, alproto, tx_ptr, flow_flags);
+ bool updated = (flow_flags & STREAM_TOSERVER) ? txd->updated_ts : txd->updated_tc;
+ if (!updated && tx_progress < tx_end_state && ((flow_flags & STREAM_EOF) == 0)) {
+ DetectTransaction no_tx = NO_TX;
+ return no_tx;
+ }
uint64_t detect_flags =
(flow_flags & STREAM_TOSERVER) ? txd->detect_flags_ts : txd->detect_flags_tc;
if (detect_flags & APP_LAYER_TX_INSPECTED_FLAG) {
return no_tx;
}
- const int tx_progress = AppLayerParserGetStateProgress(ipproto, alproto, tx_ptr, flow_flags);
const int dir_int = (flow_flags & STREAM_TOSERVER) ? 0 : 1;
DetectEngineState *tx_de_state = txd->de_state;
DetectEngineStateDirection *tx_dir_state = tx_de_state ? &tx_de_state->dir_state[dir_int] : NULL;
uint64_t tx_id = AppLayerParserGetTransactionLogId(f->alparser);
uint64_t max_id = tx_id;
int logged = 0;
- int gap = 0;
+ bool gap = false;
const bool support_files = AppLayerParserSupportsFiles(ipproto, alproto);
const uint8_t pkt_dir = STREAM_FLAGS_FOR_PACKET(p);
tx_id = ires.tx_id;
SCLogDebug("STARTING tx_id %" PRIu64 ", tx %p", tx_id, tx);
- const int tx_progress_ts =
- AppLayerParserGetStateProgress(ipproto, alproto, tx, ts_disrupt_flags);
- const int tx_progress_tc =
- AppLayerParserGetStateProgress(ipproto, alproto, tx, tc_disrupt_flags);
- const bool tx_complete = (tx_progress_ts == complete_ts && tx_progress_tc == complete_tc);
-
- SCLogDebug("file_thread_data %p filedata_thread_data %p", op_thread_data->file,
- op_thread_data->filedata);
-
AppLayerTxData *txd = AppLayerParserGetTxData(ipproto, alproto, tx);
if (unlikely(txd == NULL)) {
SCLogDebug("NO TXD");
goto next_tx;
}
+ const int tx_progress_ts =
+ AppLayerParserGetStateProgress(ipproto, alproto, tx, ts_disrupt_flags);
+ const int tx_progress_tc =
+ AppLayerParserGetStateProgress(ipproto, alproto, tx, tc_disrupt_flags);
+ const bool tx_complete = (tx_progress_ts == complete_ts && tx_progress_tc == complete_tc);
+
+ SCLogDebug("file_thread_data %p filedata_thread_data %p", op_thread_data->file,
+ op_thread_data->filedata);
+
if (file_logging_active) {
if (AppLayerParserIsFileTx(txd)) { // need to process each tx that might be a file tx,
// even if there are not files (yet)
}
}
SCLogDebug("logger: expect %08x, have %08x", logger_expectation, txd->logged.flags);
+ if (!txd->updated_tc && !txd->updated_ts && !(tx_progress_ts == complete_ts) &&
+ !(tx_progress_tc == complete_tc) && !ts_eof && !tc_eof) {
+ gap = true;
+ goto next_tx;
+ }
if (list[ALPROTO_UNKNOWN] != 0) {
OutputTxLogList0(tv, op_thread_data, p, f, tx, tx_id);
max_id = tx_id;
SCLogDebug("max_id %" PRIu64, max_id);
} else {
- gap = 1;
+ gap = true;
}
next_tx:
if (!ires.has_next)