#include "ws.h"
#include "easyif.h"
#include "transfer.h"
+#include "select.h"
#include "nonblock.h"
/* The last 3 #include files should be in this order */
}
}
+static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws,
+ const char *buffer, size_t buflen);
+
typedef ssize_t ws_write_payload(const unsigned char *buf, size_t buflen,
int frame_age, int frame_flags,
curl_off_t payload_offset,
}
}
#endif
- DEBUGF(infof(data, "WS, using chunk size %zu", chunk_size));
+ CURL_TRC_WS(data, "WS, using chunk size %zu", chunk_size);
Curl_bufq_init2(&ws->recvbuf, chunk_size, WS_CHUNK_COUNT,
BUFQ_OPT_SOFT_LIMIT);
Curl_bufq_init2(&ws->sendbuf, chunk_size, WS_CHUNK_COUNT,
infof(data, "connection expectedly closed?");
return CURLE_GOT_NOTHING;
}
- DEBUGF(infof(data, "curl_ws_recv, added %zu bytes from network",
- Curl_bufq_len(&ws->recvbuf)));
+ CURL_TRC_WS(data, "curl_ws_recv, added %zu bytes from network",
+ Curl_bufq_len(&ws->recvbuf));
}
result = ws_dec_pass(&ws->dec, data, &ws->recvbuf,
ctx.payload_len, ctx.bufidx);
*metap = &ws->frame;
*nread = ws->frame.len;
- /* infof(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
- CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)",
- buflen, *nread, ws->frame.offset, ws->frame.bytesleft); */
+ CURL_TRC_WS(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
+ CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)",
+ buflen, *nread, ws->frame.offset, ws->frame.bytesleft);
return CURLE_OK;
}
static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
- bool complete)
+ bool blocking)
{
if(!Curl_bufq_is_empty(&ws->sendbuf)) {
CURLcode result;
size_t outlen, n;
while(Curl_bufq_peek(&ws->sendbuf, &out, &outlen)) {
- if(data->set.connect_only)
+ if(blocking) {
+ result = ws_send_raw_blocking(data, ws, (char *)out, outlen);
+ n = result? 0 : outlen;
+ }
+ else if(data->set.connect_only || Curl_is_in_callback(data))
result = Curl_senddata(data, out, outlen, &n);
else {
result = Curl_xfer_send(data, out, outlen, FALSE, &n);
result = CURLE_AGAIN;
}
- if(result) {
- if(result == CURLE_AGAIN) {
- if(!complete) {
- infof(data, "WS: flush EAGAIN, %zu bytes remain in buffer",
- Curl_bufq_len(&ws->sendbuf));
- return result;
- }
- /* TODO: the current design does not allow for buffered writes.
- * We need to flush the buffer now. There is no ws_flush() later */
- n = 0;
- continue;
- }
- else if(result) {
- failf(data, "WS: flush, write error %d", result);
- return result;
- }
+ if(result == CURLE_AGAIN) {
+ CURL_TRC_WS(data, "flush EAGAIN, %zu bytes remain in buffer",
+ Curl_bufq_len(&ws->sendbuf));
+ return result;
+ }
+ else if(result) {
+ failf(data, "WS: flush, write error %d", result);
+ return result;
}
else {
infof(data, "WS: flushed %zu bytes", n);
return CURLE_OK;
}
+static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws,
+ const char *buffer, size_t buflen)
+{
+ CURLcode result = CURLE_OK;
+ size_t nwritten;
+
+ (void)ws;
+ while(buflen) {
+ result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten);
+ if(result)
+ return result;
+ DEBUGASSERT(nwritten <= buflen);
+ buffer += nwritten;
+ buflen -= nwritten;
+ if(buflen) {
+ curl_socket_t sock = data->conn->sock[FIRSTSOCKET];
+ timediff_t left_ms;
+ int ev;
+
+ CURL_TRC_WS(data, "ws_send_raw_blocking() partial, %zu left to send",
+ buflen);
+ left_ms = Curl_timeleft(data, NULL, FALSE);
+ if(left_ms < 0) {
+ failf(data, "Timeout waiting for socket becoming writable");
+ return CURLE_SEND_ERROR;
+ }
+
+ /* POLLOUT socket */
+ if(sock == CURL_SOCKET_BAD)
+ return CURLE_SEND_ERROR;
+ ev = Curl_socket_check(CURL_SOCKET_BAD, CURL_SOCKET_BAD, sock,
+ left_ms? left_ms : 500);
+ if(ev < 0) {
+ failf(data, "Error while waiting for socket becoming writable");
+ return CURLE_SEND_ERROR;
+ }
+ }
+ }
+ return result;
+}
+
+static CURLcode ws_send_raw(CURL *data, const void *buffer,
+ size_t buflen, size_t *pnwritten)
+{
+ struct websocket *ws = data->conn->proto.ws;
+ CURLcode result;
+
+ if(!ws) {
+ failf(data, "Not a websocket transfer");
+ return CURLE_SEND_ERROR;
+ }
+ if(!buflen)
+ return CURLE_OK;
+
+ if(Curl_is_in_callback(data)) {
+ /* When invoked from inside callbacks, we do a blocking send as the
+ * callback will probably not implement partial writes that may then
+ * mess up the ws framing subsequently.
+ * We need any pending data to be flushed before sending. */
+ result = ws_flush(data, ws, TRUE);
+ if(result)
+ return result;
+ result = ws_send_raw_blocking(data, ws, buffer, buflen);
+ }
+ else {
+ /* We need any pending data to be sent or EAGAIN this call. */
+ result = ws_flush(data, ws, FALSE);
+ if(result)
+ return result;
+ result = Curl_senddata(data, buffer, buflen, pnwritten);
+ }
+
+ CURL_TRC_WS(data, "ws_send_raw(len=%zu) -> %d, %zu",
+ buflen, result, *pnwritten);
+ return result;
+}
+
CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
size_t buflen, size_t *sent,
curl_off_t fragsize,
{
struct websocket *ws;
ssize_t n;
- size_t nwritten, space;
+ size_t space, payload_added;
CURLcode result;
+ CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T
+ ", flags=%x), raw=%d",
+ buflen, fragsize, flags, data->set.ws_raw_mode);
*sent = 0;
if(!data->conn && data->set.connect_only) {
result = Curl_connect_only_attach(data);
if(result)
- return result;
+ goto out;
}
if(!data->conn) {
failf(data, "No associated connection");
- return CURLE_SEND_ERROR;
+ result = CURLE_SEND_ERROR;
+ goto out;
}
if(!data->conn->proto.ws) {
failf(data, "Not a websocket transfer");
- return CURLE_SEND_ERROR;
+ result = CURLE_SEND_ERROR;
+ goto out;
}
ws = data->conn->proto.ws;
+ /* try flushing any content still waiting to be sent. */
+ result = ws_flush(data, ws, FALSE);
+ if(result)
+ goto out;
+
if(data->set.ws_raw_mode) {
+ /* In raw mode, we write directly to the connection */
if(fragsize || flags) {
- DEBUGF(infof(data, "ws_send: "
- "fragsize and flags cannot be non-zero in raw mode"));
+ failf(data, "ws_send, raw mode: fragsize and flags cannot be non-zero");
return CURLE_BAD_FUNCTION_ARGUMENT;
}
- if(!buflen)
- /* nothing to do */
- return CURLE_OK;
- /* raw mode sends exactly what was requested, and this is from within
- the write callback */
- if(Curl_is_in_callback(data)) {
- result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten);
- }
- else
- result = Curl_senddata(data, buffer, buflen, &nwritten);
-
- infof(data, "WS: wanted to send %zu bytes, sent %zu bytes",
- buflen, nwritten);
- *sent = nwritten;
- return result;
+ result = ws_send_raw(data, buffer, buflen, sent);
+ goto out;
}
/* Not RAW mode, buf we do the frame encoding */
- result = ws_flush(data, ws, FALSE);
- if(result)
- return result;
-
- /* TODO: the current design does not allow partial writes, afaict.
- * It is not clear how the application is supposed to react. */
space = Curl_bufq_space(&ws->sendbuf);
- DEBUGF(infof(data, "curl_ws_send(len=%zu), sendbuf len=%zu space %zu",
- buflen, Curl_bufq_len(&ws->sendbuf), space));
- if(space < 14)
- return CURLE_AGAIN;
+ CURL_TRC_WS(data, "curl_ws_send(len=%zu), sendbuf=%zu space_left=%zu",
+ buflen, Curl_bufq_len(&ws->sendbuf), space);
+ if(space < 14) {
+ result = CURLE_AGAIN;
+ goto out;
+ }
if(flags & CURLWS_OFFSET) {
if(fragsize) {
n = ws_enc_write_head(data, &ws->enc, flags, fragsize,
&ws->sendbuf, &result);
if(n < 0)
- return result;
+ goto out;
}
else {
if((curl_off_t)buflen > ws->enc.payload_remain) {
n = ws_enc_write_head(data, &ws->enc, flags, (curl_off_t)buflen,
&ws->sendbuf, &result);
if(n < 0)
- return result;
+ goto out;
}
n = ws_enc_write_payload(&ws->enc, data,
buffer, buflen, &ws->sendbuf, &result);
if(n < 0)
- return result;
+ goto out;
+ payload_added = (size_t)n;
+
+ while(!result && (buflen || !Curl_bufq_is_empty(&ws->sendbuf))) {
+ /* flush, blocking when in callback */
+ result = ws_flush(data, ws, Curl_is_in_callback(data));
+ if(!result) {
+ DEBUGASSERT(payload_added <= buflen);
+ /* all buffered data sent. Try sending the rest if there is any. */
+ *sent += payload_added;
+ buffer = (const char *)buffer + payload_added;
+ buflen -= payload_added;
+ payload_added = 0;
+ if(buflen) {
+ n = ws_enc_write_payload(&ws->enc, data,
+ buffer, buflen, &ws->sendbuf, &result);
+ if(n < 0)
+ goto out;
+ payload_added = Curl_bufq_len(&ws->sendbuf);
+ }
+ }
+ else if(result == CURLE_AGAIN) {
+ /* partially sent. how much of the call data has been part of it? what
+ * should we report to out caller so it can retry/send the rest? */
+ if(payload_added < buflen) {
+ /* We did not add everything the caller wanted. Return just
+ * the partial write to our buffer. */
+ *sent = payload_added;
+ result = CURLE_OK;
+ goto out;
+ }
+ else if(!buflen) {
+ /* We have no payload to report a partial write. EAGAIN would make
+ * the caller repeat this and add the frame again.
+ * Flush blocking seems the only way out of this. */
+ *sent = (size_t)n;
+ result = ws_flush(data, ws, TRUE);
+ goto out;
+ }
+ /* We added the complete data to our sendbuf. Report one byte less as
+ * sent. This parital success should make the caller invoke us again
+ * with the last byte. */
+ *sent = payload_added - 1;
+ result = Curl_bufq_unwrite(&ws->sendbuf, 1);
+ if(!result)
+ result = CURLE_AGAIN;
+ }
+ }
- *sent = (size_t)n;
- return ws_flush(data, ws, TRUE);
+out:
+ CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T
+ ", flags=%x, raw=%d) -> %d, %zu",
+ buflen, fragsize, flags, data->set.ws_raw_mode, result, *sent);
+ return result;
}
static void ws_free(struct connectdata *conn)