struct task *(*fct)(struct task *));
void stream_int_unregister_handler(struct stream_interface *si);
+static inline const struct protocol *si_ctrl(struct stream_interface *si)
+{
+ return si->conn.ctrl;
+}
+
+static inline const struct sock_ops *si_data(struct stream_interface *si)
+{
+ return si->conn.data;
+}
+
static inline void clear_target(struct target *dest)
{
dest->type = TARG_TYPE_NONE;
static inline void stream_interface_prepare(struct stream_interface *si, const struct sock_ops *ops)
{
- memcpy(&si->sock, ops, sizeof(si->sock));
+ si->conn.data = ops;
}
if (si->flags & SI_FL_FROM_SET)
return;
- if (!si->proto || !si->proto->get_src)
+ if (!si_ctrl(si) || !si_ctrl(si)->get_src)
return;
- if (si->proto->get_src(si->fd, (struct sockaddr *)&si->addr.from,
- sizeof(si->addr.from),
- si->target.type != TARG_TYPE_CLIENT) == -1)
+ if (si_ctrl(si)->get_src(si->fd, (struct sockaddr *)&si->addr.from,
+ sizeof(si->addr.from),
+ si->target.type != TARG_TYPE_CLIENT) == -1)
return;
si->flags |= SI_FL_FROM_SET;
}
if (si->flags & SI_FL_TO_SET)
return;
- if (!si->proto || !si->proto->get_dst)
+ if (!si_ctrl(si) || !si_ctrl(si)->get_dst)
return;
- if (si->proto->get_dst(si->fd, (struct sockaddr *)&si->addr.to,
- sizeof(si->addr.to),
- si->target.type != TARG_TYPE_CLIENT) == -1)
+ if (si_ctrl(si)->get_dst(si->fd, (struct sockaddr *)&si->addr.to,
+ sizeof(si->addr.to),
+ si->target.type != TARG_TYPE_CLIENT) == -1)
return;
si->flags |= SI_FL_TO_SET;
}
+/* Sends a shutr to the connection using the data layer */
+static inline void si_shutr(struct stream_interface *si)
+{
+ si_data(si)->shutr(si);
+}
+
+/* Sends a shutw to the connection using the data layer */
+static inline void si_shutw(struct stream_interface *si)
+{
+ si_data(si)->shutw(si);
+}
+
+/* Calls the data state update on the stream interfaace */
+static inline void si_update(struct stream_interface *si)
+{
+ si_data(si)->update(si);
+}
+
+/* Calls chk_rcv on the connection using the data layer */
+static inline void si_chk_rcv(struct stream_interface *si)
+{
+ si_data(si)->chk_rcv(si);
+}
+
+/* Calls chk_snd on the connection using the data layer */
+static inline void si_chk_snd(struct stream_interface *si)
+{
+ si_data(si)->chk_snd(si);
+}
+
+/* Calls chk_snd on the connection using the ctrl layer */
+static inline int si_connect(struct stream_interface *si)
+{
+ if (unlikely(!si_ctrl(si) || !si_ctrl(si)->connect))
+ return SN_ERR_INTERNAL;
+ return si_ctrl(si)->connect(si);
+}
#endif /* _PROTO_STREAM_INTERFACE_H */
struct si_applet;
struct stream_interface;
+/* This structure describes a connection with its methods and data.
+ * A connection may be performed to proxy or server via a local or remote
+ * socket, and can also be made to an internal applet. It can support
+ * several data schemes (applet, raw, ssl, ...). It can support several
+ * connection control schemes, generally a protocol for socket-oriented
+ * connections, but other methods for applets.
+ */
+struct connection {
+ const struct sock_ops *data; /* operations at the data layer */
+ const struct protocol *ctrl; /* operations at the control layer, generally a protocol */
+};
+
struct target {
int type;
union {
unsigned int err_type; /* first error detected, one of SI_ET_* */
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
- struct sock_ops sock; /* socket level operations */
- struct protocol *proto; /* socket protocol */
+ struct connection conn; /* descriptor for a connection */
void (*release)(struct stream_interface *); /* handler to call after the last close() */
/* set the correct protocol on the output stream interface */
if (s->target.type == TARG_TYPE_SERVER) {
- s->req->cons->proto = target_srv(&s->target)->proto;
+ s->req->cons->conn.ctrl = target_srv(&s->target)->proto;
stream_interface_prepare(s->req->cons, target_srv(&s->target)->sock);
}
else if (s->target.type == TARG_TYPE_PROXY) {
/* proxies exclusively run on sock_raw right now */
- s->req->cons->proto = protocol_by_family(s->req->cons->addr.to.ss_family);
+ s->req->cons->conn.ctrl = protocol_by_family(s->req->cons->addr.to.ss_family);
stream_interface_prepare(s->req->cons, &sock_raw);
- if (!s->req->cons->proto)
+ if (!si_ctrl(s->req->cons))
return SN_ERR_INTERNAL;
}
else
if (s->fe->options2 & PR_O2_SRC_ADDR)
s->req->cons->flags |= SI_FL_SRC_ADDR;
- err = s->req->cons->proto->connect(s->req->cons);
+ err = si_connect(s->req->cons);
if (err != SN_ERR_NONE)
return err;
/* Let's close for real now. We just close the request
* side, the conditions below will complete if needed.
*/
- si->sock.shutw(si);
+ si_shutw(si);
break;
}
else if (si->applet.st0 == STAT_CLI_GETREQ) {
* we forward the close to the request side so that it flows upstream to
* the client.
*/
- si->sock.shutw(si);
+ si_shutw(si);
}
if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && (si->applet.st0 < STAT_CLI_OUTPUT)) {
* the client side has closed. So we'll forward this state downstream
* on the response buffer.
*/
- si->sock.shutr(si);
+ si_shutr(si);
res->flags |= BF_READ_NULL;
}
/* update all other flags and resync with the other side */
- si->sock.update(si);
+ si_update(si);
/* we don't want to expire timeouts while we're processing requests */
si->ib->rex = TICK_ETERNITY;
if (s->txn.meth == HTTP_METH_POST) {
if (stats_http_redir(si, s->be->uri_auth)) {
si->applet.st0 = 1;
- si->sock.shutw(si);
+ si_shutw(si);
}
} else {
if (stats_dump_http(si, s->be->uri_auth)) {
si->applet.st0 = 1;
- si->sock.shutw(si);
+ si_shutw(si);
}
}
}
if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST))
- si->sock.shutw(si);
+ si_shutw(si);
if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && si->applet.st0) {
- si->sock.shutr(si);
+ si_shutr(si);
res->flags |= BF_READ_NULL;
}
/* update all other flags and resync with the other side */
- si->sock.update(si);
+ si_update(si);
/* we don't want to expire timeouts while we're processing requests */
si->ib->rex = TICK_ETERNITY;
si->applet.st0 = PEER_SESSION_END;
/* fall through */
case PEER_SESSION_END: {
- si->sock.shutw(si);
- si->sock.shutr(si);
+ si_shutw(si);
+ si_shutr(si);
si->ib->flags |= BF_READ_NULL;
goto quit;
}
}
}
out:
- si->sock.update(si);
+ si_update(si);
si->ob->flags |= BF_READ_DONTWAIT;
/* we don't want to expire timeouts while we're processing requests */
si->ib->rex = TICK_ETERNITY;
s->si[0].state = s->si[0].prev_state = SI_ST_EST;
s->si[0].err_type = SI_ET_NONE;
s->si[0].err_loc = NULL;
- s->si[0].proto = NULL;
+ s->si[0].conn.ctrl = NULL;
s->si[0].release = NULL;
s->si[0].send_proxy_ofs = 0;
set_target_client(&s->si[0].target);
s->si[1].conn_retries = p->conn_retries;
s->si[1].err_type = SI_ET_NONE;
s->si[1].err_loc = NULL;
- s->si[1].proto = peer->proto;
+ s->si[1].conn.ctrl = peer->proto;
s->si[1].release = NULL;
s->si[1].send_proxy_ofs = 0;
set_target_proxy(&s->si[1].target, s->be);
}
/* prepare to return without error. */
- si->sock.shutr(si);
- si->sock.shutw(si);
+ si_shutr(si);
+ si_shutw(si);
si->err_type = SI_ET_NONE;
si->err_loc = NULL;
si->state = SI_ST_CLO;
http_silent_debug(__LINE__, s);
s->req->cons->flags |= SI_FL_NOLINGER | SI_FL_NOHALF;
- s->req->cons->sock.shutr(s->req->cons);
- s->req->cons->sock.shutw(s->req->cons);
+ si_shutr(s->req->cons);
+ si_shutw(s->req->cons);
http_silent_debug(__LINE__, s);
fdtab[fd].cb[DIR_WR].f = tcp_connect_write;
}
else {
- fdtab[fd].cb[DIR_RD].f = si->sock.read;
- fdtab[fd].cb[DIR_WR].f = si->sock.write;
+ fdtab[fd].cb[DIR_RD].f = si_data(si)->read;
+ fdtab[fd].cb[DIR_WR].f = si_data(si)->write;
}
fdinfo[fd].peeraddr = (struct sockaddr *)&si->addr.to;
/* The FD is ready now, we can hand the handlers to the socket layer
* and forward the event there to start working on the socket.
*/
- fdtab[fd].cb[DIR_RD].f = si->sock.read;
- fdtab[fd].cb[DIR_WR].f = si->sock.write;
+ fdtab[fd].cb[DIR_RD].f = si_data(si)->read;
+ fdtab[fd].cb[DIR_WR].f = si_data(si)->write;
fdtab[fd].state = FD_STREADY;
si->exp = TICK_ETERNITY;
- return si->sock.write(fd);
+ return si_data(si)->write(fd);
out_wakeup:
task_wakeup(si->owner, TASK_WOKEN_IO);
s->si[0].state = s->si[0].prev_state = SI_ST_EST;
s->si[0].err_type = SI_ET_NONE;
s->si[0].err_loc = NULL;
- s->si[0].proto = l->proto;
+ s->si[0].conn.ctrl = l->proto;
s->si[0].release = NULL;
s->si[0].send_proxy_ofs = 0;
set_target_client(&s->si[0].target);
s->si[1].err_type = SI_ET_NONE;
s->si[1].conn_retries = 0; /* used for logging too */
s->si[1].err_loc = NULL;
- s->si[1].proto = NULL;
+ s->si[1].conn.ctrl = NULL;
s->si[1].release = NULL;
s->si[1].send_proxy_ofs = 0;
clear_target(&s->si[1].target);
fdtab[cfd].owner = &s->si[0];
fdtab[cfd].state = FD_STREADY;
fdtab[cfd].flags = 0;
- fdtab[cfd].cb[DIR_RD].f = s->si[0].sock.read;
- fdtab[cfd].cb[DIR_WR].f = s->si[0].sock.write;
+ fdtab[cfd].cb[DIR_RD].f = si_data(&s->si[0])->read;
+ fdtab[cfd].cb[DIR_WR].f = si_data(&s->si[0])->write;
fdinfo[cfd].peeraddr = (struct sockaddr *)&s->si[0].addr.from;
fdinfo[cfd].peerlen = sizeof(s->si[0].addr.from);
EV_FD_SET(cfd, DIR_RD);
(((req->flags & (BF_OUT_EMPTY|BF_WRITE_ACTIVITY)) == BF_OUT_EMPTY) ||
s->be->options & PR_O_ABRT_CLOSE)))) {
/* give up */
- si->sock.shutw(si);
+ si_shutw(si);
si->err_type |= SI_ET_CONN_ABRT;
si->err_loc = target_srv(&s->target);
si->flags &= ~SI_FL_CAP_SPLICE;
process_srv_queue(target_srv(&s->target));
/* shutw is enough so stop a connecting socket */
- si->sock.shutw(si);
+ si_shutw(si);
si->ob->flags |= BF_WRITE_ERROR;
si->ib->flags |= BF_READ_ERROR;
rep->analysers |= s->fe->fe_rsp_ana | s->be->be_rsp_ana;
rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
- if (si->proto) {
+ if (si_ctrl(si)) {
/* real connections have timeouts */
req->wto = s->be->timeout.server;
rep->rto = s->be->timeout.server;
process_srv_queue(srv);
/* Failed and not retryable. */
- si->sock.shutr(si);
- si->sock.shutw(si);
+ si_shutr(si);
+ si_shutw(si);
si->ob->flags |= BF_WRITE_ERROR;
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
if (srv)
srv->counters.failed_conns++;
s->be->be_counters.failed_conns++;
- si->sock.shutr(si);
- si->sock.shutw(si);
+ si_shutr(si);
+ si_shutw(si);
si->ob->flags |= BF_WRITE_TIMEOUT;
if (!si->err_type)
si->err_type = SI_ET_QUEUE_TO;
/* give up */
si->exp = TICK_ETERNITY;
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
- si->sock.shutr(si);
- si->sock.shutw(si);
+ si_shutr(si);
+ si_shutw(si);
si->err_type |= SI_ET_QUEUE_ABRT;
si->state = SI_ST_CLO;
if (s->srv_error)
(si->ob->flags & BF_OUT_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
/* give up */
si->exp = TICK_ETERNITY;
- si->sock.shutr(si);
- si->sock.shutw(si);
+ si_shutr(si);
+ si_shutw(si);
si->err_type |= SI_ET_CONN_ABRT;
si->state = SI_ST_CLO;
if (s->srv_error)
return;
/* we did not get any server, let's check the cause */
- si->sock.shutr(si);
- si->sock.shutw(si);
+ si_shutr(si);
+ si_shutw(si);
si->ob->flags |= BF_WRITE_ERROR;
if (!si->err_type)
si->err_type = SI_ET_CONN_OTHER;
if (unlikely((s->req->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) {
s->req->cons->flags |= SI_FL_NOLINGER;
- s->req->cons->sock.shutw(s->req->cons);
+ si_shutw(s->req->cons);
}
if (unlikely((s->req->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) {
if (s->req->prod->flags & SI_FL_NOHALF)
s->req->prod->flags |= SI_FL_NOLINGER;
- s->req->prod->sock.shutr(s->req->prod);
+ si_shutr(s->req->prod);
}
buffer_check_timeouts(s->rep);
if (unlikely((s->rep->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) {
s->rep->cons->flags |= SI_FL_NOLINGER;
- s->rep->cons->sock.shutw(s->rep->cons);
+ si_shutw(s->rep->cons);
}
if (unlikely((s->rep->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) {
if (s->rep->prod->flags & SI_FL_NOHALF)
s->rep->prod->flags |= SI_FL_NOLINGER;
- s->rep->prod->sock.shutr(s->rep->prod);
+ si_shutr(s->rep->prod);
}
}
srv = target_srv(&s->target);
if (unlikely(s->si[0].flags & SI_FL_ERR)) {
if (s->si[0].state == SI_ST_EST || s->si[0].state == SI_ST_DIS) {
- s->si[0].sock.shutr(&s->si[0]);
- s->si[0].sock.shutw(&s->si[0]);
+ si_shutr(&s->si[0]);
+ si_shutw(&s->si[0]);
stream_int_report_error(&s->si[0]);
if (!(s->req->analysers) && !(s->rep->analysers)) {
s->be->be_counters.cli_aborts++;
if (unlikely(s->si[1].flags & SI_FL_ERR)) {
if (s->si[1].state == SI_ST_EST || s->si[1].state == SI_ST_DIS) {
- s->si[1].sock.shutr(&s->si[1]);
- s->si[1].sock.shutw(&s->si[1]);
+ si_shutr(&s->si[1]);
+ si_shutw(&s->si[1]);
stream_int_report_error(&s->si[1]);
s->be->be_counters.failed_resp++;
if (srv)
/* shutdown(write) pending */
if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_OUT_EMPTY)) == (BF_SHUTW_NOW|BF_OUT_EMPTY)))
- s->req->cons->sock.shutw(s->req->cons);
+ si_shutw(s->req->cons);
/* shutdown(write) done on server side, we must stop the client too */
if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW &&
if (unlikely((s->req->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) {
if (s->req->prod->flags & SI_FL_NOHALF)
s->req->prod->flags |= SI_FL_NOLINGER;
- s->req->prod->sock.shutr(s->req->prod);
+ si_shutr(s->req->prod);
}
/* it's possible that an upper layer has requested a connection setup or abort.
s->req->cons->state = SI_ST_REQ; /* new connection requested */
s->req->cons->conn_retries = s->be->conn_retries;
if (unlikely(s->req->cons->target.type == TARG_TYPE_APPLET &&
- !(s->req->cons->proto && s->req->cons->proto->connect))) {
+ !(si_ctrl(s->req->cons) && si_ctrl(s->req->cons)->connect))) {
s->req->cons->state = SI_ST_EST; /* connection established */
s->rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
s->req->wex = TICK_ETERNITY;
/* shutdown(write) pending */
if (unlikely((s->rep->flags & (BF_SHUTW|BF_OUT_EMPTY|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW)))
- s->rep->cons->sock.shutw(s->rep->cons);
+ si_shutw(s->rep->cons);
/* shutdown(write) done on the client side, we must stop the server too */
if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW) &&
if (unlikely((s->rep->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) {
if (s->rep->prod->flags & SI_FL_NOHALF)
s->rep->prod->flags |= SI_FL_NOLINGER;
- s->rep->prod->sock.shutr(s->rep->prod);
+ si_shutr(s->rep->prod);
}
if (s->req->prod->state == SI_ST_DIS || s->req->cons->state == SI_ST_DIS)
session_process_counters(s);
if (s->rep->cons->state == SI_ST_EST && s->rep->cons->target.type != TARG_TYPE_APPLET)
- s->rep->cons->sock.update(s->rep->cons);
+ si_update(s->rep->cons);
if (s->req->cons->state == SI_ST_EST && s->req->cons->target.type != TARG_TYPE_APPLET)
- s->req->cons->sock.update(s->req->cons);
+ si_update(s->req->cons);
s->req->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED);
s->rep->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED);
#include <proto/pipe.h>
#include <proto/protocols.h>
#include <proto/sock_raw.h>
+#include <proto/stream_interface.h>
#include <proto/task.h>
#include <types/global.h>
si->flags |= SI_FL_WAIT_ROOM;
EV_FD_CLR(fd, DIR_RD);
b->rex = TICK_ETERNITY;
- b->cons->sock.chk_snd(b->cons);
+ si_chk_snd(b->cons);
return 1;
}
(b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) {
int last_len = b->pipe ? b->pipe->data : 0;
- b->cons->sock.chk_snd(b->cons);
+ si_chk_snd(b->cons);
/* check if the consumer has freed some space */
if (!(b->flags & BF_FULL) &&
/* the producer might be waiting for more room to store data */
if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
(b->prod->flags & SI_FL_WAIT_ROOM)))
- b->prod->sock.chk_rcv(b->prod);
+ si_chk_rcv(b->prod);
/* we have to wake up if there is a special event or if we don't have
* any more data to forward and it's not planned to send any more.
return;
if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))
- si->sock.shutw(si);
+ si_shutw(si);
if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
si->flags |= SI_FL_WAIT_DATA;
old_flags = si->flags;
if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
(si->ob->prod->flags & SI_FL_WAIT_ROOM)))
- si->ob->prod->sock.chk_rcv(si->ob->prod);
+ si_chk_rcv(si->ob->prod);
if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
(si->ib->cons->flags & SI_FL_WAIT_DATA)) {
- si->ib->cons->sock.chk_snd(si->ib->cons);
+ si_chk_snd(si->ib->cons);
/* check if the consumer has freed some space */
if (!(si->ib->flags & BF_FULL))
si->flags &= ~SI_FL_WAIT_ROOM;
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner);
stream_interface_prepare(si, &stream_int_embedded);
- si->proto = NULL;
+ si->conn.ctrl = NULL;
set_target_applet(&si->target, app);
si->applet.state = 0;
si->release = app->release;
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
stream_interface_prepare(si, &stream_int_task);
- si->proto = NULL;
+ si->conn.ctrl = NULL;
clear_target(&si->target);
si->release = NULL;
si->flags |= SI_FL_WAIT_DATA;