enum obj_type obj_type; /* differentiates connection from applet context */
/* 3 bytes hole here */
unsigned int flags; /* CS_FL_* */
- enum obj_type *end; /* points to the end point (connection or appctx) */
+ void *end; /* points to the end point (MUX stream or appctx) */
enum obj_type *app; /* points to the applicative point (stream or check) */
struct stream_interface *si;
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
- void *ctx; /* mux-specific context */
+ void *ctx; /* endpoint-specific context */
};
struct conn_stream *cs_new();
void cs_free(struct conn_stream *cs);
-void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx);
+void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
+void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx);
int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
void cs_detach_endp(struct conn_stream *cs);
void cs_detach_app(struct conn_stream *cs);
cs->data_cb = NULL;
}
-/* Returns the connection from a cs if the endpoint is a connection. Otherwise
+/* Returns the connection from a cs if the endpoint is a mux stream. Otherwise
* NULL is returned. __cs_conn() returns the connection without any control
* while cs_conn() check the endpoint type.
*/
static inline struct connection *__cs_conn(const struct conn_stream *cs)
{
- return __objt_conn(cs->end);
+ return cs->ctx;
}
static inline struct connection *cs_conn(const struct conn_stream *cs)
{
return NULL;
}
-/* Returns the mux of the connection from a cs if the endpoint is a
- * connection. Otherwise NULL is returned.
+/* Returns the mux ops of the connection from a cs if the endpoint is a
+ * mux stream. Otherwise NULL is returned.
*/
static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs)
{
*/
static inline struct appctx *__cs_appctx(const struct conn_stream *cs)
{
- return __objt_appctx(cs->end);
+ return cs->end;
}
static inline struct appctx *cs_appctx(const struct conn_stream *cs)
{
struct conn_stream *cs = cs_new();
if (!cs)
return NULL;
- cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
-
+ cs_attach_endp_mux(cs, qcs, qcs->qcc->conn);
qcs->cs = cs;
- cs->ctx = qcs;
stream_new(qcs->qcc->conn->owner, cs, buf);
++qcs->qcc->nb_cs;
}
if (avail >= 1) {
- cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
cs_detach_endp(s->csb);
srv_conn = NULL;
return SF_ERR_INTERNAL; /* how did we get there ? */
}
- cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
+ cs_attach_endp_mux(s->csb, NULL, srv_conn);
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
if (!srv ||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
}
-/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
-void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
+/* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
+void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
{
- struct connection *conn;
- struct appctx *appctx;
+ struct connection *conn = ctx;
cs->end = endp;
cs->ctx = ctx;
- if ((conn = objt_conn(endp)) != NULL) {
- if (!conn->ctx)
- conn->ctx = cs;
- if (cs_strm(cs)) {
- cs->si->ops = &si_conn_ops;
- cs->data_cb = &si_conn_cb;
- }
- else if (cs_check(cs))
- cs->data_cb = &check_conn_cb;
- cs->flags |= CS_FL_ENDP_MUX;
+ if (!conn->ctx)
+ conn->ctx = cs;
+ if (cs_strm(cs)) {
+ cs->si->ops = &si_conn_ops;
+ cs->data_cb = &si_conn_cb;
}
- else if ((appctx = objt_appctx(endp)) != NULL) {
- appctx->owner = cs;
- if (cs->si) {
- cs->si->ops = &si_applet_ops;
- cs->data_cb = NULL;
- }
- cs->flags |= CS_FL_ENDP_APP;
+ else if (cs_check(cs))
+ cs->data_cb = &check_conn_cb;
+ cs->flags |= CS_FL_ENDP_MUX;
+}
+
+/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
+void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
+{
+ struct appctx *appctx = endp;
+
+ cs->end = endp;
+ cs->ctx = ctx;
+ appctx->owner = cs;
+ if (cs->si) {
+ cs->si->ops = &si_applet_ops;
+ cs->data_cb = NULL;
}
+ cs->flags |= CS_FL_ENDP_APP;
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
goto out_free_strm;
- cs_attach_endp(cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
goto out_free_sess;
- cs_attach_endp(cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(cs, appctx, appctx);
stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */
size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
size_t total = 0;
- struct qcs *qcs = cs->ctx;
+ struct qcs *qcs = cs->end;
struct htx *htx;
enum htx_blk_type btype;
struct htx_blk *blk;
goto out_fail_sess;
}
- cs_attach_endp(cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(cs, appctx, appctx);
/* Initialise cross reference between stream and Lua socket object. */
xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref);
if (!cs)
return -1;
+
b_del(rxbuf, b_data(rxbuf));
b_free(&htx_buf);
static size_t hq_interop_snd_buf(struct conn_stream *cs, struct buffer *buf,
size_t count, int flags)
{
- struct qcs *qcs = cs->ctx;
+ struct qcs *qcs = cs->end;
struct htx *htx;
enum htx_blk_type btype;
struct htx_blk *blk;
break;
}
- cs_attach_endp(cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(cs, appctx, appctx);
s->flags |= SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->res.flags |= CF_READ_DONTWAIT;
TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn);
goto out;
}
-
+ cs_attach_endp_mux(cs, fstrm, fconn->conn);
fstrm->cs = cs;
fstrm->sess = sess;
- cs->ctx = fstrm;
+ cs->end = fstrm;
fconn->nb_cs++;
TRACE_LEAVE(FCGI_EV_FSTRM_NEW, fconn->conn, fstrm);
*/
static void fcgi_detach(struct conn_stream *cs)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
struct fcgi_conn *fconn;
struct session *sess;
TRACE_ENTER(FCGI_EV_STRM_END, (fstrm ? fstrm->fconn->conn : NULL), fstrm);
+ cs->end = NULL;
cs->ctx = NULL;
if (!fstrm) {
TRACE_LEAVE(FCGI_EV_STRM_END);
/* shutr() called by the conn_stream (mux_ops.shutr) */
static void fcgi_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
if (cs->flags & CS_FL_KILL_CONN)
/* shutw() called by the conn_stream (mux_ops.shutw) */
static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
if (cs->flags & CS_FL_KILL_CONN)
*/
static int fcgi_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
struct fcgi_conn *fconn = fstrm->fconn;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
*/
static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
struct fcgi_conn *fconn = fstrm->fconn;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
*/
static size_t fcgi_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
struct fcgi_conn *fconn = fstrm->fconn;
size_t ret = 0;
*/
static size_t fcgi_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct fcgi_strm *fstrm = cs->ctx;
+ struct fcgi_strm *fstrm = cs->end;
struct fcgi_conn *fconn = fstrm->fconn;
size_t total = 0;
size_t ret;
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
goto err;
}
- cs_attach_endp(cs, &h1c->conn->obj_type, h1s);
+ cs_attach_endp_mux(cs, h1s, h1c->conn);
h1s->cs = cs;
if (h1s->flags & H1S_F_NOT_FIRST)
if (!h1s)
goto fail;
+ cs_attach_endp_mux(cs, h1s, h1c->conn);
h1s->flags |= H1S_F_RX_BLK;
h1s->cs = cs;
h1s->sess = sess;
- cs->ctx = h1s;
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
if (!h1c_frt_stream_new(h1c))
goto fail;
-
h1c->h1s->cs = cs;
- cs->ctx = h1c->h1s;
+ cs_attach_endp_mux(cs, h1c->h1s, conn);
/* Attach the CS but Not ready yet */
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
*/
static void h1_detach(struct conn_stream *cs)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c;
struct session *sess;
int is_not_first;
TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
+ cs->end = NULL;
cs->ctx = NULL;
if (!h1s) {
TRACE_LEAVE(H1_EV_STRM_END);
static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c;
if (!h1s)
static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c;
if (!h1s)
*/
static int h1_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
if (!h1s)
return 0;
*/
static int h1_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c;
if (!h1s)
*/
static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
size_t ret = 0;
/* Called from the upper layer, to send data */
static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c;
size_t total = 0;
/* Send and get, using splicing */
static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
int ret = 0;
static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
{
- struct h1s *h1s = cs->ctx;
+ struct h1s *h1s = cs->end;
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
int ret = 0;
if (!cs)
goto out_close;
cs->flags |= CS_FL_NOT_FIRST;
- cs_attach_endp(cs, &h2c->conn->obj_type, h2s);
+ cs_attach_endp_mux(cs, h2s, h2c->conn);
h2s->cs = cs;
h2c->nb_cs++;
if (!h2s)
goto out;
+ cs_attach_endp_mux(cs, h2s, h2c->conn);
h2s->cs = cs;
h2s->sess = sess;
- cs->ctx = h2s;
h2c->nb_cs++;
out:
*/
static void h2_detach(struct conn_stream *cs)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
struct h2c *h2c;
struct session *sess;
TRACE_ENTER(H2_EV_STRM_END, h2s ? h2s->h2c->conn : NULL, h2s);
+ cs->end = NULL;
cs->ctx = NULL;
if (!h2s) {
TRACE_LEAVE(H2_EV_STRM_END);
/* shutr() called by the conn_stream (mux_ops.shutr) */
static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s);
if (cs->flags & CS_FL_KILL_CONN)
/* shutw() called by the conn_stream (mux_ops.shutw) */
static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s);
if (cs->flags & CS_FL_KILL_CONN)
*/
static int h2_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
struct h2c *h2c = h2s->h2c;
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
*/
static int h2_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
*/
static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
struct h2c *h2c = h2s->h2c;
struct htx *h2s_htx = NULL;
struct htx *buf_htx = NULL;
*/
static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h2s *h2s = cs->ctx;
+ struct h2s *h2s = cs->end;
size_t total = 0;
size_t ret;
struct htx *htx;
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
goto fail_free_ctx;
}
- cs_attach_endp(cs, &conn->obj_type, NULL);
+ cs_attach_endp_mux(cs, ctx, conn);
if (!stream_new(conn->owner, cs, &BUF_NULL)) {
TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
TRACE_ENTER(PT_EV_STRM_NEW, conn);
if (ctx->wait_event.events)
conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
+ cs_attach_endp_mux(cs, ctx, conn);
ctx->cs = cs;
cs->flags |= CS_FL_RCV_MORE;
TRACE_ENTER(PT_EV_STRM_END, conn, cs);
+ cs->end = NULL;
+ cs->ctx = NULL;
+
/* Subscribe, to know if we got disconnected */
if (!conn_is_back(conn) && conn->owner != NULL &&
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
static void qc_detach(struct conn_stream *cs)
{
- struct qcs *qcs = cs->ctx;
+ struct qcs *qcs = cs->end;
struct qcc *qcc = qcs->qcc;
TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);
static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf,
size_t count, int flags)
{
- struct qcs *qcs = cs->ctx;
+ struct qcs *qcs = cs->end;
struct htx *qcs_htx = NULL;
struct htx *cs_htx = NULL;
size_t ret = 0;
static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
size_t count, int flags)
{
- struct qcs *qcs = cs->ctx;
+ struct qcs *qcs = cs->end;
size_t ret;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
static int qc_subscribe(struct conn_stream *cs, int event_type,
struct wait_event *es)
{
- return qcs_subscribe(cs->ctx, event_type, es);
+ return qcs_subscribe(cs->end, event_type, es);
}
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
*/
static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct qcs *qcs = cs->ctx;
+ struct qcs *qcs = cs->end;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
BUG_ON(qcs->subs && qcs->subs != es);
if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr)))
goto out_free_strm;
- cs_attach_endp(cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr)))
goto out_free_strm;
- cs_attach_endp(cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
if (unlikely(!appctx))
return ACT_RET_ERR;
- /* Initialise the context. */
+ /* Finish initialisation of the context. */
memset(&appctx->ctx, 0, sizeof(appctx->ctx));
appctx->rule = rule;
if (appctx->applet->init && !appctx->applet->init(appctx))
strm->csb->si->err_type, strm->csb->si->wait_event.events);
cs = strm->csf;
- chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
+ chunk_appendf(&trash, " cs=%p csf=0x%08x endp=%p\n", cs, cs->flags, cs->end);
if ((conn = cs_conn(cs)) != NULL) {
chunk_appendf(&trash,
}
cs = strm->csb;
- chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
+ chunk_appendf(&trash, " cs=%p csf=0x%08x end=%p\n", cs, cs->flags, cs->end);
if ((conn = cs_conn(cs)) != NULL) {
chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
appctx = appctx_new(app, si->cs);
if (!appctx)
return NULL;
- cs_attach_endp(si->cs, &appctx->obj_type, appctx);
+ cs_attach_endp_app(si->cs, appctx, appctx);
appctx->t->nice = si_strm(si)->task->nice;
si_cant_get(si);
appctx_wakeup(appctx);
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
goto out;
}
- cs_attach_endp(check->cs, &conn->obj_type, conn);
+ cs_attach_endp_mux(check->cs, NULL, conn);
tasklet_set_tid(check->wait_list.tasklet, tid);
conn_set_owner(conn, check->sess, NULL);