#include "util/config_file.h"
#include "util/alloc.h"
#include "util/module.h"
+#include "util/regional.h"
#include "util/log.h"
#include "services/modstack.h"
#include "services/localzone.h"
return ctx;
}
-/** delete q */
+/** delete context query */
static void
-delq(rbnode_t* n, void* ATTR_UNUSED(arg))
+context_query_delete(struct ctx_query* q)
{
- struct ctx_query* q = (struct ctx_query*)n;
if(!q) return;
ub_val_result_free(q->res);
+ free(q->msg);
free(q);
}
+/** delete q */
+static void
+delq(rbnode_t* n, void* ATTR_UNUSED(arg))
+{
+ struct ctx_query* q = (struct ctx_query*)n;
+ context_query_delete(q);
+}
+
void
ub_val_ctx_delete(struct ub_val_ctx* ctx)
{
int
ub_val_ctx_wait(struct ub_val_ctx* ctx)
{
+ int r;
lock_basic_lock(&ctx->cfglock);
while(ctx->num_async > 0) {
- lock_basic_lock(&ctx->rrpipe_lock);
lock_basic_unlock(&ctx->cfglock);
- (void)pollit(ctx, NULL);
+ lock_basic_lock(&ctx->rrpipe_lock);
+ r = pollit(ctx, NULL);
lock_basic_unlock(&ctx->rrpipe_lock);
- ub_val_ctx_process(ctx);
+ if(r)
+ ub_val_ctx_process(ctx);
lock_basic_lock(&ctx->cfglock);
}
lock_basic_unlock(&ctx->cfglock);
int
ub_val_ctx_fd(struct ub_val_ctx* ctx)
{
- return ctx->rrpipe[0];
+ int fd;
+ lock_basic_lock(&ctx->rrpipe_lock);
+ fd = ctx->rrpipe[0];
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ return fd;
+}
+
+/** process answer from bg worker */
+static int
+process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
+{
+ int err;
+ struct ctx_query* q;
+ ub_val_callback_t cb;
+ void* cbarg;
+ struct ub_val_result* res;
+ if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) {
+ log_err("error: bad data from bg worker %d",
+ (int)context_serial_getcmd(msg, len));
+ return 0;
+ }
+
+ lock_basic_lock(&ctx->cfglock);
+ q = context_deserialize_answer(ctx, msg, len, &err);
+ if(!q) {
+ lock_basic_unlock(&ctx->cfglock);
+ return 0;
+ }
+ log_assert(q->async);
+
+ /* grab cb while locked */
+ cb = q->cb;
+ cbarg = q->cb_arg;
+ if(err) {
+ res = NULL;
+ ub_val_result_free(q->res);
+ } else {
+ /* parse the message, extract rcode, fill result */
+ ldns_buffer* buf = ldns_buffer_new(q->msg_len);
+ struct regional* region = regional_create();
+ res = q->res;
+ res->rcode = LDNS_RCODE_SERVFAIL;
+ if(region && buf) {
+ ldns_buffer_clear(buf);
+ ldns_buffer_write(buf, q->msg, q->msg_len);
+ ldns_buffer_flip(buf);
+ libworker_enter_result(res, buf, region,
+ q->msg_security);
+ }
+ ldns_buffer_free(buf);
+ regional_destroy(region);
+ }
+ q->res = NULL;
+ /* delete the q from list */
+ (void)rbtree_delete(&ctx->queries, q->node.key);
+ ctx->num_async--;
+ context_query_delete(q);
+ lock_basic_unlock(&ctx->cfglock);
+
+ /* no locks held while calling callback, so that library is
+ * re-entrant. */
+ (*cb)(cbarg, err, res);
+
+ return 1;
}
int
ub_val_ctx_process(struct ub_val_ctx* ctx)
{
- /* TODO */
- /* ctx->num_asynx-- when handled; */
- return UB_NOMEM;
+ int r;
+ uint8_t* msg = NULL;
+ uint32_t len = 0;
+ while(1) {
+ lock_basic_lock(&ctx->rrpipe_lock);
+ r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
+ lock_basic_unlock(&ctx->rrpipe_lock);
+ if(r == 0)
+ return UB_PIPE;
+ else if(r == -1)
+ return UB_NOERROR;
+ if(!process_answer(ctx, msg, len)) {
+ free(msg);
+ return UB_PIPE;
+ }
+ free(msg);
+ }
+ return UB_NOERROR;
}
int
r = libworker_fg(ctx, q);
if(r) {
- ub_val_result_free(q->res);
- free(q);
+ context_query_delete(q);
return r;
}
*result = q->res;
+ q->res = NULL;
(void)rbtree_delete(&ctx->queries, q->node.key);
- free(q->msg);
- free(q);
+ context_query_delete(q);
return UB_NOERROR;
}
int rrclass, void* mydata, ub_val_callback_t callback, int* async_id)
{
struct ctx_query* q;
+ uint8_t* msg = NULL;
+ uint32_t len = 0;
+ *async_id = 0;
lock_basic_lock(&ctx->cfglock);
if(!ctx->finalized) {
int r = context_finalize(ctx);
q = context_new(ctx, name, rrtype, rrclass, callback, mydata);
if(!q)
return UB_NOMEM;
- /* TODO write over pipe to background worker */
+
+ /* write over pipe to background worker */
+ lock_basic_lock(&ctx->cfglock);
+ msg = context_serialize_new_query(q, &len);
+ if(!msg) {
+ (void)rbtree_delete(&ctx->queries, q->node.key);
+ ctx->num_async--;
+ context_query_delete(q);
+ lock_basic_unlock(&ctx->cfglock);
+ return UB_NOMEM;
+ }
*async_id = q->querynum;
- return UB_NOMEM;
+ lock_basic_unlock(&ctx->cfglock);
+
+ lock_basic_lock(&ctx->qqpipe_lock);
+ libworker_write_msg(ctx->qqpipe[1], msg, len, 0);
+ lock_basic_unlock(&ctx->qqpipe_lock);
+ free(msg);
+ return UB_NOERROR;
}
int
ub_val_cancel(struct ub_val_ctx* ctx, int async_id)
{
struct ctx_query* q;
+ uint8_t* msg = NULL;
+ uint32_t len = 0;
lock_basic_lock(&ctx->cfglock);
q = (struct ctx_query*)rbtree_search(&ctx->queries, &async_id);
- lock_basic_unlock(&ctx->cfglock);
- if(!q || !q->async) /* it is not there, so nothing to do */
+ if(!q || !q->async) {
+ /* it is not there, so nothing to do */
+ lock_basic_unlock(&ctx->cfglock);
return UB_NOERROR;
- /* TODO ; send cancel to background worker */
-
- lock_basic_lock(&ctx->cfglock);
- (void)rbtree_delete(&ctx->queries, &async_id);
+ }
+ log_assert(q->async);
+ msg = context_serialize_cancel(q, &len);
+ if(!msg) {
+ lock_basic_unlock(&ctx->cfglock);
+ return UB_NOMEM;
+ }
+
+ /* delete it */
+ (void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
+ context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
- return UB_NOMEM;
+
+ /* send cancel to background worker */
+ lock_basic_lock(&ctx->qqpipe_lock);
+ libworker_write_msg(ctx->qqpipe[1], msg, len, 0);
+ lock_basic_unlock(&ctx->qqpipe_lock);
+ free(msg);
+ return UB_NOERROR;
}
void
case UB_FORKFAIL: return "could not fork";
case UB_INITFAIL: return "initialization failure";
case UB_AFTERFINAL: return "setting change after finalize";
+ case UB_PIPE: return "error in pipe communication with async";
default: return "unknown error";
}
}
/**
* Callback for results of async queries.
* The readable function definition looks like:
- * void my_callback(void* my_arg, int err, int secure, int havedata,
- * struct ub_val_result* result);
+ * void my_callback(void* my_arg, int err, struct ub_val_result* result);
* It is called with
* void* my_arg: your pointer to a (struct of) data of your choice,
* or NULL.
/**
* Perform resolution and validation of the target name.
* Asynchronous, after a while, the callback will be called with your
- * data and the result + secure status.
+ * data and the result.
* @param ctx: context.
* If no thread or process has been created yet to perform the
* work in the background, it is created now.
* and is passed on to the callback function.
* @param callback: this is called on completion of the resolution.
* It is called as:
- * void callback(void* mydata, int err, int secure, int havedata,
- * struct ub_val_result* result)
+ * void callback(void* mydata, int err, struct ub_val_result* result)
* with mydata: the same as passed here, you may pass NULL,
* with err: is 0 when a result has been found.
* with result: a newly allocated result structure.
return 1;
}
+/** fill result from parsed message, on error fills servfail */
+void
+libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
+ struct regional* temp, enum sec_status msg_security)
+{
+ struct query_info rq;
+ struct reply_info* rep;
+ res->rcode = LDNS_RCODE_SERVFAIL;
+ rep = parse_reply(buf, temp, &rq);
+ if(!rep) {
+ return; /* error parsing buf, or out of memory */
+ }
+ if(!fill_res(res, reply_find_answer_rrset(&rq, rep),
+ reply_find_final_cname_target(&rq, rep), &rq))
+ return; /* out of memory */
+ /* rcode, havedata, nxdomain, secure, bogus */
+ res->rcode = (int)FLAGS_GET_RCODE(rep->flags);
+ if(res->data && res->data[0])
+ res->havedata = 1;
+ if(res->rcode == LDNS_RCODE_NXDOMAIN)
+ res->nxdomain = 1;
+ if(msg_security == sec_status_secure)
+ res->secure = 1;
+ if(msg_security == sec_status_bogus)
+ res->bogus = 1;
+}
+
/** callback with fg results */
static void
libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
{
- struct query_info rq; /* replied query */
- struct reply_info* rep;
struct libworker_fg_data* d = (struct libworker_fg_data*)arg;
/* fg query is done; exit comm base */
comm_base_exit(d->w->base);
d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf));
d->q->msg_len = ldns_buffer_limit(buf);
if(!d->q->msg) {
- return; /* error in rcode */
+ return; /* the error is in the rcode */
}
/* canonname and results */
- rep = parse_reply(buf, d->w->env->scratch, &rq);
- if(!rep) {
- return; /* error parsing buf, or out of memory */
- }
- if(!fill_res(d->q->res, reply_find_answer_rrset(&rq, rep),
- reply_find_final_cname_target(&rq, rep), &rq))
- return; /* out of memory */
- /* rcode, havedata, nxdomain, secure, bogus */
- d->q->res->rcode = (int)LDNS_RCODE_WIRE(d->q->msg);
- if(d->q->res->data && d->q->res->data[0])
- d->q->res->havedata = 1;
- if(d->q->res->rcode == LDNS_RCODE_NXDOMAIN)
- d->q->res->nxdomain = 1;
- if(s == sec_status_secure)
- d->q->res->secure = 1;
- if(s == sec_status_bogus)
- d->q->res->bogus = 1;
-
d->q->msg_security = s;
+
+ libworker_enter_result(d->q->res, buf, d->w->env->scratch, s);
}
/** setup qinfo and edns */