]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
iterator module.
authorWouter Wijngaards <wouter@nlnetlabs.nl>
Fri, 11 May 2007 14:16:42 +0000 (14:16 +0000)
committerWouter Wijngaards <wouter@nlnetlabs.nl>
Fri, 11 May 2007 14:16:42 +0000 (14:16 +0000)
git-svn-id: file:///svn/unbound/trunk@311 be551aaa-1e26-0410-a405-d3ace91eadb9

14 files changed:
Makefile.in
daemon/daemon.c
daemon/daemon.h
daemon/worker.c
daemon/worker.h
doc/Changelog
iterator/iterator.c [new file with mode: 0644]
iterator/iterator.h [new file with mode: 0644]
services/outside_network.c
services/outside_network.h
testcode/fake_event.c
util/module.c
util/module.h
util/net_help.h

index 308280407250febc03ad37c8cbffa073333a76e5..5bff4f0bda1753efe1985e3145016c44f722f6c7 100644 (file)
@@ -50,7 +50,7 @@ LINTFLAGS+="-DBN_ULONG=unsigned long" -Dkrb5_int32=int "-Dkrb5_ui_4=unsigned int
 
 INSTALL=$(srcdir)/install-sh 
 
-COMMON_SRC=$(wildcard services/*.c util/*.c util/data/*.c util/storage/*.c) util/configparser.c util/configlexer.c testcode/checklocks.c
+COMMON_SRC=$(wildcard services/*.c util/*.c util/data/*.c util/storage/*.c iterator/*.c) util/configparser.c util/configlexer.c testcode/checklocks.c
 COMMON_OBJ=$(addprefix $(BUILD),$(COMMON_SRC:.c=.o))
 COMPAT_OBJ=$(addprefix $(BUILD)compat/,$(LIBOBJS))
 UNITTEST_SRC=$(wildcard testcode/unit*.c) testcode/readhex.c $(COMMON_SRC)
index 8c6742dd3dc7f116b19c2bbf066889f3f41ed712..611c04039df3a80518d59dc22012cd6df4c0d79c 100644 (file)
@@ -50,6 +50,8 @@
 #include "util/data/msgreply.h"
 #include "util/storage/slabhash.h"
 #include "services/listen_dnsport.h"
+#include "util/module.h"
+#include "iterator/iterator.h"
 #include <signal.h>
 
 /** How many quit requests happened. */
@@ -136,6 +138,11 @@ daemon_init()
                return NULL;
        }
        alloc_init(&daemon->superalloc, NULL, 0);
+       if(!(daemon->env = (struct module_env*)calloc(1, 
+               sizeof(*daemon->env)))) {
+               daemon_delete(daemon);
+               return NULL;
+       }
        return daemon;  
 }
 
@@ -152,6 +159,37 @@ daemon_open_shared_ports(struct daemon* daemon)
        return 1;
 }
 
+/**
+ * Setup modules. Assigns ids and calls module_init.
+ * @param daemon: the daemon
+ */
+static void daemon_setup_modules(struct daemon* daemon)
+{
+       int i;
+       /* fixed setup of the modules */
+       daemon->num_modules = 1;
+       daemon->modfunc = (struct module_func_block**)calloc((size_t)
+               daemon->num_modules, sizeof(struct module_func_block*));
+       if(!daemon->modfunc) {
+               fatal_exit("malloc failure allocating function callbacks");
+       }
+       daemon->modfunc[0] = iter_get_funcblock();
+       daemon->env->cfg = daemon->cfg;
+       daemon->env->msg_cache = daemon->msg_cache;
+       daemon->env->rrset_cache = daemon->rrset_cache;
+       daemon->env->alloc = &daemon->superalloc;
+       daemon->env->worker = NULL;
+       daemon->env->send_query = &worker_send_query;
+       for(i=0; i<daemon->num_modules; i++) {
+               log_info("init module %d: %s", i, daemon->modfunc[i]->name);
+               if(!(*daemon->modfunc[i]->init)(daemon->env, i)) {
+                       fatal_exit("module init for module %s failed",
+                               daemon->modfunc[i]->name);
+               }
+       }
+
+}
+
 /**
  * Allocate empty worker structures. With backptr and thread-number,
  * from 0..numthread initialised. Used as user arguments to new threads.
@@ -264,10 +302,30 @@ daemon_stop_others(struct daemon* daemon)
        }
 }
 
+/**
+ * Desetup the modules, deinit, delete.
+ * @param daemon: the daemon.
+ */
+static void
+daemon_desetup_modules(struct daemon* daemon)
+{
+       int i;
+       for(i=0; i<daemon->num_modules; i++) {
+               (*daemon->modfunc[i]->deinit)(daemon->env, i);
+       }
+       daemon->num_modules = 0;
+       free(daemon->modfunc);
+       daemon->modfunc = 0;
+}
+
 void 
 daemon_fork(struct daemon* daemon)
 {
        log_assert(daemon);
+
+       /* setup modules */
+       daemon_setup_modules(daemon);
+
        /* first create all the worker structures, so we can pass
         * them to the newly created threads. 
         */
@@ -293,6 +351,9 @@ daemon_fork(struct daemon* daemon)
        /* we exited! a signal happened! Stop other threads */
        daemon_stop_others(daemon);
 
+       /* de-setup modules */
+       daemon_desetup_modules(daemon);
+
        if(daemon->workers[0]->need_to_restart)
                daemon->need_to_exit = 0;
        else    daemon->need_to_exit = 1;
@@ -326,6 +387,7 @@ daemon_delete(struct daemon* daemon)
        alloc_clear(&daemon->superalloc);
        free(daemon->cwd);
        free(daemon->pidfile);
+       free(daemon->env);
        free(daemon);
        checklock_stop();
 }
index 2979f65f86ff2ebc0e09c39fa0447caa347d17f9..ef094cc241ef89bf2e0a3d03e54f5d9f4b24ad04 100644 (file)
@@ -48,6 +48,7 @@ struct config_file;
 struct worker;
 struct listen_port;
 struct slabhash;
+struct module_env;
 
 /**
  * Structure holding worker list.
@@ -76,6 +77,12 @@ struct daemon {
        struct slabhash* msg_cache;
        /** the rrset cache, content is struct ub_packed_rrset_key* */
        struct slabhash* rrset_cache;
