#include <haproxy/channel-t.h>
#include <haproxy/connection-t.h>
+#include <haproxy/conn_stream-t.h>
#include <haproxy/http_ana-t.h>
#include <haproxy/stream-t.h>
#include <haproxy/stream_interface-t.h>
#define SHOW_AS_STRM 0x00000040
#define SHOW_AS_TASK 0x00000080
#define SHOW_AS_TXN 0x00000100
+#define SHOW_AS_ENDP 0x00000200
// command line names, must be in exact same order as the SHOW_AS_* flags above
// so that show_as_words[i] matches flag 1U<<i.
-const char *show_as_words[] = { "ana", "chn", "conn", "cs", "si", "siet", "strm", "task", "txn", };
+const char *show_as_words[] = { "ana", "chn", "conn", "cs", "si", "siet", "strm", "task", "txn", "endp", };
#define SHOW_FLAG(f,n) \
do { \
}
putchar('\n');
}
+
+void show_endp_flags(unsigned int f)
+{
+ printf("endp->flags = ");
+ if (!f) {
+ printf("0\n");
+ return;
+ }
+ SHOW_FLAG(f, CS_EP_T_APPLET);
+ SHOW_FLAG(f, CS_EP_T_MUX);
+
+ if (f) {
+ printf("EXTRA(0x%08x)", f);
+ }
+ putchar('\n');
+}
void show_cs_flags(unsigned int f)
{
printf("cs->flags = ");
printf("0\n");
return;
}
- SHOW_FLAG(f, CS_FL_ENDP_APP);
- SHOW_FLAG(f, CS_FL_ENDP_MUX);
SHOW_FLAG(f, CS_FL_WEBSOCKET);
SHOW_FLAG(f, CS_FL_NOT_FIRST);
SHOW_FLAG(f, CS_FL_KILL_CONN);
if (show_as & SHOW_AS_CHN) show_chn_flags(flags);
if (show_as & SHOW_AS_CONN) show_conn_flags(flags);
if (show_as & SHOW_AS_CS) show_cs_flags(flags);
+ if (show_as & SHOW_AS_ENDP) show_endp_flags(flags);
if (show_as & SHOW_AS_SI) show_si_flags(flags);
if (show_as & SHOW_AS_SIET) show_si_et(flags);
if (show_as & SHOW_AS_STRM) show_strm_flags(flags);
struct stream_interface;
+/* CS endpoint flags */
+ enum {
+ CS_EP_NONE = 0x00000000, /* For initialization purposes */
+
+ /* Endpoint types */
+ CS_EP_T_MUX = 0x00000001, /* The endpoint is a mux (the target may be NULL before the mux init) */
+ CS_EP_T_APPLET = 0x00000002, /* The endpoint is an applet */
+ };
+
/* conn_stream flags */
enum {
CS_FL_NONE = 0x00000000, /* Just for initialization purposes */
/* flags set by the mux relayed to the stream */
CS_FL_WEBSOCKET = 0x00200000, /* websocket stream */
-
- CS_FL_ENDP_MUX = 0x00400000, /* Endpoint is a mux */
- CS_FL_ENDP_APP = 0x00800000, /* Endpoint is an applet */
};
/* cs_shutr() modes */
char name[8]; /* data layer name, zero-terminated */
};
+
+struct cs_endpoint {
+ void *target;
+ void *ctx;
+ unsigned int flags;
+};
+
/*
* This structure describes the elements of a connection relevant to a stream
*/
enum obj_type obj_type; /* differentiates connection from applet context */
/* 3 bytes hole here */
unsigned int flags; /* CS_FL_* */
- void *end; /* points to the end point (MUX stream or appctx) */
+ struct cs_endpoint *endp; /* points to the end point (MUX stream or appctx) */
enum obj_type *app; /* points to the applicative point (stream or check) */
struct stream_interface *si;
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
- void *ctx; /* endpoint-specific context */
};
#define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(__cs_conn(cs)))
+struct cs_endpoint *cs_endpoint_new();
+void cs_endpoint_free(struct cs_endpoint *endp);
+
struct conn_stream *cs_new();
void cs_free(struct conn_stream *cs);
void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
void cs_detach_endp(struct conn_stream *cs);
void cs_detach_app(struct conn_stream *cs);
+/* Returns the endpoint target without any control */
+static inline void *__cs_endp_target(const struct conn_stream *cs)
+{
+ return cs->endp->target;
+}
+
+/* Returns the endpoint context without any control */
+static inline void *__cs_endp_ctx(const struct conn_stream *cs)
+{
+ return cs->endp->ctx;
+}
+
/* Returns the connection from a cs if the endpoint is a mux stream. Otherwise
* NULL is returned. __cs_conn() returns the connection without any control
* while cs_conn() check the endpoint type.
*/
static inline struct connection *__cs_conn(const struct conn_stream *cs)
{
- return cs->ctx;
+ return __cs_endp_ctx(cs);
}
static inline struct connection *cs_conn(const struct conn_stream *cs)
{
- if (cs->flags & CS_FL_ENDP_MUX)
+ if (cs->endp->flags & CS_EP_T_MUX)
return __cs_conn(cs);
return NULL;
}
return (conn ? conn->mux : NULL);
}
+/* Returns the mux from a cs if the endpoint is a mux. Otherwise
+ * NULL is returned. __cs_mux() returns the mux without any control
+ * while cs_mux() check the endpoint type.
+ */
+static inline void *__cs_mux(const struct conn_stream *cs)
+{
+ return __cs_endp_target(cs);
+}
+static inline struct appctx *cs_mux(const struct conn_stream *cs)
+{
+ if (cs->endp->flags & CS_EP_T_MUX)
+ return __cs_mux(cs);
+ return NULL;
+}
+
/* Returns the appctx from a cs if the endpoint is an appctx. Otherwise
* NULL is returned. __cs_appctx() returns the appctx without any control
* while cs_appctx() check the endpoint type.
*/
static inline struct appctx *__cs_appctx(const struct conn_stream *cs)
{
- return cs->end;
+ return __cs_endp_target(cs);
}
static inline struct appctx *cs_appctx(const struct conn_stream *cs)
{
- if (cs->flags & CS_FL_ENDP_APP)
+ if (cs->endp->flags & CS_EP_T_APPLET)
return __cs_appctx(cs);
return NULL;
}
#include <haproxy/stream_interface.h>
DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
+DECLARE_POOL(pool_head_cs_endpoint, "cs_endpoint", sizeof(struct cs_endpoint));
+void cs_endpoint_init(struct cs_endpoint *endp)
+{
+ endp->target = NULL;
+ endp->ctx = NULL;
+ endp->flags = CS_EP_NONE;
+}
+
+struct cs_endpoint *cs_endpoint_new()
+{
+ struct cs_endpoint *endp;
+
+ endp = pool_alloc(pool_head_cs_endpoint);
+ if (unlikely(!endp))
+ return NULL;
+
+ cs_endpoint_init(endp);
+ return endp;
+}
+
+void cs_endpoint_free(struct cs_endpoint *endp)
+{
+ pool_free(pool_head_cs_endpoint, endp);
+}
/* Tries to allocate a new conn_stream and initialize its main fields. On
* failure, nothing is allocated and NULL is returned.
struct conn_stream *cs_new()
{
struct conn_stream *cs;
+ struct cs_endpoint *endp;
cs = pool_alloc(pool_head_connstream);
+
if (unlikely(!cs))
- return NULL;
+ goto alloc_error;
cs->obj_type = OBJ_TYPE_CS;
cs->flags = CS_FL_NONE;
- cs->end = NULL;
cs->app = NULL;
- cs->ctx = NULL;
cs->si = NULL;
cs->data_cb = NULL;
+ endp = cs_endpoint_new();
+ if (unlikely(!endp))
+ goto alloc_error;
+ cs->endp = endp;
+
return cs;
+
+ alloc_error:
+ pool_free(pool_head_connstream, cs);
+ return NULL;
}
/* Releases a conn_stream previously allocated by cs_new(), as well as any
void cs_free(struct conn_stream *cs)
{
si_free(cs->si);
+ if (cs->endp) {
+ cs_endpoint_free(cs->endp);
+ }
pool_free(pool_head_connstream, cs);
}
{
struct connection *conn = ctx;
- cs->end = endp;
- cs->ctx = ctx;
+ cs->endp->target = endp;
+ cs->endp->ctx = ctx;
+ cs->endp->flags |= CS_EP_T_MUX;
if (!conn->ctx)
conn->ctx = cs;
if (cs_strm(cs)) {
}
else if (cs_check(cs))
cs->data_cb = &check_conn_cb;
- cs->flags |= CS_FL_ENDP_MUX;
}
/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
{
struct appctx *appctx = endp;
- cs->end = endp;
- cs->ctx = ctx;
+ cs->endp->target = endp;
+ cs->endp->ctx = ctx;
+ cs->endp->flags |= CS_EP_T_APPLET;
appctx->owner = cs;
if (cs->si) {
cs->si->ops = &si_applet_ops;
cs->data_cb = NULL;
}
- cs->flags |= CS_FL_ENDP_APP;
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
if (unlikely(!cs->si))
return -1;
- if (cs_conn(cs)) {
+ if (cs->endp->flags & CS_EP_T_MUX) {
cs->si->ops = &si_conn_ops;
cs->data_cb = &si_conn_cb;
}
- else if (cs_appctx(cs)) {
+ else if (cs->endp->flags & CS_EP_T_APPLET) {
cs->si->ops = &si_applet_ops;
cs->data_cb = NULL;
}
*/
void cs_detach_endp(struct conn_stream *cs)
{
- struct connection *conn;
- struct appctx *appctx;
+ if (cs->endp->flags & CS_EP_T_MUX) {
+ struct connection *conn = cs_conn(cs);
- if ((conn = cs_conn(cs))) {
if (conn->mux) {
/* TODO: handle unsubscribe for healthchecks too */
if (cs->si && cs->si->wait_event.events != 0)
conn_free(conn);
}
}
- else if ((appctx = cs_appctx(cs))) {
+ else if (cs->endp->flags & CS_EP_T_APPLET) {
+ struct appctx *appctx = cs_appctx(cs);
+
if (cs->si)
si_applet_release(cs->si);
appctx_free(appctx);
}
+ if (cs->endp) {
+ cs_endpoint_init(cs->endp);
+ }
+
/* FIXME: Rest CS for now but must be reviewed. CS flags are only
* connection related for now but this will evolved
*/
cs->flags = CS_FL_NONE;
- cs->end = NULL;
- cs->ctx = NULL;
if (cs->si)
cs->si->ops = &si_embedded_ops;
cs->data_cb = NULL;
cs->si = NULL;
cs->data_cb = NULL;
- if (cs->end == NULL)
+ if (!cs->endp || !cs->endp->target)
cs_free(cs);
}
size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
size_t total = 0;
- struct qcs *qcs = cs->end;
+ struct qcs *qcs = __cs_mux(cs);
struct htx *htx;
enum htx_blk_type btype;
struct htx_blk *blk;
static size_t hq_interop_snd_buf(struct conn_stream *cs, struct buffer *buf,
size_t count, int flags)
{
- struct qcs *qcs = cs->end;
+ struct qcs *qcs = __cs_mux(cs);
struct htx *htx;
enum htx_blk_type btype;
struct htx_blk *blk;
cs_attach_endp_mux(cs, fstrm, fconn->conn);
fstrm->cs = cs;
fstrm->sess = sess;
- cs->end = fstrm;
fconn->nb_cs++;
TRACE_LEAVE(FCGI_EV_FSTRM_NEW, fconn->conn, fstrm);
*/
static void fcgi_detach(struct conn_stream *cs)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
struct fcgi_conn *fconn;
struct session *sess;
TRACE_ENTER(FCGI_EV_STRM_END, (fstrm ? fstrm->fconn->conn : NULL), fstrm);
- cs->end = NULL;
- cs->ctx = NULL;
if (!fstrm) {
TRACE_LEAVE(FCGI_EV_STRM_END);
return;
/* shutr() called by the conn_stream (mux_ops.shutr) */
static void fcgi_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
if (cs->flags & CS_FL_KILL_CONN)
/* shutw() called by the conn_stream (mux_ops.shutw) */
static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
if (cs->flags & CS_FL_KILL_CONN)
*/
static int fcgi_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
struct fcgi_conn *fconn = fstrm->fconn;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
*/
static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
struct fcgi_conn *fconn = fstrm->fconn;
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
*/
static size_t fcgi_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
struct fcgi_conn *fconn = fstrm->fconn;
size_t ret = 0;
*/
static size_t fcgi_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct fcgi_strm *fstrm = cs->end;
+ struct fcgi_strm *fstrm = __cs_mux(cs);
struct fcgi_conn *fconn = fstrm->fconn;
size_t total = 0;
size_t ret;
*/
static void h1_detach(struct conn_stream *cs)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c;
struct session *sess;
int is_not_first;
TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
- cs->end = NULL;
- cs->ctx = NULL;
if (!h1s) {
TRACE_LEAVE(H1_EV_STRM_END);
return;
static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c;
if (!h1s)
static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c;
if (!h1s)
*/
static int h1_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
if (!h1s)
return 0;
*/
static int h1_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c;
if (!h1s)
*/
static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
size_t ret = 0;
/* Called from the upper layer, to send data */
static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c;
size_t total = 0;
/* Send and get, using splicing */
static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
int ret = 0;
static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
{
- struct h1s *h1s = cs->end;
+ struct h1s *h1s = __cs_mux(cs);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
int ret = 0;
*/
static void h2_detach(struct conn_stream *cs)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
struct h2c *h2c;
struct session *sess;
TRACE_ENTER(H2_EV_STRM_END, h2s ? h2s->h2c->conn : NULL, h2s);
- cs->end = NULL;
- cs->ctx = NULL;
if (!h2s) {
TRACE_LEAVE(H2_EV_STRM_END);
return;
/* shutr() called by the conn_stream (mux_ops.shutr) */
static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s);
if (cs->flags & CS_FL_KILL_CONN)
/* shutw() called by the conn_stream (mux_ops.shutw) */
static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s);
if (cs->flags & CS_FL_KILL_CONN)
*/
static int h2_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
struct h2c *h2c = h2s->h2c;
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
*/
static int h2_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
*/
static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
struct h2c *h2c = h2s->h2c;
struct htx *h2s_htx = NULL;
struct htx *buf_htx = NULL;
*/
static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- struct h2s *h2s = cs->end;
+ struct h2s *h2s = __cs_mux(cs);
size_t total = 0;
size_t ret;
struct htx *htx;
TRACE_ENTER(PT_EV_STRM_END, conn, cs);
- cs->end = NULL;
- cs->ctx = NULL;
-
/* Subscribe, to know if we got disconnected */
if (!conn_is_back(conn) && conn->owner != NULL &&
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
static void qc_detach(struct conn_stream *cs)
{
- struct qcs *qcs = cs->end;
+ struct qcs *qcs = __cs_mux(cs);
struct qcc *qcc = qcs->qcc;
TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);
- cs->ctx = NULL;
qcs->cs = NULL;
--qcc->nb_cs;
static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf,
size_t count, int flags)
{
- struct qcs *qcs = cs->end;
+ struct qcs *qcs = __cs_mux(cs);
struct htx *qcs_htx = NULL;
struct htx *cs_htx = NULL;
size_t ret = 0;
static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
size_t count, int flags)
{
- struct qcs *qcs = cs->end;
+ struct qcs *qcs = __cs_mux(cs);
size_t ret;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
static int qc_subscribe(struct conn_stream *cs, int event_type,
struct wait_event *es)
{
- return qcs_subscribe(cs->end, event_type, es);
+ return qcs_subscribe(__cs_mux(cs), event_type, es);
}
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
*/
static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
- struct qcs *qcs = cs->end;
+ struct qcs *qcs = __cs_mux(cs);
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
BUG_ON(qcs->subs && qcs->subs != es);
/* applets do not release session yet */
/* FIXME: Handle it in appctx_free ??? */
- must_free_sess = objt_appctx(sess->origin) && sess->origin == s->csf->end;
+ must_free_sess = objt_appctx(sess->origin) && sess->origin == __cs_endp_target(s->csf);
/* FIXME: ATTENTION, si CSF est libere avant, ca plante !!!! */
cs_detach_endp(s->csb);
strm->csf->si,
si_state_str(strm->csf->si->state),
strm->csf->si->flags,
- obj_type_name(strm->csf->end),
- obj_base_ptr(strm->csf->end),
+ (strm->csf->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
+ __cs_endp_target(strm->csf),
strm->csf->si->exp ?
tick_is_expired(strm->csf->si->exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->csf->si->exp - now_ms),
strm->csb->si,
si_state_str(strm->csb->si->state),
strm->csb->si->flags,
- obj_type_name(strm->csb->end),
- obj_base_ptr(strm->csb->end),
+ (strm->csb->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
+ __cs_endp_target(strm->csb),
strm->csb->si->exp ?
tick_is_expired(strm->csb->si->exp, now_ms) ? "<PAST>" :
human_time(TICKS_TO_MS(strm->csb->si->exp - now_ms),
strm->csb->si->err_type, strm->csb->si->wait_event.events);
cs = strm->csf;
- chunk_appendf(&trash, " cs=%p csf=0x%08x endp=%p\n", cs, cs->flags, cs->end);
+ chunk_appendf(&trash, " cs=%p csf=0x%08x endp=%p,0x%08x\n", cs, cs->flags, cs->endp->target, cs->endp->flags);
if ((conn = cs_conn(cs)) != NULL) {
chunk_appendf(&trash,
}
cs = strm->csb;
- chunk_appendf(&trash, " cs=%p csf=0x%08x end=%p\n", cs, cs->flags, cs->end);
+ chunk_appendf(&trash, " cs=%p csb=0x%08x endp=%p,0x%08x\n", cs, cs->flags, cs->endp->target, cs->endp->flags);
if ((conn = cs_conn(cs)) != NULL) {
chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",