* connection is not really dequeued. It will be done during process_stream().
* This function takes all the required locks for the operation. The pendconn
* must be valid, though it doesn't matter if it was already unlinked. Prefer
- * pendconn_cond_unlink() to first check <p>.
+ * pendconn_cond_unlink() to first check <p>. It also forces a serialization
+ * on p->del_lock to make sure another thread currently waking it up finishes
+ * first.
*/
void pendconn_unlink(struct pendconn *p)
{
oldidx = _HA_ATOMIC_LOAD(&p->queue->idx);
HA_SPIN_LOCK(QUEUE_LOCK, &q->lock);
+ HA_SPIN_LOCK(QUEUE_LOCK, &p->del_lock);
+
if (p->node.node.leaf_p) {
eb32_delete(&p->node);
done = 1;
}
+
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &p->del_lock);
HA_SPIN_UNLOCK(QUEUE_LOCK, &q->lock);
if (done) {
*
* The proxy's queue will be consulted only if px_ok is non-zero.
*
- * This function must only be called if the server queue _AND_ the proxy queue
- * are locked (if px_ok is set). Today it is only called by process_srv_queue.
+ * This function must only be called if the server queue is locked _AND_ the
+ * proxy queue is not. Today it is only called by process_srv_queue.
* When a pending connection is dequeued, this function returns 1 if a pendconn
* is dequeued, otherwise 0.
*/
goto use_p;
use_pp:
- /* Let's switch from the server pendconn to the proxy pendconn */
+ /* we'd like to release the proxy lock ASAP to let other threads
+ * work with other servers. But for this we must first hold the
+ * pendconn alive to prevent a removal from its owning stream.
+ */
+ HA_SPIN_LOCK(QUEUE_LOCK, &pp->del_lock);
+
+ /* now the element won't go, we can release the proxy */
__pendconn_unlink_prx(pp);
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+
+ pp->strm_flags |= SF_ASSIGNED;
+ pp->target = srv;
+ stream_add_srv_conn(pp->strm, srv);
+
+ /* we must wake the task up before releasing the lock as it's the only
+ * way to make sure the task still exists. The pendconn cannot vanish
+ * under us since the task will need to take the lock anyway and to wait
+ * if it wakes up on a different thread.
+ */
+ task_wakeup(pp->strm->task, TASK_WOKEN_RES);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &pp->del_lock);
+
_HA_ATOMIC_DEC(&px->queue.length);
_HA_ATOMIC_INC(&px->queue.idx);
- p = pp;
- goto unlinked;
+ return 1;
+
use_p:
+ /* we don't need the px queue lock anymore, we have the server's lock */
if (pp)
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
- __pendconn_unlink_srv(p);
- _HA_ATOMIC_DEC(&srv->queue.length);
- _HA_ATOMIC_INC(&srv->queue.idx);
- unlinked:
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+
p->strm_flags |= SF_ASSIGNED;
p->target = srv;
-
stream_add_srv_conn(p->strm, srv);
+ /* we must wake the task up before releasing the lock as it's the only
+ * way to make sure the task still exists. The pendconn cannot vanish
+ * under us since the task will need to take the lock anyway and to wait
+ * if it wakes up on a different thread.
+ */
task_wakeup(p->strm->task, TASK_WOKEN_RES);
+ __pendconn_unlink_srv(p);
+ _HA_ATOMIC_DEC(&srv->queue.length);
+ _HA_ATOMIC_INC(&srv->queue.idx);
return 1;
}
p->node.key = MAKE_KEY(strm->priority_class, strm->priority_offset);
p->strm = strm;
p->strm_flags = strm->flags;
+ HA_SPIN_INIT(&p->del_lock);
strm->pend_pos = p;
px = strm->be;
is_unlinked = !p->node.node.leaf_p;
pendconn_queue_unlock(p);
+ /* serialize to make sure the element was finished processing */
+ HA_SPIN_LOCK(QUEUE_LOCK, &p->del_lock);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &p->del_lock);
+
if (!is_unlinked)
return 1;