From: Willy Tarreau Date: Fri, 13 Mar 2015 13:00:47 +0000 (+0100) Subject: MEDIUM: channel: don't always set CF_WAKE_WRITE on bi_put* X-Git-Tag: v1.6-dev2~332 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bc18da17aaba8db4dbc08471d4cc333cea08b41a;p=thirdparty%2Fhaproxy.git MEDIUM: channel: don't always set CF_WAKE_WRITE on bi_put* It was inappropriate to put this flag on every failed write into an input buffer because it depends where it happens. When it's in the context of an analyser (eg: hlua) it makes sense. When it's in the context of an applet (eg: dumpstats), it does not make sense, and it only happens to work because currently applets are scheduled by the sessions. The proper solution for applets would be to add the flag SI_FL_WAIT_ROOM on the stream interface. Thus, we now don't set any flag anymore in bi_put* and it's up to the caller to either set CF_WAKE_WRITE on the channel or SI_FL_WAIT_ROOM on the stream interface. Changes were applied to hlua, peers and dumpstats. --- diff --git a/src/channel.c b/src/channel.c index 231d99f352..755d2d9cb0 100644 --- a/src/channel.c +++ b/src/channel.c @@ -103,18 +103,14 @@ int bo_inject(struct channel *chn, const char *msg, int len) * input is closed, -2 is returned. If there is not enough room left in the * buffer, -1 is returned. Otherwise the number of bytes copied is returned * (1). Channel flag READ_PARTIAL is updated if some data can be transferred. - * Channel flag CF_WAKE_WRITE is set if the write fails because the buffer is - * full. */ int bi_putchr(struct channel *chn, char c) { if (unlikely(channel_input_closed(chn))) return -2; - if (!channel_may_recv(chn)) { - chn->flags |= CF_WAKE_WRITE; + if (!channel_may_recv(chn)) return -1; - } *bi_end(chn->buf) = c; @@ -137,8 +133,7 @@ int bi_putchr(struct channel *chn, char c) * -3 is returned. If there is not enough room left in the buffer, -1 is * returned. Otherwise the number of bytes copied is returned (0 being a valid * number). Channel flag READ_PARTIAL is updated if some data can be - * transferred. Channel flag CF_WAKE_WRITE is set if the write fails because - * the buffer is full. + * transferred. */ int bi_putblk(struct channel *chn, const char *blk, int len) { @@ -156,7 +151,6 @@ int bi_putblk(struct channel *chn, const char *blk, int len) if (len > max) return -3; - chn->flags |= CF_WAKE_WRITE; return -1; } @@ -203,10 +197,8 @@ struct buffer *bi_swpbuf(struct channel *chn, struct buffer *buf) if (unlikely(channel_input_closed(chn))) return NULL; - if (!chn->buf->size || !buffer_empty(chn->buf)) { - chn->flags |= CF_WAKE_WRITE; + if (!chn->buf->size || !buffer_empty(chn->buf)) return buf; - } old = chn->buf; chn->buf = buf; diff --git a/src/dumpstats.c b/src/dumpstats.c index 0e87f09e7f..8dd9459ba0 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -579,8 +579,10 @@ static int stats_dump_table_head_to_buffer(struct chunk *msg, struct stream_inte if (target && s->listener->bind_conf->level < ACCESS_LVL_OPER) chunk_appendf(msg, "# contents not dumped due to insufficient privileges\n"); - if (bi_putchk(si_ic(si), msg) == -1) + if (bi_putchk(si_ic(si), msg) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } return 1; } @@ -650,8 +652,10 @@ static int stats_dump_table_entry_to_buffer(struct chunk *msg, struct stream_int } chunk_appendf(msg, "\n"); - if (bi_putchk(si_ic(si), msg) == -1) + if (bi_putchk(si_ic(si), msg) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } return 1; } @@ -1329,7 +1333,9 @@ static int stats_sock_parse_request(struct stream_interface *si, char *line) /* return server's effective weight at the moment */ snprintf(trash.str, trash.size, "%d (initial %d)\n", sv->uweight, sv->iweight); - bi_putstr(si_ic(si), trash.str); + if (bi_putstr(si_ic(si), trash.str) == -1) + si->flags |= SI_FL_WAIT_ROOM; + return 1; } else if (strcmp(args[1], "map") == 0 || strcmp(args[1], "acl") == 0) { @@ -2264,7 +2270,7 @@ static void cli_io_handler(struct stream_interface *si) * would want to return some info right after parsing. */ if (buffer_almost_full(si_ib(si))) { - si_ic(si)->flags |= CF_WAKE_WRITE; + si->flags |= SI_FL_WAIT_ROOM; break; } @@ -2342,12 +2348,16 @@ static void cli_io_handler(struct stream_interface *si) case STAT_CLI_PRINT: if (bi_putstr(si_ic(si), appctx->ctx.cli.msg) != -1) appctx->st0 = STAT_CLI_PROMPT; + else + si->flags |= SI_FL_WAIT_ROOM; break; case STAT_CLI_PRINT_FREE: if (bi_putstr(si_ic(si), appctx->ctx.cli.err) != -1) { free(appctx->ctx.cli.err); appctx->st0 = STAT_CLI_PROMPT; } + else + si->flags |= SI_FL_WAIT_ROOM; break; case STAT_CLI_O_INFO: if (stats_dump_info_to_buffer(si)) @@ -2396,6 +2406,8 @@ static void cli_io_handler(struct stream_interface *si) if (appctx->st0 == STAT_CLI_PROMPT) { if (bi_putstr(si_ic(si), appctx->st1 ? "\n> " : "\n") != -1) appctx->st0 = STAT_CLI_GETREQ; + else + si->flags |= SI_FL_WAIT_ROOM; } /* If the output functions are still there, it means they require more room. */ @@ -2563,8 +2575,10 @@ static int stats_dump_info_to_buffer(struct stream_interface *si) global.node, global.desc ? global.desc : "" ); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } return 1; } @@ -2576,8 +2590,10 @@ static int stats_dump_info_to_buffer(struct stream_interface *si) static int stats_dump_pools_to_buffer(struct stream_interface *si) { dump_pools_to_trash(); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } return 1; } @@ -3788,8 +3804,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy case STAT_PX_ST_TH: if (appctx->ctx.stats.flags & STAT_FMT_HTML) { stats_dump_html_px_hdr(si, px, uri); - if (bi_putchk(rep, &trash) == -1) + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } } appctx->ctx.stats.px_st = STAT_PX_ST_FE; @@ -3797,9 +3815,12 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy case STAT_PX_ST_FE: /* print the frontend */ - if (stats_dump_fe_stats(si, px)) - if (bi_putchk(rep, &trash) == -1) + if (stats_dump_fe_stats(si, px)) { + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } + } appctx->ctx.stats.l = px->conf.listeners.n; appctx->ctx.stats.px_st = STAT_PX_ST_LI; @@ -3809,7 +3830,7 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy /* stats.l has been initialized above */ for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) { if (buffer_almost_full(rep->buf)) { - rep->flags |= CF_WAKE_WRITE; + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -3826,9 +3847,12 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy } /* print the frontend */ - if (stats_dump_li_stats(si, px, l, uri ? uri->flags : 0)) - if (bi_putchk(rep, &trash) == -1) + if (stats_dump_li_stats(si, px, l, uri ? uri->flags : 0)) { + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } + } } appctx->ctx.stats.sv = px->srv; /* may be NULL */ @@ -3841,7 +3865,7 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy int sv_state; if (buffer_almost_full(rep->buf)) { - rep->flags |= CF_WAKE_WRITE; + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -3904,9 +3928,12 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy continue; } - if (stats_dump_sv_stats(si, px, uri ? uri->flags : 0, sv, sv_state)) - if (bi_putchk(rep, &trash) == -1) + if (stats_dump_sv_stats(si, px, uri ? uri->flags : 0, sv, sv_state)) { + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } + } } /* for sv */ appctx->ctx.stats.px_st = STAT_PX_ST_BE; @@ -3914,9 +3941,12 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy case STAT_PX_ST_BE: /* print the backend */ - if (stats_dump_be_stats(si, px, uri ? uri->flags : 0)) - if (bi_putchk(rep, &trash) == -1) + if (stats_dump_be_stats(si, px, uri ? uri->flags : 0)) { + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } + } appctx->ctx.stats.px_st = STAT_PX_ST_END; /* fall through */ @@ -3924,8 +3954,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy case STAT_PX_ST_END: if (appctx->ctx.stats.flags & STAT_FMT_HTML) { stats_dump_html_px_end(si, px); - if (bi_putchk(rep, &trash) == -1) + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } } appctx->ctx.stats.px_st = STAT_PX_ST_FIN; @@ -4321,8 +4353,10 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut else stats_dump_csv_header(); - if (bi_putchk(rep, &trash) == -1) + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } appctx->st2 = STAT_ST_INFO; /* fall through */ @@ -4330,8 +4364,10 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut case STAT_ST_INFO: if (appctx->ctx.stats.flags & STAT_FMT_HTML) { stats_dump_html_info(si, uri); - if (bi_putchk(rep, &trash) == -1) + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } } appctx->ctx.stats.px = proxy; @@ -4343,7 +4379,7 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut /* dump proxies */ while (appctx->ctx.stats.px) { if (buffer_almost_full(rep->buf)) { - rep->flags |= CF_WAKE_WRITE; + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -4364,8 +4400,10 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut case STAT_ST_END: if (appctx->ctx.stats.flags & STAT_FMT_HTML) { stats_dump_html_end(); - if (bi_putchk(rep, &trash) == -1) + if (bi_putchk(rep, &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } } appctx->st2 = STAT_ST_FIN; @@ -4732,8 +4770,10 @@ static int stats_send_http_headers(struct stream_interface *si) s->txn.status = 200; s->logs.tv_request = now; - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } return 1; } @@ -4777,8 +4817,10 @@ static int stats_send_http_redirect(struct stream_interface *si) s->txn.status = 303; s->logs.tv_request = now; - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } return 1; } @@ -4828,6 +4870,7 @@ static void http_stats_io_handler(struct stream_interface *si) si_ic(si)->to_forward = 0; chunk_printf(&trash, "\r\n000000\r\n"); if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; si_ic(si)->to_forward = last_fwd; goto fail; } @@ -4852,7 +4895,8 @@ static void http_stats_io_handler(struct stream_interface *si) if (last_len != data_len) { chunk_printf(&trash, "\r\n%06x\r\n", (last_len - data_len)); - bi_putchk(si_ic(si), &trash); + if (bi_putchk(si_ic(si), &trash) == -1) + si->flags |= SI_FL_WAIT_ROOM; si_ic(si)->total += (last_len - data_len); si_ib(si)->i += (last_len - data_len); @@ -4877,8 +4921,10 @@ static void http_stats_io_handler(struct stream_interface *si) if (appctx->st0 == STAT_HTTP_DONE) { if (appctx->ctx.stats.flags & STAT_CHUNKED) { chunk_printf(&trash, "\r\n0\r\n\r\n"); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; goto fail; + } } /* eat the whole request */ bo_skip(si_oc(si), si_ob(si)->o); @@ -4976,8 +5022,10 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se if (appctx->ctx.sess.section > 0 && appctx->ctx.sess.uid != sess->uniq_id) { /* session changed, no need to go any further */ chunk_appendf(&trash, " *** session terminated while we were watching it ***\n"); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } appctx->ctx.sess.uid = 0; appctx->ctx.sess.section = 0; return 1; @@ -5256,8 +5304,10 @@ static int stats_dump_full_sess_to_buffer(struct stream_interface *si, struct se sess->txn.rsp.next, sess->res.buf->i, sess->res.buf->size); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } /* use other states to dump the contents */ } @@ -5279,8 +5329,10 @@ static int stats_pats_list(struct stream_interface *si) */ chunk_reset(&trash); chunk_appendf(&trash, "# id (file) description\n"); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } /* Now, we start the browsing of the references lists. * Note that the following call to LIST_ELEM return bad pointer. The only @@ -5308,6 +5360,7 @@ static int stats_pats_list(struct stream_interface *si) /* let's try again later from this session. We add ourselves into * this session's users so that it can remove us upon termination. */ + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -5426,6 +5479,7 @@ static int stats_map_lookup(struct stream_interface *si) /* let's try again later from this session. We add ourselves into * this session's users so that it can remove us upon termination. */ + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -5476,6 +5530,7 @@ static int stats_pat_list(struct stream_interface *si) /* let's try again later from this session. We add ourselves into * this session's users so that it can remove us upon termination. */ + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -5680,6 +5735,7 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si) /* let's try again later from this session. We add ourselves into * this session's users so that it can remove us upon termination. */ + si->flags |= SI_FL_WAIT_ROOM; LIST_ADDQ(&curr_sess->back_refs, &appctx->ctx.sess.bref.users); return 0; } @@ -5695,8 +5751,10 @@ static int stats_dump_sess_to_buffer(struct stream_interface *si) else chunk_appendf(&trash, "Session not found.\n"); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } appctx->ctx.sess.target = NULL; appctx->ctx.sess.uid = 0; @@ -5977,6 +6035,7 @@ static int stats_dump_errors_to_buffer(struct stream_interface *si) if (bi_putchk(si_ic(si), &trash) == -1) { /* Socket buffer full. Let's try again later from the same point */ + si->flags |= SI_FL_WAIT_ROOM; return 0; } @@ -6061,6 +6120,7 @@ static int stats_dump_errors_to_buffer(struct stream_interface *si) if (bi_putchk(si_ic(si), &trash) == -1) { /* Socket buffer full. Let's try again later from the same point */ + si->flags |= SI_FL_WAIT_ROOM; return 0; } appctx->ctx.errors.ptr = 0; @@ -6071,8 +6131,10 @@ static int stats_dump_errors_to_buffer(struct stream_interface *si) /* the snapshot changed while we were dumping it */ chunk_appendf(&trash, " WARNING! update detected on this snapshot, dump interrupted. Please re-check!\n"); - if (bi_putchk(si_ic(si), &trash) == -1) + if (bi_putchk(si_ic(si), &trash) == -1) { + si->flags |= SI_FL_WAIT_ROOM; return 0; + } goto next; } @@ -6088,6 +6150,7 @@ static int stats_dump_errors_to_buffer(struct stream_interface *si) if (bi_putchk(si_ic(si), &trash) == -1) { /* Socket buffer full. Let's try again later from the same point */ + si->flags |= SI_FL_WAIT_ROOM; return 0; } appctx->ctx.errors.ptr = newptr; diff --git a/src/hlua.c b/src/hlua.c index 86fe28283a..b23eb02eea 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1462,6 +1462,9 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext * Other unknown error are also not expected. */ if (len <= 0) { + if (len == -1) + si_ic(&socket->s->si[0])->flags |= CF_WAKE_WRITE; + MAY_LJMP(hlua_socket_close(L)); lua_pop(L, 1); lua_pushinteger(L, -1); @@ -2237,8 +2240,10 @@ __LJMP static int hlua_channel_append_yield(lua_State *L, int status, lua_KConte lua_pushinteger(L, -1); return 1; } - if (ret == -1) + if (ret == -1) { + chn->flags |= CF_WAKE_WRITE; WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_append_yield, TICK_ETERNITY, 0)); + } l += ret; lua_pop(L, 1); lua_pushinteger(L, l); diff --git a/src/peers.c b/src/peers.c index 53e5035873..792260f553 100644 --- a/src/peers.c +++ b/src/peers.c @@ -450,7 +450,7 @@ switchstate: repl = bi_putblk(si_ic(si), trash.str, repl); if (repl <= 0) { if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -514,7 +514,7 @@ switchstate: repl = bi_putblk(si_ic(si), trash.str, repl); if (repl <= 0) { if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -848,7 +848,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -865,7 +865,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -884,7 +884,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -920,7 +920,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -954,7 +954,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -970,7 +970,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -1012,7 +1012,7 @@ incomplete: if (repl <= 0) { /* no more write possible */ if (repl == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; goto switchstate; } @@ -1028,7 +1028,7 @@ incomplete: repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1); if (bi_putblk(si_ic(si), trash.str, repl) == -1) - goto out; + goto full; appctx->st0 = PEER_SESS_ST_END; /* fall through */ case PEER_SESS_ST_END: { @@ -1047,6 +1047,9 @@ out: si_oc(si)->wex = TICK_ETERNITY; quit: return; +full: + si->flags |= SI_FL_WAIT_ROOM; + goto out; } static struct si_applet peer_applet = {