return;
}
- union session_or_handle out = { 0 };
- int res = io_create(master->loop, &out, SOCK_STREAM, AF_UNSPEC, grp,
- NULL, 0, false);
- if (res) {
- if (res == UV_EMFILE) {
- the_worker->too_many_open = true;
- the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent;
- }
- if (out.handle != NULL) {
+ uv_handle_t *client = { 0 };
+ if (io_create(master->loop, &client, SOCK_STREAM, AF_UNSPEC)) {
+ return;
+ }
+
+ struct session2 *s = session2_new_io(client, grp, NULL, 0, false);
+ int res = uv_accept(master, (uv_stream_t *)client);
+ if (s == NULL || res) {
+ if (s == NULL) {
/* Since res isn't OK struct session wasn't
* allocated \ borrowed. We must release client handle
* only. But first accept the connection, as it has
* already been established by the kernel and
- * it is required for proper termination.
- */
- if (uv_accept(master, (uv_stream_t *)out.handle) == 0) {
- uv_close(out.handle, (uv_close_cb)free);
- }
+ * it is required for proper termination. */
+ uv_close(client, (uv_close_cb)free);
+ } else {
+ /* close session, close underlying uv handles and
+ * deallocate (or return to memory pool) memory. */
+ session2_close(s);
}
return;
}
- struct session2 *s = out.session;
- kr_require(s->outgoing == false);
-
- uv_tcp_t *client = (uv_tcp_t *)session2_get_handle(s);
- if (uv_accept(master, (uv_stream_t *)client) != 0) {
- /* close session, close underlying uv handles and
- * deallocate (or return to memory pool) memory. */
- session2_close(s);
- return;
- }
-
/* Get peer's and our address. We apparently get specific sockname here
* even if we listened on a wildcard address. */
struct sockaddr *sa = session2_get_peer(s);
int sa_len = sizeof(struct sockaddr_in6);
- int ret = uv_tcp_getpeername(client, sa, &sa_len);
+ int ret = uv_tcp_getpeername((uv_tcp_t *)client, sa, &sa_len);
if (ret || sa->sa_family == AF_UNSPEC) {
session2_close(s);
return;
}
sa = session2_get_sockname(s);
sa_len = sizeof(struct sockaddr_in6);
- ret = uv_tcp_getsockname(client, sa, &sa_len);
+ ret = uv_tcp_getsockname((uv_tcp_t *)client, sa, &sa_len);
if (ret || sa->sa_family == AF_UNSPEC) {
session2_close(s);
return;
session2_event(s, PROTOLAYER_EVENT_CONNECT, NULL);
session2_timer_start(s, PROTOLAYER_EVENT_GENERAL_TIMEOUT,
timeout, idle_in_timeout);
- io_start_read((uv_handle_t *)client);
+ io_start_read(client);
}
static void tcp_accept(uv_stream_t *master, int status)
}
#endif
-int io_create(uv_loop_t *loop, union session_or_handle *out,
- int type, unsigned family, enum kr_proto grp,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count, bool outgoing)
+int io_create(uv_loop_t *loop, uv_handle_t **handle,
+ int type, unsigned family)
{
- out->session = NULL;
+ *handle = NULL;
int ret = -1;
- uv_handle_t *handle;
if (type == SOCK_DGRAM) {
uv_udp_t *udp = malloc(sizeof(uv_udp_t));
kr_require(udp);
ret = uv_udp_init(loop, udp);
- handle = (uv_handle_t *)udp;
+ *handle = (uv_handle_t *)udp;
} else if (type == SOCK_STREAM) {
uv_tcp_t *tcp = malloc(sizeof(uv_tcp_t));
kr_require(tcp);
ret = uv_tcp_init_ex(loop, tcp, family);
uv_tcp_nodelay(tcp, 1);
- handle = (uv_handle_t *)tcp;
+ *handle = (uv_handle_t *)tcp;
} else {
kr_require(false && "io_create: invalid socket type");
}
- if (ret != 0) {
- return ret;
+ if (ret != 0 && *handle) {
+ free(*handle);
}
- struct session2 *s = session2_new_io(handle, grp, layer_param,
- layer_param_count, outgoing);
- if (unlikely(s == NULL)) {
- out->handle = handle;
- return -1;
+ if (ret == UV_EMFILE) {
+ the_worker->too_many_open = true;
+ the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent;
}
- out->session = s;
return ret;
}
struct tls_ctx;
struct tls_client_ctx;
struct io_stream_data;
-/* union used for io_create. handle will be used only if session init fails,
- * allowing us to terminated the uv_handle gracefully */
-union session_or_handle {
- struct session2 *session;
- uv_handle_t *handle;
-};
/** Bind address into a file-descriptor (only, no libuv). type is e.g. SOCK_DGRAM */
int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags);
void tcp_timeout_trigger(uv_timer_t *timer);
-/** Initialize the handle, incl. ->data = struct session * instance.
+/** Initialize the handle
* \param type = SOCK_*
* \param family = AF_*
* \param has_tls has meanings only when type is SOCK_STREAM */
-int io_create(uv_loop_t *loop, union session_or_handle *out, int type,
- unsigned family, enum kr_proto grp,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count, bool outgoing);
+int io_create(uv_loop_t *loop, uv_handle_t **handle, int type, unsigned family);
void io_free(uv_handle_t *handle);
int io_start_read(uv_handle_t *handle);
}
/* Create connection for iterative query */
- union session_or_handle out;
- int ret = io_create(the_worker->loop, &out, socktype, family, grp,
- layer_param, layer_param_count, true);
- if (ret) {
- if (ret == UV_EMFILE) {
- the_worker->too_many_open = true;
- the_worker->rconcurrent_highwatermark = the_worker->stats.rconcurrent;
- }
+ uv_handle_t *handle = { 0 };
+ if (io_create(the_worker->loop, &handle, socktype, family)) {
+ return NULL;
+ }
+ struct session2 *s = session2_new_io(handle, grp, layer_param,
+ layer_param_count, true);
+ if (!s) {
return NULL;
}
- struct session2 *s = out.session;
/* Bind to outgoing address, according to IP v4/v6. */
union kr_sockaddr *addr;
} else {
addr = (union kr_sockaddr *)&the_worker->out_addr6;
}
+ int ret = 0;
if (addr->ip.sa_family != AF_UNSPEC) {
if (kr_fails_assert(addr->ip.sa_family == family)) {
session2_force_close(s);