* 'streamdns_send_req_t' object.
*
* To understand how sending is done, start by looking at
- * 'isc__nm_async_streamdnssend()'. Additionally also take a look at
+ * 'isc__nm_streamdns_send()'. Additionally also take a look at
* 'streamdns_get_send_req()' and 'streamdns_put_send_req()' which are
* responsible for send requests allocation/reuse and initialisation.
*
void *cbarg) {
isc_nmsocket_t *sock = NULL;
bool closing = false;
- bool worker_thread;
REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_streamdnssocket);
REQUIRE(sock->recv_handle == NULL);
+ REQUIRE(sock->tid == isc_tid());
closing = streamdns_closing(sock);
- worker_thread = sock->tid == isc_tid();
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
* asynchronous as we just want to start reading from the
* underlying transport.
*/
- if (worker_thread && !closing &&
- isc_dnsstream_assembler_result(sock->streamdns.input) ==
- ISC_R_UNSET)
+ if (!closing && isc_dnsstream_assembler_result(sock->streamdns.input) ==
+ ISC_R_UNSET)
{
isc__netievent_streamdnsread_t event = { .sock = sock };
isc__nm_async_streamdnsread(sock->worker,
(isc__netievent_t *)&event);
- } else {
- isc__netievent_streamdnsread_t *ievent = NULL;
- /*
- * We want the read operation to be asynchronous in most cases
- * because:
- *
- * 1. A read operation might be initiated from within the read
- * callback itself.
- *
- * 2. Due to the above, we need to make the operation
- * asynchronous to keep the socket state consistent.
- */
- ievent = isc__nm_get_netievent_streamdnsread(sock->worker,
- sock);
- isc__nm_enqueue_ievent(sock->worker,
- (isc__netievent_t *)ievent);
- }
-}
-
-void
-isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
- isc__netievent_streamdnssend_t *ievent =
- (isc__netievent_streamdnssend_t *)ev0;
- isc_nmsocket_t *sock = ievent->sock;
- isc__nm_uvreq_t *req = ievent->req;
- streamdns_send_req_t *send_req;
- isc_mem_t *mctx;
- isc_region_t data = { 0 };
-
- REQUIRE(VALID_UVREQ(req));
- REQUIRE(sock->tid == isc_tid());
-
- UNUSED(worker);
-
- ievent->req = NULL;
-
- if (streamdns_closing(sock)) {
- isc__nm_failed_send_cb(sock, req, ISC_R_CANCELED, true);
return;
}
- mctx = sock->worker->mctx;
-
- send_req = streamdns_get_send_req(sock, mctx, req);
- data.base = (unsigned char *)req->uvbuf.base;
- data.length = req->uvbuf.len;
- isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb,
- (void *)send_req);
-
- isc__nm_uvreq_put(&req, sock);
- return;
+ /*
+ * We want the read operation to be asynchronous in most cases
+ * because:
+ *
+ * 1. A read operation might be initiated from within the read
+ * callback itself.
+ *
+ * 2. Due to the above, we need to make the operation
+ * asynchronous to keep the socket state consistent.
+ */
+ isc__netievent_streamdnsread_t *ievent =
+ isc__nm_get_netievent_streamdnsread(sock->worker, sock);
+ isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc_nm_cb_t cb, void *cbarg) {
isc__nm_uvreq_t *uvreq = NULL;
isc_nmsocket_t *sock = NULL;
+ streamdns_send_req_t *send_req;
+ isc_mem_t *mctx;
+ isc_region_t data = { 0 };
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
sock = handle->sock;
REQUIRE(sock->type == isc_nm_streamdnssocket);
+ REQUIRE(sock->tid == isc_tid());
uvreq = isc__nm_uvreq_get(sock->worker, sock);
isc_nmhandle_attach(handle, &uvreq->handle);
uvreq->uvbuf.base = (char *)region->base;
uvreq->uvbuf.len = region->length;
+ if (streamdns_closing(sock)) {
+ isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED, true);
+ return;
+ }
+
/*
* As when sending, we, basically, handing data to the underlying
* transport, we can treat the operation synchronously, as the
* transport code will take care of the asynchronicity if required.
*/
- if (sock->tid == isc_tid()) {
- isc__netievent_streamdnssend_t event = { .sock = sock,
- .req = uvreq };
- isc__nm_async_streamdnssend(sock->worker,
- (isc__netievent_t *)&event);
- } else {
- isc__netievent_streamdnssend_t *ievent =
- isc__nm_get_netievent_streamdnssend(sock->worker, sock,
- uvreq);
- isc__nm_enqueue_ievent(sock->worker,
- (isc__netievent_t *)ievent);
- }
+ mctx = sock->worker->mctx;
+ send_req = streamdns_get_send_req(sock, mctx, uvreq);
+ data.base = (unsigned char *)uvreq->uvbuf.base;
+ data.length = uvreq->uvbuf.len;
+ isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb,
+ (void *)send_req);
+
+ isc__nm_uvreq_put(&uvreq, sock);
}
static void
atomic_store(&sock->active, false);
}
-void
-isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) {
- isc__netievent_streamdnsclose_t *ievent =
- (isc__netievent_streamdnsclose_t *)ev0;
- isc_nmsocket_t *sock = ievent->sock;
-
- UNUSED(worker);
-
- streamdns_close_direct(sock);
-}
-
void
isc__nm_streamdns_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_streamdnssocket);
+ REQUIRE(sock->tid == isc_tid());
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
return;
}
- if (sock->tid == isc_tid()) {
- streamdns_close_direct(sock);
- } else {
- isc__netievent_streamdnsclose_t *ievent =
- isc__nm_get_netievent_streamdnsclose(sock->worker,
- sock);
- isc__nm_enqueue_ievent(sock->worker,
- (isc__netievent_t *)ievent);
- }
+ streamdns_close_direct(sock);
}
void