+       /** the module environment master value, copied and changed by threads*/
+       struct module_env* env;
+       /** number of modules active, ids from 0 to num-1. */
+       int num_modules;
+       /** the module callbacks, array of num_modules length */
+       struct module_func_block** modfunc;
 };
 
 /**
index 9fbd1e5bd90ff67b3464df20167a41cb0c806e38..0382197c3a3e0a2821ab238e7baa003b54d5c7ee 100644 (file)
 #include <netdb.h>
 #include <signal.h>
 
-/** size of ID+FLAGS in a DNS message */
-#define DNS_ID_AND_FLAGS 4
-/** timeout in seconds for UDP queries to auth servers. TODO: proper rtt */
-#define UDP_QUERY_TIMEOUT 4
-/** timeout in seconds for TCP queries to auth servers. TODO: proper rtt */
-#define TCP_QUERY_TIMEOUT 30 
-/** Advertised version of EDNS capabilities */
-#define EDNS_ADVERTISED_VERSION        0
-/** Advertised size of EDNS capabilities */
-#define EDNS_ADVERTISED_SIZE   4096
-
-
 void 
 worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
        enum worker_commands cmd)
@@ -91,14 +79,15 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
 static void
 req_release(struct work_query* w)
 {
-       if(w->worker->num_requests == w->worker->request_size)  {
+       struct worker* worker = w->state.env->worker;
+       if(worker->num_requests == worker->request_size)  {
                /* no longer at max, start accepting again. */
-               listen_resume(w->worker->front);
+               listen_resume(worker->front);
        }
-       log_assert(w->worker->num_requests >= 1);
-       w->worker->num_requests --;
-       w->next = w->worker->free_queries;
-       w->worker->free_queries = w;
+       log_assert(worker->num_requests >= 1);
+       worker->num_requests --;
+       w->next = worker->free_queries;
+       worker->free_queries = w;
 }
 
 /** create error and fill into buffer */
@@ -131,180 +120,80 @@ replyerror_fillbuf(int r, struct comm_reply* repinfo, uint16_t id,
 static void 
 replyerror(int r, struct work_query* w)
 {
-       w->edns.edns_version = EDNS_ADVERTISED_VERSION;
-       w->edns.udp_size = EDNS_ADVERTISED_SIZE;
-       w->edns.ext_rcode = 0;
-       w->edns.bits &= EDNS_DO;
-       replyerror_fillbuf(r, &w->query_reply, w->query_id, w->query_flags,
-               &w->qinfo);
-       attach_edns_record(w->query_reply.c->buffer, &w->edns);
+       w->state.edns.edns_version = EDNS_ADVERTISED_VERSION;
+       w->state.edns.udp_size = EDNS_ADVERTISED_SIZE;
+       w->state.edns.ext_rcode = 0;
+       w->state.edns.bits &= EDNS_DO;
+       replyerror_fillbuf(r, &w->query_reply, w->query_id, 
+               w->state.query_flags, &w->state.qinfo);
+       attach_edns_record(w->query_reply.c->buffer, &w->state.edns);
        comm_point_send_reply(&w->query_reply);
        req_release(w);
-       query_info_clear(&w->qinfo);
-}
-
-/** see if rrset needs to be updated in the cache */
-static int
-need_to_update_rrset(struct packed_rrset_data* newd, 
-       struct packed_rrset_data* cached)
-{
-       /*      o if current RRset is more trustworthy - insert it */
-       if( newd->trust > cached->trust )
-               return 1;
-       /*      o same trust, but different in data - insert it */
-       if( newd->trust == cached->trust &&
-               !rrsetdata_equal(newd, cached))
-               return 1;
-       /*      o see if TTL is better than TTL in cache. */
-       /*        if so, see if rrset+rdata is the same */
-       /*        if so, update TTL in cache, even if trust is worse. */
-       if( newd->ttl > cached->ttl &&
-               rrsetdata_equal(newd, cached))
-               return 1;
-       return 0;
+       query_info_clear(&w->state.qinfo);
 }
 
-/** store rrsets in the rrset cache. */
-static void
-worker_store_rrsets(struct worker* worker, struct reply_info* rep)
+/** process incoming request */
+static void 
+worker_process_query(struct worker* worker, struct work_query* w, 
+       enum module_ev event) 
 {
-       struct lruhash_entry* e;
-       size_t i;
-       /* see if rrset already exists in cache, if not insert it. */
-       /* if it does exist: check to insert it */
-       for(i=0; i<rep->rrset_count; i++) {
-               rep->ref[i].key = rep->rrsets[i];
-               rep->ref[i].id = rep->rrsets[i]->id;
-               /* looks up item with a readlock - no editing! */
-               if((e=slabhash_lookup(worker->daemon->rrset_cache,
-                       rep->rrsets[i]->entry.hash, rep->rrsets[i]->entry.key,
-                       0)) != 0) {
-                       struct packed_rrset_data* data = 
-                               (struct packed_rrset_data*)e->data;
-                       struct packed_rrset_data* rd = 
-                               (struct packed_rrset_data*)
-                               rep->rrsets[i]->entry.data;
-                       rep->ref[i].key = (struct ub_packed_rrset_key*)e->key;
-                       rep->ref[i].id = rep->rrsets[i]->id;
-                       /* found in cache, do checks above */
-                       if(!need_to_update_rrset(rd, data)) {
-                               lock_rw_unlock(&e->lock);
-                               ub_packed_rrset_parsedelete(rep->rrsets[i],
-                                       &worker->alloc);
-                               rep->rrsets[i] = rep->ref[i].key;
-                               continue; /* use cached item instead */
-                       }
-                       if(rd->trust < data->trust)
-                               rd->trust = data->trust;
-                       lock_rw_unlock(&e->lock);
-                       /* small gap here, where entry is not locked.
-                        * possibly entry is updated with something else.
-                        * this is just too bad, its cache anyway. */
-                       /* use insert to update entry to manage lruhash
-                        * cache size values nicely. */
-               }
-               slabhash_insert(worker->daemon->rrset_cache, 
-                       rep->rrsets[i]->entry.hash, &rep->rrsets[i]->entry,
-                       rep->rrsets[i]->entry.data, &worker->alloc);
-               if(e) rep->rrsets[i] = rep->ref[i].key;
+       int i;
+       if(event == module_event_new) {
+               w->state.curmod = 0;
+               for(i=0; i<worker->daemon->num_modules; i++)
+                       w->state.ext_state[i] = module_state_initial;
+       }
+       /* allow current module to run */
+       (*worker->daemon->modfunc[w->state.curmod]->operate)(&w->state, event,
+               w->state.curmod);
+       /* TODO examine results, start further modules, etc.
+        * assume it went to sleep
+        */
+       region_free_all(worker->scratchpad);
+       if(w->state.ext_state[w->state.curmod] == module_error) {
+               region_free_all(w->state.region);
+               replyerror(LDNS_RCODE_SERVFAIL, w);
+               return;
+       }
+       if(w->state.ext_state[w->state.curmod] == module_finished) {
+               memcpy(ldns_buffer_begin(w->query_reply.c->buffer),
+                       &w->query_id, sizeof(w->query_id));
+               comm_point_send_reply(&w->query_reply);
+               region_free_all(w->state.region);
+               req_release(w);
+               query_info_clear(&w->state.qinfo);
+               return;
        }
+       /* suspend, waits for wakeup callback */
 }
 
 /** process incoming replies from the network */
 static int 
 worker_handle_reply(struct comm_point* c, void* arg, int error, 
-       struct comm_reply* ATTR_UNUSED(reply_info))
+       struct comm_reply* reply_info)
 {
        struct work_query* w = (struct work_query*)arg;
-       struct query_info qinf;
-       struct reply_info* rep;
-       struct msgreply_entry* e;
-       struct edns_data svr_edns; /* unused server edns advertisement */
-       uint16_t us;
-       int r;
+       struct worker* worker = w->state.env->worker;
 
-       verbose(VERB_DETAIL, "reply to query with stored ID %d", 
-               ntohs(w->query_id)); /* byteswapped so same as dig prints */
+       w->state.reply = reply_info;
        if(error != 0) {
-               replyerror(LDNS_RCODE_SERVFAIL, w);
+               worker_process_query(worker, w, module_event_timeout);
                return 0;
        }
        /* sanity check. */
-       if(!LDNS_QR_WIRE(ldns_buffer_begin(c->buffer)))
-               return 0; /* not a reply. */
-       if(LDNS_OPCODE_WIRE(ldns_buffer_begin(c->buffer)) != LDNS_PACKET_QUERY)
-               return 0; /* not a reply to a query. */
-       if(LDNS_QDCOUNT(ldns_buffer_begin(c->buffer)) > 1)
-               return 0; /* too much in the query section */
-       /* see if it is truncated */
-       if(LDNS_TC_WIRE(ldns_buffer_begin(c->buffer)) && c->type == comm_udp) {
-               log_info("TC: truncated. retry in TCP mode.");
-               qinfo_query_encode(w->worker->back->udp_buff, &w->qinfo);
-               pending_tcp_query(w->worker->back, w->worker->back->udp_buff, 
-                       &w->worker->fwd_addr, w->worker->fwd_addrlen, 
-                       TCP_QUERY_TIMEOUT, worker_handle_reply, w, 
-                       w->worker->rndstate);
+       if(!LDNS_QR_WIRE(ldns_buffer_begin(c->buffer))
+               || LDNS_OPCODE_WIRE(ldns_buffer_begin(c->buffer)) != 
+                       LDNS_PACKET_QUERY
+               || LDNS_QDCOUNT(ldns_buffer_begin(c->buffer)) > 1) {
+               /* error becomes timeout for the module as if this reply
+                * never arrived. */
+               worker_process_query(worker, w, module_event_timeout);
                return 0;
        }
-       /* woohoo a reply! */
-       if((r=reply_info_parse(c->buffer, &w->worker->alloc, &qinf, &rep,
-               w->worker->scratchpad, &svr_edns))!=0) {
-               if(r == LDNS_RCODE_SERVFAIL)
-                       log_err("reply_info_parse: out of memory");
-               /* formerr on my parse gives servfail to my client */
-               replyerror(LDNS_RCODE_SERVFAIL, w);
-               region_free_all(w->worker->scratchpad);
-               return 0;
-       }
-       us = w->edns.udp_size;
-       w->edns.edns_version = EDNS_ADVERTISED_VERSION;
-       w->edns.udp_size = EDNS_ADVERTISED_SIZE;
-       w->edns.ext_rcode = 0;
-       w->edns.bits &= EDNS_DO;
-       if(!reply_info_answer_encode(&qinf, rep, w->query_id, w->query_flags,
-               w->query_reply.c->buffer, 0, 0, w->worker->scratchpad, us, 
-               &w->edns)) {
-               replyerror(LDNS_RCODE_SERVFAIL, w);
-               query_info_clear(&qinf);
-               reply_info_parsedelete(rep, &w->worker->alloc);
-               region_free_all(w->worker->scratchpad);
-               return 0;
-       }
-       comm_point_send_reply(&w->query_reply);
-       region_free_all(w->worker->scratchpad);
-       req_release(w);
-       query_info_clear(&w->qinfo);
-       if(rep->ttl == 0) {
-               log_info("TTL 0: dropped");
-               query_info_clear(&qinf);
-               reply_info_parsedelete(rep, &w->worker->alloc);
-               return 0;
-       }
-       reply_info_set_ttls(rep, time(0));
-       worker_store_rrsets(w->worker, rep);
-       reply_info_sortref(rep);
-       /* store msg in the cache */
-       if(!(e = query_info_entrysetup(&qinf, rep, w->query_hash))) {
-               query_info_clear(&qinf);
-               reply_info_parsedelete(rep, &w->worker->alloc);
-               return 0;
-       }
-       slabhash_insert(w->worker->daemon->msg_cache, w->query_hash, 
-               &e->entry, rep, &w->worker->alloc);
+       worker_process_query(worker, w, module_event_reply);
        return 0;
 }
 
-/** process incoming request */
-static void 
-worker_process_query(struct worker* worker, struct work_query* w) 
-{
-       /* query the forwarding address */
-       verbose(VERB_DETAIL, "process_query ID %d", ntohs(w->query_id));
-       pending_udp_query(worker->back, w->query_reply.c->buffer, 
-               &worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
-               worker_handle_reply, w, worker->rndstate);
-}
-
 /** check request sanity. Returns error code, 0 OK, or -1 discard. 
  * @param pkt: the wire packet to examine for sanity.
 */
@@ -515,7 +404,7 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
                query_info_clear(&qinfo);
                return 0;
        }
-       w->edns = edns;
+       w->state.edns = edns;
        worker->free_queries = w->next;
        worker->num_requests ++;
        log_assert(worker->num_requests <= worker->request_size);
@@ -526,14 +415,15 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
 
        /* init request */
        w->next = NULL;
-       w->query_hash = h;
+       w->state.query_hash = h;
        memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply));
