/* public variables */
-extern volatile struct fdlist fd_cache;
-extern volatile struct fdlist fd_cache_local[MAX_THREADS];
-
extern volatile struct fdlist update_list;
extern unsigned long *polled_mask;
-extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
-
extern THREAD_LOCAL int *fd_updt; // FD updates list
extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
extern volatile int ha_used_fds; // Number of FDs we're currently using
-__decl_hathreads(extern HA_RWLOCK_T __attribute__((aligned(64))) fdcache_lock); /* global lock to protect fd_cache array */
-
/* Deletes an FD from the fdsets.
* The file descriptor is also closed.
*/
*/
void run_poller();
-/* Scan and process the cached events. This should be called right after
- * the poller.
- */
-void fd_process_cached_events();
-
void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
}
}
-/* Allocates a cache entry for a file descriptor if it does not yet have one.
- * This can be done at any time.
- */
-static inline void fd_alloc_cache_entry(const int fd)
-{
- _HA_ATOMIC_OR(&fd_cache_mask, fdtab[fd].thread_mask);
- if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
- fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache));
- else
- fd_add_to_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache));
-}
-
-/* Removes entry used by fd <fd> from the FD cache and replaces it with the
- * last one.
- * If the fd has no entry assigned, return immediately.
- */
-static inline void fd_release_cache_entry(const int fd)
-{
- if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
- fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, offsetof(struct fdtab, cache));
- else
- fd_rm_from_fd_list(&fd_cache, fd, offsetof(struct fdtab, cache));
-}
-
-/* This function automatically enables/disables caching for an entry depending
- * on its state. It is only called on state changes.
- */
-static inline void fd_update_cache(int fd)
-{
- /* only READY and ACTIVE states (the two with both flags set) require a cache entry */
- if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
- ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
- fd_alloc_cache_entry(fd);
- }
- else {
- fd_release_cache_entry(fd);
- }
-}
-
/*
* returns the FD's recv state (FD_EV_*)
*/
static inline void fd_stop_recv(int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing send events on fd <fd> */
static inline void fd_stop_send(int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing of events on fd <fd> for both directions. */
static inline void fd_stop_both(int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if ((old ^ new) & FD_EV_POLLED_RW)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
static inline void fd_cant_recv(const int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> may receive again without polling. */
static inline void fd_may_recv(const int fd)
{
- unsigned long locked;
-
/* marking ready never changes polled status */
if ((fdtab[fd].state & FD_EV_READY_R) ||
HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT))
return;
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable readiness when polled. This is useful to interrupt reading when it
static inline void fd_done_recv(const int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
static inline void fd_cant_send(const int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> may send again without polling (EAGAIN not detected). */
static inline void fd_may_send(const int fd)
{
- unsigned long locked;
-
/* marking ready never changes polled status */
if ((fdtab[fd].state & FD_EV_READY_W) ||
HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT))
return;
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to receive */
static inline void fd_want_recv(int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if (old & FD_EV_ACTIVE_R)
return;
- new = old | FD_EV_ACTIVE_R;
- if (!(new & FD_EV_READY_R))
- new |= FD_EV_POLLED_R;
+ new = old | FD_EV_ACTIVE_R | FD_EV_POLLED_R;
} while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)));
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to send */
static inline void fd_want_send(int fd)
{
unsigned char old, new;
- unsigned long locked;
old = fdtab[fd].state;
do {
if (old & FD_EV_ACTIVE_W)
return;
- new = old | FD_EV_ACTIVE_W;
- if (!(new & FD_EV_READY_W))
- new |= FD_EV_POLLED_W;
+ new = old | FD_EV_ACTIVE_W | FD_EV_POLLED_W;
} while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)));
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
-
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked)
- HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
- fd_update_cache(fd); /* need an update entry to change the state */
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Update events seen for FD <fd> and its state if needed. This should be called
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
fd_may_send(fd);
+
+ if (fdtab[fd].iocb)
+ fdtab[fd].iocb(fd);
}
/* Prepares <fd> for being polled */
*/
struct activity {
unsigned int loops; // complete loops in run_poll_loop()
- unsigned int wake_cache; // active fd_cache prevented poll() from sleeping
unsigned int wake_tasks; // active tasks prevented poll() from sleeping
unsigned int wake_signal; // pending signal prevented poll() from sleeping
unsigned int poll_exp; // number of times poll() sees an expired timeout (includes wake_*)
li = fdt.owner;
chunk_printf(&trash,
- " %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c] cnext=%d cprev=%d tmask=0x%lx umask=0x%lx owner=%p iocb=%p(%s)",
+ " %5d : st=0x%02x(R:%c%c%c W:%c%c%c) ev=0x%02x(%c%c%c%c%c) [%c%c] tmask=0x%lx umask=0x%lx owner=%p iocb=%p(%s)",
fd,
fdt.state,
(fdt.state & FD_EV_POLLED_R) ? 'P' : 'p',
(fdt.ev & FD_POLL_IN) ? 'I' : 'i',
fdt.linger_risk ? 'L' : 'l',
fdt.cloned ? 'C' : 'c',
- fdt.cache.next,
- fdt.cache.prev,
fdt.thread_mask, fdt.update_mask,
fdt.owner,
fdt.iocb,
chunk_appendf(&trash, "thread_id: %u (%u..%u)\n", tid + 1, 1, global.nbthread);
chunk_appendf(&trash, "date_now: %lu.%06lu\n", (long)now.tv_sec, (long)now.tv_usec);
chunk_appendf(&trash, "loops:"); SHOW_TOT(thr, activity[thr].loops);
- chunk_appendf(&trash, "wake_cache:"); SHOW_TOT(thr, activity[thr].wake_cache);
chunk_appendf(&trash, "wake_tasks:"); SHOW_TOT(thr, activity[thr].wake_tasks);
chunk_appendf(&trash, "wake_signal:"); SHOW_TOT(thr, activity[thr].wake_signal);
chunk_appendf(&trash, "poll_exp:"); SHOW_TOT(thr, activity[thr].poll_exp);
chunk_appendf(buf,
"%c%cThread %-2u: act=%d glob=%d wq=%d rq=%d tl=%d tlsz=%d rqsz=%d\n"
- " stuck=%d fdcache=%d prof=%d",
+ " stuck=%d prof=%d",
(thr == calling_tid) ? '*' : ' ', stuck ? '>' : ' ', thr + 1,
thread_has_tasks(),
!!(global_tasks_mask & thr_bit),
task_per_thread[thr].task_list_size,
task_per_thread[thr].rqueue_size,
stuck,
- !!(fd_cache_mask & thr_bit),
!!(task_profiling_mask & thr_bit));
chunk_appendf(buf,
tv_leaving_poll(wait_time, status);
thread_harmless_end();
+ if (sleeping_thread_mask & tid_bit)
+ _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
/* process polled events */
}
thread_harmless_now();
+ if (sleeping_thread_mask & tid_bit)
+ _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
/*
* Determine how long to wait for events to materialise on the port.
tv_leaving_poll(wait_time, status);
thread_harmless_end();
+ if (sleeping_thread_mask & tid_bit)
+ _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
for (count = 0; count < status; count++) {
unsigned int n = 0;
} while (!_HA_ATOMIC_CAS(&maxfd, &old_maxfd, new_maxfd));
thread_harmless_now();
+ if (sleeping_thread_mask & tid_bit)
+ _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
fd_nbupdt = 0;
tv_leaving_poll(delta_ms, status);
thread_harmless_end();
+ if (sleeping_thread_mask & tid_bit)
+ _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
if (status <= 0)
return;
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
- * This code implements an events cache for file descriptors. It remembers the
- * readiness of a file descriptor after a return from poll() and the fact that
- * an I/O attempt failed on EAGAIN. Events in the cache which are still marked
- * ready and active are processed just as if they were reported by poll().
- *
- * This serves multiple purposes. First, it significantly improves performance
- * by avoiding to subscribe to polling unless absolutely necessary, so most
- * events are processed without polling at all, especially send() which
- * benefits from the socket buffers. Second, it is the only way to support
- * edge-triggered pollers (eg: EPOLL_ET). And third, it enables I/O operations
- * that are backed by invisible buffers. For example, SSL is able to read a
- * whole socket buffer and not deliver it to the application buffer because
- * it's full. Unfortunately, it won't be reported by a poller anymore until
- * some new activity happens. The only way to call it again thus is to keep
- * this readiness information in the cache and to access it without polling
- * once the FD is enabled again.
- *
- * One interesting feature of the cache is that it maintains the principle
- * of speculative I/O introduced in haproxy 1.3 : the first time an event is
- * enabled, the FD is considered as ready so that the I/O attempt is performed
- * via the cache without polling. And the polling happens only when EAGAIN is
- * first met. This avoids polling for HTTP requests, especially when the
- * defer-accept mode is used. It also avoids polling for sending short data
- * such as requests to servers or short responses to clients.
- *
- * The cache consists in a list of active events and a list of updates.
- * Active events are events that are expected to come and that we must report
- * to the application until it asks to stop or asks to poll. Updates are new
- * requests for changing an FD state. Updates are the only way to create new
- * events. This is important because it means that the number of cached events
- * cannot increase between updates and will only grow one at a time while
- * processing updates. All updates must always be processed, though events
- * might be processed by small batches if required.
- *
* There is no direct link between the FD and the updates list. There is only a
* bit in the fdtab[] to indicate than a file descriptor is already present in
* the updates list. Once an fd is present in the updates list, it will have to
* is that unhandled events will still wake the poller up. Using an edge-
* triggered poller such as EPOLL_ET will solve this issue though.
*
- * Since we do not want to scan all the FD list to find cached I/O events,
- * we store them in a list consisting in a linear array holding only the FD
- * indexes right now. Note that a closed FD cannot exist in the cache, because
- * it is closed by fd_delete() which in turn calls fd_release_cache_entry()
- * which always removes it from the list.
- *
- * The FD array has to hold a back reference to the cache. This reference is
- * always valid unless the FD is not in the cache and is not updated, in which
- * case the reference points to index 0.
- *
* The event state for an FD, as found in fdtab[].state, is maintained for each
* direction. The state field is built this way, with R bits in the low nibble
* and W bits in the high nibble for ease of access and debugging :
struct poller cur_poller;
int nbpollers = 0;
-volatile struct fdlist fd_cache ; // FD events cache
-volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
volatile struct fdlist update_list; // Global update list
-unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
-
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
if (cur_poller.clo)
cur_poller.clo(fd);
- fd_release_cache_entry(fd);
fdtab[fd].state = 0;
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fd_dodelete(fd, 0);
}
-static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
-{
- int fd, old_fd, e;
- unsigned long locked;
-
- for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) {
- if (fd == -2) {
- fd = old_fd;
- continue;
- } else if (fd <= -3)
- fd = -fd - 4;
- if (fd == -1)
- break;
- old_fd = fd;
- if (!(fdtab[fd].thread_mask & tid_bit))
- continue;
- if (fdtab[fd].cache.next < -3)
- continue;
-
- _HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
- locked = atleast2(fdtab[fd].thread_mask);
- if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
- activity[tid].fd_lock++;
- continue;
- }
-
- e = fdtab[fd].state;
- fdtab[fd].ev &= FD_POLL_STICKY;
-
- if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
- fdtab[fd].ev |= FD_POLL_IN;
-
- if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
- fdtab[fd].ev |= FD_POLL_OUT;
-
- if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
- fdtab[fd].iocb(fd);
- }
- else {
- fd_release_cache_entry(fd);
- if (locked)
- HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
- }
- }
-}
-
-/* Scan and process the cached events. This should be called right after
- * the poller. The loop may cause new entries to be created, for example
- * if a listener causes an accept() to initiate a new incoming connection
- * wanting to attempt an recv().
- */
-void fd_process_cached_events()
-{
- _HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit);
- fdlist_process_cached_events(&fd_cache_local[tid]);
- fdlist_process_cached_events(&fd_cache);
-}
-
#if defined(USE_CLOSEFROM)
void my_closefrom(int start)
{
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
goto fail_info;
- fd_cache.first = fd_cache.last = -1;
update_list.first = update_list.last = -1;
for (p = 0; p < global.maxsock; p++) {
fdtab[p].cache.next = -3;
fdtab[p].update.next = -3;
}
- for (p = 0; p < global.nbthread; p++)
- fd_cache_local[p].first = fd_cache_local[p].last = -1;
do {
bp = NULL;
/* expire immediately if events are pending */
wake = 1;
- if (fd_cache_mask & tid_bit)
- activity[tid].wake_cache++;
- else if (thread_has_tasks())
+ if (thread_has_tasks())
activity[tid].wake_tasks++;
else if (signal_queue_len && tid == 0)
activity[tid].wake_signal++;
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, next, wake);
- if (sleeping_thread_mask & tid_bit)
- _HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
- fd_process_cached_events();
activity[tid].loops++;
}
#define H1C_F_CS_ERROR 0x00001000 /* connection must be closed ASAP because an error occurred */
#define H1C_F_CS_SHUTW_NOW 0x00002000 /* connection must be shut down for writes ASAP */
#define H1C_F_CS_SHUTDOWN 0x00004000 /* connection is shut down for read and writes */
-#define H1C_F_CS_WAIT_CONN 0x00008000 /* waiting for the connection establishment */
#define H1C_F_WAIT_NEXT_REQ 0x00010000 /* waiting for the next request to start, use keep-alive timeout */
#define H1C_F_UPG_H2C 0x00020000 /* set if an upgrade to h2 should be done */
t->expire = tick_add(now_ms, h1c->timeout);
}
- if (!(conn->flags & CO_FL_CONNECTED) || (conn->flags & CO_FL_HANDSHAKE))
- h1c->flags |= H1C_F_CS_WAIT_CONN;
-
/* Always Create a new H1S */
if (!h1s_create(h1c, conn->ctx, sess))
goto fail;
if (h1c->wait_event.events & SUB_RETRY_RECV)
return (b_data(&h1c->ibuf));
- if (!(conn->flags & CO_FL_ERROR) && h1c->flags & H1C_F_CS_WAIT_CONN) {
- conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
- return 0;
- }
-
if (!h1_recv_allowed(h1c)) {
rcvd = 1;
goto end;
if (conn->flags & CO_FL_ERROR)
return 0;
- if (h1c->flags & H1C_F_CS_WAIT_CONN) {
- if (!(h1c->wait_event.events & SUB_RETRY_SEND))
- conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event);
- return 0;
- }
-
if (!b_data(&h1c->obuf))
goto end;
if (!conn->ctx)
return -1;
- if (h1c->flags & H1C_F_CS_WAIT_CONN) {
- if (!(conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)) ||
- (!(conn->flags & CO_FL_ERROR) && (conn->flags & CO_FL_HANDSHAKE)))
- goto end;
- h1c->flags &= ~H1C_F_CS_WAIT_CONN;
- h1_wake_stream_for_send(h1s);
- }
-
if (!h1s) {
if (h1c->flags & (H1C_F_CS_ERROR|H1C_F_CS_SHUTDOWN) ||
conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH) ||
static void h1_reset(struct connection *conn)
{
- struct h1c *h1c = conn->ctx;
- /* Reset the flags, and let the mux know we're waiting for a connection */
- h1c->flags = H1C_F_CS_WAIT_CONN;
}
static int h1_wake(struct connection *conn)
{
struct wait_event *sw;
struct h1s *h1s = cs->ctx;
+ struct h1c *h1c = h1s->h1c;
if (!h1s)
return -1;
BUG_ON(h1s->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
sw->events |= SUB_RETRY_SEND;
h1s->send_wait = sw;
+ /*
+ * If the conn_stream attempt to subscribe, and the
+ * mux isn't subscribed to the connection, then it
+ * probably means the connection wasn't established
+ * yet, so we have to subscribe.
+ */
+ if (!(h1c->wait_event.events & SUB_RETRY_SEND))
+ h1c->conn->xprt->subscribe(h1c->conn,
+ h1c->conn->xprt_ctx,
+ SUB_RETRY_SEND,
+ &h1c->wait_event);
return 0;
default:
break;
return 0;
h1c = h1s->h1c;
- if (h1c->flags & H1C_F_CS_WAIT_CONN)
+
+ /* If we're not connected yet, or we're waiting for a handshake, stop
+ * now, as we don't want to remove everything from the channel buffer
+ * before we're sure we can send it.
+ */
+ if (!(h1c->conn->flags & CO_FL_CONNECTED) ||
+ (h1c->conn->flags & CO_FL_HANDSHAKE))
return 0;
while (count) {
if (l->options & LI_O_ACC_CIP)
cli_conn->flags |= CO_FL_ACCEPT_CIP;
- conn_xprt_want_recv(cli_conn);
if (conn_xprt_init(cli_conn) < 0)
goto out_free_conn;