}
}
-/* recompute the mux polling flags after updating the current conn_stream and
- * propagate the result down the transport layer.
- */
-static inline void cs_update_mux_polling(struct conn_stream *cs)
-{
- struct connection *conn = cs->conn;
-
- if (conn->mux && conn->mux->update_poll)
- conn->mux->update_poll(cs);
-}
-
/***** Event manipulation primitives for use by DATA I/O callbacks *****/
/* The __conn_* versions do not propagate to lower layers and are only meant
* to be used by handlers called by the connection handler. The other ones
c->flags &= ~CO_FL_XPRT_RD_ENA;
}
-static inline void __cs_want_recv(struct conn_stream *cs)
-{
- cs->flags |= CS_FL_DATA_RD_ENA;
-}
-
-static inline void __cs_stop_recv(struct conn_stream *cs)
-{
- cs->flags &= ~CS_FL_DATA_RD_ENA;
-}
-
-static inline void cs_want_recv(struct conn_stream *cs)
-{
- __cs_want_recv(cs);
- cs_update_mux_polling(cs);
-}
-
-static inline void cs_stop_recv(struct conn_stream *cs)
-{
- __cs_stop_recv(cs);
- cs_update_mux_polling(cs);
-}
-
/* this one is used only to stop speculative recv(). It doesn't stop it if the
* fd is already polled in order to avoid expensive polling status changes.
* Since it might require the upper layer to re-enable reading, we'll return 1
c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
}
-static inline void __cs_want_send(struct conn_stream *cs)
-{
- cs->flags |= CS_FL_DATA_WR_ENA;
-}
-
-static inline void __cs_stop_send(struct conn_stream *cs)
-{
- cs->flags &= ~CS_FL_DATA_WR_ENA;
-}
-
-static inline void cs_stop_send(struct conn_stream *cs)
-{
- __cs_stop_send(cs);
- cs_update_mux_polling(cs);
-}
-
-static inline void cs_want_send(struct conn_stream *cs)
-{
- __cs_want_send(cs);
- cs_update_mux_polling(cs);
-}
-
-static inline void __cs_stop_both(struct conn_stream *cs)
-{
- cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
-}
-
-static inline void cs_stop_both(struct conn_stream *cs)
-{
- __cs_stop_both(cs);
- cs_update_mux_polling(cs);
-}
-
-
static inline void conn_xprt_want_recv(struct connection *c)
{
__conn_xprt_want_recv(c);
/* shut read */
static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- __cs_stop_recv(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutr)
/* shut write */
static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- __cs_stop_send(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutw)
LIST_ADD(pool, &conn->list);
cs_attach(cs, si, &si_idle_conn_cb);
- cs_want_recv(cs);
}
/* Attach conn_stream <cs> to the stream interface <si>. The stream interface
/* Calls chk_rcv on the connection using the data layer */
static inline void si_chk_rcv(struct stream_interface *si)
{
- si->ops->chk_rcv(si);
+ if (si->ops->chk_rcv)
+ si->ops->chk_rcv(si);
}
/* Calls chk_snd on the connection using the data layer */
static inline void si_chk_snd(struct stream_interface *si)
{
- si->ops->chk_snd(si);
+ if (si->ops->chk_snd)
+ si->ops->chk_snd(si);
}
/* Calls chk_snd on the connection using the ctrl layer */
}
else {
/* reuse the existing connection */
- if (!channel_is_empty(si_oc(si))) {
- /* we'll have to send a request there. */
- cs_want_send(cs);
- }
/* the connection is established */
si->state = SI_ST_EST;
/* conn_stream flags */
enum {
CS_FL_NONE = 0x00000000, /* Just for initialization purposes */
- CS_FL_DATA_RD_ENA = 0x00000001, /* receiving data is allowed */
- CS_FL_DATA_WR_ENA = 0x00000002, /* sending data is desired */
-
CS_FL_SHRD = 0x00000010, /* read shut, draining extra data */
CS_FL_SHRR = 0x00000020, /* read shut, resetting extra data */
CS_FL_SHR = CS_FL_SHRD | CS_FL_SHRR, /* read shut status */
struct mux_ops {
int (*init)(struct connection *conn, struct proxy *prx); /* early initialization */
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
- void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */
int (*rcv_pipe)(struct conn_stream *cs, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0);
- __cs_stop_both(cs);
goto out_wakeup;
}
b_realign_if_empty(&check->bo);
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
- __cs_stop_both(cs);
goto out_wakeup;
}
if (b_data(&check->bo)) {
t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
task_queue(t);
}
- goto out_nowake;
+ goto out;
out_wakeup:
task_wakeup(t, TASK_WOKEN_IO);
- out_nowake:
- __cs_stop_send(cs); /* nothing more to write */
out:
return;
}
* range quickly. To avoid sending RSTs all the time, we first try to
* drain pending data.
*/
- __cs_stop_both(cs);
cs_shutw(cs, CS_SHW_NORMAL);
/* OK, let's not stay here forever */
return;
wait_more_data:
- __cs_want_recv(cs);
cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list);
goto out;
}
* we expect errno to still be valid.
*/
chk_report_conn_err(check, errno, 0);
- __cs_stop_both(cs);
task_wakeup(check->task, TASK_WOKEN_IO);
}
- else if (!(conn->flags & CO_FL_HANDSHAKE) && !(cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_DATA_WR_ENA))) {
+ else if (!(conn->flags & CO_FL_HANDSHAKE) && !check->type) {
/* we may get here if only a connection probe was required : we
* don't have any data to send nor anything expected in response,
* so the completion of the connection establishment is enough.
if (proto && proto->connect)
ret = proto->connect(conn, check->type, quickack ? 2 : 0);
- if (check->type)
- cs_want_send(cs);
#ifdef USE_OPENSSL
if (s->check.sni)
t->expire = tick_first(t->expire, t_con);
}
- if (check->type) {
- cs_want_recv(cs); /* prepare for reading a possible reply */
+ if (check->type)
__event_srv_chk_r(cs);
- }
task_set_affinity(t, tid_bit);
goto reschedule;
t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
}
- /* It's only the rules which will enable send/recv */
- if (cs)
- cs_stop_both(cs);
-
while (1) {
/* We have to try to flush the output buffer before reading, at
* the end, or if we're about to send a string that does not fit
check->current_step->string_len >= b_room(&check->bo))) {
int ret;
- __cs_want_send(cs);
ret = cs->conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0);
b_realign_if_empty(&check->bo);
if (ret <= 0) {
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
- __cs_stop_both(cs);
goto out_end_tcpcheck;
}
break;
if (unlikely(check->result == CHK_RES_FAILED))
goto out_end_tcpcheck;
- __cs_want_recv(cs);
if (cs->conn->mux->rcv_buf(cs, &check->bi, b_size(&check->bi), 0) <= 0) {
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
done = 1;
if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect;
- __cs_stop_recv(cs);
}
}
else {
if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect;
- __cs_stop_recv(cs);
}
/* not matched but was supposed to => ERROR */
else {
goto out_end_tcpcheck;
}
- /* warning, current_step may now point to the head */
- if (b_data(&check->bo))
- __cs_want_send(cs);
-
if (&check->current_step->list != head &&
- check->current_step->action == TCPCHK_ACT_EXPECT) {
- __cs_want_recv(cs);
+ check->current_step->action == TCPCHK_ACT_EXPECT)
__event_srv_chk_r(cs);
- }
goto out;
out_end_tcpcheck:
if (check->result == CHK_RES_FAILED)
conn->flags |= CO_FL_ERROR;
- __cs_stop_both(cs);
-
out:
return retcode;
}
conn->send_wait = NULL;
} else
io_available = 1;
+ __conn_xprt_stop_send(conn);
}
/* The data transfer starts here and stops on error and handshakes. Note
conn->recv_wait = NULL;
} else
io_available = 1;
+ __conn_xprt_stop_recv(conn);
}
/* It may happen during the data phase that a handshake is
conn->recv_wait = NULL;
sw->wait_reason &= ~SUB_CAN_RECV;
}
+ __conn_xprt_stop_recv(conn);
}
if (event_type & SUB_CAN_SEND) {
sw = param;
conn->send_wait = NULL;
sw->wait_reason &= ~SUB_CAN_SEND;
}
+ __conn_xprt_stop_send(conn);
}
+ conn_update_xprt_polling(conn);
return 0;
}
conn->recv_wait = sw;
}
event_type &= ~SUB_CAN_RECV;
+ __conn_xprt_want_recv(conn);
}
if (event_type & SUB_CAN_SEND) {
sw = param;
conn->send_wait = sw;
}
event_type &= ~SUB_CAN_SEND;
+ __conn_xprt_want_send(conn);
}
if (event_type != 0)
return (-1);
+ conn_update_xprt_polling(conn);
return 0;
}
}
line++;
}
+ __conn_xprt_stop_recv(conn);
if (!dst_s || !sport_s || !dport_s)
goto bad_header;
if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) {
h2c->flags &= ~H2_CF_DEM_DALLOC;
- if (h2_recv_allowed(h2c)) {
- conn_xprt_want_recv(h2c->conn);
+ if (h2_recv_allowed(h2c))
tasklet_wakeup(h2c->wait_event.task);
- }
return 1;
}
if ((h2c->flags & H2_CF_MUX_MALLOC) && b_alloc_margin(&h2c->mbuf, 0)) {
h2c->flags &= ~H2_CF_MUX_MALLOC;
- if (!(h2c->flags & H2_CF_MUX_BLOCK_ANY))
- conn_xprt_want_send(h2c->conn);
if (h2c->flags & H2_CF_DEM_MROOM) {
h2c->flags &= ~H2_CF_DEM_MROOM;
- if (h2_recv_allowed(h2c)) {
- conn_xprt_want_recv(h2c->conn);
+ if (h2_recv_allowed(h2c))
tasklet_wakeup(h2c->wait_event.task);
- }
}
return 1;
}
(h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs &&
b_alloc_margin(&h2s->rxbuf, 0)) {
h2c->flags &= ~H2_CF_DEM_SALLOC;
- if (h2_recv_allowed(h2c)) {
- conn_xprt_want_recv(h2c->conn);
+ if (h2_recv_allowed(h2c))
tasklet_wakeup(h2c->wait_event.task);
- }
return 1;
}
task_queue(t);
/* prepare to read something */
- conn_xprt_want_recv(conn);
tasklet_wakeup(h2c->wait_event.task);
return 0;
fail:
ret = 0;
} while (ret > 0);
- if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size)) {
- conn_xprt_want_recv(conn);
+ if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size))
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event);
- }
if (!b_data(buf)) {
h2_release_buf(h2c, &h2c->dbuf);
if (!b_data(&h2c->dbuf))
h2_release_buf(h2c, &h2c->dbuf);
- /* stop being notified of incoming data if we can't process them */
- if (!h2_recv_allowed(h2c))
- __conn_xprt_stop_recv(conn);
- else
- __conn_xprt_want_recv(conn);
-
- /* adjust output polling */
- if (!(conn->flags & CO_FL_SOCK_WR_SH) &&
- h2c->st0 != H2_CS_ERROR2 && !(h2c->flags & H2_CF_GOAWAY_FAILED) &&
- (h2c->st0 == H2_CS_ERROR ||
- b_data(&h2c->mbuf) ||
- (h2c->mws > 0 && !LIST_ISEMPTY(&h2c->fctl_list)) ||
- (!(h2c->flags & H2_CF_MUX_BLOCK_ANY) && !LIST_ISEMPTY(&h2c->send_list)))) {
- __conn_xprt_want_send(conn);
- }
- else {
+ if ((conn->flags & CO_FL_SOCK_WR_SH) ||
+ h2c->st0 == H2_CS_ERROR2 || (h2c->flags & H2_CF_GOAWAY_FAILED) ||
+ (h2c->st0 != H2_CS_ERROR &&
+ !b_data(&h2c->mbuf) &&
+ (h2c->mws <= 0 || LIST_ISEMPTY(&h2c->fctl_list)) &&
+ ((h2c->flags & H2_CF_MUX_BLOCK_ANY) || LIST_ISEMPTY(&h2c->send_list))))
h2_release_buf(h2c, &h2c->mbuf);
- __conn_xprt_stop_send(conn);
- }
if (h2c->task) {
if (eb_is_empty(&h2c->streams_by_id) || b_data(&h2c->mbuf)) {
return NULL;
}
-/* callback used to update the mux's polling flags after changing a cs' status.
- * The caller (cs_update_mux_polling) will take care of propagating any changes
- * to the transport layer.
- */
-static void h2_update_poll(struct conn_stream *cs)
-{
- struct h2s *h2s = cs->ctx;
-
- if (!h2s)
- return;
-
- /* we may unblock a blocked read */
-
- if (cs->flags & CS_FL_DATA_RD_ENA) {
- /* the stream indicates it's willing to read */
- h2s->h2c->flags &= ~H2_CF_DEM_SFULL;
- if (h2s->h2c->dsi == h2s->id) {
- conn_xprt_want_recv(cs->conn);
- tasklet_wakeup(h2s->h2c->wait_event.task);
- conn_xprt_want_send(cs->conn);
- }
- }
-
- /* Note: the stream and stream-int code doesn't allow us to perform a
- * synchronous send() here unfortunately, because this code is called
- * as si_update() from the process_stream() context. This means that
- * we have to queue the current cs and defer its processing after the
- * connection's cs list is processed anyway.
- */
-
- if (cs->flags & CS_FL_DATA_WR_ENA) {
- if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH))
- conn_xprt_want_send(cs->conn);
- tasklet_wakeup(h2s->h2c->wait_event.task);
- }
- /* We don't support unsubscribing from here, it shouldn't be a problem */
-
- /* this can happen from within si_chk_snd() */
- if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA))
- conn_xprt_want_send(cs->conn);
-}
-
/*
* Detach the stream from the connection and possibly release the connection.
*/
if (h2c->flags & H2_CF_DEM_TOOMANY &&
!h2_has_too_many_cs(h2c)) {
h2c->flags &= ~H2_CF_DEM_TOOMANY;
- if (h2_recv_allowed(h2c)) {
- __conn_xprt_want_recv(h2c->conn);
+ if (h2_recv_allowed(h2c))
tasklet_wakeup(h2c->wait_event.task);
- conn_xprt_want_send(h2c->conn);
- }
}
/* this stream may be blocked waiting for some data to leave (possibly
*/
h2c->flags &= ~H2_CF_DEM_BLOCK_ANY;
h2c->flags &= ~H2_CF_MUX_BLOCK_ANY;
- conn_xprt_want_recv(cs->conn);
tasklet_wakeup(h2c->wait_event.task);
- conn_xprt_want_send(cs->conn);
}
h2s_destroy(h2s);
!(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) &&
h2c_send_goaway_error(h2c, h2s) <= 0)
return;
- if (b_data(&h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
- conn_xprt_want_send(h2c->conn);
h2s_close(h2s);
h2s_close(h2s);
}
- if (b_data(&h2s->h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
- conn_xprt_want_send(h2c->conn);
add_to_list:
if (LIST_ISEMPTY(&h2s->list)) {
b_del(buf, total);
if (total > 0) {
- conn_xprt_want_send(h2s->h2c->conn);
if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
tasklet_wakeup(h2s->h2c->wait_event.task);
}
const struct mux_ops h2_ops = {
.init = h2_init,
.wake = h2_wake,
- .update_poll = h2_update_poll,
.snd_buf = h2_snd_buf,
.rcv_buf = h2_rcv_buf,
.subscribe = h2_subscribe,
if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_HANDSHAKE)) ==
CO_FL_EARLY_DATA)
conn->flags &= ~CO_FL_EARLY_DATA;
- if (ret >= 0)
- cs_update_mux_polling(cs);
return ret;
}
-/* callback used to update the mux's polling flags after changing a cs' status.
- * The caller (cs_mux_update_poll) will take care of propagating any changes to
- * the transport layer.
- */
-static void mux_pt_update_poll(struct conn_stream *cs)
-{
- struct connection *conn = cs->conn;
- int flags = 0;
-
- conn_refresh_polling_flags(conn);
-
- if (cs->flags & CS_FL_DATA_RD_ENA)
- flags |= CO_FL_XPRT_RD_ENA;
- if (cs->flags & CS_FL_DATA_WR_ENA)
- flags |= CO_FL_XPRT_WR_ENA;
-
- conn->flags = (conn->flags & ~(CO_FL_XPRT_RD_ENA | CO_FL_XPRT_WR_ENA)) | flags;
- conn_cond_update_xprt_polling(conn);
-}
-
/*
* Attach a new stream to a connection
* (Used for outgoing connections)
const struct mux_ops mux_pt_ops = {
.init = mux_pt_init,
.wake = mux_pt_wake,
- .update_poll = mux_pt_update_poll,
.rcv_buf = mux_pt_rcv_buf,
.snd_buf = mux_pt_snd_buf,
.subscribe = mux_pt_subscribe,
goto out_fail_accept;
/* finish initialization of the accepted file descriptor */
- if (cs)
- cs_want_recv(cs);
- else if (appctx)
+ if (appctx)
si_applet_want_get(&s->si[0]);
if (sess->fe->accept && sess->fe->accept(s) < 0)
si_cs_send(cs);
si_cs_recv(cs);
}
+redo:
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
// si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
stream_process_counters(s);
+ cs = objt_cs(si_f->end);
+ ret = 0;
+ if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
+ !(cs->flags & CS_FL_ERROR) && !(si_oc(si_f)->flags & CF_SHUTW))
+ ret = si_cs_send(cs);
+ cs = objt_cs(si_b->end);
+ if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
+ !(cs->flags & CS_FL_ERROR) && !(si_oc(si_b)->flags & CF_SHUTW))
+ ret |= si_cs_send(cs);
+
+ if (ret)
+ goto redo;
+
if (si_f->state == SI_ST_EST)
si_update(si_f);
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
stream_release_buffers(s);
/* We may have free'd some space in buffers, or have more to send/recv, try again */
- cs = objt_cs(si_f->end);
- ret = 0;
- if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
- ret |= si_cs_send(cs);
- si_cs_recv(cs);
- ret |= (si_ic(si_f)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR);
- }
- cs = objt_cs(si_b->end);
- if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
- ret |= si_cs_send(cs);
- si_cs_recv(cs);
- ret |= (si_ic(si_b)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR);
-
- }
- if (ret)
- task_wakeup(t, TASK_WOKEN_IO);
return t; /* nothing more to do */
}
/* stream-interface operations for connections */
struct si_ops si_conn_ops = {
- .update = stream_int_update_conn,
.chk_rcv = stream_int_chk_rcv_conn,
.chk_snd = stream_int_chk_snd_conn,
.shutr = stream_int_shutr_conn,
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
+ tasklet_wakeup(si->wait_event.task);
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
*/
- if (channel_may_recv(ic) && new_len < last_len)
+ if (channel_may_recv(ic) && new_len < last_len) {
+ tasklet_wakeup(si->wait_event.task);
si->flags &= ~SI_FL_WAIT_ROOM;
+ }
}
if (si->flags & SI_FL_WAIT_ROOM) {
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
+ int wait_room = si->flags & SI_FL_WAIT_ROOM;
/* If we have data to send, try it now */
if (!channel_is_empty(oc) && objt_cs(si->end))
stream_int_notify(si);
channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
- /* Third step : update the connection's polling status based on what
- * was done above (eg: maybe some buffers got emptied).
- */
- if (channel_is_empty(oc))
- __cs_stop_send(cs);
-
-
- if (si->flags & SI_FL_WAIT_ROOM) {
- __cs_stop_recv(cs);
- }
- else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
- channel_may_recv(ic)) {
- __cs_want_recv(cs);
- }
+ /* Try to recv() again if we free'd some room in the process */
+ if (wait_room && !(si->flags & SI_FL_WAIT_ROOM))
+ si_cs_recv(cs);
return 0;
}
}
}
/* We couldn't send all of our data, let the mux know we'd like to send more */
- if (co_data(oc)) {
- cs_want_send(cs);
+ if (co_data(oc))
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event);
- }
return did_send;
}
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_ROOM;
+ tasklet_wakeup(si->wait_event.task);
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
}
-/* Updates the polling status of a connection outside of the connection handler
- * based on the channel's flags and the stream interface's flags. It needs to be
- * called once after the channels' flags have settled down and the stream has
- * been updated. It is not designed to be called from within the connection
- * handler itself.
- */
-void stream_int_update_conn(struct stream_interface *si)
-{
- struct channel *ic = si_ic(si);
- struct channel *oc = si_oc(si);
- struct conn_stream *cs = __objt_cs(si->end);
-
- if (!(ic->flags & CF_SHUTR)) {
- /* Read not closed */
- if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
- __cs_stop_recv(cs);
- else
- __cs_want_recv(cs);
- }
-
- if (!(oc->flags & CF_SHUTW)) {
- /* Write not closed */
- if (channel_is_empty(oc))
- __cs_stop_send(cs);
- else
- __cs_want_send(cs);
- }
-
- cs_update_mux_polling(cs);
-}
-
/*
* This function performs a shutdown-read on a stream interface attached to
* a connection in a connected or init state (it does nothing for other
static void stream_int_shutr_conn(struct stream_interface *si)
{
struct conn_stream *cs = __objt_cs(si->end);
- struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW;
/* we want to immediately forward this close to the write side */
return stream_int_shutw_conn(si);
}
- else if (conn->ctrl) {
- /* we want the caller to disable polling on this FD */
- cs_stop_recv(cs);
- }
}
/*
static void stream_int_chk_rcv_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
- struct conn_stream *cs = __objt_cs(si->end);
if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
return;
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
- if (!(ic->flags & CF_DONT_READ)) /* full */ {
- si->flags |= SI_FL_WAIT_ROOM;
- }
- __cs_stop_recv(cs);
+ si->flags |= SI_FL_WAIT_ROOM;
}
else {
+ struct conn_stream *cs = objt_cs(si->end);
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
- __cs_want_recv(cs);
+ if (cs) {
+ si_cs_recv(cs);
+ tasklet_wakeup(si->wait_event.task);
+ }
}
- cs_update_mux_polling(cs);
}
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
- if (cs->flags & CS_FL_DATA_WR_ENA) {
- /* already subscribed to write notifications, will be called
- * anyway, so let's avoid calling it especially if the reader
- * is not ready.
- */
- return;
- }
-
- __cs_want_send(cs);
-
si_cs_send(cs);
+ tasklet_wakeup(si->wait_event.task);
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
- __cs_stop_both(cs);
si->flags |= SI_FL_ERR;
goto out_wakeup;
}
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
- __cs_stop_send(cs);
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
(si->state == SI_ST_EST)) {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
- __cs_want_send(cs);
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
-
- /* commit possible polling changes */
- cs_update_mux_polling(cs);
}
/*
* could soon be full. Let's stop before needing to poll.
*/
si->flags |= SI_FL_WAIT_ROOM;
- __cs_stop_recv(cs);
}
/* splice not possible (anymore), let's go on on standard copy */
* This was changed to accomodate with the mux code,
* but we may have lost a worthwhile optimization.
*/
- __cs_stop_recv(cs);
si->flags |= SI_FL_WAIT_ROOM;
break;
}
goto out_shutdown_r;
/* Subscribe to receive events */
- conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
+ if (!(si->flags & SI_FL_WAIT_ROOM))
+ conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
- return cur_read != 0;
+ return (cur_read != 0 || (si->flags & SI_FL_WAIT_ROOM));
out_shutdown_r:
/* we received a shutdown */
}
/* otherwise that's just a normal read shutdown */
- __cs_stop_recv(cs);
return;
do_close: