ub_randfree(w->env->rnd);
free(w->env);
}
+ free(w->cmd_msg);
outside_network_delete(w->back);
comm_point_delete(w->cmd_com);
comm_point_delete(w->res_com);
free(buf);
}
+/** do control command coming into bg server */
+static void
+libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len)
+{
+ switch(context_serial_getcmd(msg, len)) {
+ default:
+ case UB_LIBCMD_ANSWER:
+ log_err("unknown command for bg worker %d",
+ (int)context_serial_getcmd(msg, len));
+ /* and fall through to quit */
+ case UB_LIBCMD_QUIT:
+ free(msg);
+ comm_base_exit(w->base);
+ break;
+ case UB_LIBCMD_NEWQUERY:
+ handle_newq(w, msg, len);
+ break;
+ case UB_LIBCMD_CANCEL:
+ handle_cancel(w, msg, len);
+ break;
+ }
+}
+
/** handle control command coming into server */
int
libworker_handle_control_cmd(struct comm_point* c, void* arg,
int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
struct libworker* w = (struct libworker*)arg;
- uint32_t len = 0;
- uint8_t* buf = NULL;
- int r = libworker_read_msg(c->fd, &buf, &len, 1);
+ ssize_t r;
+
+ if(w->cmd_read < sizeof(w->cmd_len)) {
+ /* complete reading the length of control msg */
+ r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read,
+ sizeof(w->cmd_len) - w->cmd_read);
+ if(r==0) {
+ /* error has happened or */
+ /* parent closed pipe, must have exited somehow */
+ /* it is of no use to go on, exit */
+ comm_base_exit(w->base);
+ return 0;
+ }
+ if(r==-1) {
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("rpipe error: %s", strerror(errno));
+ }
+ /* nothing to read now, try later */
+ return 0;
+ }
+ w->cmd_read += r;
+ if(w->cmd_read < sizeof(w->cmd_len)) {
+ /* not complete, try later */
+ return 0;
+ }
+ w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len);
+ if(!w->cmd_msg) {
+ log_err("malloc failure");
+ w->cmd_read = 0;
+ return 0;
+ }
+ }
+ /* cmd_len has been read, read remainder */
+ r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len),
+ w->cmd_len - (w->cmd_read - sizeof(w->cmd_len)));
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
comm_base_exit(w->base);
return 0;
}
- if(r==-1) /* nothing to read now, try later */
+ if(r==-1) {
+ /* nothing to read now, try later */
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("rpipe error: %s", strerror(errno));
+ }
return 0;
-
- switch(context_serial_getcmd(buf, len)) {
- default:
- case UB_LIBCMD_ANSWER:
- log_err("unknown command for bg worker %d",
- (int)context_serial_getcmd(buf, len));
- /* and fall through to quit */
- case UB_LIBCMD_QUIT:
- free(buf);
- comm_base_exit(w->base);
- break;
- case UB_LIBCMD_NEWQUERY:
- handle_newq(w, buf, len);
- break;
- case UB_LIBCMD_CANCEL:
- handle_cancel(w, buf, len);
- break;
}
+ w->cmd_read += r;
+ if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) {
+ /* not complete, try later */
+ return 0;
+ }
+ w->cmd_read = 0;
+ libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */
+ w->cmd_msg = NULL;
return 0;
}
{
struct libworker* w = (struct libworker*)arg;
struct libworker_res_list* item = w->res_list;
- int r;
+ ssize_t r;
if(!item) {
comm_point_stop_listening(c);
return 0;
}
- r = libworker_write_msg(c->fd, item->buf, item->len, 1);
- if(r == -1)
+ if(w->res_write < sizeof(item->len)) {
+ r = write(c->fd, ((uint8_t*)&item->len) + w->res_write,
+ sizeof(item->len) - w->res_write);
+ if(r == -1) {
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("wpipe error: %s", strerror(errno));
+ }
+ return 0; /* try again later */
+ }
+ if(r == 0) {
+ /* error on pipe, must have exited somehow */
+ /* it is of no use to go on, exit */
+ comm_base_exit(w->base);
+ return 0;
+ }
+ w->res_write += r;
+ if(w->res_write < sizeof(item->len))
+ return 0;
+ }
+ r = write(c->fd, item->buf + w->res_write - sizeof(item->len),
+ item->len - (w->res_write - sizeof(item->len)));
+ if(r == -1) {
+ if(errno != EAGAIN && errno != EINTR) {
+ log_err("wpipe error: %s", strerror(errno));
+ }
return 0; /* try again later */
+ }
if(r == 0) {
/* error on pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
+ w->res_write += r;
+ if(w->res_write < sizeof(item->len) + item->len)
+ return 0;
/* done this result, remove it */
free(item->buf);
item->buf = NULL;
w->res_last = NULL;
comm_point_stop_listening(c);
}
+ w->res_write = 0;
return 0;
}
item->buf = msg;
item->len = len;
item->next = NULL;
- /* add at back of list */
+ /* add at back of list, since the first one may be partially written */
if(w->res_last)
w->res_last->next = item;
else w->res_list = item;