}
+static bool
+virNetClientIOUpdateEvents(virNetClientCallPtr call,
+ void *opaque)
+{
+ int *events = opaque;
+
+ if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
+ *events |= VIR_EVENT_HANDLE_WRITABLE;
+
+ return false;
+}
+
+
static void virNetClientIOUpdateCallback(virNetClientPtr client,
bool enableCallback)
{
int events = 0;
- if (enableCallback)
+
+ if (enableCallback) {
events |= VIR_EVENT_HANDLE_READABLE;
+ virNetClientCallMatchPredicate(client->waitDispatch,
+ virNetClientIOUpdateEvents,
+ &events);
+ }
virNetSocketUpdateIOCallback(client->sock, events);
}
goto done;
}
- if (virNetClientIOHandleInput(client) < 0) {
- VIR_WARN("Something went wrong during async message processing");
- virNetSocketRemoveIOCallback(sock);
+ if (events & VIR_EVENT_HANDLE_WRITABLE) {
+ if (virNetClientIOHandleOutput(client) < 0)
+ virNetSocketRemoveIOCallback(sock);
+ }
+
+ if (events & VIR_EVENT_HANDLE_READABLE) {
+ if (virNetClientIOHandleInput(client) < 0)
+ virNetSocketRemoveIOCallback(sock);
}
+ /* Remove completed calls or signal their threads. */
+ virNetClientCallRemovePredicate(&client->waitDispatch,
+ virNetClientIOEventLoopRemoveDone,
+ NULL);
done:
virNetClientUnlock(client);
}