-       memcpy(&w->qinfo, &qinfo, sizeof(struct query_info));
+       memcpy(&w->state.qinfo, &qinfo, sizeof(struct query_info));
        memcpy(&w->query_id, ldns_buffer_begin(c->buffer), sizeof(uint16_t));
-       w->query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
+       w->state.query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
 
        /* answer it */
-       worker_process_query(worker, w);
+       w->state.buf = c->buffer;
+       worker_process_query(worker, w, module_event_new);
        return 0;
 }
 
@@ -610,7 +500,16 @@ reqs_init(struct worker* worker)
                struct work_query* q = (struct work_query*)calloc(1,
                        sizeof(struct work_query));
                if(!q) return 0;
-               q->worker = worker;
+               q->state.buf = worker->front->udp_buff;
+               q->state.scratch = worker->scratchpad;
+               q->state.region = region_create_custom(malloc, free, 1024, 
+                       64, 16, 0);
+               if(!q->state.region) {
+                       free(q);
+                       return 0;
+               }
+               q->state.env = &worker->env;
+               q->state.work_info = q;
                q->next = worker->free_queries;
                worker->free_queries = q;
                q->all_next = worker->all_queries;
@@ -627,9 +526,10 @@ reqs_delete(struct worker* worker)
        struct work_query* n;
        while(q) {
                n = q->all_next;
-               log_assert(q->worker == worker);
+               log_assert(q->state.env->worker == worker);
                /* comm_reply closed in outside_network_delete */
-               query_info_clear(&q->qinfo);
+               query_info_clear(&q->state.qinfo);
+               region_destroy(q->state.region);
                free(q);
                q = n;
        }
@@ -710,24 +610,25 @@ worker_init(struct worker* worker, struct config_file *cfg,
                        return 0;
                }
        }
+       worker->scratchpad = region_create_custom(malloc, free, 
+               65536, 8192, 32, 1);
+       if(!worker->scratchpad) {
+               log_err("malloc failure");
+               worker_delete(worker);
+               return 0;
+       }
        worker->request_size = cfg->num_queries_per_thread;
        if(!reqs_init(worker)) {
                worker_delete(worker);
                return 0;
        }
 
-       /* set forwarder address */
-       if(cfg->fwd_address && cfg->fwd_address[0]) {
-               if(!worker_set_fwd(worker, cfg->fwd_address, cfg->fwd_port)) {
-                       worker_delete(worker);
-                       fatal_exit("could not set forwarder address");
-               }
-       }
        server_stats_init(&worker->stats);
        alloc_init(&worker->alloc, &worker->daemon->superalloc, 
                worker->thread_num);
-       worker->scratchpad = region_create_custom(malloc, free, 
-               65536, 8192, 32, 1);
+       worker->env = *worker->daemon->env;
+       worker->env.worker = worker;
+       worker->env.alloc = &worker->alloc;
        return 1;
 }
 
@@ -765,34 +666,15 @@ worker_delete(struct worker* worker)
 }
 
 int 
-worker_set_fwd(struct worker* worker, const char* ip, int port)
+worker_send_query(ldns_buffer* pkt, struct sockaddr_storage* addr,
+        socklen_t addrlen, int timeout, struct module_qstate* q, int use_tcp)
 {
-       uint16_t p;
-       log_assert(worker && ip);
-       p = (uint16_t) port;
-       if(str_is_ip6(ip)) {
-               struct sockaddr_in6* sa = 
-                       (struct sockaddr_in6*)&worker->fwd_addr;
-               worker->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in6);
-               memset(sa, 0, worker->fwd_addrlen);
-               sa->sin6_family = AF_INET6;
-               sa->sin6_port = (in_port_t)htons(p);
-               if(inet_pton((int)sa->sin6_family, ip, &sa->sin6_addr) <= 0) {
-                       log_err("Bad ip6 address %s", ip);
-                       return 0;
-               }
-       } else { /* ip4 */
-               struct sockaddr_in* sa = 
-                       (struct sockaddr_in*)&worker->fwd_addr;
-               worker->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in);
-               memset(sa, 0, worker->fwd_addrlen);
-               sa->sin_family = AF_INET;
-               sa->sin_port = (in_port_t)htons(p);
-               if(inet_pton((int)sa->sin_family, ip, &sa->sin_addr) <= 0) {
-                       log_err("Bad ip4 address %s", ip);
-                       return 0;
-               }
-       }
-       verbose(VERB_ALGO, "fwd queries to: %s %d", ip, p);
-       return 1;
+       struct worker* worker = q->env->worker;
+       if(use_tcp) {
+               return pending_tcp_query(worker->back, pkt, addr, addrlen, 
+                       timeout, worker_handle_reply, q->work_info, 
+                       worker->rndstate);
+       }
+       return pending_udp_query(worker->back, pkt, addr, addrlen, timeout,
+               worker_handle_reply, q->work_info, worker->rndstate);
 }
index d278aec6aa212cf443088935138fe3bbbb560543..0d21b3e7c13918bc50d8b8d1aafd68a149dbb8bb 100644 (file)
@@ -50,6 +50,7 @@
 #include "util/data/msgreply.h"
 #include "util/data/msgparse.h"
 #include "daemon/stats.h"
+#include "util/module.h"
 struct listen_dnsport;
 struct outside_network;
 struct config_file;
