/* post-IO notification callback */
static void cs_notify(struct conn_stream *cs);
-struct data_cb si_conn_cb = {
- .wake = si_cs_process,
+
+struct data_cb cs_data_conn_cb = {
+ .wake = cs_conn_process,
.name = "STRM",
};
-
struct data_cb cs_data_applet_cb = {
.wake = cs_applet_process,
.name = "STRM",
};
+
struct stream_interface *si_new(struct conn_stream *cs)
{
struct stream_interface *si;
* connection's polling based on the channels and stream interface's final
* states. The function always returns 0.
*/
-int si_cs_process(struct conn_stream *cs)
+int cs_conn_process(struct conn_stream *cs)
{
struct connection *conn = __cs_conn(cs);
- struct stream_interface *si = cs_si(cs);
- struct channel *ic = si_ic(si);
- struct channel *oc = si_oc(si);
+ struct channel *ic = cs_ic(cs);
+ struct channel *oc = cs_oc(cs);
BUG_ON(!conn);
/* If we have data to send, try it now */
- if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND))
- si_cs_send(cs);
+ if (!channel_is_empty(oc) && !(cs->wait_event.events & SUB_RETRY_SEND))
+ cs_conn_send(cs);
/* First step, report to the conn-stream what was detected at the
* connection layer : errors and connection establishment.
* to retry to connect, the connection may still have CO_FL_ERROR,
* and we don't want to add CS_EP_ERROR back
*
- * Note: This test is only required because si_cs_process is also the SI
- * wake callback. Otherwise si_cs_recv()/si_cs_send() already take
+ * Note: This test is only required because cs_conn_process is also the SI
+ * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
* care of it.
*/
- if (si->cs->state >= CS_ST_CON) {
- if (si_is_conn_error(si))
+ if (cs->state >= CS_ST_CON) {
+ if (si_is_conn_error(cs->si))
cs->endp->flags |= CS_EP_ERROR;
}
if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) &&
(cs->endp->flags & CS_EP_WAIT_FOR_HS)) {
cs->endp->flags &= ~CS_EP_WAIT_FOR_HS;
- task_wakeup(si_task(si), TASK_WOKEN_MSG);
+ task_wakeup(cs_strm_task(cs), TASK_WOKEN_MSG);
}
- if (!cs_state_in(si->cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) &&
+ if (!cs_state_in(cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) &&
(conn->flags & CO_FL_WAIT_XPRT) == 0) {
__cs_strm(cs)->conn_exp = TICK_ETERNITY;
oc->flags |= CF_WRITE_NULL;
- if (si->cs->state == CS_ST_CON)
- si->cs->state = CS_ST_RDY;
+ if (cs->state == CS_ST_CON)
+ cs->state = CS_ST_RDY;
}
/* Report EOS on the channel if it was reached from the mux point of
* view.
*
- * Note: This test is only required because si_cs_process is also the SI
- * wake callback. Otherwise si_cs_recv()/si_cs_send() already take
+ * Note: This test is only required because cs_conn_process is also the SI
+ * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
* care of it.
*/
if (cs->endp->flags & CS_EP_EOS && !(ic->flags & CF_SHUTR)) {
/* Report EOI on the channel if it was reached from the mux point of
* view.
*
- * Note: This test is only required because si_cs_process is also the SI
- * wake callback. Otherwise si_cs_recv()/si_cs_send() already take
+ * Note: This test is only required because cs_conn_process is also the SI
+ * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
* care of it.
*/
if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI))
* stream-int status.
*/
cs_notify(cs);
- stream_release_buffers(si_strm(si));
+ stream_release_buffers(__cs_strm(cs));
return 0;
}
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
-int si_cs_send(struct conn_stream *cs)
+int cs_conn_send(struct conn_stream *cs)
{
struct connection *conn = __cs_conn(cs);
- struct stream_interface *si = cs_si(cs);
- struct stream *s = si_strm(si);
- struct channel *oc = si_oc(si);
+ struct stream *s = __cs_strm(cs);
+ struct channel *oc = cs_oc(cs);
int ret;
int did_send = 0;
- if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) {
+ if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
/* We're probably there because the tasklet was woken up,
* but process_stream() ran before, detected there were an
* error and put the si back to CS_ST_TAR. There's still
* CO_FL_ERROR on the connection but we don't want to add
* CS_EP_ERROR back, so give up
*/
- if (si->cs->state < CS_ST_CON)
+ if (cs->state < CS_ST_CON)
return 0;
cs->endp->flags |= CS_EP_ERROR;
return 1;
}
/* We're already waiting to be able to send, give up */
- if (si->cs->wait_event.events & SUB_RETRY_SEND)
+ if (cs->wait_event.events & SUB_RETRY_SEND)
return 0;
/* we might have been called just after an asynchronous shutw */
if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) &&
((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
(oc->flags & CF_EXPECT_MORE) ||
- (IS_HTX_STRM(si_strm(si)) &&
+ (IS_HTX_STRM(s) &&
(!(oc->flags & (CF_EOI|CF_SHUTR)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
((oc->flags & CF_ISRESP) &&
((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW))))
end:
if (did_send) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
- if (si->cs->state == CS_ST_CON)
- si->cs->state = CS_ST_RDY;
+ if (cs->state == CS_ST_CON)
+ cs->state = CS_ST_RDY;
- si_rx_room_rdy(si_opposite(si));
+ si_rx_room_rdy(cs_opposite(cs)->si);
}
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING)) {
/* We couldn't send all of our data, let the mux know we'd like to send more */
if (!channel_is_empty(oc))
- conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event);
+ conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event);
return did_send;
}
return t;
if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
- ret = si_cs_send(cs);
+ ret = cs_conn_send(cs);
if (!(cs->wait_event.events & SUB_RETRY_RECV))
- ret |= si_cs_recv(cs);
+ ret |= cs_conn_recv(cs);
if (ret != 0)
- si_cs_process(cs);
+ cs_conn_process(cs);
stream_release_buffers(__cs_strm(cs));
return t;
if (!si_rx_endp_ready(cs->si) || si_rx_blocked(cs->si))
return 0; // already failed
- return si_cs_recv(cs);
+ return cs_conn_recv(cs);
}
/* perform a synchronous send() for the stream interface. The CF_WRITE_NULL and
if (!cs_conn_mux(cs))
return;
- si_cs_send(cs);
+ cs_conn_send(cs);
}
/*
* into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function.
*/
-int si_cs_recv(struct conn_stream *cs)
+int cs_conn_recv(struct conn_stream *cs)
{
struct connection *conn = __cs_conn(cs);
- struct stream_interface *si = cs_si(cs);
- struct channel *ic = si_ic(si);
+ struct channel *ic = cs_ic(cs);
int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS;
int flags = 0;
if (cs->state != CS_ST_EST)
return 0;
- /* If another call to si_cs_recv() failed, and we subscribed to
+ /* If another call to cs_conn_recv() failed, and we subscribed to
* recv events already, give up now.
*/
- if (si->cs->wait_event.events & SUB_RETRY_RECV)
+ if (cs->wait_event.events & SUB_RETRY_RECV)
return 0;
/* maybe we were called immediately after an asynchronous shutr */
/* the pipe is full or we have read enough data that it
* could soon be full. Let's stop before needing to poll.
*/
- si_rx_room_blk(si);
+ si_rx_room_blk(cs->si);
goto done_recv;
}
}
/* now we'll need a input buffer for the stream */
- if (!si_alloc_ibuf(si, &(si_strm(si)->buffer_wait)))
+ if (!si_alloc_ibuf(cs->si, &(__cs_strm(cs)->buffer_wait)))
goto end_recv;
/* For an HTX stream, if the buffer is stuck (no output data with some
* NOTE: A possible optim may be to let the mux decides if defrag is
* required or not, depending on amount of data to be xferred.
*/
- if (IS_HTX_STRM(si_strm(si)) && !co_data(ic)) {
+ if (IS_HTX_STRM(__cs_strm(cs)) && !co_data(ic)) {
struct htx *htx = htxbuf(&ic->buf);
if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
- htx_defrag(htxbuf(&ic->buf), NULL, 0);
+ htx_defrag(htx, NULL, 0);
}
/* Instruct the mux it must subscribed for read events */
- flags |= ((!conn_is_back(conn) && (si_strm(si)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0);
+ flags |= ((!conn_is_back(conn) && (__cs_strm(cs)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0);
/* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
* was enabled, which implies that the recv buffer was not full. So we have a guarantee
*/
BUG_ON(c_empty(ic));
- si_rx_room_blk(si);
+ si_rx_room_blk(cs->si);
/* Add READ_PARTIAL because some data are pending but
* cannot be xferred to the channel
*/
* here to proceed.
*/
if (flags & CO_RFL_BUF_FLUSH)
- si_rx_room_blk(si);
+ si_rx_room_blk(cs->si);
break;
}
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
/* we're stopped by the channel's policy */
- si_rx_chan_blk(si);
+ si_rx_chan_blk(cs->si);
break;
}
*/
if (ic->flags & CF_STREAMER) {
/* we're stopped by the channel's policy */
- si_rx_chan_blk(si);
+ si_rx_chan_blk(cs->si);
break;
}
*/
if (ret >= global.tune.recv_enough) {
/* we're stopped by the channel's policy */
- si_rx_chan_blk(si);
+ si_rx_chan_blk(cs->si);
break;
}
}
/* if we are waiting for more space, don't try to read more data
* right now.
*/
- if (si_rx_blocked(si))
+ if (si_rx_blocked(cs->si))
break;
} /* while !flags */
cs_conn_read0(cs);
ret = 1;
}
- else if (!si_rx_blocked(si)) {
+ else if (!si_rx_blocked(cs->si)) {
/* Subscribe to receive events if we're blocking on I/O */
- conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event);
- si_rx_endp_done(si);
+ conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
+ si_rx_endp_done(cs->si);
} else {
- si_rx_endp_more(si);
+ si_rx_endp_more(cs->si);
ret = 1;
}
return ret;