*/
static void dns_session_io_handler(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
struct dns_session *ds = appctx->svcctx;
struct dns_ring *ring = &ds->ring;
struct buffer *buf = &ring->buf;
size_t len, cnt, ofs;
int ret = 0;
- if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
- co_skip(sc_oc(sc), co_data(sc_oc(sc)));
+ if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
+ applet_reset_input(appctx);
goto out;
}
/* if the connection is not established, inform the stream that we want
* to be notified whenever the connection completes.
*/
- if (sc_opposite(sc)->state < SC_ST_EST) {
+ if (se_fl_test(appctx->sedesc, SE_FL_APPLET_NEED_CONN)) {
applet_need_more_data(appctx);
- se_need_remote_conn(appctx->sedesc);
+ applet_have_more_data(appctx);
+ goto out;
+ }
+
+ if (applet_get_outbuf(appctx) == NULL) {
applet_have_more_data(appctx);
goto out;
}
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
/* retrieve available room on output channel */
- available_room = channel_recv_max(sc_ic(sc));
+ available_room = applet_output_room(appctx);
/* tx_msg_offset null means we are at the start of a new message */
if (!ds->tx_msg_offset) {
/* check if there is enough room to put message len and query id */
if (available_room < sizeof(slen) + sizeof(new_qid)) {
- sc_need_room(sc, sizeof(slen) + sizeof(new_qid));
+ applet_have_more_data(appctx);
ret = 0;
break;
}
/* check if it remains available room on output chan */
if (unlikely(!available_room)) {
- sc_need_room(sc, 1);
+ applet_have_more_data(appctx);
ret = 0;
break;
}
if (ds->tx_msg_offset) {
/* msg was not fully processed, we must be awake to drain pending data */
- sc_need_room(sc, 0);
+ applet_have_more_data(appctx);
ret = 0;
break;
}
*/
__ha_barrier_load();
if (!LIST_INLIST_ATOMIC(&ds->waiter)) {
+ if (applet_get_inbuf(appctx) == NULL) {
+ applet_need_more_data(appctx);
+ goto out;
+ }
+
while (1) {
uint16_t query_id;
struct eb32_node *eb;
if (!ds->rx_msg.len) {
/* retrieve message len */
- ret = co_getblk(sc_oc(sc), (char *)&msg_len, 2, 0);
+ ret = applet_getblk(appctx, (char *)&msg_len, 2, 0);
if (ret <= 0) {
if (ret == -1)
goto error;
}
/* mark as consumed */
- co_skip(sc_oc(sc), 2);
+ applet_skip_input(appctx, 2);
/* store message len */
ds->rx_msg.len = ntohs(msg_len);
continue;
}
- if (co_data(sc_oc(sc)) + ds->rx_msg.offset < ds->rx_msg.len) {
+ if (applet_input_data(appctx) + ds->rx_msg.offset < ds->rx_msg.len) {
/* message only partially available */
/* read available data */
- ret = co_getblk(sc_oc(sc), ds->rx_msg.area + ds->rx_msg.offset, co_data(sc_oc(sc)), 0);
+ ret = applet_getblk(appctx, ds->rx_msg.area + ds->rx_msg.offset, applet_input_data(appctx), 0);
if (ret <= 0) {
if (ret == -1)
goto error;
}
/* update message offset */
- ds->rx_msg.offset += co_data(sc_oc(sc));
+ ds->rx_msg.offset += applet_input_data(appctx);
/* consume all pending data from the channel */
- co_skip(sc_oc(sc), co_data(sc_oc(sc)));
+ applet_skip_input(appctx, applet_input_data(appctx));
/* we need to wait for more data */
applet_need_more_data(appctx);
/* enough data is available into the channel to read the message until the end */
/* read from the channel until the end of the message */
- ret = co_getblk(sc_oc(sc), ds->rx_msg.area + ds->rx_msg.offset, ds->rx_msg.len - ds->rx_msg.offset, 0);
+ ret = applet_getblk(appctx, ds->rx_msg.area + ds->rx_msg.offset, ds->rx_msg.len - ds->rx_msg.offset, 0);
if (ret <= 0) {
if (ret == -1)
goto error;
}
/* consume all data until the end of the message from the channel */
- co_skip(sc_oc(sc), ds->rx_msg.len - ds->rx_msg.offset);
+ applet_skip_input(appctx, ds->rx_msg.len - ds->rx_msg.offset);
/* reset reader offset to 0 for next message reand */
ds->rx_msg.offset = 0;
return;
close:
- se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
+ applet_set_eos(appctx);
goto out;
error:
- se_fl_set(appctx->sedesc, SE_FL_ERROR);
+ applet_set_eos(appctx);
+ applet_set_error(appctx);
goto out;
}
s->do_log = NULL;
s->uniq_id = 0;
+ se_need_remote_conn(appctx->sedesc);
applet_expect_no_data(appctx);
ds->appctx = appctx;
appctx->t->expire = TICK_ETERNITY;
.obj_type = OBJ_TYPE_APPLET,
.name = "<STRMDNS>", /* used for logging */
.fct = dns_session_io_handler,
+ .rcv_buf = appctx_raw_rcv_buf,
+ .snd_buf = appctx_raw_snd_buf,
.init = dns_session_init,
.release = dns_session_release,
};