@@ -71,22 +72,14 @@ enum worker_commands {
 struct work_query {
        /** next query in freelist */
        struct work_query* next;
-       /** the worker for this query */
-       struct worker* worker;
+       /** query state */
+       struct module_qstate state;
        /** the query reply destination, packet buffer and where to send. */
        struct comm_reply query_reply;
-       /** the query_info structure from the query */
-       struct query_info qinfo;
-       /** hash value of the query qinfo */
-       hashvalue_t query_hash;
-       /** next query in all-list */
-       struct work_query* all_next;
        /** id of query, in network byteorder. */
        uint16_t query_id;
-       /** flags uint16 from query */
-       uint16_t query_flags;
-       /** edns data from the query */
-       struct edns_data edns;
+       /** next query in all-list */
+       struct work_query* all_next;
 };
 
 /**
@@ -124,11 +117,6 @@ struct worker {
        /** list of all working queries */
        struct work_query* all_queries;
 
-       /** address to forward to */
-       struct sockaddr_storage fwd_addr;
-       /** length of fwd_addr */
-       socklen_t fwd_addrlen;
-
        /** random() table for this worker. */
        struct ub_randstate* rndstate;
        /** do we need to restart (instead of exit) ? */
@@ -139,6 +127,9 @@ struct worker {
        struct server_stats stats;
        /** thread scratch region */
        struct region* scratchpad;
+
+       /** module environment passed to modules, changed for this thread */
+       struct module_env env;
 };
 
 /**
@@ -173,15 +164,6 @@ void worker_work(struct worker* worker);
  */
 void worker_delete(struct worker* worker);
 
-/**
- * Set forwarder
- * @param worker: the worker to modify.
- * @param ip: the server name.
- * @param port: port on server or NULL for default 53.
- * @return: false on error.
- */
-int worker_set_fwd(struct worker* worker, const char* ip, int port);
-
 /**
  * Send a command to a worker. Uses blocking writes.
  * @param worker: worker to send command to.
@@ -198,4 +180,18 @@ void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
  */
 void worker_sighandler(int sig, void* arg);
 
+/**
+ * Worker service routine to send udp messages for modules.
+ * @param pkt: packet to send.
+ * @param addr: where to.
+ * @param addrlen: length of addr.
+ * @param timeout: seconds to wait until timeout.
+ * @param q: wich query state to reactivate upon return.
+ * @param use_tcp: true to use TCP, false for UDP.
+ * return: false on failure (memory or socket related). no query was
+ *      sent.
+ */
+int worker_send_query(ldns_buffer* pkt, struct sockaddr_storage* addr,
+       socklen_t addrlen, int timeout, struct module_qstate* q, int use_tcp);
+
 #endif /* DAEMON_WORKER_H */
index 7ffad134f0196ac5074516b4369de8acdcac5757..91a6f8303689182c9ac5a729e93b64432c79e2ea 100644 (file)
@@ -1,3 +1,6 @@
+11 May 2007: Wouter
+       - iterator/iterator.c module.
+
 10 May 2007: Wouter
        - created release-0.3 svn tag.
        - util/module.h
diff --git a/iterator/iterator.c b/iterator/iterator.c
new file mode 100644 (file)
index 0000000..2853eb8
--- /dev/null
@@ -0,0 +1,309 @@
+/*
+ * iterator/iterator.c - iterative resolver DNS query response module
+ *
+ * Copyright (c) 2007, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ *
+ * This file contains a module that performs recusive iterative DNS query
+ * processing.
+ */
+
+#include "config.h"
+#include "iterator/iterator.h"
+#include "util/module.h"
+#include "util/netevent.h"
+#include "util/config_file.h"
+#include "util/net_help.h"
+#include "util/storage/slabhash.h"
+
+/** 
+ * Set forwarder address 
+ * @param ie: iterator global state.
+ * @param ip: the server name.
+ * @param port: port on server or NULL for default 53.
+ * @return: false on error.
+ */
+static int
+iter_set_fwd(struct iter_env* ie, const char* ip, int port)
+{
+        uint16_t p;
+        log_assert(ie && ip);
+        p = (uint16_t) port;
+        if(str_is_ip6(ip)) {
+                struct sockaddr_in6* sa =
+                        (struct sockaddr_in6*)&ie->fwd_addr;
+                ie->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in6);
+                memset(sa, 0, ie->fwd_addrlen);
+                sa->sin6_family = AF_INET6;
+                sa->sin6_port = (in_port_t)htons(p);
+                if(inet_pton((int)sa->sin6_family, ip, &sa->sin6_addr) <= 0) {
+                        log_err("Bad ip6 address %s", ip);
+                        return 0;
+                }
+        } else { /* ip4 */
+                struct sockaddr_in* sa =
+                        (struct sockaddr_in*)&ie->fwd_addr;
+                ie->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in);
+                memset(sa, 0, ie->fwd_addrlen);
+                sa->sin_family = AF_INET;
+                sa->sin_port = (in_port_t)htons(p);
+                if(inet_pton((int)sa->sin_family, ip, &sa->sin_addr) <= 0) {
+                        log_err("Bad ip4 address %s", ip);
+                        return 0;
+                }
+        }
+        verbose(VERB_ALGO, "iterator: fwd queries to: %s %d", ip, p);
+        return 1;
+}
+
+/** iterator init */
+static int 
+iter_init(struct module_env* env, int id)
+{
+       struct iter_env* iter_env = (struct iter_env*)calloc(1,
+               sizeof(struct iter_env));
+       if(!iter_env) {
+               log_err("malloc failure");
+               return 0;
+       }
+       env->modinfo[id] = (void*)iter_env;
+       /* set forwarder address */
+       if(env->cfg->fwd_address && env->cfg->fwd_address[0]) {
+               if(!iter_set_fwd(iter_env, env->cfg->fwd_address, 
+                       env->cfg->fwd_port)) {
+                       log_err("iterator: could not set forwarder address");
+                       return 0;
+               }
+       }
+       return 1;
+}
+
+/** iterator deinit */
+static void 
+iter_deinit(struct module_env* env, int id)
+{
+       struct iter_env* iter_env;
+       if(!env || !env->modinfo)
+               return;
+       iter_env = (struct iter_env*)env->modinfo[id];
+       if(iter_env)
+               free(iter_env);
+}
+
+/** see if rrset needs to be updated in the cache */
+static int
+need_to_update_rrset(struct packed_rrset_data* newd,
+        struct packed_rrset_data* cached)
+{
+        /*      o if current RRset is more trustworthy - insert it */
+        if( newd->trust > cached->trust )
+                return 1;
+        /*      o same trust, but different in data - insert it */
+        if( newd->trust == cached->trust &&
+                !rrsetdata_equal(newd, cached))
+                return 1;
+        /*      o see if TTL is better than TTL in cache. */
+        /*        if so, see if rrset+rdata is the same */
+        /*        if so, update TTL in cache, even if trust is worse. */
+        if( newd->ttl > cached->ttl &&
+                rrsetdata_equal(newd, cached))
+                return 1;
+        return 0;
+}
+
+/** store rrsets in the rrset cache. */
+static void
+worker_store_rrsets(struct module_env* env, struct reply_info* rep)
+{
+        struct lruhash_entry* e;
+        size_t i;
+        /* see if rrset already exists in cache, if not insert it. */
+        /* if it does exist: check to insert it */
+        for(i=0; i<rep->rrset_count; i++) {
+                rep->ref[i].key = rep->rrsets[i];
+                rep->ref[i].id = rep->rrsets[i]->id;
+                /* looks up item with a readlock - no editing! */
+                if((e=slabhash_lookup(env->rrset_cache,
+                        rep->rrsets[i]->entry.hash, rep->rrsets[i]->entry.key,
+                        0)) != 0) {
+                        struct packed_rrset_data* data =
+                                (struct packed_rrset_data*)e->data;
+                        struct packed_rrset_data* rd =
+                                (struct packed_rrset_data*)
+                                rep->rrsets[i]->entry.data;
+                        rep->ref[i].key = (struct ub_packed_rrset_key*)e->key;
+                        rep->ref[i].id = rep->rrsets[i]->id;
+                        /* found in cache, do checks above */
+                        if(!need_to_update_rrset(rd, data)) {
+                                lock_rw_unlock(&e->lock);
+                                ub_packed_rrset_parsedelete(rep->rrsets[i],
+                                        env->alloc);
+                                rep->rrsets[i] = rep->ref[i].key;
+                                continue; /* use cached item instead */
+                        }
+                        if(rd->trust < data->trust)
+                                rd->trust = data->trust;
+                        lock_rw_unlock(&e->lock);
+                        /* small gap here, where entry is not locked.
+                         * possibly entry is updated with something else.
+                         * this is just too bad, its cache anyway. */
+                        /* use insert to update entry to manage lruhash
+                         * cache size values nicely. */
+                }
+                slabhash_insert(env->rrset_cache,
+                        rep->rrsets[i]->entry.hash, &rep->rrsets[i]->entry,
+                        rep->rrsets[i]->entry.data, env->alloc);
+                if(e) rep->rrsets[i] = rep->ref[i].key;
+        }
+}
+
+
+/** store message in the cache */
+static void
+store_msg(struct module_qstate* qstate, struct query_info* qinfo,
+       struct reply_info* rep)
+{
+       struct msgreply_entry* e;
+       reply_info_set_ttls(rep, time(0));
+       worker_store_rrsets(qstate->env, rep);
+       if(rep->ttl == 0) {
+               log_info("TTL 0: dropped msg from cache");
+               return;
+       }
+       reply_info_sortref(rep);
+       /* store msg in the cache */
+       if(!(e = query_info_entrysetup(qinfo, rep, qstate->query_hash))) {
+               log_err("store_msg: malloc failed");
+               return;
+       }
+       slabhash_insert(qstate->env->msg_cache, qstate->query_hash,
+               &e->entry, rep, &qstate->env->alloc);
+}
+
+/** iterator operate on a query */
+static void 
+iter_operate(struct module_qstate* qstate, enum module_ev event, int id)
+{
+       struct module_env* env = qstate->env;
+       struct iter_env* ie = (struct iter_env*)env->modinfo[id];
+       verbose(VERB_ALGO, "iterator[module %d] operate: extstate:%s event:%s", 
+               id, strextstate(qstate->ext_state[id]), strmodulevent(event));
+       if(event == module_event_error) {
+               qstate->ext_state[id] = module_error;
+               return;
+       }
+       if(event == module_event_new) {
+               /* send UDP query to forwarder address */
+               (*env->send_query)(qstate->buf, &ie->fwd_addr, 
+                       ie->fwd_addrlen, UDP_QUERY_TIMEOUT, qstate, 0);
+               qstate->ext_state[id] = module_wait_reply;
+               qstate->minfo[id] = NULL;
+               return;
+       }
+       if(event == module_event_timeout) {
+               /* try TCP if UDP fails */
+               if(qstate->reply->c->type == comm_udp) {
+                       qinfo_query_encode(qstate->buf, &qstate->qinfo);
+                       (*env->send_query)(qstate->buf, &ie->fwd_addr, 
+                               ie->fwd_addrlen, TCP_QUERY_TIMEOUT, qstate, 1);
+                       return;
+               }
+               qstate->ext_state[id] = module_error;
+               return;
+       }
+       if(event == module_event_reply) {
+               uint16_t us = qstate->edns.udp_size;
+               struct query_info reply_qinfo;
+               struct reply_info* reply_msg;
+               struct edns_data reply_edns;
+               int r;
+               /* see if it is truncated */
+               if(LDNS_TC_WIRE(ldns_buffer_begin(qstate->reply->c->buffer)) 
+                       && qstate->reply->c->type == comm_udp) {
+                       log_info("TC: truncated. retry in TCP mode.");
+                       qinfo_query_encode(qstate->buf, &qstate->qinfo);
+                       (*env->send_query)(qstate->buf, &ie->fwd_addr, 
+                               ie->fwd_addrlen, TCP_QUERY_TIMEOUT, qstate, 1);
+                       /* stay in wait_reply state */
+                       return;
+               }
+               if((r=reply_info_parse(qstate->reply->c->buffer, env->alloc, 
+                       &reply_qinfo, &reply_msg, qstate->scratch, 
+                       &reply_edns))!=0) {
+                       qstate->ext_state[id] = module_error;
+                       return;
+               }
+
+               qstate->edns.edns_version = EDNS_ADVERTISED_VERSION;
+               qstate->edns.udp_size = EDNS_ADVERTISED_SIZE;
+               qstate->edns.ext_rcode = 0;
+               qstate->edns.bits &= EDNS_DO;
+               if(!reply_info_answer_encode(&reply_qinfo, reply_msg, 0, 
+                       qstate->query_flags, qstate->buf, 0, 0, 
+                       qstate->scratch, us, &qstate->edns)) {
+                       qstate->ext_state[id] = module_error;
+                       return;
+               }
+               store_msg(qstate, &reply_qinfo, reply_msg);
+               qstate->ext_state[id] = module_finished;
+               return;
+       }
+       log_err("bad event for iterator");
+       qstate->ext_state[id] = module_error;
+}
+
+/** iterator cleanup query state */
+static void 
+iter_clear(struct module_qstate* qstate, int id)
+{
+       if(!qstate)
+               return;
+       /* allocated in region, so nothing to do */
+       qstate->minfo[id] = NULL;
+}
+
+/**
+ * The iterator function block 
+ */
+static struct module_func_block iter_block = {
+       "iterator",
+       &iter_init, &iter_deinit, &iter_operate, &iter_clear
+};
+
+struct module_func_block* 
+iter_get_funcblock()
+{
+       return &iter_block;
+}
diff --git a/iterator/iterator.h b/iterator/iterator.h
new file mode 100644 (file)
index 0000000..0fc8c26
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * iterator/iterator.h - iterative resolver DNS query response module
+ *
+ * Copyright (c) 2007, NLnet Labs. All rights reserved.
+ *
+ * This software is open source.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 
+ * Neither the name of the NLNET LABS nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ *
+ * This file contains a module that performs recusive iterative DNS query
+ * processing.
+ */
+
+#ifndef ITERATOR_ITERATOR_H
+#define ITERATOR_ITERATOR_H
+struct module_func_block;
+
+/**
+ * Global state for the iterator. 
+ */
+struct iter_env {
+       /** address to forward to */
+       struct sockaddr_storage fwd_addr;
+       /** length of fwd_addr */
+       socklen_t fwd_addrlen;
+};
+
+/**
+ * Per query state for the iterator module.
+ */
+struct iter_qstate {
+};
+
+/**
+ * Get the iterator function block.
+ */
+struct module_func_block* iter_get_funcblock();
+
+#endif /* ITERATOR_ITERATOR_H */
index bd39b7daa484169a11880d5f54734e4062cb2f6d..e43d74ca0775c79e18fd7cd2b92f9ef09a279c50 100644 (file)
@@ -122,7 +122,7 @@ waiting_tcp_delete(struct waiting_tcp* w)
 }
 
 /** use next free buffer to service a tcp query */
