esl_mutex_create(&handle->mutex);
}
+ if (!handle->packet_buf) {
+ esl_buffer_create(&handle->packet_buf, BUF_CHUNK, BUF_START, 0);
+ }
+
handle->connected = 1;
esl_send_recv(handle, "connect\n\n");
if (!handle->mutex) {
esl_mutex_create(&handle->mutex);
}
+
+ if (!handle->packet_buf) {
+ esl_buffer_create(&handle->packet_buf, BUF_CHUNK, BUF_START, 0);
+ }
handle->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
esl_mutex_destroy(&mutex);
}
+ if (handle->packet_buf) {
+ esl_buffer_destroy(&handle->packet_buf);
+ }
+
+
return status;
}
if (check_q) {
esl_mutex_lock(handle->mutex);
- if (handle->race_event) {
+ if (handle->race_event || esl_buffer_packet_count(handle->packet_buf)) {
esl_mutex_unlock(handle->mutex);
return esl_recv_event(handle, check_q, save_event);
}
}
+static esl_ssize_t handle_recv(esl_handle_t *handle, void *data, esl_size_t datalen)
+{
+ return recv(handle->sock, data, datalen, 0);
+}
ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, int check_q, esl_event_t **save_event)
{
char *c;
esl_ssize_t rrval;
- int crc = 0;
esl_event_t *revent = NULL;
char *beg;
char *hname, *hval;
char *cl;
esl_ssize_t len;
int zc = 0;
- int bread = 0;
if (!handle || !handle->connected || handle->sock == ESL_SOCK_INVALID) {
return ESL_FAIL;
esl_mutex_lock(handle->mutex);
if (!handle->connected || handle->sock == ESL_SOCK_INVALID) {
- handle->connected = 0;
- esl_mutex_unlock(handle->mutex);
- return ESL_FAIL;
+ goto fail;
}
esl_event_safe_destroy(&handle->last_event);
goto parse_event;
}
- memset(handle->header_buf, 0, sizeof(handle->header_buf));
-
- c = handle->header_buf;
- beg = c;
+
+ while(!revent && handle->connected) {
+ esl_size_t len;
+
+ if ((len = esl_buffer_read_packet(handle->packet_buf, handle->socket_buf, sizeof(handle->socket_buf)))) {
+ char *data = (char *) handle->socket_buf;
+ char *p, *e;
+
+ esl_event_create(&revent, ESL_EVENT_CLONE);
+ revent->event_id = ESL_EVENT_SOCKET_DATA;
+ esl_event_add_header_string(revent, ESL_STACK_BOTTOM, "Event-Name", "SOCKET_DATA");
+
+ hname = p = data;
+ while(p) {
+ hname = p;
+ p = NULL;
+
+ if ((hval = strchr(hname, ':'))) {
+ *hval++ = '\0';
+ while(*hval == ' ' || *hval == '\t') hval++;
+
+ if ((e = strchr(hval, '\n'))) {
+ *e++ = '\0';
+ while(*e == '\n' || *e == '\r') e++;
+
+ if (hname && hval) {
+ esl_url_decode(hval);
+ esl_log(ESL_LOG_DEBUG, "RECV HEADER [%s] = [%s]\n", hname, hval);
+ esl_event_add_header_string(revent, ESL_STACK_BOTTOM, hname, hval);
+ }
+
+ p = e;
+ }
+ }
+ }
- while(handle->connected) {
- if (bread + 2 >= sizeof(handle->header_buf)) {
- esl_log(ESL_LOG_CRIT, "OUT OF BUFFER SPACE!\n");
- handle->connected = 0;
- esl_mutex_unlock(handle->mutex);
- return ESL_DISCONNECTED;
+ break;
}
- rrval = recv(handle->sock, c, 1, 0);
+ rrval = handle_recv(handle, handle->socket_buf, sizeof(handle->socket_buf));
+
if (rrval == 0) {
if (++zc >= 100) {
- handle->connected = 0;
- esl_mutex_unlock(handle->mutex);
- return ESL_DISCONNECTED;
+ goto fail;
}
+ continue;
} else if (rrval < 0) {
strerror_r(handle->errnum, handle->err, sizeof(handle->err));
goto fail;
- } else {
- zc = 0;
-
- if (*c == '\n') {
-
- *(c+1) = '\0';
-
- if (++crc == 2) {
- break;
- }
-
- if (!revent) {
- esl_event_create(&revent, ESL_EVENT_CLONE);
- revent->event_id = ESL_EVENT_SOCKET_DATA;
- esl_event_add_header_string(revent, ESL_STACK_BOTTOM, "Event-Name", "SOCKET_DATA");
-
- }
-
- hname = beg;
- hval = col = NULL;
-
- if (hname && (col = strchr(hname, ':'))) {
- hval = col + 1;
- *col = '\0';
- while(*hval == ' ') hval++;
- }
-
- *c = '\0';
-
- if (hname && hval) {
- esl_url_decode(hval);
- esl_log(ESL_LOG_DEBUG, "RECV HEADER [%s] = [%s]\n", hname, hval);
- esl_event_add_header_string(revent, ESL_STACK_BOTTOM, hname, hval);
- }
+ }
- c = beg;
- bread = 0;
- continue;
-
- } else {
- crc = 0;
- }
+ zc = 0;
- c++;
- }
+ esl_buffer_write(handle->packet_buf, handle->socket_buf, rrval);
}
-
+
if (!revent) {
goto fail;
}
*(body + len) = '\0';
do {
- esl_ssize_t r;
- if ((r = recv(handle->sock, body + sofar, len - sofar, 0)) < 0) {
- strerror_r(handle->errnum, handle->err, sizeof(handle->err));
- goto fail;
+ esl_ssize_t r,s = esl_buffer_inuse(handle->packet_buf);
+
+ if (s >= len) {
+ sofar = esl_buffer_read(handle->packet_buf, body, len);
+ } else {
+ r = handle_recv(handle, handle->socket_buf, sizeof(handle->socket_buf));
+
+ if (r < 0) {
+ strerror_r(handle->errnum, handle->err, sizeof(handle->err));
+ goto fail;
+ } else if (r == 0) {
+ if (++zc >= 100) {
+ goto fail;
+ }
+ continue;
+ }
+
+ zc = 0;
+
+ esl_buffer_write(handle->packet_buf, handle->socket_buf, r);
}
- sofar += r;
+
} while (sofar < len);
revent->body = body;
fail:
+ esl_mutex_unlock(handle->mutex);
+
handle->connected = 0;
return ESL_FAIL;
int main(void)
{
esl_handle_t handle = {{0}};
+ esl_buffer_t *buffer;
+ char doh[65536];
+
+ esl_buffer_create(&buffer, 32 * 1024, 32 * 1024, 0);
+
+ snprintf(doh, sizeof(doh), "TEST 1 FOO BAR 1234\n");
+ esl_buffer_write(buffer, doh, strlen(doh));
+ esl_buffer_write(buffer, doh, strlen(doh));
+ esl_buffer_write(buffer, doh, strlen(doh));
+ snprintf(doh, sizeof(doh), "TEST 1 END\n\n");
+ esl_buffer_write(buffer, doh, strlen(doh));
+
+
+
+ snprintf(doh, sizeof(doh), "TEST 2 BAR FOO 4321\n");
+ esl_buffer_write(buffer, doh, strlen(doh));
+ esl_buffer_write(buffer, doh, strlen(doh));
+ esl_buffer_write(buffer, doh, strlen(doh));
+ snprintf(doh, sizeof(doh), "TEST 2 END\n\n");
+ esl_buffer_write(buffer, doh, strlen(doh));
+
+ snprintf(doh, sizeof(doh), "TEST 2 BAR FOO 4321\n");
+ esl_buffer_write(buffer, doh, strlen(doh));
+ esl_buffer_write(buffer, doh, strlen(doh));
+ esl_buffer_write(buffer, doh, strlen(doh));
+ snprintf(doh, sizeof(doh), "TEST 2 END\n\n");
+ esl_buffer_write(buffer, doh, strlen(doh));
+
+ printf("COUNT %ld\n", esl_buffer_packet_count(buffer));
+
+ memset(doh, 0, sizeof(doh));
+ esl_buffer_read_packet(buffer, doh, sizeof(doh));
+ printf("TEST: [%s]\n", doh);
+
+ memset(doh, 0, sizeof(doh));
+
+
+ esl_buffer_read_packet(buffer, doh, sizeof(doh));
+ printf("TEST2: [%s]\n", doh);
+
+ return 0;
esl_connect(&handle, "localhost", 8021, NULL, "ClueCon");