log_err("write socket: %s", strerror(errno));
}
+/** release workrequest back to the freelist,
+ * note that the w->qinfo still needs to be cleared after this.
+ */
+static void
+req_release(struct work_query* w)
+{
+ if(w->worker->num_requests == w->worker->request_size) {
+ /* no longer at max, start accepting again. */
+ listen_resume(w->worker->front);
+ }
+ w->worker->num_requests --;
+ log_assert(w->worker->num_requests >= 0);
+ w->next = w->worker->free_queries;
+ w->worker->free_queries = w;
+}
+
/** reply to query with given error code */
static void
-replyerror(int r, struct worker* worker)
+replyerror(int r, struct work_query* w)
{
- ldns_buffer* buf = worker->query_reply.c->buffer;
+ ldns_buffer* buf = w->query_reply.c->buffer;
uint16_t flags;
verbose(VERB_DETAIL, "reply with error");
ldns_buffer_clear(buf);
- ldns_buffer_write_u16(buf, worker->query_id);
+ ldns_buffer_write_u16(buf, w->query_id);
flags = (uint16_t)(0x8000 | r); /* QR and retcode*/
- flags |= (worker->query_flags & 0x0100); /* copy RD bit */
+ flags |= (w->query_flags & 0x0100); /* copy RD bit */
ldns_buffer_write_u16(buf, flags);
flags = 1;
ldns_buffer_write_u16(buf, flags);
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
- ldns_buffer_write(buf, worker->qinfo.qname, worker->qinfo.qnamesize);
- ldns_buffer_write_u16(buf, worker->qinfo.qtype);
- ldns_buffer_write_u16(buf, worker->qinfo.qclass);
+ ldns_buffer_write(buf, w->qinfo.qname, w->qinfo.qnamesize);
+ ldns_buffer_write_u16(buf, w->qinfo.qtype);
+ ldns_buffer_write_u16(buf, w->qinfo.qclass);
ldns_buffer_flip(buf);
- comm_point_send_reply(&worker->query_reply);
- if(worker->num_requests == 1) {
- /* no longer at max, start accepting again. */
- listen_resume(worker->front);
- }
- worker->num_requests --;
- query_info_clear(&worker->qinfo);
+ comm_point_send_reply(&w->query_reply);
+ req_release(w);
+ query_info_clear(&w->qinfo);
}
/** process incoming replies from the network */
worker_handle_reply(struct comm_point* c, void* arg, int error,
struct comm_reply* ATTR_UNUSED(reply_info))
{
- struct worker* worker = (struct worker*)arg;
+ struct work_query* w = (struct work_query*)arg;
struct reply_info* rep;
struct msgreply_entry* e;
- verbose(VERB_DETAIL, "reply to query with stored ID %d",
- worker->query_id);
+ verbose(VERB_DETAIL, "reply to query with stored ID %d", w->query_id);
if(error != 0) {
- replyerror(LDNS_RCODE_SERVFAIL, worker);
+ replyerror(LDNS_RCODE_SERVFAIL, w);
return 0;
}
/* sanity check. */
rep = (struct reply_info*)malloc(sizeof(struct reply_info));
if(!rep) {
log_err("out of memory");
- replyerror(LDNS_RCODE_SERVFAIL, worker);
+ replyerror(LDNS_RCODE_SERVFAIL, w);
return 0;
}
rep->replysize = ldns_buffer_limit(c->buffer) - 2; /* minus ID */
if(!rep->reply) {
free(rep);
log_err("out of memory");
- replyerror(LDNS_RCODE_SERVFAIL, worker);
+ replyerror(LDNS_RCODE_SERVFAIL, w);
return 0;
}
memmove(rep->reply, ldns_buffer_at(c->buffer, 2), rep->replysize);
- ldns_buffer_write_u16_at(worker->query_reply.c->buffer, 0,
- worker->query_id);
- reply_info_answer(rep, worker->query_flags, worker->query_reply.c->
- buffer);
- comm_point_send_reply(&worker->query_reply);
- if(worker->num_requests == 1) {
- /* no longer at max, start accepting again. */
- listen_resume(worker->front);
- }
- worker->num_requests --;
+ ldns_buffer_write_u16_at(w->query_reply.c->buffer, 0, w->query_id);
+ reply_info_answer(rep, w->query_flags, w->query_reply.c->buffer);
+ comm_point_send_reply(&w->query_reply);
+ req_release(w);
/* store or update reply in the cache */
- if(!(e = query_info_entrysetup(&worker->qinfo, rep,
- worker->query_hash))) {
+ if(!(e = query_info_entrysetup(&w->qinfo, rep, w->query_hash))) {
log_err("out of memory");
return 0;
}
- slabhash_insert(worker->daemon->msg_cache, worker->query_hash,
+ slabhash_insert(w->worker->daemon->msg_cache, w->query_hash,
&e->entry, rep);
return 0;
}
/** process incoming request */
static void
-worker_process_query(struct worker* worker)
+worker_process_query(struct worker* worker, struct work_query* w)
{
/* query the forwarding address */
- worker->query_id = LDNS_ID_WIRE(ldns_buffer_begin(
- worker->query_reply.c->buffer));
- worker->query_flags = ldns_buffer_read_u16_at(worker->
- query_reply.c->buffer, 2);
- verbose(VERB_DETAIL, "process_query ID %d", worker->query_id);
- pending_udp_query(worker->back, worker->query_reply.c->buffer,
+ verbose(VERB_DETAIL, "process_query ID %d", w->query_id);
+ pending_udp_query(worker->back, w->query_reply.c->buffer,
&worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
- worker_handle_reply, worker, worker->rndstate);
+ worker_handle_reply, w, worker->rndstate);
}
/** check request sanity. Returns error code, 0 OK, or -1 discard.
int ret;
hashvalue_t h;
struct lruhash_entry* e;
+ struct query_info qinfo;
+ struct work_query* w;
+
verbose(VERB_DETAIL, "worker handle request");
if(error != NETEVENT_NOERROR) {
log_err("called with err=%d", error);
comm_point_drop_reply(repinfo);
return 0;
}
- if(worker->num_requests > 0) {
- /* we could get this due to a slow tcp incoming query,
- that started before we performed listen_pushback */
- verbose(VERB_DETAIL, "worker: too many incoming requests "
- "active. dropping incoming query.");
- comm_point_drop_reply(repinfo);
- return 0;
- }
/* see if query is in the cache */
- if(!query_info_parse(&worker->qinfo, c->buffer)) {
+ if(!query_info_parse(&qinfo, c->buffer)) {
LDNS_QR_SET(ldns_buffer_begin(c->buffer));
LDNS_RCODE_SET(ldns_buffer_begin(c->buffer),
LDNS_RCODE_FORMERR);
return 1;
}
- h = query_info_hash(&worker->qinfo);
- if((e=slabhash_lookup(worker->daemon->msg_cache, h, &worker->qinfo,
- 0))) {
+ h = query_info_hash(&qinfo);
+ if((e=slabhash_lookup(worker->daemon->msg_cache, h, &qinfo, 0))) {
/* answer from cache */
log_info("answer from the cache");
- query_info_clear(&worker->qinfo);
/* id is still in the buffer, no need to touch it */
reply_info_answer((struct reply_info*)e->data,
ldns_buffer_read_u16_at(c->buffer, 2), c->buffer);
return 1;
}
ldns_buffer_rewind(c->buffer);
+ /* perform memory allocation(s) */
+ if(!query_info_allocqname(&qinfo)) {
+ comm_point_drop_reply(repinfo);
+ return 0;
+ }
- /* answer it */
- worker->query_hash = h;
+ /* grab a work request structure for this new request */
+ if(!worker->free_queries) {
+ /* we could get this due to a slow tcp incoming query,
+ that started before we performed listen_pushback */
+ verbose(VERB_DETAIL, "worker: too many incoming requests "
+ "active. dropping incoming query.");
+ comm_point_drop_reply(repinfo);
+ return 0;
+ }
+ w = worker->free_queries;
+ worker->free_queries = w->next;
worker->num_requests ++;
- if(worker->num_requests >= 1) {
+ log_assert(worker->num_requests <= worker->request_size);
+ if(worker->num_requests == worker->request_size) {
/* the max request number has been reached, stop accepting */
listen_pushback(worker->front);
}
- memcpy(&worker->query_reply, repinfo, sizeof(struct comm_reply));
- worker_process_query(worker);
+
+ /* init request */
+ w->next = NULL;
+ w->query_hash = h;
+ memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply));
+ memcpy(&w->qinfo, &qinfo, sizeof(struct query_info));
+ w->query_id = LDNS_ID_WIRE(ldns_buffer_begin(c->buffer));
+ w->query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
+
+ /* answer it */
+ worker_process_query(worker, w);
return 0;
}
return worker;
}
+/** create request handling structures */
+static int
+reqs_init(struct worker* worker)
+{
+ int i;
+ for(i=0; i<worker->request_size; i++) {
+ struct work_query* q = (struct work_query*)calloc(1,
+ sizeof(struct work_query));
+ if(!q) return 0;
+ q->worker = worker;
+ q->next = worker->free_queries;
+ worker->free_queries = q;
+ }
+ return 1;
+}
+
int
worker_init(struct worker* worker, struct config_file *cfg,
struct listen_port* ports, size_t buffer_size, int do_sigs)
return 0;
}
}
+ worker->request_size = 1;
+ if(!reqs_init(worker)) {
+ worker_delete(worker);
+ return 0;
+ }
/* set forwarder address */
if(cfg->fwd_address && cfg->fwd_address[0]) {