-static void
+static int
 outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
 {
        struct pending_tcp* pend = w->outnet->tcp_free;
@@ -139,9 +139,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
        if(s == -1) {
                log_err("outgoing tcp: socket: %s", strerror(errno));
                log_addr(&w->addr, w->addrlen);
-               (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
-               waiting_tcp_delete(w);
-               return;
+               return 0;
        }
        fd_set_nonblock(s);
        if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) {
@@ -149,9 +147,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
                        log_err("outgoing tcp: connect: %s", strerror(errno));
                        log_addr(&w->addr, w->addrlen);
                        close(s);
-                       (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
-                       waiting_tcp_delete(w);
-                       return;
+                       return 0;
                }
        }
        w->pkt = NULL;
@@ -166,7 +162,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
        pend->c->tcp_is_reading = 0;
        pend->c->tcp_byte_count = 0;
        comm_point_start_listening(pend->c, s, -1);
-       return;
+       return 1;
 }
 
 /** see if buffers can be used to service TCP queries. */
@@ -179,7 +175,10 @@ use_free_buffer(struct outside_network* outnet)
                outnet->tcp_wait_first = w->next_waiting;
                if(outnet->tcp_wait_last == w)
                        outnet->tcp_wait_last = NULL;
-               outnet_tcp_take_into_use(w, w->pkt);
+               if(!outnet_tcp_take_into_use(w, w->pkt)) {
+                       (void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
+                       waiting_tcp_delete(w);
+               }
        }
 }
 
