/* Destroy the task attached to this applet */
task_destroy(spoe_appctx->task);
- /* Notify all waiting streams */
+ /* Report an error to all streams in the appctx waiting queue */
list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
LIST_DELETE(&ctx->list);
LIST_INIT(&ctx->list);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
- /* If the applet was processing a fragmented frame, notify the
- * corresponding stream. */
+ /* If the applet was processing a fragmented frame, report an error to
+ * the corresponding stream. */
if (spoe_appctx->frag_ctx.ctx) {
ctx = spoe_appctx->frag_ctx.ctx;
ctx->spoe_appctx = NULL;
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
- if (!LIST_ISEMPTY(&agent->rt[tid].waiting_queue)) {
+ if (!LIST_ISEMPTY(&agent->rt[tid].applets)) {
+ /* If there are still some running applets, remove reference on
+ * the current one from streams in the async waiting queue. In
+ * async mode, the ACK may be received from another appctx.
+ */
list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
if (ctx->spoe_appctx == spoe_appctx)
ctx->spoe_appctx = NULL;
goto end;
}
else {
- /* It is the last running applet and the sending and waiting
- * queues are not empty. Try to start a new one if HAproxy is
- * not stopping.
+ /* It is the last running applet and the sending and async
+ * waiting queues are not empty. So try to start a new applet if
+ * HAproxy is not stopping. On success, we remove reference on
+ * the current appctx from streams in the async waiting queue.
+ * In async mode, the ACK may be received from another appctx.
*/
if (!stopping &&
(!LIST_ISEMPTY(&agent->rt[tid].sending_queue) || !LIST_ISEMPTY(&agent->rt[tid].waiting_queue)) &&
- spoe_create_appctx(agent->spoe_conf))
+ spoe_create_appctx(agent->spoe_conf)) {
+ list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
+ if (ctx->spoe_appctx == spoe_appctx)
+ ctx->spoe_appctx = NULL;
+ }
goto end;
+ }
- /* otherwise, notify all waiting streams */
+ /* Otherwise, report an error to all streams in the sending and
+ * async waiting queues.
+ */
list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
LIST_DELETE(&ctx->list);
LIST_INIT(&ctx->list);