} pc_queue; // producer-consumer queue
#endif
+static int msg_indexes = 1;
+
typedef struct {
+ int index_number;
uint32_t referenceCount; // we might start using this...
unsigned int nheaders;
char *name[16];
}
void msg_retain(rtsp_message *msg) {
- if (msg) {
int rc = pthread_mutex_lock(&reference_counter_lock);
if (rc)
debug(1, "Error %d locking reference counter lock");
+ if (msg > (rtsp_message *) 0x00010000) {
msg->referenceCount++;
+ debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount);
rc = pthread_mutex_unlock(&reference_counter_lock);
if (rc)
debug(1, "Error %d unlocking reference counter lock");
} else {
- debug(1, "null rtsp_message pointer passed to retain");
+ debug(1, "invalid rtsp_message pointer %d passed to retain", (int) msg);
}
}
if (msg) {
memset(msg, 0, sizeof(rtsp_message));
msg->referenceCount = 1; // from now on, any access to this must be protected with the lock
+ msg->index_number = msg_indexes++;
} else {
- die("can not allocate memory for an rtsp_message.");
+ die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes);
}
+ debug(1,"msg_init -- create item %d.", msg->index_number);
return msg;
}
}
*/
+void msg_free(rtsp_message **msgh) {
+ debug_mutex_lock(&reference_counter_lock, 1000, 0);
+ if (*msgh > (rtsp_message *)0x00010000) {
+ rtsp_message *msg = *msgh;
+ msg->referenceCount--;
+ if (msg->referenceCount == 0) {
+ unsigned int i;
+ for (i = 0; i < msg->nheaders; i++) {
+ free(msg->name[i]);
+ free(msg->value[i]);
+ }
+ if (msg->content)
+ free(msg->content);
+ debug(1,"msg_free item %d -- free.",msg->index_number);
+ *msgh = (rtsp_message *)(msg->index_number & 0xFFFF); // put the index number of the freed message in here
+ free(msg);
+ } else {
+ debug(1,"msg_free item %d -- decrement reference to %d.",msg->index_number,msg->referenceCount);
+ }
+
+ // else {
+ // debug(1,"rtsp_message reference count non-zero:
+ // %d!",msg->referenceCount);
+ //}
+ } else if (*msgh != NULL) {
+ debug(1, "msg_free: error attempting to free an allocated but already-freed rtsp_message %d.",(int)*msgh);
+ }
+ debug_mutex_unlock(&reference_counter_lock, 0);
+}
+
+/*
void msg_free(rtsp_message *msg) {
if (msg) {
debug(1, "null rtsp_message pointer passed to msg_free()");
}
}
+*/
int msg_handle_line(rtsp_message **pmsg, char *line) {
rtsp_message *msg = *pmsg;
}
fail:
- *pmsg = NULL;
- msg_free(msg);
+ msg_free(pmsg);
return 0;
}
enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, rtsp_message **the_packet) {
+
+ *the_packet = NULL; // need this for erro handling
+
enum rtsp_read_request_response reply = rtsp_read_request_response_ok;
ssize_t buflen = 4096;
char *buf = malloc(buflen + 1); // add a NUL at the end
-
- rtsp_message *msg = NULL;
+ if (!buf) {
+ warn("rtsp_read_request: can't get a buffer.");
+ reply = rtsp_read_request_response_error;
+ goto shutdown;
+ }
ssize_t nread;
ssize_t inbuf = 0;
char *next;
while (msg_size < 0 && (next = nextline(buf, inbuf))) {
- msg_size = msg_handle_line(&msg, buf);
+ msg_size = msg_handle_line(the_packet, buf);
- if (!msg) {
+ if (!(*the_packet)) {
warn("no RTSP header received");
reply = rtsp_read_request_response_bad_packet;
goto shutdown;
inbuf += nread;
}
+ rtsp_message *msg = *the_packet;
msg->contentlength = inbuf;
msg->content = buf;
char *jp = inbuf + buf;
return reply;
shutdown:
- if (msg) {
- msg_free(msg); // which will free the content and everything else
- }
- // in case the message wasn't formed or wasn't fully initialised
- if ((msg && (msg->content == NULL)) || (!msg))
- free(buf);
- *the_packet = NULL;
+ msg_free(the_packet);
+ free(buf);
return reply;
}
// debug(1, "metadata_pack_cleanup_function called");
metadata_package *pack = (metadata_package *)arg;
if (pack->carrier)
- msg_free(pack->carrier); // release the message
+ msg_free(&pack->carrier); // release the message
else if (pack->data)
free(pack->data);
}
pack.code = code;
pack.data = data;
pack.length = length;
- if (carrier)
- msg_retain(carrier);
pack.carrier = carrier;
+ if (pack.carrier)
+ msg_retain(pack.carrier);
int rc = pc_queue_add_item(&metadata_queue, &pack, block);
if (rc == EBUSY) {
- if (carrier)
- msg_free(carrier);
+ if (pack.carrier)
+ msg_free(&pack.carrier);
else if (data)
free(data);
warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code);
void msg_cleanup_function(void *arg) {
// debug(3, "msg_cleanup_function called.");
- msg_free((rtsp_message *)arg);
+ msg_free((rtsp_message **)arg);
}
static void *rtsp_conversation_thread_func(void *pconn) {
int debug_level = 3; // for printing the request and response
reply = rtsp_read_request(conn, &req);
if (reply == rtsp_read_request_response_ok) {
- pthread_cleanup_push(msg_cleanup_function, (void *)req);
+ pthread_cleanup_push(msg_cleanup_function, (void *)&req);
resp = msg_init();
- pthread_cleanup_push(msg_cleanup_function, (void *)resp);
+ pthread_cleanup_push(msg_cleanup_function, (void *)&resp);
resp->respcode = 400;
if (strcmp(req->method, "OPTIONS") !=