@@ -649,7 +648,7 @@ select_port(struct outside_network* outnet, struct pending* pend,
 }
 
 
-void 
+int 
 pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, 
        struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
        comm_point_callback_t* cb, void* cb_arg, struct ub_randstate* rnd)
@@ -660,19 +659,15 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
        /* create pending struct and change ID to be unique */
        if(!(pend=new_pending(outnet, packet, addr, addrlen, cb, cb_arg, 
                rnd))) {
-               /* callback user for the error */
-               (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
-               return;
+               return 0;
        }
        select_port(outnet, pend, rnd);
 
        /* send it over the commlink */
        if(!comm_point_send_udp_msg(pend->c, packet, (struct sockaddr*)addr, 
                addrlen)) {
-               /* callback user for the error */
-               (void)(*pend->cb)(pend->c, pend->cb_arg, NETEVENT_CLOSED, NULL);
                pending_delete(outnet, pend);
-               return;
+               return 0;
        }
 
        /* system calls to set timeout after sending UDP to make roundtrip
@@ -680,6 +675,7 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
        tv.tv_sec = timeout;
        tv.tv_usec = 0;
        comm_timer_set(pend->timer, &tv);
+       return 1;
 }
 
 /** callback for outgoing TCP timer event */
@@ -714,7 +710,7 @@ outnet_tcptimer(void* arg)
        use_free_buffer(outnet);
 }
 
-void 
+int 
 pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, 
        struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
        comm_point_callback_t* callback, void* callback_arg,
@@ -728,14 +724,11 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
        w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp) 
                + (pend?0:ldns_buffer_limit(packet)));
        if(!w) {
-               /* callback user for the error */
-               (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
-               return;
+               return 0;
        }
        if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) {
                free(w);
-               (void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
-               return;
+               return 0;
        }
        w->pkt = NULL;
        w->pkt_len = ldns_buffer_limit(packet);
@@ -752,7 +745,10 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
        comm_timer_set(w->timer, &tv);
        if(pend) {
                /* we have a buffer available right now */
-               outnet_tcp_take_into_use(w, ldns_buffer_begin(packet));
+               if(!outnet_tcp_take_into_use(w, ldns_buffer_begin(packet))) {
+                       waiting_tcp_delete(w);
+                       return 0;
+               }
        } else {
                /* queue up */
                w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
@@ -763,4 +759,5 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
                else    outnet->tcp_wait_first = w;
                outnet->tcp_wait_last = w;
        }
+       return 1;
 }
index 1b15cd625aab3bd9dea057269234a969553e2328..f5d64c4fc9f520bca25b055720c43b0b78d46c65 100644 (file)
@@ -204,12 +204,11 @@ void outside_network_delete(struct outside_network* outnet);
  * @param addrlen: length of addr.
  * @param timeout: in seconds from now.
  * @param callback: function to call on error, timeout or reply.
- *    The routine does not return an error, instead it calls the callback,
- *    with an error code if an error happens.
  * @param callback_arg: user argument for callback function.
  * @param rnd: random state for generating ID and port.
+ * @return: false on error for malloc or socket.
  */
-void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, 
+int pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, 
        struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
        comm_point_callback_t* callback, void* callback_arg,
        struct ub_randstate* rnd);
@@ -225,12 +224,11 @@ void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
  *    Timer starts running now. Timer may expire if all buffers are used,
  *    without any query been sent to the server yet.
  * @param callback: function to call on error, timeout or reply.
- *    The routine does not return an error, instead it calls the callback,
- *    with an error code if an error happens.
  * @param callback_arg: user argument for callback function.
  * @param rnd: random state for generating ID.
+ * @return: false on error for malloc or socket.
  */
-void pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, 
+int pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, 
        struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
        comm_point_callback_t* callback, void* callback_arg,
        struct ub_randstate* rnd);
index 97f8f00a38ae6465fdda7aec88cbc15a62dc8815..ca545078d5c73f2052ac4483a6d3b48955426644 100644 (file)
@@ -656,7 +656,7 @@ outside_network_delete(struct outside_network* outnet)
        free(outnet);
 }
 
-void 
+int 
 pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
        struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
        comm_point_callback_t* callback, void* callback_arg,
@@ -701,9 +701,10 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
        /* add to list */
        pend->next = runtime->pending_list;
        runtime->pending_list = pend;
