c->flags &= ~CO_FL_XPRT_RD_ENA;
}
-static inline void __cs_data_want_recv(struct conn_stream *cs)
+static inline void __cs_want_recv(struct conn_stream *cs)
{
cs->flags |= CS_FL_DATA_RD_ENA;
}
-static inline void __cs_data_stop_recv(struct conn_stream *cs)
+static inline void __cs_stop_recv(struct conn_stream *cs)
{
cs->flags &= ~CS_FL_DATA_RD_ENA;
}
-static inline void cs_data_want_recv(struct conn_stream *cs)
+static inline void cs_want_recv(struct conn_stream *cs)
{
- __cs_data_want_recv(cs);
+ __cs_want_recv(cs);
cs_update_mux_polling(cs);
}
-static inline void cs_data_stop_recv(struct conn_stream *cs)
+static inline void cs_stop_recv(struct conn_stream *cs)
{
- __cs_data_stop_recv(cs);
+ __cs_stop_recv(cs);
cs_update_mux_polling(cs);
}
c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
}
-static inline void __cs_data_want_send(struct conn_stream *cs)
+static inline void __cs_want_send(struct conn_stream *cs)
{
cs->flags |= CS_FL_DATA_WR_ENA;
}
-static inline void __cs_data_stop_send(struct conn_stream *cs)
+static inline void __cs_stop_send(struct conn_stream *cs)
{
cs->flags &= ~CS_FL_DATA_WR_ENA;
}
-static inline void cs_data_stop_send(struct conn_stream *cs)
+static inline void cs_stop_send(struct conn_stream *cs)
{
- __cs_data_stop_send(cs);
+ __cs_stop_send(cs);
cs_update_mux_polling(cs);
}
-static inline void cs_data_want_send(struct conn_stream *cs)
+static inline void cs_want_send(struct conn_stream *cs)
{
- __cs_data_want_send(cs);
+ __cs_want_send(cs);
cs_update_mux_polling(cs);
}
-static inline void __cs_data_stop_both(struct conn_stream *cs)
+static inline void __cs_stop_both(struct conn_stream *cs)
{
cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
}
-static inline void cs_data_stop_both(struct conn_stream *cs)
+static inline void cs_stop_both(struct conn_stream *cs)
{
- __cs_data_stop_both(cs);
+ __cs_stop_both(cs);
cs_update_mux_polling(cs);
}
c->xprt->shutw(c, 0);
}
+/* shut read after draining possibly pending data */
+static inline void cs_shutr(struct conn_stream *cs)
+{
+ __cs_stop_recv(cs);
+
+ /* clean data-layer shutdown */
+ if (cs->conn->mux && cs->conn->mux->shutr)
+ cs->conn->mux->shutr(cs, 1);
+}
+
+/* shut read after disabling lingering */
+static inline void cs_shutr_hard(struct conn_stream *cs)
+{
+ __cs_stop_recv(cs);
+
+ /* clean data-layer shutdown */
+ if (cs->conn->mux && cs->conn->mux->shutr)
+ cs->conn->mux->shutr(cs, 0);
+}
+
+static inline void cs_shutw(struct conn_stream *cs)
+{
+ __cs_stop_send(cs);
+
+ /* clean data-layer shutdown */
+ if (cs->conn->mux && cs->conn->mux->shutw)
+ cs->conn->mux->shutw(cs, 1);
+}
+
+static inline void cs_shutw_hard(struct conn_stream *cs)
+{
+ __cs_stop_send(cs);
+
+ /* unclean data-layer shutdown */
+ if (cs->conn->mux && cs->conn->mux->shutw)
+ cs->conn->mux->shutw(cs, 0);
+}
+
+
/* detect sock->data read0 transition */
static inline int conn_xprt_read0_pending(struct connection *c)
{
{
conn->obj_type = OBJ_TYPE_CONN;
conn->flags = CO_FL_NONE;
- conn->data = NULL;
conn->tmp_early_data = -1;
conn->mux = NULL;
conn->mux_ctx = NULL;
return conn;
}
-/* Tries to allocate a new conn_stream and initialize its main fields. The
- * connection is returned on success, NULL on failure. The connection must
- * be released using pool_free2() or conn_free().
+/* Releases a conn_stream previously allocated by cs_new() */
+static inline void cs_free(struct conn_stream *cs)
+{
+ pool_free2(pool2_connstream, cs);
+}
+
+/* Tries to allocate a new conn_stream and initialize its main fields. If
+ * <conn> is NULL, then a new connection is allocated on the fly, initialized,
+ * and assigned to cs->conn ; this connection will then have to be released
+ * using pool_free2() or conn_free(). The conn_stream is initialized and added
+ * to the mux's stream list on success, then returned. On failure, nothing is
+ * allocated and NULL is returned.
*/
static inline struct conn_stream *cs_new(struct connection *conn)
{
struct conn_stream *cs;
cs = pool_alloc2(pool2_connstream);
- if (likely(cs != NULL))
- cs_init(cs, conn);
- return cs;
-}
+ if (!likely(cs))
+ return NULL;
-/* Releases a conn_stream previously allocated by cs_new() */
-static inline void cs_free(struct conn_stream *cs)
-{
- pool_free2(pool2_connstream, cs);
+ if (!conn) {
+ conn = conn_new();
+ if (!likely(conn)) {
+ cs_free(cs);
+ return NULL;
+ }
+ conn_init(conn);
+ }
+
+ cs_init(cs, conn);
+ return cs;
}
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
- if (conn->mux && conn->mux->release)
- conn->mux->release(conn);
pool_free2(pool2_connection, conn);
}
conn->flags |= CO_FL_ADDR_TO_SET;
}
-/* Attaches a connection to an owner and assigns a data layer */
-static inline void conn_attach(struct connection *conn, void *owner, const struct data_cb *data)
+/* Attaches a conn_stream to a data layer and sets the relevant callbacks */
+static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb)
{
- conn->data = data;
- conn->owner = owner;
+ cs->data_cb = data_cb;
+ cs->data = data;
}
/* Installs the connection's mux layer for upper context <ctx>.
return conn->mux->name;
}
-static inline const char *conn_get_data_name(const struct connection *conn)
+static inline const char *cs_get_data_name(const struct conn_stream *cs)
{
- if (!conn->data)
+ if (!cs->data_cb)
return "NONE";
- return conn->data->name;
+ return cs->data_cb->name;
}
/* registers pointer to transport layer <id> (XPRT_*) */
extern struct data_cb sess_conn_cb;
struct stream *stream_new(struct session *sess, enum obj_type *origin);
-int stream_create_from_conn(struct connection *conn);
+int stream_create_from_cs(struct conn_stream *cs);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_stream();
*/
static inline void si_release_endpoint(struct stream_interface *si)
{
- struct connection *conn;
+ struct conn_stream *cs;
struct appctx *appctx;
if (!si->end)
return;
- if ((conn = objt_conn(si->end))) {
- LIST_DEL(&conn->list);
- conn_stop_tracking(conn);
- conn_full_close(conn);
- conn_free(conn);
- }
+ if ((cs = objt_cs(si->end)))
+ cs_destroy(cs);
else if ((appctx = objt_appctx(si->end))) {
if (appctx->applet->release && si->state < SI_ST_DIS)
appctx->applet->release(appctx);
* connection will also be added at the head of this list. This connection
* remains assigned to the stream interface it is currently attached to.
*/
-static inline void si_idle_conn(struct stream_interface *si, struct list *pool)
+static inline void si_idle_cs(struct stream_interface *si, struct list *pool)
{
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
+ struct connection *conn = cs->conn;
if (pool)
LIST_ADD(pool, &conn->list);
- conn_attach(conn, si, &si_idle_conn_cb);
- conn_xprt_want_recv(conn);
+ cs_attach(cs, si, &si_idle_conn_cb);
+ cs_want_recv(cs);
}
-/* Attach connection <conn> to the stream interface <si>. The stream interface
+/* Attach conn_stream <cs> to the stream interface <si>. The stream interface
* is configured to work with a connection and the connection it configured
* with a stream interface data layer.
*/
-static inline void si_attach_conn(struct stream_interface *si, struct connection *conn)
+static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
{
si->ops = &si_conn_ops;
- si->end = &conn->obj_type;
- conn_attach(conn, si, &si_conn_cb);
+ si->end = &cs->obj_type;
+ cs_attach(cs, si, &si_conn_cb);
}
/* Returns true if a connection is attached to the stream interface <si> and
*/
static inline int si_conn_ready(struct stream_interface *si)
{
- struct connection *conn = objt_conn(si->end);
+ struct connection *conn = cs_conn(objt_cs(si->end));
return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn);
}
si->flags &= ~SI_FL_WANT_GET;
}
-/* Try to allocate a new connection and assign it to the interface. If
+/* Try to allocate a new conn_stream and assign it to the interface. If
* an endpoint was previously allocated, it is released first. The newly
- * allocated connection is initialized, assigned to the stream interface,
+ * allocated conn_stream is initialized, assigned to the stream interface,
* and returned.
*/
-static inline struct connection *si_alloc_conn(struct stream_interface *si)
+static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struct connection *conn)
{
- struct connection *conn;
+ struct conn_stream *cs;
si_release_endpoint(si);
- conn = conn_new();
- if (conn)
- si_attach_conn(si, conn);
+ cs = cs_new(conn);
+ if (cs)
+ si_attach_cs(si, cs);
- return conn;
+ return cs;
}
/* Release the interface's existing endpoint (connection or appctx) and
/* Calls chk_snd on the connection using the ctrl layer */
static inline int si_connect(struct stream_interface *si)
{
- struct connection *conn = objt_conn(si->end);
+ struct conn_stream *cs = objt_cs(si->end);
+ struct connection *conn = cs_conn(cs);
int ret = SF_ERR_NONE;
if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect))
/* reuse the existing connection */
if (!channel_is_empty(si_oc(si))) {
/* we'll have to send a request there. */
- conn_xprt_want_send(conn);
+ cs_want_send(cs);
}
/* the connection is established */
struct check {
struct xprt_ops *xprt; /* transport layer operations for health checks */
- struct connection *conn; /* connection state for health checks */
+ struct conn_stream *cs; /* conn_stream state for health checks */
unsigned short port; /* the port to use for the health checks */
struct buffer *bi, *bo; /* input and output buffers to send/recv check */
struct task *task; /* the task associated to the health check processing, NULL if disabled */
* data movement. It may abort a connection by returning < 0.
*/
struct data_cb {
- void (*recv)(struct connection *conn); /* data-layer recv callback */
- void (*send)(struct connection *conn); /* data-layer send callback */
- int (*wake)(struct connection *conn); /* data-layer callback to report activity */
+ void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
+ void (*send)(struct conn_stream *cs); /* data-layer send callback */
+ int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
char name[8]; /* data layer name, zero-terminated */
};
const struct protocol *ctrl; /* operations at the socket layer */
const struct xprt_ops *xprt; /* operations at the transport layer */
const struct mux_ops *mux; /* mux layer opreations. Must be set before xprt->init() */
- const struct data_cb *data; /* data layer callbacks. Must be set before xprt->init() */
void *xprt_ctx; /* general purpose pointer, initialized to NULL */
void *mux_ctx; /* mux-specific context, initialized to NULL */
- void *owner; /* pointer to upper layer's entity (eg: session, stream interface) */
+ void *owner; /* pointer to the owner session for incoming connections, or NULL */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */
union conn_handle handle; /* connection handle at the socket layer */
srv = NULL;
s->target = NULL;
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (conn &&
(conn->flags & CO_FL_CONNECTED) &&
s->target = &s->be->obj_type;
}
else if ((s->be->options & PR_O_HTTP_PROXY) &&
- (conn = objt_conn(s->si[1].end)) &&
- is_addr(&conn->addr.to)) {
+ conn && is_addr(&conn->addr.to)) {
/* in proxy mode, we need a valid destination address */
s->target = &s->be->obj_type;
}
int assign_server_address(struct stream *s)
{
struct connection *cli_conn = objt_conn(strm_orig(s));
- struct connection *srv_conn = objt_conn(s->si[1].end);
+ struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
#ifdef DEBUG_FULL
fprintf(stderr,"assign_server_address : s=%p\n",s);
struct server *srv = objt_server(s->target);
struct conn_src *src;
struct connection *cli_conn;
- struct connection *srv_conn = objt_conn(s->si[1].end);
+ struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
if (srv && srv->conn_src.opts & CO_SRC_BIND)
src = &srv->conn_src;
{
struct connection *cli_conn;
struct connection *srv_conn;
- struct connection *old_conn;
+ struct conn_stream *srv_cs;
+ struct conn_stream *old_cs;
struct server *srv;
int reuse = 0;
int err;
srv = objt_server(s->target);
- srv_conn = objt_conn(s->si[1].end);
+ srv_cs = objt_cs(s->si[1].end);
+ srv_conn = cs_conn(srv_cs);
if (srv_conn)
reuse = s->target == srv_conn->target;
if (srv && !reuse) {
- old_conn = srv_conn;
- if (old_conn) {
+ old_cs = srv_cs;
+ if (old_cs) {
srv_conn = NULL;
- old_conn->owner = NULL;
+ srv_cs->data = NULL;
si_detach_endpoint(&s->si[1]);
/* note: if the connection was in a server's idle
* queue, it doesn't get dequeued.
LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list);
- if (srv_conn->owner) {
- si_detach_endpoint(srv_conn->owner);
- if (old_conn && !(old_conn->flags & CO_FL_PRIVATE)) {
- si_attach_conn(srv_conn->owner, old_conn);
- si_idle_conn(srv_conn->owner, NULL);
+ /* XXX cognet: this assumes only 1 conn_stream per
+ * connection, has to be revisited later
+ */
+ srv_cs = srv_conn->mux_ctx;
+
+ if (srv_conn->mux == &mux_pt_ops && srv_cs->data) {
+ si_detach_endpoint(srv_cs->data);
+ if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) {
+ si_attach_cs(srv_cs->data, old_cs);
+ si_idle_cs(srv_cs->data, NULL);
}
}
- si_attach_conn(&s->si[1], srv_conn);
+ si_attach_cs(&s->si[1], srv_cs);
reuse = 1;
}
/* we may have to release our connection if we couldn't swap it */
- if (old_conn && !old_conn->owner) {
- LIST_DEL(&old_conn->list);
- conn_full_close(old_conn);
- conn_free(old_conn);
- }
+ if (old_cs && !old_cs->data)
+ cs_destroy(old_cs);
}
if (reuse) {
}
}
- if (!reuse)
- srv_conn = si_alloc_conn(&s->si[1]);
- else {
+ if (!reuse) {
+ srv_cs = si_alloc_cs(&s->si[1], NULL);
+ srv_conn = cs_conn(srv_cs);
+ } else {
/* reusing our connection, take it out of the idle list */
LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list);
}
- if (!srv_conn)
+ if (!srv_cs)
return SF_ERR_RESOURCE;
if (!(s->flags & SF_ADDR_SET)) {
/* set the correct protocol on the output stream interface */
if (srv) {
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt);
- conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
+ /* XXX: Pick the right mux, when we finally have one */
+ conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
}
else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
/* proxies exclusively run on raw_sock right now */
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW));
- if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl)
+ if (!objt_cs(s->si[1].end) || !objt_cs(s->si[1].end)->conn->ctrl)
return SF_ERR_INTERNAL;
- conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
+ /* XXX: Pick the right mux, when we finally have one */
+ conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
}
else
return SF_ERR_INTERNAL; /* how did we get there ? */
conn_get_to_addr(cli_conn);
}
- si_attach_conn(&s->si[1], srv_conn);
+ si_attach_cs(&s->si[1], srv_cs);
assign_tproxy_address(s);
}
else {
/* the connection is being reused, just re-attach it */
- si_attach_conn(&s->si[1], srv_conn);
+ si_attach_cs(&s->si[1], srv_cs);
s->flags |= SF_SRV_REUSED;
}
*/
static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
{
- struct connection *conn = check->conn;
+ struct conn_stream *cs = check->cs;
+ struct connection *conn = cs_conn(cs);
const char *err_msg;
struct chunk *chk;
int step;
* it sends the request. In other cases, it calls set_server_check_status()
* to set check->status, check->duration and check->result.
*/
-static void event_srv_chk_w(struct connection *conn)
+static void event_srv_chk_w(struct conn_stream *cs)
{
- struct check *check = conn->owner;
+ struct connection *conn = cs->conn;
+ struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0);
- __conn_xprt_stop_both(conn);
+ __cs_stop_both(cs);
goto out_wakeup;
}
return;
if (check->bo->o) {
- conn->xprt->snd_buf(conn, check->bo, 0);
+ conn->mux->snd_buf(cs, check->bo, 0);
if (conn->flags & CO_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
- __conn_xprt_stop_both(conn);
+ __cs_stop_both(cs);
goto out_wakeup;
}
if (check->bo->o)
out_wakeup:
task_wakeup(t, TASK_WOKEN_IO);
out_nowake:
- __conn_xprt_stop_send(conn); /* nothing more to write */
+ __cs_stop_send(cs); /* nothing more to write */
}
/*
* call it with a proper error status like HCHK_STATUS_L7STS, HCHK_STATUS_L6RSP,
* etc.
*/
-static void event_srv_chk_r(struct connection *conn)
+static void event_srv_chk_r(struct conn_stream *cs)
{
- struct check *check = conn->owner;
+ struct connection *conn = cs->conn;
+ struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
char *desc;
done = 0;
- conn->xprt->rcv_buf(conn, check->bi, check->bi->size);
+ conn->mux->rcv_buf(cs, check->bi, check->bi->size);
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
done = 1;
if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
* range quickly. To avoid sending RSTs all the time, we first try to
* drain pending data.
*/
- __conn_xprt_stop_both(conn);
- conn_xprt_shutw(conn);
+ __cs_stop_both(cs);
+ cs_shutw(cs);
/* OK, let's not stay here forever */
if (check->result == CHK_RES_FAILED)
return;
wait_more_data:
- __conn_xprt_want_recv(conn);
+ __cs_want_recv(cs);
}
/*
* It returns 0 on normal cases, <0 if at least one close() has happened on the
* connection (eg: reconnect).
*/
-static int wake_srv_chk(struct connection *conn)
+static int wake_srv_chk(struct conn_stream *cs)
{
- struct check *check = conn->owner;
+ struct connection *conn = cs->conn;
+ struct check *check = cs->data;
int ret = 0;
/* we may have to make progress on the TCP checks */
if (check->type == PR_O2_TCPCHK_CHK) {
ret = tcpcheck_main(check);
- conn = check->conn;
+ cs = check->cs;
+ conn = cs_conn(cs);
}
if (unlikely(conn->flags & CO_FL_ERROR)) {
* we expect errno to still be valid.
*/
chk_report_conn_err(check, errno, 0);
-
- __conn_xprt_stop_both(conn);
+ __cs_stop_both(cs);
task_wakeup(check->task, TASK_WOKEN_IO);
}
else if (!(conn->flags & (CO_FL_XPRT_RD_ENA|CO_FL_XPRT_WR_ENA|CO_FL_HANDSHAKE))) {
{
struct check *check = t->context;
struct server *s = check->server;
- struct connection *conn = check->conn;
+ struct conn_stream *cs = check->cs;
+ struct connection *conn = cs_conn(cs);
struct protocol *proto;
struct tcpcheck_rule *tcp_rule = NULL;
int ret;
}
/* prepare a new connection */
- conn = check->conn = conn_new();
- if (!check->conn)
+ cs = check->cs = cs_new(NULL);
+ if (!check->cs)
return SF_ERR_RESOURCE;
+ conn = cs->conn;
if (is_addr(&check->addr)) {
/* we'll connect to the check addr specified on the server */
i = srv_check_healthcheck_port(check);
if (i == 0) {
- conn->owner = check;
+ cs->data = check;
return SF_ERR_CHK_PORT;
}
proto = protocol_by_family(conn->addr.to.ss_family);
conn_prepare(conn, proto, check->xprt);
- conn_install_mux(conn, &mux_pt_ops, conn);
- conn_attach(conn, check, &check_conn_cb);
+ conn_install_mux(conn, &mux_pt_ops, cs);
+ cs_attach(cs, check, &check_conn_cb);
conn->target = &s->obj_type;
/* no client address */
{
struct check *check = t->context;
struct server *s = check->server;
- struct connection *conn = check->conn;
+ struct conn_stream *cs = check->cs;
+ struct connection *conn = cs_conn(cs);
int rv;
int ret;
int expired = tick_is_expired(t->expire, now_ms);
check->bo->o = 0;
ret = connect_conn_chk(t);
- conn = check->conn;
+ cs = check->cs;
+ conn = cs_conn(cs);
switch (ret) {
case SF_ERR_UP:
}
if (check->type)
- conn_xprt_want_recv(conn); /* prepare for reading a possible reply */
+ cs_want_recv(cs); /* prepare for reading a possible reply */
task_set_affinity(t, tid_bit);
goto reschedule;
}
/* here, we have seen a synchronous error, no fd was allocated */
- if (conn) {
- conn_free(conn);
- check->conn = conn = NULL;
+ if (cs) {
+ cs_destroy(cs);
+ cs = check->cs = NULL;
+ conn = NULL;
}
check->state &= ~CHK_ST_INPROGRESS;
}
if (conn) {
- conn_free(conn);
- check->conn = conn = NULL;
+ cs_destroy(cs);
+ cs = check->cs = NULL;
+ conn = NULL;
}
if (check->result == CHK_RES_FAILED) {
char *contentptr, *comment;
struct tcpcheck_rule *next;
int done = 0, ret = 0, step = 0;
- struct connection *conn = check->conn;
+ struct conn_stream *cs = check->cs;
+ struct connection *conn = cs_conn(cs);
struct server *s = check->server;
struct task *t = check->task;
struct list *head = check->tcpcheck_rules;
}
/* It's only the rules which will enable send/recv */
- if (conn)
- __conn_xprt_stop_both(conn);
+ if (cs)
+ cs_stop_both(cs);
while (1) {
/* We have to try to flush the output buffer before reading, at
check->current_step->action != TCPCHK_ACT_SEND ||
check->current_step->string_len >= buffer_total_space(check->bo))) {
- __conn_xprt_want_send(conn);
- if (conn->xprt->snd_buf(conn, check->bo, 0) <= 0) {
+ __cs_want_send(cs);
+ if (conn->mux->snd_buf(cs, check->bo, 0) <= 0) {
if (conn->flags & CO_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
- __conn_xprt_stop_both(conn);
+ __cs_stop_both(cs);
goto out_end_tcpcheck;
}
break;
* 2: try to get a new connection
* 3: release and replace the old one on success
*/
- if (check->conn) {
- conn_full_close(check->conn);
+ if (check->cs) {
+ /* XXX: need to kill all CS here as well but not to free them yet */
+ conn_full_close(check->cs->conn);
retcode = -1; /* do not reuse the fd! */
}
check->last_started_step = check->current_step;
/* prepare new connection */
- conn = conn_new();
- if (!conn) {
+ cs = cs_new(NULL);
+ if (!cs) {
step = tcpcheck_get_step_id(check);
chunk_printf(&trash, "TCPCHK error allocating connection at step %d", step);
comment = tcpcheck_get_step_comment(check, step);
return retcode;
}
- if (check->conn)
- conn_free(check->conn);
- check->conn = conn;
+ if (check->cs) {
+ if (check->cs->conn)
+ conn_free(check->cs->conn);
+ cs_free(check->cs);
+ }
- conn_attach(conn, check, &check_conn_cb);
+ check->cs = cs;
+ conn = cs->conn;
+ cs_attach(cs, check, &check_conn_cb);
conn->target = &s->obj_type;
/* no client address */
xprt = xprt_get(XPRT_RAW);
}
conn_prepare(conn, proto, xprt);
- conn_install_mux(conn, &mux_pt_ops, conn);
+ conn_install_mux(conn, &mux_pt_ops, cs);
ret = SF_ERR_INTERNAL;
if (proto->connect)
if (unlikely(check->result == CHK_RES_FAILED))
goto out_end_tcpcheck;
- __conn_xprt_want_recv(conn);
- if (conn->xprt->rcv_buf(conn, check->bi, check->bi->size) <= 0) {
+ __cs_want_recv(cs);
+ if (conn->mux->rcv_buf(cs, check->bi, check->bi->size) <= 0) {
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
done = 1;
if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect;
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
}
}
else {
if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect;
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
}
/* not matched but was supposed to => ERROR */
else {
/* warning, current_step may now point to the head */
if (check->bo->o)
- __conn_xprt_want_send(conn);
+ __cs_want_send(cs);
if (&check->current_step->list != head &&
check->current_step->action == TCPCHK_ACT_EXPECT)
- __conn_xprt_want_recv(conn);
+ __cs_want_recv(cs);
return retcode;
out_end_tcpcheck:
if (check->result == CHK_RES_FAILED)
conn->flags |= CO_FL_ERROR;
- __conn_xprt_stop_both(conn);
+ __cs_stop_both(cs);
return retcode;
}
return "out of memory while allocating check buffer";
}
check->bo->size = global.tune.chksize;
-
return NULL;
}
check->bi = NULL;
free(check->bo);
check->bo = NULL;
- free(check->conn);
- check->conn = NULL;
+ free(check->cs->conn);
+ check->cs->conn = NULL;
+ cs_free(check->cs);
+ check->cs = NULL;
}
void email_alert_free(struct email_alert *alert)
struct cmsghdr *cmsg;
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
- struct connection *remote = objt_conn(si_opposite(si)->end);
+ struct connection *remote = cs_conn(objt_cs(si_opposite(si)->end));
struct msghdr msghdr;
struct iovec iov;
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
/* try to report the ALPN value when available (also works for NPN) */
- if (conn && conn->owner == &s->si[0]) {
+ if (conn && conn == cs_conn(objt_cs(s->si[0].end))) {
if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) {
int len = MIN(alpn_len, sizeof(alpn) - 1);
memcpy(alpn, alpn_str, len);
static void hlua_socket_handler(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
- struct connection *c = objt_conn(si_opposite(si)->end);
+ struct connection *c = cs_conn(objt_cs(si_opposite(si)->end));
if (appctx->ctx.hlua_cosocket.die) {
si_shutw(si);
si = appctx->owner;
s = si_strm(si);
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (!conn) {
xref_unlock(&socket->xref, peer);
lua_pushnil(L);
si = appctx->owner;
s = si_strm(si);
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (!conn) {
xref_unlock(&socket->xref, peer);
lua_pushnil(L);
s = si_strm(si);
/* Initialise connection. */
- conn = si_alloc_conn(&s->si[1]);
+ conn = cs_conn(si_alloc_cs(&s->si[1], NULL));
if (!conn) {
xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: internal error"));
break;
case LOG_FMT_BACKENDIP: // %bi
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
else
break;
case LOG_FMT_BACKENDPORT: // %bp
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
else
break;
case LOG_FMT_SERVERIP: // %si
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
else
break;
case LOG_FMT_SERVERPORT: // %sp
- conn = objt_conn(s->si[1].end);
+ conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
else
#include <proto/connection.h>
#include <proto/stream.h>
-/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is
- * assumed that no data layer has yet been instanciated so the mux is
- * attached to an incoming connection and will instanciate a new stream. If
- * conn->mux_ctx exists, it is assumed that it is an outgoing connection
- * requested for this context. Returns < 0 on error.
+/* Initialize the mux once it's attached. It is expected that conn->mux_ctx
+ * points to the existing conn_stream (for outgoing connections) or NULL (for
+ * incoming ones, in which case one will be allocated and a new stream will be
+ * instanciated). Returns < 0 on error.
*/
static int mux_pt_init(struct connection *conn)
{
- if (!conn->mux_ctx)
- return stream_create_from_conn(conn);
+ struct conn_stream *cs = conn->mux_ctx;
+
+ if (!cs) {
+ cs = cs_new(conn);
+ if (!cs)
+ goto fail;
+
+ if (stream_create_from_cs(cs) < 0)
+ goto fail_free;
+
+ conn->mux_ctx = cs;
+ }
return 0;
+
+ fail_free:
+ cs_free(cs);
+ fail:
+ return -1;
}
/* callback to be used by default for the pass-through mux. It calls the data
*/
static int mux_pt_wake(struct connection *conn)
{
- return conn->data->wake ? conn->data->wake(conn) : 0;
+ struct conn_stream *cs = conn->mux_ctx;
+ int ret;
+
+ ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
+
+ cs_update_mux_polling(cs);
+ return (ret);
}
/* callback used to update the mux's polling flags after changing a cs' status.
*/
static void mux_pt_recv(struct connection *conn)
{
- conn->data->recv(conn);
+ struct conn_stream *cs = conn->mux_ctx;
+
+ if (conn_xprt_read0_pending(conn))
+ cs->flags |= CS_FL_EOS;
+ cs->data_cb->recv(cs);
+ cs_update_mux_polling(cs);
}
/* callback to be used by default for the pass-through mux. It simply calls the
*/
static void mux_pt_send(struct connection *conn)
{
- conn->data->send(conn);
+ struct conn_stream *cs = conn->mux_ctx;
+
+ cs->data_cb->send(cs);
+ cs_update_mux_polling(cs);
}
/*
struct session *sess;
struct stream *s;
struct connection *conn;
+ struct conn_stream *cs;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
peer->statuscode = PEER_SESS_SC_CONNECTCODE;
if (unlikely((conn = conn_new()) == NULL))
goto out_free_strm;
+ if (unlikely((cs = cs_new(conn)) == NULL))
+ goto out_free_conn;
+
conn_prepare(conn, peer->proto, peer->xprt);
- conn_install_mux(conn, &mux_pt_ops, conn);
- si_attach_conn(&s->si[1], conn);
+ conn_install_mux(conn, &mux_pt_ops, cs);
+ si_attach_cs(&s->si[1], cs);
conn->target = s->target = &s->be->obj_type;
memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
return appctx;
/* Error unrolling */
+ out_free_conn:
+ conn_free(conn);
out_free_strm:
LIST_DEL(&s->list);
pool_free2(pool2_stream, s);
char *path;
/* Note that for now we don't reuse existing proxy connections */
- if (unlikely((conn = si_alloc_conn(&s->si[1])) == NULL)) {
+ if (unlikely((conn = cs_conn(si_alloc_cs(&s->si[1], NULL))) == NULL)) {
txn->req.err_state = txn->req.msg_state;
txn->req.msg_state = HTTP_MSG_ERROR;
txn->status = 500;
int prev_status = s->txn->status;
struct proxy *fe = strm_fe(s);
struct proxy *be = s->be;
+ struct conn_stream *cs;
struct connection *srv_conn;
struct server *srv;
unsigned int prev_flags = s->txn->flags;
* flags. We also need a more accurate method for computing per-request
* data.
*/
- srv_conn = objt_conn(s->si[1].end);
+ /*
+ * XXX cognet: This is probably wrong, this is killing a whole
+ * connection, in the new world order, we probably want to just kill
+ * the stream, this is to be revisited the day we handle multiple
+ * streams in one server connection.
+ */
+ cs = objt_cs(s->si[1].end);
+ srv_conn = cs_conn(cs);
/* unless we're doing keep-alive, we want to quickly close the connection
* to the server.
if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) {
srv = objt_server(srv_conn->target);
if (!srv)
- si_idle_conn(&s->si[1], NULL);
+ si_idle_cs(&s->si[1], NULL);
else if (srv_conn->flags & CO_FL_PRIVATE)
- si_idle_conn(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
+ si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
else if (prev_flags & TX_NOT_FIRST)
/* note: we check the request, not the connection, but
* this is valid for strategies SAFE and AGGR, and in
* case of ALWS, we don't care anyway.
*/
- si_idle_conn(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
+ si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
else
- si_idle_conn(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
+ si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
}
s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0;
s->res.analysers = 0;
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir,
objt_conn(sess->origin) ? (unsigned short)objt_conn(sess->origin)->handle.fd : -1,
- objt_conn(s->si[1].end) ? (unsigned short)objt_conn(s->si[1].end)->handle.fd : -1);
+ objt_cs(s->si[1].end) ? (unsigned short)objt_cs(s->si[1].end)->conn->handle.fd : -1);
for (max = 0; start + max < end; max++)
if (start[max] == '\r' || start[max] == '\n')
/* get the object associated with the stream interface.The
* object can be other thing than a connection. For example,
* it be a appctx. */
- conn = objt_conn(smp->strm->si[dir].end);
+ conn = cs_conn(objt_cs(smp->strm->si[dir].end));
if (!conn)
return 0;
- /* The fd may not be avalaible for the tcp_info struct, and the
+ /* The fd may not be available for the tcp_info struct, and the
syscal can fail. */
optlen = sizeof(info);
if (getsockopt(conn->handle.fd, SOL_TCP, TCP_INFO, &info, &optlen) == -1)
* valid right after the handshake, before the connection's data layer is
* initialized, because it relies on the session to be in conn->owner.
*/
-int stream_create_from_conn(struct connection *conn)
+int stream_create_from_cs(struct conn_stream *cs)
{
struct stream *strm;
- strm = stream_new(conn->owner, &conn->obj_type);
+ strm = stream_new(cs->conn->owner, &cs->obj_type);
if (strm == NULL)
return -1;
{
struct stream *s;
struct task *t;
- struct connection *conn = objt_conn(origin);
+ struct conn_stream *cs = objt_cs(origin);
struct appctx *appctx = objt_appctx(origin);
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
s->si[0].hcto = sess->fe->timeout.clientfin;
/* attach the incoming connection to the stream interface now. */
- if (conn)
- si_attach_conn(&s->si[0], conn);
+ if (cs)
+ si_attach_cs(&s->si[0], cs);
else if (appctx)
si_attach_appctx(&s->si[0], appctx);
goto out_fail_accept;
/* finish initialization of the accepted file descriptor */
- if (conn)
- conn_xprt_want_recv(conn);
+ if (cs)
+ cs_want_recv(cs);
else if (appctx)
si_applet_want_get(&s->si[0]);
struct session *sess = strm_sess(s);
struct proxy *fe = sess->fe;
struct bref *bref, *back;
- struct connection *cli_conn = objt_conn(s->si[0].end);
+ struct conn_stream *cli_cs = objt_cs(s->si[0].end);
+ struct connection *cli_conn = cs_conn(cli_cs);
int i;
if (s->pend_pos)
http_end_txn(s);
/* ensure the client-side transport layer is destroyed */
+ /* XXX cognet: wrong for multiple streams in one connection */
if (cli_conn) {
conn_stop_tracking(cli_conn);
conn_full_close(cli_conn);
struct stream_interface *si = &s->si[1];
struct channel *req = &s->req;
struct channel *rep = &s->res;
- struct connection *srv_conn = __objt_conn(si->end);
+ struct connection *srv_conn = __objt_cs(si->end)->conn;
/* If we got an error, or if nothing happened and the connection timed
* out, we must give up. The CER state handler will take care of retry
si->exp = TICK_ETERNITY;
si->state = SI_ST_CER;
+ /* XXX cognet: do we really want to kill the connection here ?
+ * Probably not for multiple streams.
+ */
conn_full_close(srv_conn);
if (si->err_type)
static int sess_update_st_cer(struct stream *s)
{
struct stream_interface *si = &s->si[1];
- struct connection *conn = objt_conn(si->end);
+ struct conn_stream *cs = objt_cs(si->end);
+ struct connection *conn = cs_conn(cs);
/* we probably have to release last stream from the server */
if (objt_server(s->target)) {
req->flags |= CF_WAKE_ONCE;
req->flags &= ~CF_WAKE_CONNECT;
}
- if (objt_conn(si->end)) {
+ if (objt_cs(si->end)) {
/* real connections have timeouts */
req->wto = s->be->timeout.server;
rep->rto = s->be->timeout.server;
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
- (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->rcv_pipe) &&
- (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->snd_pipe) &&
+ (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe) &&
+ (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
- (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->snd_pipe) &&
- (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->rcv_pipe) &&
+ (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe) &&
+ (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
si_b->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id,
- objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
- objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
+ objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
+ objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
}
si_f->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id,
- objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
- objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
+ objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
+ objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
}
}
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
s->uniq_id, s->be->id,
- objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
- objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
+ objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
+ objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
}
struct tm tm;
extern const char *monthname[12];
char pn[INET6_ADDRSTRLEN];
+ struct conn_stream *cs;
struct connection *conn;
struct appctx *tmpctx;
else
chunk_appendf(&trash, " backend=<NONE> (id=-1 mode=-)");
- conn = objt_conn(strm->si[1].end);
+ cs = objt_cs(strm->si[1].end);
+ conn = cs_conn(cs);
+
if (conn)
conn_get_from_addr(conn);
TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type);
- if ((conn = objt_conn(strm->si[0].end)) != NULL) {
+ if ((cs = objt_cs(strm->si[0].end)) != NULL) {
+ conn = cs->conn;
+
chunk_appendf(&trash,
" co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
conn,
conn_get_ctrl_name(conn),
conn_get_xprt_name(conn),
conn_get_mux_name(conn),
- conn_get_data_name(conn),
+ cs_get_data_name(cs),
obj_type_name(conn->target),
obj_base_ptr(conn->target));
tmpctx->applet->name);
}
- if ((conn = objt_conn(strm->si[1].end)) != NULL) {
+ if ((cs = objt_cs(strm->si[1].end)) != NULL) {
+ conn = cs->conn;
+
chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
conn,
conn_get_ctrl_name(conn),
conn_get_xprt_name(conn),
conn_get_mux_name(conn),
- conn_get_data_name(conn),
+ cs_get_data_name(cs),
obj_type_name(conn->target),
obj_base_ptr(conn->target));
human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms),
TICKS_TO_MS(1000)) : "");
- conn = objt_conn(curr_strm->si[0].end);
+ conn = cs_conn(objt_cs(curr_strm->si[0].end));
chunk_appendf(&trash,
" s0=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[0].state,
human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
TICKS_TO_MS(1000)) : "");
- conn = objt_conn(curr_strm->si[1].end);
+ conn = cs_conn(objt_cs(curr_strm->si[1].end));
chunk_appendf(&trash,
" s1=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[1].state,
#include <proto/applet.h>
#include <proto/channel.h>
#include <proto/connection.h>
+#include <proto/mux_pt.h>
#include <proto/pipe.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
-static void si_conn_recv_cb(struct connection *conn);
-static void si_conn_send_cb(struct connection *conn);
-static int si_conn_wake_cb(struct connection *conn);
-static int si_idle_conn_wake_cb(struct connection *conn);
-static void si_idle_conn_null_cb(struct connection *conn);
+static void si_cs_recv_cb(struct conn_stream *cs);
+static void si_cs_send_cb(struct conn_stream *cs);
+static int si_cs_wake_cb(struct conn_stream *cs);
+static int si_idle_conn_wake_cb(struct conn_stream *cs);
+static void si_idle_conn_null_cb(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
};
struct data_cb si_conn_cb = {
- .recv = si_conn_recv_cb,
- .send = si_conn_send_cb,
- .wake = si_conn_wake_cb,
+ .recv = si_cs_recv_cb,
+ .send = si_cs_send_cb,
+ .wake = si_cs_wake_cb,
.name = "STRM",
};
* we've sent the whole proxy line. Otherwise we use connect().
*/
while (conn->send_proxy_ofs) {
+ struct conn_stream *cs;
int ret;
+ cs = conn->mux_ctx;
/* The target server expects a PROXY line to be sent first.
* If the send_proxy_ofs is negative, it corresponds to the
* offset to start sending from then end of the proxy string
* is attached to a stream interface. Otherwise we can only
* send a LOCAL line (eg: for use with health checks).
*/
- if (conn->data == &si_conn_cb) {
- struct stream_interface *si = conn->owner;
- struct connection *remote = objt_conn(si_opposite(si)->end);
- ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote);
+ if (conn->mux == &mux_pt_ops && cs->data_cb == &si_conn_cb) {
+ struct stream_interface *si = cs->data;
+ struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
+ ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote_cs ? remote_cs->conn : NULL);
}
else {
/* The target server expects a LOCAL line to be sent first. Retrieving
* It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
* is notified and can kill the connection.
*/
-static void si_idle_conn_null_cb(struct connection *conn)
+static void si_idle_conn_null_cb(struct conn_stream *cs)
{
- conn_sock_drain(conn);
+ conn_sock_drain(cs->conn);
}
/* Callback to be used by connection I/O handlers when some activity is detected
* a close was detected on it. It returns 0 if it did nothing serious, or -1 if
* it killed the connection.
*/
-static int si_idle_conn_wake_cb(struct connection *conn)
+static int si_idle_conn_wake_cb(struct conn_stream *cs)
{
- struct stream_interface *si = conn->owner;
+ struct connection *conn = cs->conn;
+ struct stream_interface *si = cs->data;
if (!conn_ctrl_ready(conn))
return 0;
* connection's polling based on the channels and stream interface's final
* states. The function always returns 0.
*/
-static int si_conn_wake_cb(struct connection *conn)
+static int si_cs_wake_cb(struct conn_stream *cs)
{
- struct stream_interface *si = conn->owner;
+ struct connection *conn = cs->conn;
+ struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
* was done above (eg: maybe some buffers got emptied).
*/
if (channel_is_empty(oc))
- __conn_xprt_stop_send(conn);
+ __cs_stop_send(cs);
if (si->flags & SI_FL_WAIT_ROOM) {
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) {
- __conn_xprt_want_recv(conn);
+ __cs_want_recv(cs);
}
return 0;
}
/*
* This function is called to send buffer data to a stream socket.
- * It calls the transport layer's snd_buf function. It relies on the
+ * It calls the mux layer's snd_buf function. It relies on the
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
-static void si_conn_send(struct connection *conn)
+static void si_cs_send(struct conn_stream *cs)
{
- struct stream_interface *si = conn->owner;
+ struct connection *conn = cs->conn;
+ struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si);
int ret;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
- if (oc->pipe && conn->xprt->snd_pipe) {
- ret = conn->xprt->snd_pipe(conn, oc->pipe);
+ if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
+ ret = conn->mux->snd_pipe(cs, oc->pipe);
if (ret > 0)
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
if (oc->flags & CF_STREAMER)
send_flag |= CO_SFL_STREAMER;
- ret = conn->xprt->snd_buf(conn, oc->buf, send_flag);
+ ret = conn->mux->snd_buf(cs, oc->buf, send_flag);
if (ret > 0) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed */
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
else
- __conn_xprt_want_recv(conn);
+ __cs_want_recv(cs);
}
if (!(oc->flags & CF_SHUTW)) {
/* Write not closed */
if (channel_is_empty(oc))
- __conn_xprt_stop_send(conn);
+ __cs_stop_send(cs);
else
- __conn_xprt_want_send(conn);
+ __cs_want_send(cs);
}
- conn_cond_update_xprt_polling(conn);
+ cs_update_mux_polling(cs);
}
/*
*/
static void stream_int_shutr_conn(struct stream_interface *si)
{
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
+ struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW;
return;
if (si_oc(si)->flags & CF_SHUTW) {
+ /* XXX: should just close cs ? */
conn_full_close(conn);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
}
else if (conn->ctrl) {
/* we want the caller to disable polling on this FD */
- conn_xprt_stop_recv(conn);
+ cs_stop_recv(cs);
}
}
*/
static void stream_int_shutw_conn(struct stream_interface *si)
{
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
+ struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
/* quick close, the socket is alredy shut anyway */
}
else if (si->flags & SI_FL_NOLINGER) {
- /* unclean data-layer shutdown */
- conn_xprt_shutw_hard(conn);
+ /* unclean data-layer shutdown, typically an aborted request
+ * or a forwarded shutdown from a client to a server due to
+ * option abortonclose. No need for the TLS layer to try to
+ * emit a shutdown message.
+ */
+ cs_shutw_hard(cs);
}
else {
- /* clean data-layer shutdown */
- conn_xprt_shutw(conn);
- conn_sock_shutw(conn);
+ /* clean data-layer shutdown. This only happens on the
+ * frontend side, or on the backend side when forwarding
+ * a client close in TCP mode or in HTTP TUNNEL mode
+ * while option abortonclose is set. We want the TLS
+ * layer to try to signal it to the peer before we close.
+ */
+ cs_shutw(cs);
/* If the stream interface is configured to disable half-open
* connections, we'll skip the shutdown(), but only if the
static void stream_int_chk_rcv_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
return;
- conn_refresh_polling_flags(conn);
-
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
if (!(ic->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM;
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
- __conn_xprt_want_recv(conn);
+ __cs_want_recv(cs);
}
- conn_cond_update_xprt_polling(conn);
+ cs_update_mux_polling(cs);
}
static void stream_int_chk_snd_conn(struct stream_interface *si)
{
struct channel *oc = si_oc(si);
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
- if (conn->flags & CO_FL_XPRT_WR_ENA) {
+ if (cs->flags & CS_FL_DATA_WR_ENA) {
/* already subscribed to write notifications, will be called
* anyway, so let's avoid calling it especially if the reader
* is not ready.
return;
}
- /* Before calling the data-level operations, we have to prepare
- * the polling flags to ensure we properly detect changes.
- */
- conn_refresh_polling_flags(conn);
- __conn_xprt_want_send(conn);
+ __cs_want_send(cs);
- si_conn_send(conn);
- if (conn->flags & CO_FL_ERROR) {
+ si_cs_send(cs);
+ if (cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
- __conn_xprt_stop_both(conn);
+ __cs_stop_both(cs);
si->flags |= SI_FL_ERR;
goto out_wakeup;
}
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
- __conn_xprt_stop_send(conn);
+ __cs_stop_send(cs);
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
(si->state == SI_ST_EST)) {
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
- __conn_xprt_want_send(conn);
+ __cs_want_send(cs);
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
}
/* commit possible polling changes */
- conn_cond_update_polling(conn);
+ cs_update_mux_polling(cs);
}
/*
* This is the callback which is called by the connection layer to receive data
- * into the buffer from the connection. It iterates over the transport layer's
+ * into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function.
*/
-static void si_conn_recv_cb(struct connection *conn)
+static void si_cs_recv_cb(struct conn_stream *cs)
{
- struct stream_interface *si = conn->owner;
+ struct connection *conn = cs->conn;
+ struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
int ret, max, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
return;
/* stop here if we reached the end of data */
- if (conn_xprt_read0_pending(conn))
+ if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
cur_read = 0;
/* First, let's see if we may splice data across the channel without
* using a buffer.
*/
- if (conn->xprt->rcv_pipe &&
+ if (conn->xprt->rcv_pipe && conn->mux->rcv_pipe &&
(ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
ic->flags & CF_KERN_SPLICING) {
if (buffer_not_empty(ic->buf)) {
}
}
- ret = conn->xprt->rcv_pipe(conn, ic->pipe, ic->to_forward);
+ ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward);
if (ret < 0) {
/* splice not supported on this end, let's disable it */
ic->flags &= ~CF_KERN_SPLICING;
ic->flags |= CF_READ_PARTIAL;
}
- if (conn_xprt_read0_pending(conn))
+ if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR)
* could soon be full. Let's stop before needing to poll.
*/
si->flags |= SI_FL_WAIT_ROOM;
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
}
/* splice not possible (anymore), let's go on on standard copy */
break;
}
- ret = conn->xprt->rcv_buf(conn, ic->buf, max);
+ ret = conn->mux->rcv_buf(cs, ic->buf, max);
if (ret <= 0)
break;
}
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
- if (__conn_xprt_done_recv(conn))
- si->flags |= SI_FL_WAIT_ROOM;
- break;
+ /*
+ * This used to be __conn_xprt_done_recv()
+ * This was changed to accomodate with the mux code,
+ * but we may have lost a worthwhile optimization.
+ */
+ __cs_stop_recv(cs);
}
/* if too many bytes were missing from last read, it means that
if (conn->flags & CO_FL_ERROR)
return;
- if (conn_xprt_read0_pending(conn))
+ if (cs->flags & CS_FL_EOS)
/* connection closed */
goto out_shutdown_r;
* from the buffer to the connection. It iterates over the transport layer's
* snd_buf function.
*/
-static void si_conn_send_cb(struct connection *conn)
+static void si_cs_send_cb(struct conn_stream *cs)
{
- struct stream_interface *si = conn->owner;
+ struct connection *conn = cs->conn;
+ struct stream_interface *si = cs->data;
if (conn->flags & CO_FL_ERROR)
return;
return;
/* OK there are data waiting to be sent */
- si_conn_send(conn);
+ si_cs_send(cs);
/* OK all done */
return;
*/
void stream_sock_read0(struct stream_interface *si)
{
- struct connection *conn = __objt_conn(si->end);
+ struct conn_stream *cs = __objt_cs(si->end);
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
/* force flag on ssl to keep stream in cache */
- conn_xprt_shutw_hard(conn);
+ cs_shutw_hard(cs);
goto do_close;
}
/* otherwise that's just a normal read shutdown */
- __conn_xprt_stop_recv(conn);
+ __cs_stop_recv(cs);
return;
do_close:
/* OK we completely close the socket here just as if we went through si_shut[rw]() */
- conn_full_close(conn);
+ conn_full_close(cs->conn);
ic->flags &= ~CF_SHUTR_NOW;
ic->flags |= CF_SHUTR;