p_ancil("receive_udp on interface", &rep);
#endif /* S_SPLINT_S */
- if(rep.c->pp2_enabled && !consume_pp2_header(rep.c->buffer,
- &rep, 0)) {
- log_err("proxy_protocol: could not consume PROXYv2 header");
- return;
- }
- if(!rep.is_proxied) {
- rep.client_addrlen = rep.remote_addrlen;
- memmove(&rep.client_addr, &rep.remote_addr,
- rep.remote_addrlen);
- }
+ if(rep.c->pp2_enabled && !consume_pp2_header(rep.c->buffer,
+ &rep, 0)) {
+ log_err("proxy_protocol: could not consume PROXYv2 header");
+ return;
+ }
+ if(!rep.is_proxied) {
+ rep.client_addrlen = rep.remote_addrlen;
+ memmove(&rep.client_addr, &rep.remote_addr,
+ rep.remote_addrlen);
+ }
+
+ fptr_ok(fptr_whitelist_comm_point(rep.c->callback));
+ if((*rep.c->callback)(rep.c, rep.c->cb_arg, NETEVENT_NOERROR, &rep)) {
+ /* send back immediate reply */
- (void)comm_point_send_udp_msg_if(rep.c, rep.c->buffer,
++ struct sldns_buffer *buffer;
++#ifdef USE_DNSCRYPT
++ buffer = rep.c->dnscrypt_buffer;
++#else
++ buffer = rep.c->buffer;
++#endif
++ (void)comm_point_send_udp_msg_if(rep.c, buffer,
+ (struct sockaddr*)&rep.remote_addr,
+ rep.remote_addrlen, &rep);
+ }
+ if(!rep.c || rep.c->fd == -1) /* commpoint closed */
+ break;
+ }
+}
+#endif /* AF_INET6 && IPV6_PKTINFO && HAVE_RECVMSG */
+
+void
+comm_point_udp_callback(int fd, short event, void* arg)
+{
+ struct comm_reply rep;
+ ssize_t rcv;
+ int i;
+ struct sldns_buffer *buffer;
+
+ rep.c = (struct comm_point*)arg;
+ log_assert(rep.c->type == comm_udp);
+
+ if(!(event&UB_EV_READ))
+ return;
+ log_assert(rep.c && rep.c->buffer && rep.c->fd == fd);
+ ub_comm_base_now(rep.c->ev->base);
+ for(i=0; i<NUM_UDP_PER_SELECT; i++) {
+ sldns_buffer_clear(rep.c->buffer);
+ rep.remote_addrlen = (socklen_t)sizeof(rep.remote_addr);
+ log_assert(fd != -1);
+ log_assert(sldns_buffer_remaining(rep.c->buffer) > 0);
+ rcv = recvfrom(fd, (void*)sldns_buffer_begin(rep.c->buffer),
+ sldns_buffer_remaining(rep.c->buffer), MSG_DONTWAIT,
+ (struct sockaddr*)&rep.remote_addr, &rep.remote_addrlen);
+ if(rcv == -1) {
+#ifndef USE_WINSOCK
+ if(errno != EAGAIN && errno != EINTR
+ && udp_recv_needs_log(errno))
+ log_err("recvfrom %d failed: %s",
+ fd, strerror(errno));
+#else
+ if(WSAGetLastError() != WSAEINPROGRESS &&
+ WSAGetLastError() != WSAECONNRESET &&
+ WSAGetLastError()!= WSAEWOULDBLOCK &&
+ udp_recv_needs_log(WSAGetLastError()))
+ log_err("recvfrom failed: %s",
+ wsa_strerror(WSAGetLastError()));
+#endif
+ return;
+ }
+ sldns_buffer_skip(rep.c->buffer, rcv);
+ sldns_buffer_flip(rep.c->buffer);
+ rep.srctype = 0;
+ rep.is_proxied = 0;
+
+ if(rep.c->pp2_enabled && !consume_pp2_header(rep.c->buffer,
+ &rep, 0)) {
+ log_err("proxy_protocol: could not consume PROXYv2 header");
+ return;
+ }
+ if(!rep.is_proxied) {
+ rep.client_addrlen = rep.remote_addrlen;
+ memmove(&rep.client_addr, &rep.remote_addr,
+ rep.remote_addrlen);
+ }
+
+ fptr_ok(fptr_whitelist_comm_point(rep.c->callback));
+ if((*rep.c->callback)(rep.c, rep.c->cb_arg, NETEVENT_NOERROR, &rep)) {
+ /* send back immediate reply */
+#ifdef USE_DNSCRYPT
+ buffer = rep.c->dnscrypt_buffer;
+#else
+ buffer = rep.c->buffer;
+#endif
+ (void)comm_point_send_udp_msg(rep.c, buffer,
+ (struct sockaddr*)&rep.remote_addr,
+ rep.remote_addrlen, 0);
+ }
+ if(!rep.c || rep.c->fd != fd) /* commpoint closed to -1 or reused for
+ another UDP port. Note rep.c cannot be reused with TCP fd. */
+ break;
+ }
+}
+
+#ifdef HAVE_NGTCP2
+void
+doq_pkt_addr_init(struct doq_pkt_addr* paddr)
+{
+ paddr->addrlen = (socklen_t)sizeof(paddr->addr);
+ paddr->localaddrlen = (socklen_t)sizeof(paddr->localaddr);
+ paddr->ifindex = 0;
+}
+
+/** set the ecn on the transmission */
+static void
+doq_set_ecn(int fd, int family, uint32_t ecn)
+{
+ unsigned int val = ecn;
+ if(family == AF_INET6) {
+ if(setsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &val,
+ (socklen_t)sizeof(val)) == -1) {
+ log_err("setsockopt(.. IPV6_TCLASS ..): %s",
+ strerror(errno));
+ }
+ return;
+ }
+ if(setsockopt(fd, IPPROTO_IP, IP_TOS, &val,
+ (socklen_t)sizeof(val)) == -1) {
+ log_err("setsockopt(.. IP_TOS ..): %s",
+ strerror(errno));
+ }
+}
+
+/** set the local address in the control ancillary data */
+static void
+doq_set_localaddr_cmsg(struct msghdr* msg, size_t control_size,
+ struct doq_addr_storage* localaddr, socklen_t localaddrlen,
+ int ifindex)
+{
+#ifndef S_SPLINT_S
+ struct cmsghdr* cmsg;
+#endif /* S_SPLINT_S */
+#ifndef S_SPLINT_S
+ cmsg = CMSG_FIRSTHDR(msg);
+ if(localaddr->sockaddr.in.sin_family == AF_INET) {
+#ifdef IP_PKTINFO
+ struct sockaddr_in* sa = (struct sockaddr_in*)localaddr;
+ struct in_pktinfo v4info;
+ log_assert(localaddrlen >= sizeof(struct sockaddr_in));
+ msg->msg_controllen = CMSG_SPACE(sizeof(struct in_pktinfo));
+ memset(msg->msg_control, 0, msg->msg_controllen);
+ log_assert(msg->msg_controllen <= control_size);
+ cmsg->cmsg_level = IPPROTO_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ memset(&v4info, 0, sizeof(v4info));
+# ifdef HAVE_STRUCT_IN_PKTINFO_IPI_SPEC_DST
+ memmove(&v4info.ipi_spec_dst, &sa->sin_addr,
+ sizeof(struct in_addr));
+# else
+ memmove(&v4info.ipi_addr, &sa->sin_addr,
+ sizeof(struct in_addr));
+# endif
+ v4info.ipi_ifindex = ifindex;
+ memmove(CMSG_DATA(cmsg), &v4info, sizeof(struct in_pktinfo));
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+#elif defined(IP_SENDSRCADDR)
+ struct sockaddr_in* sa= (struct sockaddr_in*)localaddr;
+ log_assert(localaddrlen >= sizeof(struct sockaddr_in));
+ msg->msg_controllen = CMSG_SPACE(sizeof(struct in_addr));
+ memset(msg->msg_control, 0, msg->msg_controllen);
+ log_assert(msg->msg_controllen <= control_size);
+ cmsg->cmsg_level = IPPROTO_IP;
+ cmsg->cmsg_type = IP_SENDSRCADDR;
+ memmove(CMSG_DATA(cmsg), &sa->sin_addr,
+ sizeof(struct in_addr));
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_addr));
+#endif
+ } else {
+ struct sockaddr_in6* sa6 = (struct sockaddr_in6*)localaddr;
+ struct in6_pktinfo v6info;
+ log_assert(localaddrlen >= sizeof(struct sockaddr_in6));
+ msg->msg_controllen = CMSG_SPACE(sizeof(struct in6_pktinfo));
+ memset(msg->msg_control, 0, msg->msg_controllen);
+ log_assert(msg->msg_controllen <= control_size);
+ cmsg->cmsg_level = IPPROTO_IPV6;
+ cmsg->cmsg_type = IPV6_PKTINFO;
+ memset(&v6info, 0, sizeof(v6info));
+ memmove(&v6info.ipi6_addr, &sa6->sin6_addr,
+ sizeof(struct in6_addr));
+ v6info.ipi6_ifindex = ifindex;
+ memmove(CMSG_DATA(cmsg), &v6info, sizeof(struct in6_pktinfo));
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo));
+ }
+#endif /* S_SPLINT_S */
+ /* Ignore unused variables, if no assertions are compiled. */
+ (void)localaddrlen;
+ (void)control_size;
+}
+
+/** write address and port into strings */
+static int
+doq_print_addr_port(struct doq_addr_storage* addr, socklen_t addrlen,
+ char* host, size_t hostlen, char* port, size_t portlen)
+{
+ if(addr->sockaddr.in.sin_family == AF_INET) {
+ struct sockaddr_in* sa = (struct sockaddr_in*)addr;
+ log_assert(addrlen >= sizeof(*sa));
+ if(inet_ntop(sa->sin_family, &sa->sin_addr, host,
+ (socklen_t)hostlen) == 0) {
+ log_err("doq_send_retry failed: inet_ntop error");
+ log_hex("inet ntop address", &sa->sin_addr,
+ sizeof(sa->sin_addr));
+ return 0;
+ }
+ snprintf(port, portlen, "%u", (unsigned)ntohs(sa->sin_port));
+ } else if(addr->sockaddr.in.sin_family == AF_INET6) {
+ struct sockaddr_in6* sa6 = (struct sockaddr_in6*)addr;
+ log_assert(addrlen >= sizeof(*sa6));
+ if(inet_ntop(sa6->sin6_family, &sa6->sin6_addr, host,
+ (socklen_t)hostlen) == 0) {
+ log_err("doq_send_retry failed: inet_ntop error");
+ log_hex("inet ntop address", &sa6->sin6_addr,
+ sizeof(sa6->sin6_addr));
+ return 0;
+ }
+ snprintf(port, portlen, "%u", (unsigned)ntohs(sa6->sin6_port));
+ }
+ return 1;
+}
+
+/** doq store the blocked packet when write has blocked */
+static void
+doq_store_blocked_pkt(struct comm_point* c, struct doq_pkt_addr* paddr,
+ uint32_t ecn)
+{
+ if(c->doq_socket->have_blocked_pkt)
+ return; /* should not happen that we write when there is
+ already a blocked write, but if so, drop it. */
+ if(sldns_buffer_limit(c->doq_socket->pkt_buf) >
+ sldns_buffer_capacity(c->doq_socket->blocked_pkt))
+ return; /* impossibly large, drop packet. impossible because
+ pkt_buf and blocked_pkt are the same size. */
+ c->doq_socket->have_blocked_pkt = 1;
+ c->doq_socket->blocked_pkt_pi.ecn = ecn;
+ memcpy(c->doq_socket->blocked_paddr, paddr,
+ sizeof(*c->doq_socket->blocked_paddr));
+ sldns_buffer_clear(c->doq_socket->blocked_pkt);
+ sldns_buffer_write(c->doq_socket->blocked_pkt,
+ sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_limit(c->doq_socket->pkt_buf));
+ sldns_buffer_flip(c->doq_socket->blocked_pkt);
+}
+
+void
+doq_send_pkt(struct comm_point* c, struct doq_pkt_addr* paddr, uint32_t ecn)
+{
+ struct msghdr msg;
+ struct iovec iov[1];
+ union {
+ struct cmsghdr hdr;
+ char buf[256];
+ } control;
+ ssize_t ret;
+ iov[0].iov_base = sldns_buffer_begin(c->doq_socket->pkt_buf);
+ iov[0].iov_len = sldns_buffer_limit(c->doq_socket->pkt_buf);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = (void*)&paddr->addr;
+ msg.msg_namelen = paddr->addrlen;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+ msg.msg_control = control.buf;
+#ifndef S_SPLINT_S
+ msg.msg_controllen = sizeof(control.buf);
+#endif /* S_SPLINT_S */
+ msg.msg_flags = 0;
+
+ doq_set_localaddr_cmsg(&msg, sizeof(control.buf), &paddr->localaddr,
+ paddr->localaddrlen, paddr->ifindex);
+ doq_set_ecn(c->fd, paddr->addr.sockaddr.in.sin_family, ecn);
+
+ for(;;) {
+ ret = sendmsg(c->fd, &msg, MSG_DONTWAIT);
+ if(ret == -1 && errno == EINTR)
+ continue;
+ break;
+ }
+ if(ret == -1) {
+#ifndef USE_WINSOCK
+ if(errno == EAGAIN ||
+# ifdef EWOULDBLOCK
+ errno == EWOULDBLOCK ||
+# endif
+ errno == ENOBUFS)
+#else
+ if(WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAENOBUFS ||
+ WSAGetLastError() == WSAEWOULDBLOCK)
+#endif
+ {
+ /* udp send has blocked */
+ doq_store_blocked_pkt(c, paddr, ecn);
+ return;
+ }
+ if(!udp_send_errno_needs_log((void*)&paddr->addr,
+ paddr->addrlen))
+ return;
+ if(verbosity >= VERB_OPS) {
+ char host[256], port[32];
+ if(doq_print_addr_port(&paddr->addr, paddr->addrlen,
+ host, sizeof(host), port, sizeof(port))) {
+ verbose(VERB_OPS, "doq sendmsg to %s %s "
+ "failed: %s", host, port,
+ strerror(errno));
+ } else {
+ verbose(VERB_OPS, "doq sendmsg failed: %s",
+ strerror(errno));
+ }
+ }
+ return;
+ } else if(ret != (ssize_t)sldns_buffer_limit(c->doq_socket->pkt_buf)) {
+ char host[256], port[32];
+ if(doq_print_addr_port(&paddr->addr, paddr->addrlen, host,
+ sizeof(host), port, sizeof(port))) {
+ log_err("doq sendmsg to %s %s failed: "
+ "sent %d in place of %d bytes",
+ host, port, (int)ret,
+ (int)sldns_buffer_limit(c->doq_socket->pkt_buf));
+ } else {
+ log_err("doq sendmsg failed: "
+ "sent %d in place of %d bytes",
+ (int)ret, (int)sldns_buffer_limit(c->doq_socket->pkt_buf));
+ }
+ return;
+ }
+}
+
+/** fetch port number */
+static int
+doq_sockaddr_get_port(struct doq_addr_storage* addr)
+{
+ if(addr->sockaddr.in.sin_family == AF_INET) {
+ struct sockaddr_in* sa = (struct sockaddr_in*)addr;
+ return ntohs(sa->sin_port);
+ } else if(addr->sockaddr.in.sin_family == AF_INET6) {
+ struct sockaddr_in6* sa6 = (struct sockaddr_in6*)addr;
+ return ntohs(sa6->sin6_port);
+ }
+ return 0;
+}
+
+/** get local address from ancillary data headers */
+static int
+doq_get_localaddr_cmsg(struct comm_point* c, struct doq_pkt_addr* paddr,
+ int* pkt_continue, struct msghdr* msg)
+{
+#ifndef S_SPLINT_S
+ struct cmsghdr* cmsg;
+#endif /* S_SPLINT_S */
+
+ memset(&paddr->localaddr, 0, sizeof(paddr->localaddr));
+#ifndef S_SPLINT_S
+ for(cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL;
+ cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ if( cmsg->cmsg_level == IPPROTO_IPV6 &&
+ cmsg->cmsg_type == IPV6_PKTINFO) {
+ struct in6_pktinfo* v6info =
+ (struct in6_pktinfo*)CMSG_DATA(cmsg);
+ struct sockaddr_in6* sa= (struct sockaddr_in6*)
+ &paddr->localaddr;
+ struct sockaddr_in6* rema = (struct sockaddr_in6*)
+ &paddr->addr;
+ if(rema->sin6_family != AF_INET6) {
+ log_err("doq cmsg family mismatch cmsg is ip6");
+ *pkt_continue = 1;
+ return 0;
+ }
+ sa->sin6_family = AF_INET6;
+ sa->sin6_port = htons(doq_sockaddr_get_port(
+ (void*)c->socket->addr->ai_addr));
+ paddr->ifindex = v6info->ipi6_ifindex;
+ memmove(&sa->sin6_addr, &v6info->ipi6_addr,
+ sizeof(struct in6_addr));
+ paddr->localaddrlen = sizeof(struct sockaddr_in6);
+ break;
+#ifdef IP_PKTINFO
+ } else if( cmsg->cmsg_level == IPPROTO_IP &&
+ cmsg->cmsg_type == IP_PKTINFO) {
+ struct in_pktinfo* v4info =
+ (struct in_pktinfo*)CMSG_DATA(cmsg);
+ struct sockaddr_in* sa= (struct sockaddr_in*)
+ &paddr->localaddr;
+ struct sockaddr_in* rema = (struct sockaddr_in*)
+ &paddr->addr;
+ if(rema->sin_family != AF_INET) {
+ log_err("doq cmsg family mismatch cmsg is ip4");
+ *pkt_continue = 1;
+ return 0;
+ }
+ sa->sin_family = AF_INET;
+ sa->sin_port = htons(doq_sockaddr_get_port(
+ (void*)c->socket->addr->ai_addr));
+ paddr->ifindex = v4info->ipi_ifindex;
+ memmove(&sa->sin_addr, &v4info->ipi_addr,
+ sizeof(struct in_addr));
+ paddr->localaddrlen = sizeof(struct sockaddr_in);
+ break;
+#elif defined(IP_RECVDSTADDR)
+ } else if( cmsg->cmsg_level == IPPROTO_IP &&
+ cmsg->cmsg_type == IP_RECVDSTADDR) {
+ struct sockaddr_in* sa= (struct sockaddr_in*)
+ &paddr->localaddr;
+ struct sockaddr_in* rema = (struct sockaddr_in*)
+ &paddr->addr;
+ if(rema->sin_family != AF_INET) {
+ log_err("doq cmsg family mismatch cmsg is ip4");
+ *pkt_continue = 1;
+ return 0;
+ }
+ sa->sin_family = AF_INET;
+ sa->sin_port = htons(doq_sockaddr_get_port(
+ (struct sockaddr_storage*)c->socket->
+ addr->ai_addr));
+ paddr->ifindex = 0;
+ memmove(&sa.sin_addr, CMSG_DATA(cmsg),
+ sizeof(struct in_addr));
+ paddr->localaddrlen = sizeof(struct sockaddr_in);
+ break;
+#endif /* IP_PKTINFO or IP_RECVDSTADDR */
+ }
+ }
+#endif /* S_SPLINT_S */
+
+return 1;
+}
+
+/** get packet ecn information */
+static uint32_t
+msghdr_get_ecn(struct msghdr* msg, int family)
+{
+#ifndef S_SPLINT_S
+ struct cmsghdr* cmsg;
+ if(family == AF_INET6) {
+ for(cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL;
+ cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ if(cmsg->cmsg_level == IPPROTO_IPV6 &&
+ cmsg->cmsg_type == IPV6_TCLASS &&
+ cmsg->cmsg_len != 0) {
+ uint8_t* ecn = (uint8_t*)CMSG_DATA(cmsg);
+ return *ecn;
+ }
+ }
+ return 0;
+ }
+ for(cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL;
+ cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ if(cmsg->cmsg_level == IPPROTO_IP &&
+ cmsg->cmsg_type == IP_TOS &&
+ cmsg->cmsg_len != 0) {
+ uint8_t* ecn = (uint8_t*)CMSG_DATA(cmsg);
+ return *ecn;
+ }
+ }
+ return 0;
+#endif /* S_SPLINT_S */
+}
+
+/** receive packet for DoQ on UDP. get ancillary data for addresses,
+ * return false if failed and the callback can stop receiving UDP packets
+ * if pkt_continue is false. */
+static int
+doq_recv(struct comm_point* c, struct doq_pkt_addr* paddr, int* pkt_continue,
+ struct ngtcp2_pkt_info* pi)
+{
+ struct msghdr msg;
+ struct iovec iov[1];
+ ssize_t rcv;
+ union {
+ struct cmsghdr hdr;
+ char buf[256];
+ } ancil;
+
+ msg.msg_name = &paddr->addr;
+ msg.msg_namelen = (socklen_t)sizeof(paddr->addr);
+ iov[0].iov_base = sldns_buffer_begin(c->doq_socket->pkt_buf);
+ iov[0].iov_len = sldns_buffer_remaining(c->doq_socket->pkt_buf);
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+ msg.msg_control = ancil.buf;
+#ifndef S_SPLINT_S
+ msg.msg_controllen = sizeof(ancil.buf);
+#endif /* S_SPLINT_S */
+ msg.msg_flags = 0;
+
+ rcv = recvmsg(c->fd, &msg, MSG_DONTWAIT);
+ if(rcv == -1) {
+ if(errno != EAGAIN && errno != EINTR
+ && udp_recv_needs_log(errno)) {
+ log_err("recvmsg failed for doq: %s", strerror(errno));
+ }
+ *pkt_continue = 0;
+ return 0;
+ }
+
+ paddr->addrlen = msg.msg_namelen;
+ sldns_buffer_skip(c->doq_socket->pkt_buf, rcv);
+ sldns_buffer_flip(c->doq_socket->pkt_buf);
+ if(!doq_get_localaddr_cmsg(c, paddr, pkt_continue, &msg))
+ return 0;
+ pi->ecn = msghdr_get_ecn(&msg, paddr->addr.sockaddr.in.sin_family);
+ return 1;
+}
+
+/** send the version negotiation for doq. scid and dcid are flipped around
+ * to send back to the client. */
+static void
+doq_send_version_negotiation(struct comm_point* c, struct doq_pkt_addr* paddr,
+ const uint8_t* dcid, size_t dcidlen, const uint8_t* scid,
+ size_t scidlen)
+{
+ uint32_t versions[2];
+ size_t versions_len = 0;
+ ngtcp2_ssize ret;
+ uint8_t unused_random;
+
+ /* fill the array with supported versions */
+ versions[0] = NGTCP2_PROTO_VER_V1;
+ versions_len = 1;
+ unused_random = ub_random_max(c->doq_socket->rnd, 256);
+ sldns_buffer_clear(c->doq_socket->pkt_buf);
+ ret = ngtcp2_pkt_write_version_negotiation(
+ sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_capacity(c->doq_socket->pkt_buf), unused_random,
+ dcid, dcidlen, scid, scidlen, versions, versions_len);
+ if(ret < 0) {
+ log_err("ngtcp2_pkt_write_version_negotiation failed: %s",
+ ngtcp2_strerror(ret));
+ return;
+ }
+ sldns_buffer_set_position(c->doq_socket->pkt_buf, ret);
+ sldns_buffer_flip(c->doq_socket->pkt_buf);
+ doq_send_pkt(c, paddr, 0);
+}
+
+/** Find the doq_conn object by remote address and dcid */
+static struct doq_conn*
+doq_conn_find(struct doq_table* table, struct doq_addr_storage* addr,
+ socklen_t addrlen, struct doq_addr_storage* localaddr,
+ socklen_t localaddrlen, int ifindex, const uint8_t* dcid,
+ size_t dcidlen)
+{
+ struct rbnode_type* node;
+ struct doq_conn key;
+ memset(&key.node, 0, sizeof(key.node));
+ key.node.key = &key;
+ memmove(&key.key.paddr.addr, addr, addrlen);
+ key.key.paddr.addrlen = addrlen;
+ memmove(&key.key.paddr.localaddr, localaddr, localaddrlen);
+ key.key.paddr.localaddrlen = localaddrlen;
+ key.key.paddr.ifindex = ifindex;
+ key.key.dcid = (void*)dcid;
+ key.key.dcidlen = dcidlen;
+ node = rbtree_search(table->conn_tree, &key);
+ if(node)
+ return (struct doq_conn*)node->key;
+ return NULL;
+}
+
+/** find the doq_con by the connection id */
+static struct doq_conn*
+doq_conn_find_by_id(struct doq_table* table, const uint8_t* dcid,
+ size_t dcidlen)
+{
+ struct doq_conid* conid;
+ lock_rw_rdlock(&table->conid_lock);
+ conid = doq_conid_find(table, dcid, dcidlen);
+ if(conid) {
+ /* make a copy of the key */
+ struct doq_conn* conn;
+ struct doq_conn_key key = conid->key;
+ uint8_t cid[NGTCP2_MAX_CIDLEN];
+ log_assert(conid->key.dcidlen <= NGTCP2_MAX_CIDLEN);
+ memcpy(cid, conid->key.dcid, conid->key.dcidlen);
+ key.dcid = cid;
+ lock_rw_unlock(&table->conid_lock);
+
+ /* now that the conid lock is released, look up the conn */
+ lock_rw_rdlock(&table->lock);
+ conn = doq_conn_find(table, &key.paddr.addr,
+ key.paddr.addrlen, &key.paddr.localaddr,
+ key.paddr.localaddrlen, key.paddr.ifindex, key.dcid,
+ key.dcidlen);
+ if(!conn) {
+ /* The connection got deleted between the conid lookup
+ * and the connection lock grab, it no longer exists,
+ * so return null. */
+ lock_rw_unlock(&table->lock);
+ return NULL;
+ }
+ lock_basic_lock(&conn->lock);
+ if(conn->is_deleted) {
+ lock_rw_unlock(&table->lock);
+ lock_basic_unlock(&conn->lock);
+ return NULL;
+ }
+ lock_rw_unlock(&table->lock);
+ return conn;
+ }
+ lock_rw_unlock(&table->conid_lock);
+ return NULL;
+}
+
+/** Find the doq_conn, by addr or by connection id */
+static struct doq_conn*
+doq_conn_find_by_addr_or_cid(struct doq_table* table,
+ struct doq_pkt_addr* paddr, const uint8_t* dcid, size_t dcidlen)
+{
+ struct doq_conn* conn;
+ lock_rw_rdlock(&table->lock);
+ conn = doq_conn_find(table, &paddr->addr, paddr->addrlen,
+ &paddr->localaddr, paddr->localaddrlen, paddr->ifindex,
+ dcid, dcidlen);
+ if(conn && conn->is_deleted) {
+ conn = NULL;
+ }
+ if(conn) {
+ lock_basic_lock(&conn->lock);
+ lock_rw_unlock(&table->lock);
+ verbose(VERB_ALGO, "doq: found connection by address, dcid");
+ } else {
+ lock_rw_unlock(&table->lock);
+ conn = doq_conn_find_by_id(table, dcid, dcidlen);
+ if(conn) {
+ verbose(VERB_ALGO, "doq: found connection by dcid");
+ }
+ }
+ return conn;
+}
+
+/** decode doq packet header, false on handled or failure, true to continue
+ * to process the packet */
+static int
+doq_decode_pkt_header_negotiate(struct comm_point* c,
+ struct doq_pkt_addr* paddr, struct doq_conn** conn)
+{
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ struct ngtcp2_version_cid vc;
+#else
+ uint32_t version;
+ const uint8_t *dcid, *scid;
+ size_t dcidlen, scidlen;
+#endif
+ int rv;
+
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ rv = ngtcp2_pkt_decode_version_cid(&vc,
+ sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_limit(c->doq_socket->pkt_buf),
+ c->doq_socket->sv_scidlen);
+#else
+ rv = ngtcp2_pkt_decode_version_cid(&version, &dcid, &dcidlen,
+ &scid, &scidlen, sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_limit(c->doq_socket->pkt_buf), c->doq_socket->sv_scidlen);
+#endif
+ if(rv != 0) {
+ if(rv == NGTCP2_ERR_VERSION_NEGOTIATION) {
+ /* send the version negotiation */
+ doq_send_version_negotiation(c, paddr,
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ vc.scid, vc.scidlen, vc.dcid, vc.dcidlen
+#else
+ scid, scidlen, dcid, dcidlen
+#endif
+ );
+ return 0;
+ }
+ verbose(VERB_ALGO, "doq: could not decode version "
+ "and CID from QUIC packet header: %s",
+ ngtcp2_strerror(rv));
+ return 0;
+ }
+
+ if(verbosity >= VERB_ALGO) {
+ verbose(VERB_ALGO, "ngtcp2_pkt_decode_version_cid packet has "
+ "QUIC protocol version %u", (unsigned)
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ vc.
+#endif
+ version
+ );
+ log_hex("dcid",
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ (void*)vc.dcid, vc.dcidlen
+#else
+ (void*)dcid, dcidlen
+#endif
+ );
+ log_hex("scid",
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ (void*)vc.scid, vc.scidlen
+#else
+ (void*)scid, scidlen
+#endif
+ );
+ }
+ *conn = doq_conn_find_by_addr_or_cid(c->doq_socket->table, paddr,
+#ifdef HAVE_STRUCT_NGTCP2_VERSION_CID
+ vc.dcid, vc.dcidlen
+#else
+ dcid, dcidlen
+#endif
+ );
+ if(*conn)
+ (*conn)->doq_socket = c->doq_socket;
+ return 1;
+}
+
+/** fill cid structure with random data */
+static void doq_cid_randfill(struct ngtcp2_cid* cid, size_t datalen,
+ struct ub_randstate* rnd)
+{
+ uint8_t buf[32];
+ if(datalen > sizeof(buf))
+ datalen = sizeof(buf);
+ doq_fill_rand(rnd, buf, datalen);
+ ngtcp2_cid_init(cid, buf, datalen);
+}
+
+/** send retry packet for doq connection. */
+static void
+doq_send_retry(struct comm_point* c, struct doq_pkt_addr* paddr,
+ struct ngtcp2_pkt_hd* hd)
+{
+ char host[256], port[32];
+ struct ngtcp2_cid scid;
+ uint8_t token[NGTCP2_CRYPTO_MAX_RETRY_TOKENLEN];
+ ngtcp2_tstamp ts;
+ ngtcp2_ssize tokenlen, ret;
+
+ if(!doq_print_addr_port(&paddr->addr, paddr->addrlen, host,
+ sizeof(host), port, sizeof(port))) {
+ return;
+ }
+ verbose(VERB_ALGO, "doq: sending retry packet to %s %s", host, port);
+
+ /* the server chosen source connection ID */
+ scid.datalen = c->doq_socket->sv_scidlen;
+ doq_cid_randfill(&scid, scid.datalen, c->doq_socket->rnd);
+
+ ts = doq_get_timestamp_nanosec();
+
+ tokenlen = ngtcp2_crypto_generate_retry_token(token,
+ c->doq_socket->static_secret, c->doq_socket->static_secret_len,
+ hd->version, (void*)&paddr->addr, paddr->addrlen, &scid,
+ &hd->dcid, ts);
+ if(tokenlen < 0) {
+ log_err("ngtcp2_crypto_generate_retry_token failed: %s",
+ ngtcp2_strerror(tokenlen));
+ return;
+ }
+
+ sldns_buffer_clear(c->doq_socket->pkt_buf);
+ ret = ngtcp2_crypto_write_retry(sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_capacity(c->doq_socket->pkt_buf), hd->version,
+ &hd->scid, &scid, &hd->dcid, token, tokenlen);
+ if(ret < 0) {
+ log_err("ngtcp2_crypto_write_retry failed: %s",
+ ngtcp2_strerror(ret));
+ return;
+ }
+ sldns_buffer_set_position(c->doq_socket->pkt_buf, ret);
+ sldns_buffer_flip(c->doq_socket->pkt_buf);
+ doq_send_pkt(c, paddr, 0);
+}
+
+/** doq send stateless connection close */
+static void
+doq_send_stateless_connection_close(struct comm_point* c,
+ struct doq_pkt_addr* paddr, struct ngtcp2_pkt_hd* hd,
+ uint64_t error_code)
+{
+ ngtcp2_ssize ret;
+ sldns_buffer_clear(c->doq_socket->pkt_buf);
+ ret = ngtcp2_crypto_write_connection_close(
+ sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_capacity(c->doq_socket->pkt_buf), hd->version, &hd->scid,
+ &hd->dcid, error_code, NULL, 0);
+ if(ret < 0) {
+ log_err("ngtcp2_crypto_write_connection_close failed: %s",
+ ngtcp2_strerror(ret));
+ return;
+ }
+ sldns_buffer_set_position(c->doq_socket->pkt_buf, ret);
+ sldns_buffer_flip(c->doq_socket->pkt_buf);
+ doq_send_pkt(c, paddr, 0);
+}
+
+/** doq verify retry token, false on failure */
+static int
+doq_verify_retry_token(struct comm_point* c, struct doq_pkt_addr* paddr,
+ struct ngtcp2_cid* ocid, struct ngtcp2_pkt_hd* hd)
+{
+ char host[256], port[32];
+ ngtcp2_tstamp ts;
+ if(!doq_print_addr_port(&paddr->addr, paddr->addrlen, host,
+ sizeof(host), port, sizeof(port)))
+ return 0;
+ ts = doq_get_timestamp_nanosec();
+ verbose(VERB_ALGO, "doq: verifying retry token from %s %s", host,
+ port);
+ if(ngtcp2_crypto_verify_retry_token(ocid,
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ hd->token, hd->tokenlen,
+#else
+ hd->token.base, hd->token.len,
+#endif
+ c->doq_socket->static_secret,
+ c->doq_socket->static_secret_len, hd->version,
+ (void*)&paddr->addr, paddr->addrlen, &hd->dcid,
+ 10*NGTCP2_SECONDS, ts) != 0) {
+ verbose(VERB_ALGO, "doq: could not verify retry token "
+ "from %s %s", host, port);
+ return 0;
+ }
+ verbose(VERB_ALGO, "doq: verified retry token from %s %s", host, port);
+ return 1;
+}
+
+/** doq verify token, false on failure */
+static int
+doq_verify_token(struct comm_point* c, struct doq_pkt_addr* paddr,
+ struct ngtcp2_pkt_hd* hd)
+{
+ char host[256], port[32];
+ ngtcp2_tstamp ts;
+ if(!doq_print_addr_port(&paddr->addr, paddr->addrlen, host,
+ sizeof(host), port, sizeof(port)))
+ return 0;
+ ts = doq_get_timestamp_nanosec();
+ verbose(VERB_ALGO, "doq: verifying token from %s %s", host, port);
+ if(ngtcp2_crypto_verify_regular_token(
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ hd->token, hd->tokenlen,
+#else
+ hd->token.base, hd->token.len,
+#endif
+ c->doq_socket->static_secret, c->doq_socket->static_secret_len,
+ (void*)&paddr->addr, paddr->addrlen, 3600*NGTCP2_SECONDS,
+ ts) != 0) {
+ verbose(VERB_ALGO, "doq: could not verify token from %s %s",
+ host, port);
+ return 0;
+ }
+ verbose(VERB_ALGO, "doq: verified token from %s %s", host, port);
+ return 1;
+}
+
+/** delete and remove from the lookup tree the doq_conn connection */
+static void
+doq_delete_connection(struct comm_point* c, struct doq_conn* conn)
+{
+ struct doq_conn copy;
+ uint8_t cid[NGTCP2_MAX_CIDLEN];
+ rbnode_type* node;
+ if(!conn)
+ return;
+ /* Copy the key and set it deleted. */
+ conn->is_deleted = 1;
+ doq_conn_write_disable(conn);
+ copy.key = conn->key;
+ log_assert(conn->key.dcidlen <= NGTCP2_MAX_CIDLEN);
+ memcpy(cid, conn->key.dcid, conn->key.dcidlen);
+ copy.key.dcid = cid;
+ copy.node.key = ©
+ lock_basic_unlock(&conn->lock);
+
+ /* Now get the table lock to delete it from the tree */
+ lock_rw_wrlock(&c->doq_socket->table->lock);
+ node = rbtree_delete(c->doq_socket->table->conn_tree, copy.node.key);
+ if(node) {
+ conn = (struct doq_conn*)node->key;
+ lock_basic_lock(&conn->lock);
+ doq_conn_write_list_remove(c->doq_socket->table, conn);
+ if(conn->timer.timer_in_list) {
+ /* Remove timer from list first, because finding the
+ * rbnode element of the setlist of same timeouts
+ * needs tree lookup. Edit the tree structure after
+ * that lookup. */
+ doq_timer_list_remove(c->doq_socket->table,
+ &conn->timer);
+ }
+ if(conn->timer.timer_in_tree)
+ doq_timer_tree_remove(c->doq_socket->table,
+ &conn->timer);
+ }
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ if(node) {
+ lock_basic_unlock(&conn->lock);
+ doq_table_quic_size_subtract(c->doq_socket->table,
+ sizeof(*conn)+conn->key.dcidlen);
+ doq_conn_delete(conn, c->doq_socket->table);
+ }
+}
+
+/** create and setup a new doq connection, to a new destination, or with
+ * a new dcid. It has a new set of streams. It is inserted in the lookup tree.
+ * Returns NULL on failure. */
+static struct doq_conn*
+doq_setup_new_conn(struct comm_point* c, struct doq_pkt_addr* paddr,
+ struct ngtcp2_pkt_hd* hd, struct ngtcp2_cid* ocid)
+{
+ struct doq_conn* conn;
+ if(!doq_table_quic_size_available(c->doq_socket->table,
+ c->doq_socket->cfg, sizeof(*conn)+hd->dcid.datalen
+ + sizeof(struct doq_stream)
+ + 100 /* estimated input query */
+ + 1200 /* estimated output query */)) {
+ verbose(VERB_ALGO, "doq: no mem available for new connection");
+ doq_send_stateless_connection_close(c, paddr, hd,
+ NGTCP2_CONNECTION_REFUSED);
+ return NULL;
+ }
+ conn = doq_conn_create(c, paddr, hd->dcid.data, hd->dcid.datalen,
+ hd->version);
+ if(!conn) {
+ log_err("doq: could not allocate doq_conn");
+ return NULL;
+ }
+ lock_rw_wrlock(&c->doq_socket->table->lock);
+ lock_basic_lock(&conn->lock);
+ if(!rbtree_insert(c->doq_socket->table->conn_tree, &conn->node)) {
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ log_err("doq: duplicate connection");
+ /* conn has no entry in writelist, and no timer yet. */
+ lock_basic_unlock(&conn->lock);
+ doq_conn_delete(conn, c->doq_socket->table);
+ return NULL;
+ }
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ doq_table_quic_size_add(c->doq_socket->table,
+ sizeof(*conn)+conn->key.dcidlen);
+ verbose(VERB_ALGO, "doq: created new connection");
+
+ /* the scid and dcid switch meaning from the accepted client
+ * connection to the server connection. The 'source' and 'destination'
+ * meaning is reversed. */
+ if(!doq_conn_setup(conn, hd->scid.data, hd->scid.datalen,
+ (ocid?ocid->data:NULL), (ocid?ocid->datalen:0),
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ hd->token, hd->tokenlen
+#else
+ hd->token.base, hd->token.len
+#endif
+ )) {
+ log_err("doq: could not set up connection");
+ doq_delete_connection(c, conn);
+ return NULL;
+ }
+ return conn;
+}
+
+/** perform doq address validation */
+static int
+doq_address_validation(struct comm_point* c, struct doq_pkt_addr* paddr,
+ struct ngtcp2_pkt_hd* hd, struct ngtcp2_cid* ocid,
+ struct ngtcp2_cid** pocid)
+{
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ const uint8_t* token = hd->token;
+ size_t tokenlen = hd->tokenlen;
+#else
+ const uint8_t* token = hd->token.base;
+ size_t tokenlen = hd->token.len;
+#endif
+ verbose(VERB_ALGO, "doq stateless address validation");
+
+ if(tokenlen == 0 || token == NULL) {
+ doq_send_retry(c, paddr, hd);
+ return 0;
+ }
+ if(token[0] != NGTCP2_CRYPTO_TOKEN_MAGIC_RETRY &&
+ hd->dcid.datalen < NGTCP2_MIN_INITIAL_DCIDLEN) {
+ doq_send_stateless_connection_close(c, paddr, hd,
+ NGTCP2_INVALID_TOKEN);
+ return 0;
+ }
+ if(token[0] == NGTCP2_CRYPTO_TOKEN_MAGIC_RETRY) {
+ if(!doq_verify_retry_token(c, paddr, ocid, hd)) {
+ doq_send_stateless_connection_close(c, paddr, hd,
+ NGTCP2_INVALID_TOKEN);
+ return 0;
+ }
+ *pocid = ocid;
+ } else if(token[0] == NGTCP2_CRYPTO_TOKEN_MAGIC_REGULAR) {
+ if(!doq_verify_token(c, paddr, hd)) {
+ doq_send_retry(c, paddr, hd);
+ return 0;
+ }
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ hd->token = NULL;
+ hd->tokenlen = 0;
+#else
+ hd->token.base = NULL;
+ hd->token.len = 0;
+#endif
+ } else {
+ verbose(VERB_ALGO, "doq address validation: unrecognised "
+ "token in hd.token.base with magic byte 0x%2.2x",
+ (int)token[0]);
+ if(c->doq_socket->validate_addr) {
+ doq_send_retry(c, paddr, hd);
+ return 0;
+ }
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ hd->token = NULL;
+ hd->tokenlen = 0;
+#else
+ hd->token.base = NULL;
+ hd->token.len = 0;
+#endif
+ }
+ return 1;
+}
+
+/** the doq accept, returns false if no further processing of content */
+static int
+doq_accept(struct comm_point* c, struct doq_pkt_addr* paddr,
+ struct doq_conn** conn, struct ngtcp2_pkt_info* pi)
+{
+ int rv;
+ struct ngtcp2_pkt_hd hd;
+ struct ngtcp2_cid ocid, *pocid=NULL;
+ int err_retry;
+ memset(&hd, 0, sizeof(hd));
+ rv = ngtcp2_accept(&hd, sldns_buffer_begin(c->doq_socket->pkt_buf),
+ sldns_buffer_limit(c->doq_socket->pkt_buf));
+ if(rv != 0) {
+ if(rv == NGTCP2_ERR_RETRY) {
+ doq_send_retry(c, paddr, &hd);
+ return 0;
+ }
+ log_err("doq: initial packet failed, ngtcp2_accept failed: %s",
+ ngtcp2_strerror(rv));
+ return 0;
+ }
+ if(c->doq_socket->validate_addr ||
+#ifdef HAVE_STRUCT_NGTCP2_PKT_HD_TOKENLEN
+ hd.tokenlen
+#else
+ hd.token.len
+#endif
+ ) {
+ if(!doq_address_validation(c, paddr, &hd, &ocid, &pocid))
+ return 0;
+ }
+ *conn = doq_setup_new_conn(c, paddr, &hd, pocid);
+ if(!*conn)
+ return 0;
+ (*conn)->doq_socket = c->doq_socket;
+ if(!doq_conn_recv(c, paddr, *conn, pi, &err_retry, NULL)) {
+ if(err_retry)
+ doq_send_retry(c, paddr, &hd);
+ doq_delete_connection(c, *conn);
+ *conn = NULL;
+ return 0;
+ }
+ return 1;
+}
+
+/** doq pickup a timer to wait for for the worker. If any timer exists. */
+static void
+doq_pickup_timer(struct comm_point* c)
+{
+ struct doq_timer* t;
+ struct timeval tv;
+ int have_time = 0;
+ memset(&tv, 0, sizeof(tv));
+
+ lock_rw_wrlock(&c->doq_socket->table->lock);
+ RBTREE_FOR(t, struct doq_timer*, c->doq_socket->table->timer_tree) {
+ if(t->worker_doq_socket == NULL ||
+ t->worker_doq_socket == c->doq_socket) {
+ /* pick up this element */
+ t->worker_doq_socket = c->doq_socket;
+ have_time = 1;
+ memcpy(&tv, &t->time, sizeof(tv));
+ break;
+ }
+ }
+ lock_rw_unlock(&c->doq_socket->table->lock);
+
+ if(have_time) {
+ struct timeval rel;
+ timeval_subtract(&rel, &tv, c->doq_socket->now_tv);
+ comm_timer_set(c->doq_socket->timer, &rel);
+ memcpy(&c->doq_socket->marked_time, &tv,
+ sizeof(c->doq_socket->marked_time));
+ verbose(VERB_ALGO, "doq pickup timer at %d.%6.6d in %d.%6.6d",
+ (int)tv.tv_sec, (int)tv.tv_usec, (int)rel.tv_sec,
+ (int)rel.tv_usec);
+ } else {
+ if(comm_timer_is_set(c->doq_socket->timer))
+ comm_timer_disable(c->doq_socket->timer);
+ memset(&c->doq_socket->marked_time, 0,
+ sizeof(c->doq_socket->marked_time));
+ verbose(VERB_ALGO, "doq timer disabled");
+ }
+}
+
+/** doq done with connection, release locks and setup timer and write */
+static void
+doq_done_setup_timer_and_write(struct comm_point* c, struct doq_conn* conn)
+{
+ struct doq_conn copy;
+ uint8_t cid[NGTCP2_MAX_CIDLEN];
+ rbnode_type* node;
+ struct timeval new_tv;
+ int write_change = 0, timer_change = 0;
+
+ /* No longer in callbacks, so the pointer to doq_socket is back
+ * to NULL. */
+ conn->doq_socket = NULL;
+
+ if(doq_conn_check_timer(conn, &new_tv))
+ timer_change = 1;
+ if( (conn->write_interest && !conn->on_write_list) ||
+ (!conn->write_interest && conn->on_write_list))
+ write_change = 1;
+
+ if(!timer_change && !write_change) {
+ /* Nothing to do. */
+ lock_basic_unlock(&conn->lock);
+ return;
+ }
+
+ /* The table lock is needed to change the write list and timer tree.
+ * So the connection lock is release and then the connection is
+ * looked up again. */
+ copy.key = conn->key;
+ log_assert(conn->key.dcidlen <= NGTCP2_MAX_CIDLEN);
+ memcpy(cid, conn->key.dcid, conn->key.dcidlen);
+ copy.key.dcid = cid;
+ copy.node.key = ©
+ lock_basic_unlock(&conn->lock);
+
+ lock_rw_wrlock(&c->doq_socket->table->lock);
+ node = rbtree_search(c->doq_socket->table->conn_tree, copy.node.key);
+ if(!node) {
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ /* Must have been deleted in the mean time. */
+ return;
+ }
+ conn = (struct doq_conn*)node->key;
+ lock_basic_lock(&conn->lock);
+ if(conn->is_deleted) {
+ /* It is deleted now. */
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ lock_basic_unlock(&conn->lock);
+ return;
+ }
+
+ if(write_change) {
+ /* Edit the write lists, we are holding the table.lock and can
+ * edit the list first,last and also prev,next and on_list
+ * elements in the doq_conn structures. */
+ doq_conn_set_write_list(c->doq_socket->table, conn);
+ }
+ if(timer_change) {
+ doq_timer_set(c->doq_socket->table, &conn->timer,
+ c->doq_socket, &new_tv);
+ }
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ lock_basic_unlock(&conn->lock);
+}
+
+/** doq done with connection callbacks, release locks and setup write */
+static void
+doq_done_with_conn_cb(struct comm_point* c, struct doq_conn* conn)
+{
+ struct doq_conn copy;
+ uint8_t cid[NGTCP2_MAX_CIDLEN];
+ rbnode_type* node;
+
+ /* no longer in callbacks, so the pointer to doq_socket is back
+ * to NULL. */
+ conn->doq_socket = NULL;
+
+ if( (conn->write_interest && conn->on_write_list) ||
+ (!conn->write_interest && !conn->on_write_list)) {
+ /* The connection already has the required write list
+ * status. */
+ lock_basic_unlock(&conn->lock);
+ return;
+ }
+
+ /* To edit the write list of connections we have to hold the table
+ * lock, so we release the connection and then look it up again. */
+ copy.key = conn->key;
+ log_assert(conn->key.dcidlen <= NGTCP2_MAX_CIDLEN);
+ memcpy(cid, conn->key.dcid, conn->key.dcidlen);
+ copy.key.dcid = cid;
+ copy.node.key = ©
+ lock_basic_unlock(&conn->lock);
+
+ lock_rw_wrlock(&c->doq_socket->table->lock);
+ node = rbtree_search(c->doq_socket->table->conn_tree, copy.node.key);
+ if(!node) {
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ /* must have been deleted in the mean time */
+ return;
+ }
+ conn = (struct doq_conn*)node->key;
+ lock_basic_lock(&conn->lock);
+ if(conn->is_deleted) {
+ /* it is deleted now. */
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ lock_basic_unlock(&conn->lock);
+ return;
+ }
+
+ /* edit the write lists, we are holding the table.lock and can
+ * edit the list first,last and also prev,next and on_list elements
+ * in the doq_conn structures. */
+ doq_conn_set_write_list(c->doq_socket->table, conn);
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ lock_basic_unlock(&conn->lock);
+}
+
+/** doq count the length of the write list */
+static size_t
+doq_write_list_length(struct comm_point* c)
+{
+ size_t count = 0;
+ struct doq_conn* conn;
+ lock_rw_rdlock(&c->doq_socket->table->lock);
+ conn = c->doq_socket->table->write_list_first;
+ while(conn) {
+ count++;
+ conn = conn->write_next;
+ }
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ return count;
+}
+
+/** doq pop the first element from the write list to have write events */
+static struct doq_conn*
+doq_pop_write_conn(struct comm_point* c)
+{
+ struct doq_conn* conn;
+ lock_rw_wrlock(&c->doq_socket->table->lock);
+ conn = doq_table_pop_first(c->doq_socket->table);
+ while(conn && conn->is_deleted) {
+ lock_basic_unlock(&conn->lock);
+ conn = doq_table_pop_first(c->doq_socket->table);
+ }
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ if(conn)
+ conn->doq_socket = c->doq_socket;
+ return conn;
+}
+
+/** doq the connection is done with write callbacks, release it. */
+static void
+doq_done_with_write_cb(struct comm_point* c, struct doq_conn* conn,
+ int delete_it)
+{
+ if(delete_it) {
+ doq_delete_connection(c, conn);
+ return;
+ }
+ doq_done_setup_timer_and_write(c, conn);
+}
+
+/** see if the doq socket wants to write packets */
+static int
+doq_socket_want_write(struct comm_point* c)
+{
+ int want_write = 0;
+ if(c->doq_socket->have_blocked_pkt)
+ return 1;
+ lock_rw_rdlock(&c->doq_socket->table->lock);
+ if(c->doq_socket->table->write_list_first)
+ want_write = 1;
+ lock_rw_unlock(&c->doq_socket->table->lock);
+ return want_write;
+}
+
+/** enable write event for the doq server socket fd */
+static void
+doq_socket_write_enable(struct comm_point* c)
+{
+ verbose(VERB_ALGO, "doq socket want write");
+ if(c->doq_socket->event_has_write)
+ return;
+ comm_point_listen_for_rw(c, 1, 1);
+ c->doq_socket->event_has_write = 1;
+}
+
+/** disable write event for the doq server socket fd */
+static void
+doq_socket_write_disable(struct comm_point* c)
+{
+ verbose(VERB_ALGO, "doq socket want no write");
+ if(!c->doq_socket->event_has_write)
+ return;
+ comm_point_listen_for_rw(c, 1, 0);
+ c->doq_socket->event_has_write = 0;
+}
+
+/** write blocked packet, if possible. returns false if failed, again. */
+static int
+doq_write_blocked_pkt(struct comm_point* c)
+{
+ struct doq_pkt_addr paddr;
+ if(!c->doq_socket->have_blocked_pkt)
+ return 1;
+ c->doq_socket->have_blocked_pkt = 0;
+ if(sldns_buffer_limit(c->doq_socket->blocked_pkt) >
+ sldns_buffer_remaining(c->doq_socket->pkt_buf))
+ return 1; /* impossibly large, drop it.
+ impossible since pkt_buf is same size as blocked_pkt buf. */
+ sldns_buffer_clear(c->doq_socket->pkt_buf);
+ sldns_buffer_write(c->doq_socket->pkt_buf,
+ sldns_buffer_begin(c->doq_socket->blocked_pkt),
+ sldns_buffer_limit(c->doq_socket->blocked_pkt));
+ sldns_buffer_flip(c->doq_socket->pkt_buf);
+ memcpy(&paddr, c->doq_socket->blocked_paddr, sizeof(paddr));
+ doq_send_pkt(c, &paddr, c->doq_socket->blocked_pkt_pi.ecn);
+ if(c->doq_socket->have_blocked_pkt)
+ return 0;
+ return 1;
+}
+
+/** doq find a timer that timeouted and return the conn, locked. */
+static struct doq_conn*
+doq_timer_timeout_conn(struct doq_server_socket* doq_socket)
+{
+ struct doq_conn* conn = NULL;
+ struct rbnode_type* node;
+ lock_rw_wrlock(&doq_socket->table->lock);
+ node = rbtree_first(doq_socket->table->timer_tree);
+ if(node && node != RBTREE_NULL) {
+ struct doq_timer* t = (struct doq_timer*)node;
+ conn = t->conn;
+
+ /* If now < timer then no further timeouts in tree. */
+ if(timeval_smaller(doq_socket->now_tv, &t->time)) {
+ lock_rw_unlock(&doq_socket->table->lock);
+ return NULL;
+ }
+
+ lock_basic_lock(&conn->lock);
+ conn->doq_socket = doq_socket;
+
+ /* Now that the timer is fired, remove it. */
+ doq_timer_unset(doq_socket->table, t);
+ lock_rw_unlock(&doq_socket->table->lock);
+ return conn;
+ }
+ lock_rw_unlock(&doq_socket->table->lock);
+ return NULL;
+}
+
+/** doq timer erase the marker that said which timer the worker uses. */
+static void
+doq_timer_erase_marker(struct doq_server_socket* doq_socket)
+{
+ struct doq_timer* t;
+ lock_rw_wrlock(&doq_socket->table->lock);
+ t = doq_timer_find_time(doq_socket->table, &doq_socket->marked_time);
+ if(t && t->worker_doq_socket == doq_socket)
+ t->worker_doq_socket = NULL;
+ lock_rw_unlock(&doq_socket->table->lock);
+ memset(&doq_socket->marked_time, 0, sizeof(doq_socket->marked_time));
+}
+
+void
+doq_timer_cb(void* arg)
+{
+ struct doq_server_socket* doq_socket = (struct doq_server_socket*)arg;
+ struct doq_conn* conn;
+ verbose(VERB_ALGO, "doq timer callback");
+
+ doq_timer_erase_marker(doq_socket);
- fptr_ok(fptr_whitelist_comm_point(rep.c->callback));
- if((*rep.c->callback)(rep.c, rep.c->cb_arg, NETEVENT_NOERROR, &rep)) {
- /* send back immediate reply */
- struct sldns_buffer *buffer;
-#ifdef USE_DNSCRYPT
- buffer = rep.c->dnscrypt_buffer;
+ while((conn = doq_timer_timeout_conn(doq_socket)) != NULL) {
+ if(conn->is_deleted ||
+#ifdef HAVE_NGTCP2_CONN_IN_CLOSING_PERIOD
+ ngtcp2_conn_in_closing_period(conn->conn) ||
#else
- buffer = rep.c->buffer;
+ ngtcp2_conn_is_in_closing_period(conn->conn) ||
#endif
- (void)comm_point_send_udp_msg_if(rep.c, buffer,
- (struct sockaddr*)&rep.remote_addr,
- rep.remote_addrlen, &rep);
+#ifdef HAVE_NGTCP2_CONN_IN_DRAINING_PERIOD
+ ngtcp2_conn_in_draining_period(conn->conn)
+#else
+ ngtcp2_conn_is_in_draining_period(conn->conn)
+#endif
+ ) {
+ if(verbosity >= VERB_ALGO) {
+ char remotestr[256];
+ addr_to_str((void*)&conn->key.paddr.addr,
+ conn->key.paddr.addrlen, remotestr,
+ sizeof(remotestr));
+ verbose(VERB_ALGO, "doq conn %s is deleted "
+ "after timeout", remotestr);
+ }
+ doq_delete_connection(doq_socket->cp, conn);
+ continue;
}
- if(!rep.c || rep.c->fd == -1) /* commpoint closed */
- break;
+ if(!doq_conn_handle_timeout(conn))
+ doq_delete_connection(doq_socket->cp, conn);
+ else doq_done_setup_timer_and_write(doq_socket->cp, conn);
}
+
+ if(doq_socket_want_write(doq_socket->cp))
+ doq_socket_write_enable(doq_socket->cp);
+ else doq_socket_write_disable(doq_socket->cp);
+ doq_pickup_timer(doq_socket->cp);
}
-#endif /* AF_INET6 && IPV6_PKTINFO && HAVE_RECVMSG */
void
-comm_point_udp_callback(int fd, short event, void* arg)
+comm_point_doq_callback(int fd, short event, void* arg)
{
- struct comm_reply rep;
- ssize_t rcv;
- int i;
- struct sldns_buffer *buffer;
+ struct comm_point* c;
+ struct doq_pkt_addr paddr;
+ int i, pkt_continue, err_drop;
+ struct doq_conn* conn;
+ struct ngtcp2_pkt_info pi;
+ size_t count, num_len;
- rep.c = (struct comm_point*)arg;
- log_assert(rep.c->type == comm_udp);
+ c = (struct comm_point*)arg;
+ log_assert(c->type == comm_doq);
- if(!(event&UB_EV_READ))
- return;
- log_assert(rep.c && rep.c->buffer && rep.c->fd == fd);
- ub_comm_base_now(rep.c->ev->base);
- for(i=0; i<NUM_UDP_PER_SELECT; i++) {
- sldns_buffer_clear(rep.c->buffer);
- rep.remote_addrlen = (socklen_t)sizeof(rep.remote_addr);
- log_assert(fd != -1);
- log_assert(sldns_buffer_remaining(rep.c->buffer) > 0);
- rcv = recvfrom(fd, (void*)sldns_buffer_begin(rep.c->buffer),
- sldns_buffer_remaining(rep.c->buffer), MSG_DONTWAIT,
- (struct sockaddr*)&rep.remote_addr, &rep.remote_addrlen);
- if(rcv == -1) {
-#ifndef USE_WINSOCK
- if(errno != EAGAIN && errno != EINTR
- && udp_recv_needs_log(errno))
- log_err("recvfrom %d failed: %s",
- fd, strerror(errno));
+ log_assert(c && c->doq_socket->pkt_buf && c->fd == fd);
+ ub_comm_base_now(c->ev->base);
+
+ /* see if there is a blocked packet, and send that if possible.
+ * do not attempt to read yet, even if possible, that would just
+ * push more answers in reply to those read packets onto the list
+ * of written replies. First attempt to clear the write content out.
+ * That keeps the memory usage from bloating up. */
+ if(c->doq_socket->have_blocked_pkt) {
+ if(!doq_write_blocked_pkt(c)) {
+ /* this write has also blocked, attempt to write
+ * later. Make sure the event listens to write
+ * events. */
+ if(!c->doq_socket->event_has_write)
+ doq_socket_write_enable(c);
+ doq_pickup_timer(c);
+ return;
+ }
+ }
+
+ /* see if there is write interest */
+ count = 0;
+ num_len = doq_write_list_length(c);
+ while((conn = doq_pop_write_conn(c)) != NULL) {
+ if(conn->is_deleted ||
+#ifdef HAVE_NGTCP2_CONN_IN_CLOSING_PERIOD
+ ngtcp2_conn_in_closing_period(conn->conn) ||
#else
- if(WSAGetLastError() != WSAEINPROGRESS &&
- WSAGetLastError() != WSAECONNRESET &&
- WSAGetLastError()!= WSAEWOULDBLOCK &&
- udp_recv_needs_log(WSAGetLastError()))
- log_err("recvfrom failed: %s",
- wsa_strerror(WSAGetLastError()));
+ ngtcp2_conn_is_in_closing_period(conn->conn) ||
#endif
+#ifdef HAVE_NGTCP2_CONN_IN_DRAINING_PERIOD
+ ngtcp2_conn_in_draining_period(conn->conn)
+#else
+ ngtcp2_conn_is_in_draining_period(conn->conn)
+#endif
+ ) {
+ conn->doq_socket = NULL;
+ lock_basic_unlock(&conn->lock);
+ if(c->doq_socket->have_blocked_pkt) {
+ if(!c->doq_socket->event_has_write)
+ doq_socket_write_enable(c);
+ doq_pickup_timer(c);
+ return;
+ }
+ if(++count > num_len*2)
+ break;
+ continue;
+ }
+ if(verbosity >= VERB_ALGO) {
+ char remotestr[256];
+ addr_to_str((void*)&conn->key.paddr.addr,
+ conn->key.paddr.addrlen, remotestr,
+ sizeof(remotestr));
+ verbose(VERB_ALGO, "doq write connection %s %d",
+ remotestr, doq_sockaddr_get_port(
+ &conn->key.paddr.addr));
+ }
+ if(doq_conn_write_streams(c, conn, &err_drop))
+ err_drop = 0;
+ doq_done_with_write_cb(c, conn, err_drop);
+ if(c->doq_socket->have_blocked_pkt) {
+ if(!c->doq_socket->event_has_write)
+ doq_socket_write_enable(c);
+ doq_pickup_timer(c);
return;
}
- sldns_buffer_skip(rep.c->buffer, rcv);
- sldns_buffer_flip(rep.c->buffer);
- rep.srctype = 0;
- rep.is_proxied = 0;
+ /* Stop overly long write lists that are created
+ * while we are processing. Do those next time there
+ * is a write callback. Stops long loops, and keeps
+ * fair for other events. */
+ if(++count > num_len*2)
+ break;
+ }
- if(rep.c->pp2_enabled && !consume_pp2_header(rep.c->buffer,
- &rep, 0)) {
- log_err("proxy_protocol: could not consume PROXYv2 header");
+ /* check for data to read */
+ if((event&UB_EV_READ)!=0)
+ for(i=0; i<NUM_UDP_PER_SELECT; i++) {
+ /* there may be a blocked write packet and if so, stop
+ * reading because the reply cannot get written. The
+ * blocked packet could be written during the conn_recv
+ * handling of replies, or for a connection close. */
+ if(c->doq_socket->have_blocked_pkt) {
+ if(!c->doq_socket->event_has_write)
+ doq_socket_write_enable(c);
+ doq_pickup_timer(c);
return;
}
- if(!rep.is_proxied) {
- rep.client_addrlen = rep.remote_addrlen;
- memmove(&rep.client_addr, &rep.remote_addr,
- rep.remote_addrlen);
+ sldns_buffer_clear(c->doq_socket->pkt_buf);
+ doq_pkt_addr_init(&paddr);
+ log_assert(fd != -1);
+ log_assert(sldns_buffer_remaining(c->doq_socket->pkt_buf) > 0);
+ if(!doq_recv(c, &paddr, &pkt_continue, &pi)) {
+ if(pkt_continue)
+ continue;
+ break;
}
- fptr_ok(fptr_whitelist_comm_point(rep.c->callback));
- if((*rep.c->callback)(rep.c, rep.c->cb_arg, NETEVENT_NOERROR, &rep)) {
- /* send back immediate reply */
-#ifdef USE_DNSCRYPT
- buffer = rep.c->dnscrypt_buffer;
+ /* handle incoming packet from remote addr to localaddr */
+ if(verbosity >= VERB_ALGO) {
+ char remotestr[256], localstr[256];
+ addr_to_str((void*)&paddr.addr, paddr.addrlen,
+ remotestr, sizeof(remotestr));
+ addr_to_str((void*)&paddr.localaddr,
+ paddr.localaddrlen, localstr,
+ sizeof(localstr));
+ log_info("incoming doq packet from %s port %d on "
+ "%s port %d ifindex %d",
+ remotestr, doq_sockaddr_get_port(&paddr.addr),
+ localstr,
+ doq_sockaddr_get_port(&paddr.localaddr),
+ paddr.ifindex);
+ log_info("doq_recv length %d ecn 0x%x",
+ (int)sldns_buffer_limit(c->doq_socket->pkt_buf),
+ (int)pi.ecn);
+ }
+
+ if(sldns_buffer_limit(c->doq_socket->pkt_buf) == 0)
+ continue;
+
+ conn = NULL;
+ if(!doq_decode_pkt_header_negotiate(c, &paddr, &conn))
+ continue;
+ if(!conn) {
+ if(!doq_accept(c, &paddr, &conn, &pi))
+ continue;
+ if(!doq_conn_write_streams(c, conn, NULL)) {
+ doq_delete_connection(c, conn);
+ continue;
+ }
+ doq_done_setup_timer_and_write(c, conn);
+ continue;
+ }
+ if(
+#ifdef HAVE_NGTCP2_CONN_IN_CLOSING_PERIOD
+ ngtcp2_conn_in_closing_period(conn->conn)
#else
- buffer = rep.c->buffer;
+ ngtcp2_conn_is_in_closing_period(conn->conn)
#endif
- (void)comm_point_send_udp_msg(rep.c, buffer,
- (struct sockaddr*)&rep.remote_addr,
- rep.remote_addrlen, 0);
+ ) {
+ if(!doq_conn_send_close(c, conn)) {
+ doq_delete_connection(c, conn);
+ } else {
+ doq_done_setup_timer_and_write(c, conn);
+ }
+ continue;
}
- if(!rep.c || rep.c->fd != fd) /* commpoint closed to -1 or reused for
- another UDP port. Note rep.c cannot be reused with TCP fd. */
- break;
+ if(
+#ifdef HAVE_NGTCP2_CONN_IN_DRAINING_PERIOD
+ ngtcp2_conn_in_draining_period(conn->conn)
+#else
+ ngtcp2_conn_is_in_draining_period(conn->conn)
+#endif
+ ) {
+ doq_done_setup_timer_and_write(c, conn);
+ continue;
+ }
+ if(!doq_conn_recv(c, &paddr, conn, &pi, NULL, &err_drop)) {
+ /* The receive failed, and if it also failed to send
+ * a close, drop the connection. That means it is not
+ * in the closing period. */
+ if(err_drop) {
+ doq_delete_connection(c, conn);
+ } else {
+ doq_done_setup_timer_and_write(c, conn);
+ }
+ continue;
+ }
+ if(!doq_conn_write_streams(c, conn, &err_drop)) {
+ if(err_drop) {
+ doq_delete_connection(c, conn);
+ } else {
+ doq_done_setup_timer_and_write(c, conn);
+ }
+ continue;
+ }
+ doq_done_setup_timer_and_write(c, conn);
+ }
+
+ /* see if we want to have more write events */
+ verbose(VERB_ALGO, "doq check write enable");
+ if(doq_socket_want_write(c))
+ doq_socket_write_enable(c);
+ else doq_socket_write_disable(c);
+ doq_pickup_timer(c);
+}
+
+/** create new doq server socket structure */
+static struct doq_server_socket*
+doq_server_socket_create(struct doq_table* table, struct ub_randstate* rnd,
+ const char* ssl_service_key, const char* ssl_service_pem,
+ struct comm_point* c, struct comm_base* base, struct config_file* cfg)
+{
+ size_t doq_buffer_size = 4096; /* bytes buffer size, for one packet. */
+ struct doq_server_socket* doq_socket;
+ doq_socket = calloc(1, sizeof(*doq_socket));
+ if(!doq_socket) {
+ return NULL;
+ }
+ doq_socket->table = table;
+ doq_socket->rnd = rnd;
+ doq_socket->validate_addr = 1;
+ if(ssl_service_key == NULL || ssl_service_key[0]==0) {
+ log_err("doq server socket create: no tls-service-key");
+ free(doq_socket);
+ return NULL;
+ }
+ if(ssl_service_pem == NULL || ssl_service_pem[0]==0) {
+ log_err("doq server socket create: no tls-service-pem");
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->ssl_service_key = strdup(ssl_service_key);
+ if(!doq_socket->ssl_service_key) {
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->ssl_service_pem = strdup(ssl_service_pem);
+ if(!doq_socket->ssl_service_pem) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->ssl_verify_pem = NULL;
+ /* the doq_socket has its own copy of the static secret, as
+ * well as other config values, so that they do not need table.lock */
+ doq_socket->static_secret_len = table->static_secret_len;
+ doq_socket->static_secret = memdup(table->static_secret,
+ table->static_secret_len);
+ if(!doq_socket->static_secret) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ free(doq_socket);
+ return NULL;
+ }
+ if(!doq_socket_setup_ctx(doq_socket)) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ free(doq_socket->static_secret);
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->idle_timeout = table->idle_timeout;
+ doq_socket->sv_scidlen = table->sv_scidlen;
+ doq_socket->cp = c;
+ doq_socket->pkt_buf = sldns_buffer_new(doq_buffer_size);
+ if(!doq_socket->pkt_buf) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ free(doq_socket->static_secret);
+ SSL_CTX_free(doq_socket->ctx);
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->blocked_pkt = sldns_buffer_new(
+ sldns_buffer_capacity(doq_socket->pkt_buf));
+ if(!doq_socket->pkt_buf) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ free(doq_socket->static_secret);
+ SSL_CTX_free(doq_socket->ctx);
+ sldns_buffer_free(doq_socket->pkt_buf);
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->blocked_paddr = calloc(1,
+ sizeof(*doq_socket->blocked_paddr));
+ if(!doq_socket->blocked_paddr) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ free(doq_socket->static_secret);
+ SSL_CTX_free(doq_socket->ctx);
+ sldns_buffer_free(doq_socket->pkt_buf);
+ sldns_buffer_free(doq_socket->blocked_pkt);
+ free(doq_socket);
+ return NULL;
+ }
+ doq_socket->timer = comm_timer_create(base, doq_timer_cb, doq_socket);
+ if(!doq_socket->timer) {
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ free(doq_socket->static_secret);
+ SSL_CTX_free(doq_socket->ctx);
+ sldns_buffer_free(doq_socket->pkt_buf);
+ sldns_buffer_free(doq_socket->blocked_pkt);
+ free(doq_socket->blocked_paddr);
+ free(doq_socket);
+ return NULL;
+ }
+ memset(&doq_socket->marked_time, 0, sizeof(doq_socket->marked_time));
+ comm_base_timept(base, &doq_socket->now_tt, &doq_socket->now_tv);
+ doq_socket->cfg = cfg;
+ return doq_socket;
+}
+
+/** delete doq server socket structure */
+static void
+doq_server_socket_delete(struct doq_server_socket* doq_socket)
+{
+ if(!doq_socket)
+ return;
+ free(doq_socket->static_secret);
+ SSL_CTX_free(doq_socket->ctx);
+#ifndef HAVE_NGTCP2_CRYPTO_QUICTLS_CONFIGURE_SERVER_CONTEXT
+ free(doq_socket->quic_method);
+#endif
+ free(doq_socket->ssl_service_key);
+ free(doq_socket->ssl_service_pem);
+ free(doq_socket->ssl_verify_pem);
+ sldns_buffer_free(doq_socket->pkt_buf);
+ sldns_buffer_free(doq_socket->blocked_pkt);
+ free(doq_socket->blocked_paddr);
+ comm_timer_delete(doq_socket->timer);
+ free(doq_socket);
+}
+
+/** find repinfo in the doq table */
+static struct doq_conn*
+doq_lookup_repinfo(struct doq_table* table, struct comm_reply* repinfo)
+{
+ struct doq_conn* conn;
+ struct doq_conn_key key;
+ doq_conn_key_from_repinfo(&key, repinfo);
+ lock_rw_rdlock(&table->lock);
+ conn = doq_conn_find(table, &key.paddr.addr,
+ key.paddr.addrlen, &key.paddr.localaddr,
+ key.paddr.localaddrlen, key.paddr.ifindex, key.dcid,
+ key.dcidlen);
+ if(conn) {
+ lock_basic_lock(&conn->lock);
+ lock_rw_unlock(&table->lock);
+ return conn;
+ }
+ lock_rw_unlock(&table->lock);
+ return NULL;
+}
+
+/** doq find connection and stream. From inside callbacks from worker. */
+static int
+doq_lookup_conn_stream(struct comm_reply* repinfo, struct comm_point* c,
+ struct doq_conn** conn, struct doq_stream** stream)
+{
+ if(c->doq_socket->current_conn) {
+ *conn = c->doq_socket->current_conn;
+ } else {
+ *conn = doq_lookup_repinfo(c->doq_socket->table, repinfo);
+ if((*conn) && (*conn)->is_deleted) {
+ lock_basic_unlock(&(*conn)->lock);
+ *conn = NULL;
+ }
+ if(*conn) {
+ (*conn)->doq_socket = c->doq_socket;
+ }
+ }
+ if(!*conn) {
+ *stream = NULL;
+ return 0;
+ }
+ *stream = doq_stream_find(*conn, repinfo->doq_streamid);
+ if(!*stream) {
+ if(!c->doq_socket->current_conn) {
+ /* Not inside callbacks, we have our own lock on conn.
+ * Release it. */
+ lock_basic_unlock(&(*conn)->lock);
+ }
+ return 0;
+ }
+ if((*stream)->is_closed) {
+ /* stream is closed, ignore reply or drop */
+ if(!c->doq_socket->current_conn) {
+ /* Not inside callbacks, we have our own lock on conn.
+ * Release it. */
+ lock_basic_unlock(&(*conn)->lock);
+ }
+ return 0;
+ }
+ return 1;
+}
+
+/** doq send a reply from a comm reply */
+static void
+doq_socket_send_reply(struct comm_reply* repinfo)
+{
+ struct doq_conn* conn;
+ struct doq_stream* stream;
+ log_assert(repinfo->c->type == comm_doq);
+ if(!doq_lookup_conn_stream(repinfo, repinfo->c, &conn, &stream)) {
+ verbose(VERB_ALGO, "doq: send_reply but %s is gone",
+ (conn?"stream":"connection"));
+ /* No stream, it may have been closed. */
+ /* Drop the reply, it cannot be sent. */
+ return;
+ }
+ if(!doq_stream_send_reply(conn, stream, repinfo->c->buffer))
+ doq_stream_close(conn, stream, 1);
+ if(!repinfo->c->doq_socket->current_conn) {
+ /* Not inside callbacks, we have our own lock on conn.
+ * Release it. */
+ doq_done_with_conn_cb(repinfo->c, conn);
+ /* since we sent a reply, or closed it, the assumption is
+ * that there is something to write, so enable write event.
+ * It waits until the write event happens to write the
+ * streams with answers, this allows some answers to be
+ * answered before the event loop reaches the doq fd, in
+ * repinfo->c->fd, and that collates answers. That would
+ * not happen if we write doq packets right now. */
+ doq_socket_write_enable(repinfo->c);
+ }
+}
+
+/** doq drop a reply from a comm reply */
+static void
+doq_socket_drop_reply(struct comm_reply* repinfo)
+{
+ struct doq_conn* conn;
+ struct doq_stream* stream;
+ log_assert(repinfo->c->type == comm_doq);
+ if(!doq_lookup_conn_stream(repinfo, repinfo->c, &conn, &stream)) {
+ verbose(VERB_ALGO, "doq: drop_reply but %s is gone",
+ (conn?"stream":"connection"));
+ /* The connection or stream is already gone. */
+ return;
+ }
+ doq_stream_close(conn, stream, 1);
+ if(!repinfo->c->doq_socket->current_conn) {
+ /* Not inside callbacks, we have our own lock on conn.
+ * Release it. */
+ doq_done_with_conn_cb(repinfo->c, conn);
+ doq_socket_write_enable(repinfo->c);
}
}
+#endif /* HAVE_NGTCP2 */
int adjusted_tcp_timeout(struct comm_point* c)
{