+       return 1;
 }
 
-void 
+int 
 pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
        struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
        comm_point_callback_t* callback, void* callback_arg,
@@ -748,6 +749,7 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
        /* add to list */
        pend->next = runtime->pending_list;
        runtime->pending_list = pend;
+       return 1;
 }
 
 struct listen_port* listening_ports_open(struct config_file* ATTR_UNUSED(cfg))
index f8182beb324ddd98947f49e66154822e69691e48..52f6bdc111032cccd4f4e1bf91972ce497bb3257 100644 (file)
 
 #include "config.h"
 #include "util/module.h"
+
+const char* 
+strextstate(enum module_ext_state s)
+{
+       switch(s) {
+       case module_state_initial: return "module_state_initial";
+       case module_wait_reply: return "module_wait_reply";
+       case module_wait_module: return "module_wait_module";
+       case module_wait_subquery: return "module_wait_subquery";
+       case module_error: return "module_error";
+       case module_finished: return "module_finished";
+       }
+       return "bad_extstate_value";
+}
+
+const char* 
+strmodulevent(enum module_ev e)
+{
+       switch(e) {
+       case module_event_new: return "module_event_new";
+       case module_event_pass: return "module_event_pass";
+       case module_event_reply: return "module_event_reply";
+       case module_event_timeout: return "module_event_timeout";
+       case module_event_mod_done: return "module_event_mod_done";
+       case module_event_subq_done: return "module_event_subq_done";
+       case module_event_error: return "module_event_error";
+       }
+       return "bad_event_value";
+}
index b2cf9fa3d0ac5c215fc932b6dade1ae6cb2801af..e80557faa92a21e53c695f50a7541584d5a2c9c9 100644 (file)
 #ifndef UTIL_MODULE_H
 #define UTIL_MODULE_H
 #include "util/storage/lruhash.h"
+#include "util/data/msgreply.h"
+#include "util/data/msgparse.h"
+struct alloc_cache;
 struct config_file;
 struct slabhash;
 struct query_info;
 struct edns_data;
 struct region;
 struct worker;
+struct module_qstate;
 
 /** Maximum number of modules in operation */
 #define MAX_MODULE 2
@@ -66,11 +70,27 @@ struct module_env {
        struct slabhash* rrset_cache;
 
        /* --- services --- */
-       /** send DNS query to server. operate() should return with wait_reply */
+       /** 
+        * Send DNS query to server. operate() should return with wait_reply.
+        * Later on a callback will cause operate() to be called with event
+        * timeout or reply.
+        * @param pkt: packet to send.
+        * @param addr: where to.
+        * @param addrlen: length of addr.
+        * @param timeout: seconds to wait until timeout.
+        * @param q: wich query state to reactivate upon return.
+        * @param use_tcp: set to true to send over TCP. 0 for UDP.
+        * return: false on failure (memory or socket related). no query was
+        *      sent.
+        */
+       int (*send_query)(ldns_buffer* pkt, struct sockaddr_storage* addr,
+               socklen_t addrlen, int timeout, struct module_qstate* q,
+               int use_tcp);
+
        /** create a subquery. operate should then return with wait_subq */
 
        /** allocation service */
-       struct alloc* alloc;
+       struct alloc_cache* alloc;
        /** internal data for daemon - worker thread. */
        struct worker* worker;
        /** module specific data. indexed by module id. */
@@ -122,30 +142,34 @@ enum module_ev {
  */
 struct module_qstate {
        /** which query is being answered: name, type, class */
-       struct query_info* qinfo;
+       struct query_info qinfo;
        /** hash value of the query qinfo */
        hashvalue_t query_hash;
        /** flags uint16 from query */
        uint16_t query_flags;
        /** edns data from the query */
-       struct edns_data* edns;
+       struct edns_data edns;
 
-       /** buffer, contains server replies, store resulting reply here. 
+       /** buffer, store resulting reply here. 
         * May be cleared when module blocks. */
        ldns_buffer* buf;
-       /** parsed message from server */
-       struct msg_parse* msg_parse;
+       /** comm_reply contains server replies */
+       struct comm_reply* reply;
        /** region for temporary usage. May be cleared when module blocks. */
        struct region* scratch;
        /** region for this query. Cleared when query process finishes. */
        struct region* region;
 
+       /** which module is executing */
+       int curmod;
        /** module states */
        enum module_ext_state ext_state[MAX_MODULE];
        /** module specific data for query. indexed by module id. */
        void* minfo[MAX_MODULE];
        /** environment for this query */
-       struct module_env* module_env;
+       struct module_env* env;
+       /** worker related state for this query */
+       struct work_query* work_info;
 
        /** parent query, only nonNULL for subqueries */
        struct module_qstate* parent;
@@ -163,14 +187,15 @@ struct module_func_block {
        char* name;
 
        /** 
-        * init the module
+        * init the module. Called once for the global state.
+        * This is the place to apply settings from the config file.
         * @param env: module environment.
         * @param id: module id number.
         * return: 0 on error
         */
        int (*init)(struct module_env* env, int id);
        /**
-        * de-init, delete, the module.
+        * de-init, delete, the module. Called once for the global state.
         * @param env: module environment.
         * @param id: module id number.
         */
@@ -197,4 +222,18 @@ struct module_func_block {
        void (*clear)(struct module_qstate* qstate, int id);
 };
 
+/** 
+ * Debug utility: module external qstate to string 
+ * @param s: the state value.
+ * @return descriptive string.
+ */
+const char* strextstate(enum module_ext_state s);
+
+/** 
+ * Debug utility: module event to string 
+ * @param e: the module event value.
+ * @return descriptive string.
+ */
+const char* strmodulevent(enum module_ev e);
+
 #endif /* UTIL_MODULE_H */
index 557c43d3960c7e5c976544574e4da4028a0e9dad..e18a24afd23255843c73773c50fa0528f8646cba 100644 (file)
 /** QR flag */
 #define BIT_QR 0x8000
 
+/** timeout in seconds for UDP queries to auth servers. TODO: proper rtt */
+#define UDP_QUERY_TIMEOUT 4
+/** timeout in seconds for TCP queries to auth servers. TODO: proper rtt */
+#define TCP_QUERY_TIMEOUT 30
+/** Advertised version of EDNS capabilities */
+#define EDNS_ADVERTISED_VERSION         0
+/** Advertised size of EDNS capabilities */
+#define EDNS_ADVERTISED_SIZE    4096
+
 /**
  * See if string is ip4 or ip6.
  * @param str: IP specification.