From 7eb31c852dc50ab04fbdb0b56ef5a54fedcdd1e0 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Sat, 7 Oct 2023 15:13:09 +0200 Subject: [PATCH] RTSP: improved RTP parser - fix HTTP header parsing to report incomplete lines it buffers as consumed! - re-implement the RTP parser for interleave RTP messages for robustness. It is now keeping its state at the connection - RTSP protocol handler "readwrite" implementation now tracks if the response is before/in/after header parsing or "in" a bod by calling "Curl_http_readwrite_headers()" itself. This allows it to know when non-RTP bytes are "junk" or HEADER or BODY. - tested with #12035 and various small receive sizes where current master fails Closes #12052 --- lib/http.c | 3 +- lib/rtsp.c | 352 ++++++++++++++++++++++++++++++++----------------- lib/rtsp.h | 9 ++ lib/transfer.c | 64 +++++---- 4 files changed, 274 insertions(+), 154 deletions(-) diff --git a/lib/http.c b/lib/http.c index d6b78cae79..d7e841fd9a 100644 --- a/lib/http.c +++ b/lib/http.c @@ -3986,6 +3986,7 @@ CURLcode Curl_http_readwrite_headers(struct Curl_easy *data, char *end_ptr; /* header line within buffer loop */ + *stop_reading = FALSE; do { size_t rest_length; size_t full_length; @@ -4023,7 +4024,7 @@ CURLcode Curl_http_readwrite_headers(struct Curl_easy *data, break; } } - + *nread = 0; break; /* read more and try again */ } diff --git a/lib/rtsp.c b/lib/rtsp.c index ccd7264b00..f963391062 100644 --- a/lib/rtsp.c +++ b/lib/rtsp.c @@ -45,8 +45,8 @@ #include "curl_memory.h" #include "memdebug.h" -#define RTP_PKT_LENGTH(p) ((((int)((unsigned char)((p)[2]))) << 8) | \ - ((int)((unsigned char)((p)[3])))) +#define RTP_PKT_LENGTH(p) ((((unsigned int)((unsigned char)((p)[2]))) << 8) | \ + ((unsigned int)((unsigned char)((p)[3])))) /* protocol-specific functions set up to be called by the main engine */ static CURLcode rtsp_do(struct Curl_easy *data, bool *done); @@ -88,7 +88,7 @@ static int rtsp_getsock_do(struct Curl_easy *data, struct connectdata *conn, } static -CURLcode rtp_client_write(struct Curl_easy *data, char *ptr, size_t len); +CURLcode rtp_client_write(struct Curl_easy *data, const char *ptr, size_t len); static CURLcode rtsp_parse_transport(struct Curl_easy *data, char *transport); @@ -585,153 +585,257 @@ static CURLcode rtsp_do(struct Curl_easy *data, bool *done) return result; } - -static CURLcode rtsp_rtp_readwrite(struct Curl_easy *data, - struct connectdata *conn, - ssize_t *nread, - bool *readmore) { - struct SingleRequest *k = &data->req; +static CURLcode rtsp_filter_rtp(struct Curl_easy *data, + struct connectdata *conn, + const char *buf, + size_t blen, + bool in_body, + size_t *pconsumed) +{ struct rtsp_conn *rtspc = &(conn->proto.rtspc); - unsigned char *rtp_channel_mask = data->state.rtp_channel_mask; + CURLcode result = CURLE_OK; - char *rtp; /* moving pointer to rtp data */ - ssize_t rtp_dataleft; /* how much data left to parse in this round */ - CURLcode result; - bool interleaved = false; - size_t skip_size = 0; + *pconsumed = 0; + while(blen) { + switch(rtspc->state) { - if(Curl_dyn_len(&rtspc->buf)) { - /* There was some leftover data the last time. Append new buffers */ - if(Curl_dyn_addn(&rtspc->buf, k->str, *nread)) - return CURLE_OUT_OF_MEMORY; - rtp = Curl_dyn_ptr(&rtspc->buf); - rtp_dataleft = Curl_dyn_len(&rtspc->buf); - } - else { - /* Just parse the request buffer directly */ - rtp = k->str; - rtp_dataleft = *nread; - } - - while(rtp_dataleft > 0) { - if(rtp[0] == '$') { - if(rtp_dataleft > 4) { - unsigned char rtp_channel; - int rtp_length; - int idx; - int off; - - /* Parse the header */ - /* The channel identifier immediately follows and is 1 byte */ - rtp_channel = (unsigned char)rtp[1]; - idx = rtp_channel / 8; - off = rtp_channel % 8; - if(!(rtp_channel_mask[idx] & (1 << off))) { - /* invalid channel number, maybe not an RTP packet */ - rtp++; - rtp_dataleft--; - skip_size++; - continue; + case RTP_PARSE_SKIP: { + DEBUGASSERT(Curl_dyn_len(&rtspc->buf) == 0); + if(in_body && buf[0] != '$') { + /* in BODY and no valid start, do not consume and return */ + goto out; + } + while(blen && buf[0] != '$') { + if(!in_body && buf[0] == 'R' && + data->set.rtspreq != RTSPREQ_RECEIVE) { + if(strncmp(buf, "RTSP/", (blen < 5) ? blen : 5) == 0) { + /* This could be the next response, no consume and return */ + if(*pconsumed) { + DEBUGF(infof(data, "RTP rtsp_filter_rtp[SKIP] RTSP/ prefix, " + "skipping %zd bytes of junk", *pconsumed)); + } + rtspc->state = RTP_PARSE_SKIP; + rtspc->in_header = TRUE; + goto out; + } } - if(skip_size > 0) { - DEBUGF(infof(data, "Skip the malformed interleaved data %lu " - "bytes", skip_size)); + /* junk, consume without buffering */ + *pconsumed += 1; + ++buf; + --blen; + } + if(blen && buf[0] == '$') { + /* possible start of an RTP message, buffer */ + if(Curl_dyn_addn(&rtspc->buf, buf, 1)) { + result = CURLE_OUT_OF_MEMORY; + goto out; } - skip_size = 0; - rtspc->rtp_channel = rtp_channel; - - /* The length is two bytes */ - rtp_length = RTP_PKT_LENGTH(rtp); + *pconsumed += 1; + ++buf; + --blen; + rtspc->state = RTP_PARSE_CHANNEL; + } + break; + } - if(rtp_dataleft < rtp_length + 4) { - /* Need more - incomplete payload */ - *readmore = TRUE; - break; + case RTP_PARSE_CHANNEL: { + int idx = ((unsigned char)buf[0]) / 8; + int off = ((unsigned char)buf[0]) % 8; + DEBUGASSERT(Curl_dyn_len(&rtspc->buf) == 1); + if(!(data->state.rtp_channel_mask[idx] & (1 << off))) { + /* invalid channel number, junk or BODY data */ + rtspc->state = RTP_PARSE_SKIP; + if(in_body) { + /* we do not consume this byte, it is BODY data */ + DEBUGF(infof(data, "RTSP: invalid RTP channel %d in BODY, " + "treating as BODY data", idx)); + if(*pconsumed == 0) { + /* We did not consume the initial '$' in our buffer, but had + * it from an earlier call. We cannot un-consume it and have + * to write it directly as BODY data */ + result = Curl_client_write(data, CLIENTWRITE_BODY, + Curl_dyn_ptr(&rtspc->buf), 1); + ++data->req.bytecount; + Curl_dyn_free(&rtspc->buf); + if(result) + goto out; + } + else { + /* un-consume the '$' and leave */ + Curl_dyn_free(&rtspc->buf); + *pconsumed -= 1; + --buf; + ++blen; + goto out; + } } - interleaved = true; - /* We have the full RTP interleaved packet - * Write out the header including the leading '$' */ - DEBUGF(infof(data, "RTP write channel %d rtp_length %d", - rtspc->rtp_channel, rtp_length)); - result = rtp_client_write(data, &rtp[0], rtp_length + 4); - if(result) { - *readmore = FALSE; - return result; + else { + /* not BODY, forget the junk '$'. Do not consume this byte, + * it might be a start */ + infof(data, "RTSP: invalid RTP channel %d, skipping", idx); + Curl_dyn_free(&rtspc->buf); } + break; + } + /* a valid channel, so we expect this to be a real RTP message */ + rtspc->rtp_channel = (unsigned char)buf[0]; + if(Curl_dyn_addn(&rtspc->buf, buf, 1)) { + result = CURLE_OUT_OF_MEMORY; + goto out; + } + *pconsumed += 1; + ++buf; + --blen; + rtspc->state = RTP_PARSE_LEN; + break; + } - /* Move forward in the buffer */ - rtp_dataleft -= rtp_length + 4; - rtp += rtp_length + 4; + case RTP_PARSE_LEN: { + size_t rtp_len = Curl_dyn_len(&rtspc->buf); + const char *rtp_buf; + DEBUGASSERT(rtp_len >= 2 && rtp_len < 4); + if(Curl_dyn_addn(&rtspc->buf, buf, 1)) { + result = CURLE_OUT_OF_MEMORY; + goto out; + } + *pconsumed += 1; + ++buf; + --blen; + if(rtp_len == 2) + break; + rtp_buf = Curl_dyn_ptr(&rtspc->buf); + rtspc->rtp_len = RTP_PKT_LENGTH(rtp_buf) + 4; + rtspc->state = RTP_PARSE_DATA; + break; + } - if(data->set.rtspreq == RTSPREQ_RECEIVE) { - /* If we are in a passive receive, give control back - * to the app as often as we can. - */ - k->keepon &= ~KEEP_RECV; + case RTP_PARSE_DATA: { + size_t rtp_len = Curl_dyn_len(&rtspc->buf); + size_t needed; + DEBUGASSERT(rtp_len < rtspc->rtp_len); + needed = rtspc->rtp_len - rtp_len; + if(needed <= blen) { + if(Curl_dyn_addn(&rtspc->buf, buf, needed)) { + result = CURLE_OUT_OF_MEMORY; + goto out; } + *pconsumed += needed; + buf += needed; + blen -= needed; + /* complete RTP message in buffer */ + DEBUGF(infof(data, "RTP write channel %d rtp_len %zu", + rtspc->rtp_channel, rtspc->rtp_len)); + result = rtp_client_write(data, Curl_dyn_ptr(&rtspc->buf), + rtspc->rtp_len); + Curl_dyn_free(&rtspc->buf); + rtspc->state = RTP_PARSE_SKIP; + if(result) + goto out; } else { - /* Need more - incomplete header */ - *readmore = TRUE; - break; - } - } - else { - /* If the following data begins with 'RTSP/', which might be an RTSP - message, we should stop skipping the data. */ - /* If `k-> headerline> 0 && !interleaved` is true, we are maybe in the - middle of an RTSP message. It is difficult to determine this, so we - stop skipping. */ - size_t prefix_len = (rtp_dataleft < 5) ? rtp_dataleft : 5; - if((k->headerline > 0 && !interleaved) || - strncmp(rtp, "RTSP/", prefix_len) == 0) { - if(skip_size > 0) { - DEBUGF(infof(data, "Skip the malformed interleaved data %lu " - "bytes", skip_size)); + if(Curl_dyn_addn(&rtspc->buf, buf, blen)) { + result = CURLE_OUT_OF_MEMORY; + goto out; } - break; /* maybe is an RTSP message */ + *pconsumed += blen; + buf += blen; + blen = 0; } - /* Skip incorrect data util the next RTP packet or RTSP message */ - do { - rtp++; - rtp_dataleft--; - skip_size++; - } while(rtp_dataleft > 0 && rtp[0] != '$' && rtp[0] != 'R'); + break; + } + + default: + DEBUGASSERT(0); + return CURLE_RECV_ERROR; } } +out: + return result; +} + +static CURLcode rtsp_rtp_readwrite(struct Curl_easy *data, + struct connectdata *conn, + ssize_t *nread, + bool *readmore) +{ + struct rtsp_conn *rtspc = &(conn->proto.rtspc); + CURLcode result = CURLE_OK; + size_t consumed = 0; + char *buf; + size_t blen; + bool in_body; - if(rtp_dataleft && rtp[0] == '$') { - DEBUGF(infof(data, "RTP Rewinding %zd %s", rtp_dataleft, - *readmore ? "(READMORE)" : "")); + if(!data->req.header) + rtspc->in_header = FALSE; + in_body = (data->req.headerline && !rtspc->in_header) && + (data->req.size >= 0) && + (data->req.bytecount < data->req.size); - /* Store the incomplete RTP packet for a "rewind" */ - if(!Curl_dyn_len(&rtspc->buf)) { - /* nothing was stored, add this data */ - if(Curl_dyn_addn(&rtspc->buf, rtp, rtp_dataleft)) - return CURLE_OUT_OF_MEMORY; - } - else { - /* keep the remainder */ - Curl_dyn_tail(&rtspc->buf, rtp_dataleft); - } + DEBUGASSERT(*nread >= 0); + blen = (size_t)(*nread); + buf = data->req.str; + *readmore = FALSE; - /* As far as the transfer is concerned, this data is consumed */ - *nread = 0; - return CURLE_OK; + if(!blen) { + goto out; } - /* Fix up k->str to point just after the last RTP packet */ - k->str += *nread - rtp_dataleft; - *nread = rtp_dataleft; + /* If header parsing is not onging, extract RTP messsages */ + if(!rtspc->in_header) { + result = rtsp_filter_rtp(data, conn, buf, blen, in_body, &consumed); + if(result) + goto out; + buf += consumed; + blen -= consumed; + } + + /* we want to parse headers, do so */ + if(data->req.header && blen) { + bool stop_reading; + rtspc->in_header = TRUE; + data->req.str = buf; + *nread = blen; + result = Curl_http_readwrite_headers(data, conn, nread, &stop_reading); + if(result) + goto out; + + DEBUGASSERT(*nread >= 0); + blen = (size_t)(*nread); + buf = data->req.str; + + if(!data->req.header) + rtspc->in_header = FALSE; + + if(!rtspc->in_header) { + /* If header parsing is done and data left, extract RTP messages */ + in_body = (data->req.headerline && !rtspc->in_header) && + (data->req.size >= 0) && + (data->req.bytecount < data->req.size); + result = rtsp_filter_rtp(data, conn, buf, blen, in_body, &consumed); + if(result) + goto out; + buf += consumed; + blen -= consumed; + } + } - /* If we get here, we have finished with the leftover/merge buffer */ - Curl_dyn_free(&rtspc->buf); + data->req.str = buf; + *nread = blen; + if(rtspc->state != RTP_PARSE_SKIP) + *readmore = TRUE; - return CURLE_OK; +out: + if(!*readmore && data->set.rtspreq == RTSPREQ_RECEIVE) { + /* In special mode RECEIVE, we just process one chunk of network + * data, so we stop the transfer here, if we have no incomplete + * RTP message pending. */ + data->req.keepon &= ~KEEP_RECV; + } + return result; } static -CURLcode rtp_client_write(struct Curl_easy *data, char *ptr, size_t len) +CURLcode rtp_client_write(struct Curl_easy *data, const char *ptr, size_t len) { size_t wrote; curl_write_callback writeit; @@ -756,7 +860,7 @@ CURLcode rtp_client_write(struct Curl_easy *data, char *ptr, size_t len) } Curl_set_in_callback(data, true); - wrote = writeit(ptr, 1, len, user_ptr); + wrote = writeit((char *)ptr, 1, len, user_ptr); Curl_set_in_callback(data, false); if(CURL_WRITEFUNC_PAUSE == wrote) { diff --git a/lib/rtsp.h b/lib/rtsp.h index 111bac2a61..237b80f809 100644 --- a/lib/rtsp.h +++ b/lib/rtsp.h @@ -39,6 +39,12 @@ CURLcode Curl_rtsp_parseheader(struct Curl_easy *data, char *header); #endif /* CURL_DISABLE_RTSP */ +typedef enum { + RTP_PARSE_SKIP, + RTP_PARSE_CHANNEL, + RTP_PARSE_LEN, + RTP_PARSE_DATA +} rtp_parse_st; /* * RTSP Connection data * @@ -47,6 +53,9 @@ CURLcode Curl_rtsp_parseheader(struct Curl_easy *data, char *header); struct rtsp_conn { struct dynbuf buf; int rtp_channel; + size_t rtp_len; + rtp_parse_st state; + BIT(in_header); }; /**************************************************************************** diff --git a/lib/transfer.c b/lib/transfer.c index 1407572603..3148e8a2ea 100644 --- a/lib/transfer.c +++ b/lib/transfer.c @@ -423,7 +423,6 @@ static CURLcode readwrite_data(struct Curl_easy *data, { CURLcode result = CURLE_OK; ssize_t nread; /* number of bytes read */ - size_t excess = 0; /* excess bytes read */ bool readmore = FALSE; /* used by RTP to signal for more data */ int maxloops = 100; curl_off_t max_recv = data->set.max_recv_speed? @@ -439,6 +438,7 @@ static CURLcode readwrite_data(struct Curl_easy *data, read or we get a CURLE_AGAIN */ do { bool is_empty_data = FALSE; + size_t excess = 0; /* excess bytes read */ size_t buffersize = data->set.buffer_size; size_t bytestoread = buffersize; /* For HTTP/2 and HTTP/3, read data without caring about the content @@ -555,9 +555,11 @@ static CURLcode readwrite_data(struct Curl_easy *data, is non-headers. */ if(!k->header && (nread > 0 || is_empty_data)) { - if(data->req.no_body) { + if(data->req.no_body && nread > 0) { /* data arrives although we want none, bail out */ streamclose(conn, "ignoring body"); + DEBUGF(infof(data, "did not want a BODY, but seeing %zd bytes", + nread)); *done = TRUE; result = CURLE_WEIRD_SERVER_REPLY; goto out; @@ -642,17 +644,6 @@ static CURLcode readwrite_data(struct Curl_easy *data, (k->bytecount + nread >= k->maxdownload)) { excess = (size_t)(k->bytecount + nread - k->maxdownload); - if(excess > 0 && !k->ignorebody) { - infof(data, - "Excess found in a read:" - " excess = %zu" - ", size = %" CURL_FORMAT_CURL_OFF_T - ", maxdownload = %" CURL_FORMAT_CURL_OFF_T - ", bytecount = %" CURL_FORMAT_CURL_OFF_T, - excess, k->size, k->maxdownload, k->bytecount); - connclose(conn, "excess found in a read"); - } - nread = (ssize_t) (k->maxdownload - k->bytecount); if(nread < 0) /* this should be unusual */ nread = 0; @@ -718,24 +709,39 @@ static CURLcode readwrite_data(struct Curl_easy *data, } /* if(!header and data to read) */ - if(conn->handler->readwrite && excess) { - /* Parse the excess data */ - k->str += nread; + if(excess > 0 && !k->ignorebody) { + if(conn->handler->readwrite) { + /* Give protocol handler a chance to do something with it */ + k->str += nread; + if(&k->str[excess] > &buf[data->set.buffer_size]) { + /* the excess amount was too excessive(!), make sure + it doesn't read out of buffer */ + excess = &buf[data->set.buffer_size] - k->str; + } + nread = (ssize_t)excess; + result = conn->handler->readwrite(data, conn, &nread, &readmore); + if(result) + goto out; - if(&k->str[excess] > &buf[data->set.buffer_size]) { - /* the excess amount was too excessive(!), make sure - it doesn't read out of buffer */ - excess = &buf[data->set.buffer_size] - k->str; + if(readmore) { + DEBUGASSERT(nread == 0); + k->keepon |= KEEP_RECV; /* we're not done reading */ + } + else if(nread == 0) + break; + /* protocol handler did not consume all excess data */ + excess = nread; + } + if(excess) { + infof(data, + "Excess found in a read:" + " excess = %zu" + ", size = %" CURL_FORMAT_CURL_OFF_T + ", maxdownload = %" CURL_FORMAT_CURL_OFF_T + ", bytecount = %" CURL_FORMAT_CURL_OFF_T, + excess, k->size, k->maxdownload, k->bytecount); + connclose(conn, "excess found in a read"); } - nread = (ssize_t)excess; - - result = conn->handler->readwrite(data, conn, &nread, &readmore); - if(result) - goto out; - - if(readmore) - k->keepon |= KEEP_RECV; /* we're not done reading */ - break; } if(is_empty_data) { -- 2.47.3