{
cs->obj_type = OBJ_TYPE_CS;
cs->flags = CS_FL_NONE;
+ LIST_INIT(&cs->wait_list.list);
LIST_INIT(&cs->send_wait_list);
cs->conn = conn;
}
/* Releases a conn_stream previously allocated by cs_new() */
static inline void cs_free(struct conn_stream *cs)
{
+ if (cs->wait_list.task)
+ tasklet_free(cs->wait_list.task);
pool_free(pool_head_connstream, cs);
}
if (!likely(cs))
return NULL;
+ cs->wait_list.task = tasklet_new();
+ if (!likely(cs->wait_list.task)) {
+ cs_free(cs);
+ return NULL;
+ }
if (!conn) {
conn = conn_new();
if (!likely(conn)) {
struct mux_ops {
int (*init)(struct connection *conn); /* early initialization */
void (*recv)(struct connection *conn); /* mux-layer recv callback */
- void (*send)(struct connection *conn); /* mux-layer send callback */
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
*/
struct data_cb {
void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
- void (*send)(struct conn_stream *cs); /* data-layer send callback */
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
char name[8]; /* data layer name, zero-terminated */
enum obj_type obj_type; /* differentiates connection from applet context */
unsigned int flags; /* CS_FL_* */
struct connection *conn; /* xprt-level connection */
+ struct wait_list wait_list; /* We're in a wait list for send */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
void *data; /* pointer to upper layer's entity (eg: stream interface) */
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
static int tcpcheck_get_step_id(struct check *);
static char * tcpcheck_get_step_comment(struct check *, int);
static int tcpcheck_main(struct check *);
+static void __event_srv_chk_w(struct conn_stream *cs);
static struct pool_head *pool_head_email_alert = NULL;
static struct pool_head *pool_head_tcpcheck_rule = NULL;
* the connection acknowledgement. If the proxy requires L7 health-checks,
* it sends the request. In other cases, it calls set_server_check_status()
* to set check->status, check->duration and check->result.
+ */
+static struct task *event_srv_chk_w(struct task *task, void *ctx, unsigned short state)
+{
+ struct conn_stream *cs = ctx;
+ struct check __maybe_unused *check = cs->data;
+
+ HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
+ __event_srv_chk_w(cs);
+ HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
+ return NULL;
+}
+
+/* same as above but protected by the server lock.
*
* Please do NOT place any return statement in this function and only leave
- * via the out_unlock label.
+ * via the out label. NOTE THAT THIS FUNCTION DOESN'T LOCK, YOU PROBABLY WANT
+ * TO USE event_srv_chk_w() instead.
*/
-static void event_srv_chk_w(struct conn_stream *cs)
+static void __event_srv_chk_w(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
- HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
if (unlikely(check->result == CHK_RES_FAILED))
goto out_wakeup;
- if (conn->flags & CO_FL_HANDSHAKE)
- goto out_unlock;
+ if (conn->flags & CO_FL_HANDSHAKE) {
+ if (cs->wait_list.task->process != event_srv_chk_w) {
+ cs->wait_list.task->process = event_srv_chk_w;
+ cs->wait_list.task->context = cs;
+ }
+ LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list);
+ goto out;
+ }
if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0);
/* wake() will take care of calling tcpcheck_main() */
if (check->type == PR_O2_TCPCHK_CHK)
- goto out_unlock;
+ goto out;
if (b_data(&check->bo)) {
b_del(&check->bo, conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0));
b_realign_if_empty(&check->bo);
-
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
__cs_stop_both(cs);
goto out_wakeup;
}
- if (b_data(&check->bo))
- goto out_unlock;
+ if (b_data(&check->bo)) {
+ if (!cs->wait_list.task->process) {
+ cs->wait_list.task->process = event_srv_chk_w;
+ cs->wait_list.task->context = cs;
+ }
+ conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list);
+ goto out;
+ }
}
/* full request sent, we allow up to <timeout.check> if nonzero for a response */
task_wakeup(t, TASK_WOKEN_IO);
out_nowake:
__cs_stop_send(cs); /* nothing more to write */
- out_unlock:
- HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
+ out:
+ return;
}
/*
ret = tcpcheck_main(check);
cs = check->cs;
conn = cs_conn(cs);
- }
+ } else if (LIST_ISEMPTY(&cs->wait_list.list))
+ __event_srv_chk_w(cs);
if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
/* We may get error reports bypassing the I/O handlers, typically
struct data_cb check_conn_cb = {
.recv = event_srv_chk_r,
- .send = event_srv_chk_w,
.wake = wake_srv_chk,
.name = "CHCK",
};
{
struct connection *conn = fdtab[fd].owner;
unsigned int flags;
+ int can_send = 0;
if (unlikely(!conn)) {
activity[tid].conn_dead++;
* both of which will be detected below.
*/
flags = 0;
- conn->mux->send(conn);
+ can_send = LIST_ISEMPTY(&conn->send_wait_list);
while (!LIST_ISEMPTY(&conn->send_wait_list)) {
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
struct wait_list *, list);
* Note that the wake callback is allowed to release the connection and
* the fd (and return < 0 in this case).
*/
- if ((((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
+ if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
- (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED)) &&
+ (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) &&
conn->mux->wake(conn) < 0)
return;
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
+ struct wait_list wait_list; /* We're in a wait list, to send */
};
/* H2 stream state, in h2s->st */
};
static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state);
+static struct task *h2_send(struct task *t, void *ctx, unsigned short state);
/*****************************************************/
/* functions below are for dynamic buffer management */
t->expire = tick_add(now_ms, h2c->timeout);
}
+ h2c->wait_list.task = tasklet_new();
+ if (!h2c->wait_list.task)
+ goto fail;
+ h2c->wait_list.task->process = h2_send;
+ h2c->wait_list.task->context = conn;
+
h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size);
if (!h2c->ddht)
goto fail;
task_queue(t);
conn_xprt_want_recv(conn);
LIST_INIT(&h2c->send_wait_list);
+ LIST_INIT(&h2c->wait_list.list);
/* mux->wake will be called soon to complete the operation */
return 0;
fail:
if (t)
task_free(t);
+ if (h2c->wait_list.task)
+ tasklet_free(h2c->wait_list.task);
pool_free(pool_head_h2c, h2c);
return -1;
}
task_wakeup(h2c->task, TASK_WOKEN_OTHER);
h2c->task = NULL;
}
+ if (h2c->wait_list.task)
+ tasklet_free(h2c->wait_list.task);
pool_free(pool_head_h2c, h2c);
}
h2s->flags &= ~H2_SF_BLK_ANY;
if (h2s->cs) {
- h2s->cs->data_cb->send(h2s->cs);
h2s->cs->data_cb->wake(h2s->cs);
} else {
h2s_send_rst_stream(h2c, h2s);
h2s->flags &= ~H2_SF_BLK_ANY;
if (h2s->cs) {
- h2s->cs->data_cb->send(h2s->cs);
h2s->cs->data_cb->wake(h2s->cs);
} else {
h2s_send_rst_stream(h2c, h2s);
return;
}
-/* callback called on send event by the connection handler */
-static void h2_send(struct connection *conn)
+/* Try to send data if possible */
+static struct task *h2_send(struct task *t, void *ctx, unsigned short state)
{
+ struct connection *conn = ctx;
struct h2c *h2c = conn->mux_ctx;
int done;
if (conn->flags & CO_FL_ERROR)
- return;
+ return NULL;
if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
/* a handshake was requested */
- return;
+ return NULL;
}
/* This loop is quite simple : it tries to fill as much as it can from
}
}
+ /* We're done, no more to send */
+ if (!b_data(&h2c->mbuf))
+ return NULL;
+schedule:
+ if (LIST_ISEMPTY(&h2c->wait_list.list))
+ conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_list);
+ return NULL;
}
/* callback called on any event by the connection handler.
else
h2c->task->expire = TICK_ETERNITY;
}
+
+ h2_send(NULL, conn, 0);
return 0;
}
else if (LIST_ISEMPTY(&h2s->list)) {
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list);
- else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
- LIST_ADDQ(&h2s->h2c->send_list, &h2s->list);
}
return total;
const struct mux_ops h2_ops = {
.init = h2_init,
.recv = h2_recv,
- .send = h2_send,
.wake = h2_wake,
.update_poll = h2_update_poll,
.rcv_buf = h2_rcv_buf,
cs_update_mux_polling(cs);
}
-/* callback to be used by default for the pass-through mux. It simply calls the
- * data layer send() callback which must be set.
- */
-static void mux_pt_send(struct connection *conn)
-{
- struct conn_stream *cs = conn->mux_ctx;
-
- if (conn->flags & CO_FL_ERROR)
- cs->flags |= CS_FL_ERROR;
- cs->data_cb->send(cs);
- cs_update_mux_polling(cs);
-}
-
/*
* Attach a new stream to a connection
* (Used for outgoing connections)
const struct mux_ops mux_pt_ops = {
.init = mux_pt_init,
.recv = mux_pt_recv,
- .send = mux_pt_send,
.wake = mux_pt_wake,
.update_poll = mux_pt_update_poll,
.rcv_buf = mux_pt_rcv_buf,
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static void si_cs_recv_cb(struct conn_stream *cs);
-static void si_cs_send_cb(struct conn_stream *cs);
static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static void si_idle_conn_null_cb(struct conn_stream *cs);
+static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
struct data_cb si_conn_cb = {
.recv = si_cs_recv_cb,
- .send = si_cs_send_cb,
.wake = si_cs_wake_cb,
.name = "STRM",
};
struct data_cb si_idle_conn_cb = {
.recv = si_idle_conn_null_cb,
- .send = si_idle_conn_null_cb,
.wake = si_idle_conn_wake_cb,
.name = "IDLE",
};
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
+ /* If we have data to send, try it now */
+ if (!channel_is_empty(oc) && objt_cs(si->end))
+ si_cs_send(NULL, objt_cs(si->end), 0);
+
/* process consumer side */
if (channel_is_empty(oc)) {
struct connection *conn = objt_cs(si->end) ? objt_cs(si->end)->conn : NULL;
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
-static void si_cs_send(struct conn_stream *cs)
+static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state)
{
+ struct conn_stream *cs = ctx;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si);
int ret;
+ int did_send = 0;
+
+ /* We're already waiting to be able to send, give up */
+ if (!LIST_ISEMPTY(&cs->wait_list.list))
+ return NULL;
+
+ if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
+ return NULL;
+
+ if (conn->flags & CO_FL_HANDSHAKE) {
+ /* a handshake was requested */
+ /* Schedule ourself to be woken up once the handshake is done */
+ LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list);
+ return NULL;
+ }
+
+ /* we might have been called just after an asynchronous shutw */
+ if (si_oc(si)->flags & CF_SHUTW)
+ return NULL;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->mux->snd_pipe(cs, oc->pipe);
- if (ret > 0)
+ if (ret > 0) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
+ did_send = 1;
+ }
if (!oc->pipe->data) {
put_pipe(oc->pipe);
}
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
+ return NULL;
}
/* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer.
*/
if (!co_data(oc))
- return;
+ goto wake_others;
/* when we're here, we already know that there is no spliced
* data left, and that there are sendable buffered data.
ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
if (ret > 0) {
+ did_send = 1;
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
co_set_data(oc, co_data(oc) - ret);
*/
}
}
+ /* We couldn't send all of our data, let the mux know we'd like to send more */
+ if (co_data(oc)) {
+ if (!cs->wait_list.task->process) {
+ cs->wait_list.task->process = si_cs_send;
+ cs->wait_list.task->context = ctx;
+ }
+ conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list);
+ }
+wake_others:
+ /* Maybe somebody was waiting for this conn_stream, wake them */
+ if (did_send) {
+ while (!LIST_ISEMPTY(&cs->send_wait_list)) {
+ struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
+ struct wait_list *, list);
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ tasklet_wakeup(sw->task);
+ }
+ }
+ return NULL;
}
/* This function is designed to be called from within the stream handler to
__cs_want_send(cs);
- si_cs_send(cs);
+ si_cs_send(NULL, cs, 0);
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
__cs_stop_both(cs);
return;
}
-/*
- * This is the callback which is called by the connection layer to send data
- * from the buffer to the connection. It iterates over the transport layer's
- * snd_buf function.
- */
-static void si_cs_send_cb(struct conn_stream *cs)
-{
- struct connection *conn = cs->conn;
- struct stream_interface *si = cs->data;
-
- if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
-
- if (conn->flags & CO_FL_HANDSHAKE)
- /* a handshake was requested */
- return;
-
- /* we might have been called just after an asynchronous shutw */
- if (si_oc(si)->flags & CF_SHUTW)
- return;
-
- /* OK there are data waiting to be sent */
- si_cs_send(cs);
-
- /* OK all done */
- return;
-}
-
/*
* This function propagates a null read received on a socket-based connection.
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,