msg_iovec_t iov[],
size_t iovlen)
{
- size_t i, j, n, m, size = 0;
+ size_t i, j, m, size = 0;
ssize_t nerror;
tport_ws_t *wstp = (tport_ws_t *)self;
- enum { WSBUFSIZE = 2048 };
+ wstp->wstp_buflen = 0;
for (i = 0; i < iovlen; i = j) {
- char *buf = wstp->wstp_buffer;
- unsigned wsbufsize = WSBUFSIZE;
-
- if (i + 1 == iovlen) {
- buf = NULL; /* Don't bother copying single chunk */
- }
-
- if (buf &&
- (char *)iov[i].siv_base - buf < WSBUFSIZE &&
- (char *)iov[i].siv_base - buf >= 0) {
- wsbufsize = buf + WSBUFSIZE - (char *)iov[i].siv_base;
- assert(wsbufsize <= WSBUFSIZE);
- }
+ char *buf = NULL;
+ unsigned wsbufsize = sizeof(wstp->wstp_buffer);
for (j = i, m = 0; buf && j < iovlen; j++) {
if (m + iov[j].siv_len > wsbufsize) {
iov[j].siv_base = buf, iov[j].siv_len = m;
}
- nerror = ws_feed_buf(&wstp->ws, buf, m);
+ nerror = 0;
+ if (m + wstp->wstp_buflen >= wsbufsize) {
+ nerror = -1;
+ errno = ENOMEM;
+ } else {
+ if (memcpy(wstp->wstp_buffer + wstp->wstp_buflen, buf, m)) {
+ wstp->wstp_buflen += m;
+ } else {
+ nerror = -1;
+ errno = ENOMEM;
+ }
+ }
+
SU_DEBUG_9(("tport_ws_writevec: vec %p %p %lu ("MOD_ZD")\n",
(void *)&wstp->ws, (void *)iov[i].siv_base, (LU)iov[i].siv_len,
nerror));
SU_DEBUG_3(("ws_write: %s\n", strerror(err)));
return -1;
}
-
- n = (size_t)nerror;
- size += n;
-
- /* Return if the write buffer is full for now */
- if (n != m)
- break;
}
- ws_send_buf(&wstp->ws, WSOC_TEXT);
-
+ if (wstp->wstp_buflen) {
+ *(wstp->wstp_buffer + wstp->wstp_buflen) = '\0';
+ ws_write_frame(&wstp->ws, WSOC_TEXT, wstp->wstp_buffer, wstp->wstp_buflen);
+ size = wstp->wstp_buflen;
+ }
return size;
}
+#include <switch.h>
#include "ws.h"
#include <pthread.h>
} while((p = (strstr(p,"\n")+1))!=(char *)1);
- if (p != (char *)1 && *p!='\0') {
+ if (p && p != (char *)1 && *p!='\0') {
char *v, *e = 0;
v = strchr(p, ':');
}
}
- if (bytes > sizeof(wsh->buffer) -1) {
+ if (bytes > wsh->buflen -1) {
goto err;
}
}
}
- } while (r == -1 && err == SSL_ERROR_WANT_READ && wsh->x < 100);
+ } while (r == -1 && err == SSL_ERROR_WANT_READ && wsh->x < 1000);
goto end;
}
ms_sleep(10);
}
}
- } while (r == -1 && xp_is_blocking(xp_errno()) && wsh->x < 100);
+ } while (r == -1 && xp_is_blocking(xp_errno()) && wsh->x < 1000);
- if (wsh->x >= 1000 || (block && wsh->x >= 100)) {
+ if (wsh->x >= 10000 || (block && wsh->x >= 1000)) {
r = -1;
}
wsh->close_sock = 1;
}
- wsh->buflen = sizeof(wsh->buffer);
+ wsh->buflen = 1024 * 64;
+ wsh->bbuflen = wsh->buflen;
+
+ wsh->buffer = malloc(wsh->buflen);
+ wsh->bbuffer = malloc(wsh->bbuflen);
+ //printf("init %p %ld\n", (void *) wsh->bbuffer, wsh->bbuflen);
+ //memset(wsh->buffer, 0, wsh->buflen);
+ //memset(wsh->bbuffer, 0, wsh->bbuflen);
+
wsh->secure = ssl_ctx ? 1 : 0;
setup_socket(sock);
SSL_free(wsh->ssl);
wsh->ssl = NULL;
}
+
+ if (wsh->buffer) free(wsh->buffer);
+ if (wsh->bbuffer) free(wsh->bbuffer);
+
+ wsh->buffer = wsh->bbuffer = NULL;
+
}
ssize_t ws_close(wsh_t *wsh, int16_t reason)
}
+
+uint64_t hton64(uint64_t val)
+{
+ if (__BYTE_ORDER == __BIG_ENDIAN) return (val);
+ else return __bswap_64(val);
+}
+
+uint64_t ntoh64(uint64_t val)
+{
+ if (__BYTE_ORDER == __BIG_ENDIAN) return (val);
+ else return __bswap_64(val);
+}
+
+
ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data)
{
char *maskp;
int ll = 0;
int frag = 0;
+ int blen;
+
+ wsh->body = wsh->bbuffer;
+ wsh->packetlen = 0;
again:
need = 2;
int fin = (wsh->buffer[0] >> 7) & 1;
int mask = (wsh->buffer[1] >> 7) & 1;
- if (fin) {
- if (*oc == WSOC_CONTINUATION) {
- frag = 1;
- } else {
- frag = 0;
- }
+
+ if (!fin && *oc != WSOC_CONTINUATION) {
+ frag = 1;
+ } else if (fin && *oc == WSOC_CONTINUATION) {
+ frag = 0;
}
if (mask) {
wsh->plen = wsh->buffer[1] & 0x7f;
wsh->payload = &wsh->buffer[2];
-
+
if (wsh->plen == 127) {
uint64_t *u64;
+ int more = 0;
need += 8;
if (need > wsh->datalen) {
/* too small - protocol err */
- *oc = WSOC_CLOSE;
- return ws_close(wsh, WS_PROTO_ERR);
- }
+ //*oc = WSOC_CLOSE;
+ //return ws_close(wsh, WS_PROTO_ERR);
- u64 = (uint64_t *) wsh->payload;
- wsh->payload += 8;
+ more = ws_raw_read(wsh, wsh->buffer + wsh->datalen, need - wsh->datalen, WS_BLOCK);
+
+ if (more < need - wsh->datalen) {
+ *oc = WSOC_CLOSE;
+ return ws_close(wsh, WS_PROTO_ERR);
+ } else {
+ wsh->datalen += more;
+ }
- wsh->plen = ntohl((u_long)*u64);
+ }
+
+ u64 = (uint64_t *) wsh->payload;
+ wsh->payload += 8;
+ wsh->plen = ntoh64(*u64);
} else if (wsh->plen == 126) {
uint16_t *u16;
return ws_close(wsh, WS_PROTO_ERR);
}
- if ((need + wsh->datalen) > (ssize_t)wsh->buflen) {
- /* too big - Ain't nobody got time fo' dat */
- *oc = WSOC_CLOSE;
- return ws_close(wsh, WS_DATA_TOO_BIG);
+ blen = wsh->body - wsh->bbuffer;
+
+ if (need + blen > (ssize_t)wsh->bbuflen) {
+ void *tmp;
+
+ wsh->bbuflen = need + blen + wsh->rplen;
+
+ if ((tmp = realloc(wsh->bbuffer, wsh->bbuflen))) {
+ wsh->bbuffer = tmp;
+ } else {
+ abort();
+ }
+
+ wsh->body = wsh->bbuffer + blen;
}
wsh->rplen = wsh->plen - need;
-
+
+ if (wsh->rplen) {
+ memcpy(wsh->body, wsh->payload, wsh->rplen);
+ }
+
while(need) {
- ssize_t r = ws_raw_read(wsh, wsh->payload + wsh->rplen, need, WS_BLOCK);
+ ssize_t r = ws_raw_read(wsh, wsh->body + wsh->rplen, need, WS_BLOCK);
if (r < 1) {
/* invalid read - protocol err .. */
ssize_t i;
for (i = 0; i < wsh->datalen; i++) {
- wsh->payload[i] ^= maskp[i % 4];
+ wsh->body[i] ^= maskp[i % 4];
}
}
if (*oc == WSOC_PING) {
- ws_write_frame(wsh, WSOC_PONG, wsh->payload, wsh->rplen);
+ ws_write_frame(wsh, WSOC_PONG, wsh->body, wsh->rplen);
goto again;
}
+ *(wsh->body+wsh->rplen) = '\0';
+ wsh->packetlen += wsh->rplen;
+ wsh->body += wsh->rplen;
+
if (frag) {
goto again;
}
-
-
- *(wsh->payload+wsh->rplen) = '\0';
- *data = (uint8_t *)wsh->payload;
- //printf("READ[%ld][%d]-----------------------------:\n[%s]\n-------------------------------\n", wsh->rplen, *oc, (char *)*data);
+ *data = (uint8_t *)wsh->bbuffer;
+
+ //printf("READ[%ld][%d]-----------------------------:\n[%s]\n-------------------------------\n", wsh->packetlen, *oc, (char *)*data);
- return wsh->rplen;
+ return wsh->packetlen;
}
break;
default:
}
}
-ssize_t ws_feed_buf(wsh_t *wsh, void *data, size_t bytes)
-{
-
- if (bytes + wsh->wdatalen > wsh->buflen) {
- return -1;
- }
-
- memcpy(wsh->wbuffer + wsh->wdatalen, data, bytes);
-
- wsh->wdatalen += bytes;
-
- return bytes;
-}
-
-ssize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc)
-{
- ssize_t r = 0;
-
- if (!wsh->wdatalen) {
- return -1;
- }
-
- r = ws_write_frame(wsh, oc, wsh->wbuffer, wsh->wdatalen);
-
- wsh->wdatalen = 0;
-
- return r;
-}
-
-
ssize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes)
{
uint8_t hdr[14] = { 0 };
hlen += 8;
u64 = (uint64_t *) &hdr[2];
- *u64 = htonl(bytes);
+ *u64 = hton64(bytes);
}
if (wsh->write_buffer_len < (hlen + bytes + 1)) {