return 1;
}
+/** remove a reply without accounting, rollback the add reply. */
+static void
+mesh_remove_reply_without_accounting(struct mesh_state* s,
+ struct mesh_reply* todel)
+{
+ struct mesh_reply* r, *prev = NULL;
+ for(r = s->reply_list; r; r = r->next) {
+ if(r == todel) {
+ if(prev)
+ prev->next = r->next;
+ else s->reply_list = r->next;
+ r->next = NULL;
+ /* todel is allocated in region */
+ return;
+ }
+ prev = r;
+ }
+}
+
+/** remove a callback without accounting, rollback the add reply. */
+static void
+mesh_remove_callback_without_accounting(struct mesh_state* s,
+ struct mesh_cb* todel)
+{
+ struct mesh_cb* r, *prev = NULL;
+ for(r = s->cb_list; r; r = r->next) {
+ if(r == todel) {
+ if(prev)
+ prev->next = r->next;
+ else s->cb_list = r->next;
+ r->next = NULL;
+ /* todel is allocated in region */
+ return;
+ }
+ prev = r;
+ }
+}
+
void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
struct respip_client_info* cinfo, uint16_t qflags,
struct edns_data* edns, struct comm_reply* rep, uint16_t qid,
int unique = unique_mesh_state(edns->opt_list_in, mesh->env);
int was_detached = 0;
int was_noreply = 0;
- int added = 0;
+ int added = 0, added_reply_without_accounting = 0, added_tcp = 0;
+ struct mesh_reply* repadded = NULL;
int timeout = mesh->env->cfg->serve_expired?
mesh->env->cfg->serve_expired_client_timeout:0;
struct sldns_buffer* r_buffer = rep->c->buffer;
}
}
/* add reply to s */
- if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo)) {
+ if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo, &repadded)) {
log_err("mesh_new_client: out of memory; SERVFAIL");
goto servfail_mem;
}
+ added_reply_without_accounting = 1;
if(rep->c->tcp_req_info) {
if(!tcp_req_info_add_meshstate(rep->c->tcp_req_info, mesh, s)) {
log_err("mesh_new_client: out of memory add tcpreqinfo");
goto servfail_mem;
}
}
+ added_tcp = 1;
if(rep->c->use_h2) {
http2_stream_add_meshstate(rep->c->h2_stream, mesh, s);
}
}
}
#endif
+ added_reply_without_accounting = 0;
infra_wait_limit_inc(mesh->env->infra_cache, rep, *mesh->env->now,
mesh->env->cfg);
/* update statistics */
if(rep->c->use_h2)
http2_stream_remove_mesh_state(rep->c->h2_stream);
comm_point_send_reply(rep);
+ if(added_reply_without_accounting) {
+ mesh_remove_reply_without_accounting(s, repadded);
+ if(added_tcp && rep->c->tcp_req_info)
+ tcp_req_info_remove_mesh_state(rep->c->tcp_req_info, s);
+ }
if(added)
mesh_state_delete(&s->s);
return;
int was_detached = 0;
int was_noreply = 0;
int added = 0;
+ struct mesh_cb* add_cb = NULL;
uint16_t mesh_flags = qflags&(BIT_RD|BIT_CD);
if(!unique)
s = mesh_area_find(mesh, NULL, qinfo, mesh_flags, 0, 0);
}
}
/* add reply to s */
- if(!mesh_state_add_cb(s, edns, buf, cb, cb_arg, qid, qflags)) {
+ if(!mesh_state_add_cb(s, edns, buf, cb, cb_arg, qid, qflags, &add_cb)) {
if(added)
mesh_state_delete(&s->s);
return 0;
}
/* add serve expired timer if not already there */
if(timeout && !mesh_serve_expired_init(s, timeout)) {
+ mesh_remove_callback_without_accounting(s, add_cb);
if(added)
mesh_state_delete(&s->s);
return 0;
(mesh->env->cachedb_enabled &&
mesh->env->cfg->cachedb_check_when_serve_expired)) {
if(!mesh_serve_expired_init(s, -1)) {
+ mesh_remove_callback_without_accounting(s, add_cb);
if(added)
mesh_state_delete(&s->s);
return 0;
int mesh_state_add_cb(struct mesh_state* s, struct edns_data* edns,
sldns_buffer* buf, mesh_cb_func_type cb, void* cb_arg,
- uint16_t qid, uint16_t qflags)
+ uint16_t qid, uint16_t qflags, struct mesh_cb** result)
{
struct mesh_cb* r = regional_alloc(s->s.region,
sizeof(struct mesh_cb));
r->qflags = qflags;
r->next = s->cb_list;
s->cb_list = r;
+ *result = r;
return 1;
}
int mesh_state_add_reply(struct mesh_state* s, struct edns_data* edns,
struct comm_reply* rep, uint16_t qid, uint16_t qflags,
- const struct query_info* qinfo)
+ const struct query_info* qinfo, struct mesh_reply** result)
{
struct mesh_reply* r = regional_alloc(s->s.region,
sizeof(struct mesh_reply));
r->local_alias = NULL;
s->reply_list = r;
+ *result = r;
return 1;
}
* @param qid: ID of reply.
* @param qflags: original query flags.
* @param qinfo: original query info.
+ * @param result: the allocated reply structure, for rollback.
* @return: 0 on alloc error.
*/
int mesh_state_add_reply(struct mesh_state* s, struct edns_data* edns,
struct comm_reply* rep, uint16_t qid, uint16_t qflags,
- const struct query_info* qinfo);
+ const struct query_info* qinfo, struct mesh_reply** result);
/**
* Create new callback structure and attach it to a mesh state.
* @param cb_arg: callback user arg.
* @param qid: ID of reply.
* @param qflags: original query flags.
+ * @param result: the allocated callback structure, for rollback.
* @return: 0 on alloc error.
*/
int mesh_state_add_cb(struct mesh_state* s, struct edns_data* edns,
struct sldns_buffer* buf, mesh_cb_func_type cb, void* cb_arg,
- uint16_t qid, uint16_t qflags);
+ uint16_t qid, uint16_t qflags, struct mesh_cb** result);
/**
* Run the mesh. Run all runnable mesh states. Which can create new