worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
}
+ /* Since res isn't OK struct session wasn't allocated \ borrowed.
+ * We must release client handle only.
+ */
worker_iohandle_release(worker, client);
return;
}
+
+ /* struct session was allocated \ borrowed from memory pool. */
+ struct session *session = client->data;
+ assert(session->outgoing == false);
+
if (uv_accept(master, client) != 0) {
- uv_close((uv_handle_t *)client, io_release);
+ /* close session, close underlying uv handles and
+ * deallocate (or return to memory pool) memory. */
+ worker_session_close(session);
return;
}
/* Set deadlines for TCP connection and start reading.
* It will re-check every half of a request time limit if the connection
* is idle and should be terminated, this is an educated guess. */
- struct session *session = client->data;
- assert(session->outgoing == false);
struct sockaddr *addr = &(session->peer.ip);
int addr_len = sizeof(union inaddr);
int ret = uv_tcp_getpeername((uv_tcp_t *)client, addr, &addr_len);
if (ret || addr->sa_family == AF_UNSPEC) {
- worker_iohandle_release(worker, client);
+ /* close session, close underlying uv handles and
+ * deallocate (or return to memory pool) memory. */
worker_session_close(session);
return;
}
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family)
{
- int ret = 0;
+ int ret = -1;
if (type == SOCK_DGRAM) {
ret = uv_udp_init(loop, (uv_udp_t *)handle);
} else if (type == SOCK_STREAM) {
> 0: number of bytes written (can be less than the supplied buffer size).
< 0: negative error code (UV_EAGAIN is returned if no data can be sent immediately).
*/
- if (ret != UV_EAGAIN) {
- /* Either we have successful write here or
- * error code other then UV_EAGAIN.
+ if ((ret == total_len) || (ret < 0 && ret != UV_EAGAIN)) {
+ /* Either all the data were buffered by libuv or
+ * uv_try_write() has returned error code other then UV_EAGAIN.
* Return. */
return ret;
}
+ /* Since we are here expression below is true
+ * (ret != total_len) && (ret >= 0 || ret == UV_EAGAIN)
+ * or the same
+ * (ret != total_len && ret >= 0) || (ret != total_len && ret == UV_EAGAIN)
+ * i.e. either occurs partial write or UV_EAGAIN.
+ * Proceed and copy data amount to owned memory and perform async write.
+ */
+ if (ret == UV_EAGAIN) {
+ /* No data were buffered, so we must buffer all the data. */
+ ret = 0;
+ }
}
/* Fallback when the queue is full, and it's not possible to do an immediate write */
- char *buf = malloc(total_len);
+ char *buf = malloc(total_len - ret);
if (buf != NULL) {
+ /* Skip data written in the partial write */
+ int to_skip = ret;
/* Copy the buffer into owned memory */
size_t off = 0;
for (int i = 0; i < iovcnt; ++i) {
+ if (to_skip > 0) {
+ /* Ignore current buffer if it's all skipped */
+ if (to_skip >= uv_buf[i].len) {
+ to_skip -= uv_buf[i].len;
+ continue;
+ }
+ /* Skip only part of the buffer */
+ uv_buf[i].base += to_skip;
+ uv_buf[i].len -= to_skip;
+ to_skip = 0;
+ }
memcpy(buf + off, uv_buf[i].base, uv_buf[i].len);
off += uv_buf[i].len;
}
uv_buf[0].base = buf;
- uv_buf[0].len = total_len;
+ uv_buf[0].len = off;
/* Create an asynchronous write request */
uv_write_t *write_req = calloc(1, sizeof(uv_write_t));