bool expectReply;
bool nonBlock;
bool haveThread;
- bool sentSomeData;
virCond cond;
};
+static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
+ virNetClientCallPtr thiscall);
+
+
static void virNetClientLock(virNetClientPtr client)
{
virMutexLock(&client->lock);
virNetClientLock(client);
- /* If there is a thread polling for data on the socket, set wantClose flag
- * and wake the thread up or just immediately close the socket when no-one
- * is polling on it.
+ client->wantClose = true;
+
+ /* If there is a thread polling for data on the socket, wake the thread up
+ * otherwise try to pass the buck to a possibly waiting thread. If no
+ * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
+ * queue and close the client because we set client->wantClose.
*/
- if (client->waitDispatch) {
+ if (client->haveTheBuck) {
char ignore = 1;
size_t len = sizeof(ignore);
- client->wantClose = true;
if (safewrite(client->wakeupSendFD, &ignore, len) != len)
VIR_ERROR(_("failed to wake up polling thread"));
} else {
- virNetClientCloseLocked(client);
+ virNetClientIOEventLoopPassTheBuck(client, NULL);
}
virNetClientUnlock(client);
ret = virNetSocketWrite(client->sock,
thecall->msg->buffer + thecall->msg->bufferOffset,
thecall->msg->bufferLength - thecall->msg->bufferOffset);
- if (ret > 0 || virNetSocketHasPendingData(client->sock))
- thecall->sentSomeData = true;
if (ret <= 0)
return ret;
}
-static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
- void *opaque)
+static bool
+virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call,
+ void *opaque)
{
virNetClientCallPtr thiscall = opaque;
- if (call == thiscall)
- return false;
-
- if (!call->nonBlock)
- return false;
-
- if (call->sentSomeData) {
- /*
- * If some data has been sent we must keep it in the list,
- * but still wakeup any thread
- */
- if (call->haveThread) {
- VIR_DEBUG("Waking up sleep %p", call);
- virCondSignal(&call->cond);
- } else {
- VIR_DEBUG("Keeping unfinished call %p in the list", call);
- }
- return false;
- } else {
- /*
- * If no data has been sent, we can remove it from the list.
- * Wakup any thread, otherwise free the caller ourselves
- */
- if (call->haveThread) {
- VIR_DEBUG("Waking up sleep %p", call);
- virCondSignal(&call->cond);
- } else {
- VIR_DEBUG("Removing call %p", call);
- if (call->expectReply)
- VIR_WARN("Got a call expecting a reply but without a waiting thread");
- ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call->msg);
- VIR_FREE(call);
- }
+ if (call != thiscall && call->nonBlock && call->haveThread) {
+ VIR_DEBUG("Waking up sleep %p", call);
+ call->haveThread = false;
+ virCondSignal(&call->cond);
return true;
}
+
+ return false;
}
-static void
-virNetClientIOEventLoopRemoveAll(virNetClientPtr client,
- virNetClientCallPtr thiscall)
+static bool
+virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call,
+ void *opaque)
{
- if (!client->waitDispatch)
- return;
+ virNetClientCallPtr thiscall = opaque;
- if (client->waitDispatch == thiscall) {
- /* just pretend nothing was sent and the caller will free the call */
- thiscall->sentSomeData = false;
- } else {
- virNetClientCallPtr call = client->waitDispatch;
- virNetClientCallRemove(&client->waitDispatch, call);
- ignore_value(virCondDestroy(&call->cond));
- VIR_FREE(call->msg);
- VIR_FREE(call);
- }
+ if (call == thiscall)
+ return false;
+
+ VIR_DEBUG("Removing call %p", call);
+ ignore_value(virCondDestroy(&call->cond));
+ VIR_FREE(call->msg);
+ VIR_FREE(call);
+ return true;
}
-static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
+static void
+virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
+ virNetClientCallPtr thiscall)
{
VIR_DEBUG("Giving up the buck %p", thiscall);
virNetClientCallPtr tmp = client->waitDispatch;
VIR_DEBUG("No thread to pass the buck to");
if (client->wantClose) {
virNetClientCloseLocked(client);
- virNetClientIOEventLoopRemoveAll(client, thiscall);
+ virNetClientCallRemovePredicate(&client->waitDispatch,
+ virNetClientIOEventLoopRemoveAll,
+ thiscall);
}
}
-static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED)
+static bool
+virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call,
+ void *opaque ATTRIBUTE_UNUSED)
{
- return call->nonBlock;
+ return call->nonBlock && call->haveThread;
}
/*
if (virNetSocketHasCachedData(client->sock) || client->wantClose)
timeout = 0;
- /* If there are any non-blocking calls in the queue,
- * then we don't want to sleep in poll()
+ /* If there are any non-blocking calls with an associated thread
+ * in the queue, then we don't want to sleep in poll()
*/
if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock,
}
/* If we were woken up because a new non-blocking call was queued,
- * we need to re-poll to check if we can send it.
+ * we need to re-poll to check if we can send it. To be precise, we
+ * will re-poll even if a blocking call arrived when unhandled
+ * non-blocking calls are still in the queue. But this can't hurt.
*/
if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock,
NULL)) {
- VIR_DEBUG("New non-blocking call arrived; repolling");
+ VIR_DEBUG("The queue contains new non-blocking call(s);"
+ " repolling");
continue;
}
}
}
/* Iterate through waiting calls and if any are
- * complete, remove them from the dispatch list..
+ * complete, remove them from the dispatch list.
*/
virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone,
thiscall);
- /* Iterate through waiting calls and if any are
- * non-blocking, remove them from the dispatch list...
+ /* Iterate through waiting calls and wake up and detach threads
+ * attached to non-blocking calls.
*/
- virNetClientCallRemovePredicate(&client->waitDispatch,
- virNetClientIOEventLoopRemoveNonBlocking,
- thiscall);
+ virNetClientCallMatchPredicate(client->waitDispatch,
+ virNetClientIOEventLoopDetachNonBlocking,
+ thiscall);
/* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
return 2;
}
- /* We're not done, but we're non-blocking */
+ /* We're not done, but we're non-blocking; keep the call queued */
if (thiscall->nonBlock) {
+ thiscall->haveThread = false;
virNetClientIOEventLoopPassTheBuck(client, thiscall);
- if (thiscall->sentSomeData) {
- return 1;
- } else {
- virNetClientCallRemove(&client->waitDispatch, thiscall);
- return 0;
- }
+ return 1;
}
if (fds[0].revents & (POLLHUP | POLLERR)) {
}
}
-
error:
virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetClientIOEventLoopPassTheBuck(client, thiscall);
goto cleanup;
}
- /* If we're non-blocking, get outta here */
+ /* If we're non-blocking, we were either queued (and detached) or the
+ * call was not sent because of an error.
+ */
if (thiscall->nonBlock) {
- if (thiscall->sentSomeData)
+ if (!thiscall->haveThread)
rv = 1; /* In progress */
else
rv = 0; /* none at all */
/*
- * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * Returns 2 if fully sent, 1 if queued (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error
*/
static int virNetClientSendInternal(virNetClientPtr client,
ret = virNetClientIO(client, call);
- /* If partially sent, then the call is still on the dispatch queue */
- if (ret == 1) {
- call->haveThread = false;
- } else {
- ignore_value(virCondDestroy(&call->cond));
- }
+ /* If queued, the call will be finished and freed later by another thread;
+ * we're done. */
+ if (ret == 1)
+ return 1;
+
+ ignore_value(virCondDestroy(&call->cond));
cleanup:
- if (ret != 1)
- VIR_FREE(call);
+ VIR_FREE(call);
return ret;
}