assert(clientInfo);
assert(clientInfo->hasQueue());
assert(clientInfo->hasQueue(queue));
- assert(!clientInfo->selectWaiting);
assert(clientInfo->eventWaiting);
clientInfo->eventWaiting = false;
do {
- // check that the head descriptor is still relevant
- const int head = clientInfo->quotaPeekFd();
- Comm::IoCallback *ccb = COMMIO_FD_WRITECB(head);
+ clientInfo->writeOrDequeue();
+ if (clientInfo->selectWaiting)
+ return;
+ } while (clientInfo->hasQueue());
- if (fd_table[head].clientInfo == clientInfo &&
- clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
- !fd_table[head].closing()) {
+ debugs(77, 3, "emptied queue");
+}
+
+void
+ClientInfo::writeOrDequeue()
+{
+ assert(!selectWaiting);
+ const auto head = quotaPeekFd();
+ const auto &headFde = fd_table[head];
+ CallBack(headFde.codeContext, [&] {
+ const auto ccb = COMMIO_FD_WRITECB(head);
+ // check that the head descriptor is still relevant
+ if (headFde.clientInfo == this &&
+ quotaPeekReserv() == ccb->quotaQueueReserv &&
+ !headFde.closing()) {
// wait for the head descriptor to become ready for writing
Comm::SetSelect(head, COMM_SELECT_WRITE, Comm::HandleWrite, ccb, 0);
- clientInfo->selectWaiting = true;
- return;
+ selectWaiting = true;
+ } else {
+ quotaDequeue(); // remove the no longer relevant descriptor
}
-
- clientInfo->quotaDequeue(); // remove the no longer relevant descriptor
- // and continue looking for a relevant one
- } while (clientInfo->hasQueue());
-
- debugs(77,3, HERE << "emptied queue");
+ });
}
bool
debugs(77,5, "clt" << (const char*)clientInfo->key <<
": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
fds.push_back(fd);
+ fd_table[fd].codeContext = CodeContext::Current();
return ++ins;
}