time_t intervalStart;
int timer;
- virNetMessagePtr response;
- int responseTimer;
-
virKeepAliveSendFunc sendCB;
virKeepAliveDeadFunc deadCB;
virKeepAliveFreeFunc freeCB;
static virNetMessagePtr
-virKeepAliveMessage(int proc)
+virKeepAliveMessage(virKeepAlivePtr ka, int proc)
{
virNetMessagePtr msg;
+ const char *procstr = NULL;
- if (!(msg = virNetMessageNew(false)))
+ switch (proc) {
+ case KEEPALIVE_PROC_PING:
+ procstr = "request";
+ break;
+ case KEEPALIVE_PROC_PONG:
+ procstr = "response";
+ break;
+ default:
+ VIR_WARN("Refusing to send unknown keepalive message: %d", proc);
return NULL;
+ }
+
+ if (!(msg = virNetMessageNew(false)))
+ goto error;
msg->header.prog = KEEPALIVE_PROGRAM;
msg->header.vers = KEEPALIVE_PROTOCOL_VERSION;
if (virNetMessageEncodeHeader(msg) < 0 ||
virNetMessageEncodePayloadEmpty(msg) < 0) {
virNetMessageFree(msg);
- return NULL;
- }
-
- return msg;
-}
-
-
-static void
-virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
-{
- const char *proc = NULL;
- void *client = ka->client;
- virKeepAliveSendFunc sendCB = ka->sendCB;
-
- switch (msg->header.proc) {
- case KEEPALIVE_PROC_PING:
- proc = "request";
- break;
- case KEEPALIVE_PROC_PONG:
- proc = "response";
- break;
- }
-
- if (!proc) {
- VIR_WARN("Refusing to send unknown keepalive message: %d",
- msg->header.proc);
- virNetMessageFree(msg);
- return;
+ goto error;
}
- VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
+ VIR_DEBUG("Sending keepalive %s to client %p", procstr, ka->client);
PROBE(RPC_KEEPALIVE_SEND,
"ka=%p client=%p prog=%d vers=%d proc=%d",
ka, ka->client, msg->header.prog, msg->header.vers, msg->header.proc);
- ka->refs++;
- virKeepAliveUnlock(ka);
-
- if (sendCB(client, msg) < 0) {
- VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
- virNetMessageFree(msg);
- }
-
- virKeepAliveLock(ka);
- ka->refs--;
-}
-
-
-static void
-virKeepAliveScheduleResponse(virKeepAlivePtr ka)
-{
- if (ka->responseTimer == -1)
- return;
-
- VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
-
- if (!ka->response &&
- !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
- VIR_WARN("Failed to generate keepalive response");
- return;
- }
+ return msg;
- virEventUpdateTimeout(ka->responseTimer, 0);
+error:
+ VIR_WARN("Failed to generate keepalive %s", procstr);
+ VIR_FREE(msg);
+ return NULL;
}
} else {
ka->countToDeath--;
ka->intervalStart = now;
- *msg = virKeepAliveMessage(KEEPALIVE_PROC_PING);
+ *msg = virKeepAliveMessage(ka, KEEPALIVE_PROC_PING);
virEventUpdateTimeout(ka->timer, ka->interval * 1000);
return false;
}
virKeepAlivePtr ka = opaque;
virNetMessagePtr msg = NULL;
bool dead;
+ void *client;
virKeepAliveLock(ka);
+ client = ka->client;
dead = virKeepAliveTimerInternal(ka, &msg);
- if (dead) {
- virKeepAliveDeadFunc deadCB = ka->deadCB;
- void *client = ka->client;
-
- ka->refs++;
- virKeepAliveUnlock(ka);
- deadCB(client);
- virKeepAliveLock(ka);
- ka->refs--;
- } else if (msg) {
- virKeepAliveSend(ka, msg);
- }
+ if (!dead && !msg)
+ goto cleanup;
+ ka->refs++;
virKeepAliveUnlock(ka);
-}
-
-
-static void
-virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
- virKeepAlivePtr ka = opaque;
- virNetMessagePtr msg;
-
- virKeepAliveLock(ka);
- VIR_DEBUG("ka=%p, client=%p, response=%p",
- ka, ka->client, ka->response);
-
- if (ka->response) {
- msg = ka->response;
- ka->response = NULL;
- virKeepAliveSend(ka, msg);
+ if (dead) {
+ ka->deadCB(client);
+ } else if (ka->sendCB(client, msg) < 0) {
+ VIR_WARN("Failed to send keepalive request to client %p", client);
+ virNetMessageFree(msg);
}
- virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
+ virKeepAliveLock(ka);
+ ka->refs--;
+cleanup:
virKeepAliveUnlock(ka);
}
ka->deadCB = deadCB;
ka->freeCB = freeCB;
- ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
- ka, virKeepAliveTimerFree);
- if (ka->responseTimer < 0) {
- virKeepAliveFree(ka);
- return NULL;
- }
- /* the timer now has a reference to ka */
- ka->refs++;
-
PROBE(RPC_KEEPALIVE_NEW,
"ka=%p client=%p refs=%d",
ka, ka->client, ka->refs);
static void
-virKeepAliveStopInternal(virKeepAlivePtr ka, bool all)
+virKeepAliveStopInternal(virKeepAlivePtr ka, bool all ATTRIBUTE_UNUSED)
{
virKeepAliveLock(ka);
ka->timer = -1;
}
- if (all) {
- if (ka->responseTimer > 0) {
- virEventRemoveTimeout(ka->responseTimer);
- ka->responseTimer = -1;
- }
-
- virNetMessageFree(ka->response);
- ka->response = NULL;
- }
-
virKeepAliveUnlock(ka);
}
bool
virKeepAliveCheckMessage(virKeepAlivePtr ka,
- virNetMessagePtr msg)
+ virNetMessagePtr msg,
+ virNetMessagePtr *response)
{
bool ret = false;
VIR_DEBUG("ka=%p, client=%p, msg=%p",
ka, ka ? ka->client : "(null)", msg);
+ *response = NULL;
if (!ka)
return false;
switch (msg->header.proc) {
case KEEPALIVE_PROC_PING:
VIR_DEBUG("Got keepalive request from client %p", ka->client);
- virKeepAliveScheduleResponse(ka);
+ *response = virKeepAliveMessage(ka, KEEPALIVE_PROC_PONG);
break;
case KEEPALIVE_PROC_PONG:
bool virKeepAliveTrigger(virKeepAlivePtr ka,
virNetMessagePtr *msg);
bool virKeepAliveCheckMessage(virKeepAlivePtr ka,
- virNetMessagePtr msg);
+ virNetMessagePtr msg,
+ virNetMessagePtr *response);
#endif /* __VIR_KEEPALIVE_H__ */
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall);
+static int virNetClientQueueNonBlocking(virNetClientPtr client,
+ virNetMessagePtr msg);
static void virNetClientLock(virNetClientPtr client)
static int
virNetClientCallDispatch(virNetClientPtr client)
{
+ virNetMessagePtr response = NULL;
+
PROBE(RPC_CLIENT_MSG_RX,
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
client, client->msg.bufferLength,
client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
client->msg.header.type, client->msg.header.status, client->msg.header.serial);
- if (virKeepAliveCheckMessage(client->keepalive, &client->msg))
+ if (virKeepAliveCheckMessage(client->keepalive, &client->msg, &response)) {
+ if (response &&
+ virNetClientQueueNonBlocking(client, response) < 0) {
+ VIR_WARN("Could not queue keepalive response");
+ virNetMessageFree(response);
+ }
return 0;
+ }
switch (client->msg.header.type) {
case VIR_NET_REPLY: /* Normal RPC replies */
virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone,
NULL);
+ virNetClientIOUpdateCallback(client, true);
+
done:
virNetClientUnlock(client);
}
}
+static int
+virNetClientQueueNonBlocking(virNetClientPtr client,
+ virNetMessagePtr msg)
+{
+ virNetClientCallPtr call;
+
+ PROBE(RPC_CLIENT_MSG_TX_QUEUE,
+ "client=%p len=%zu prog=%u vers=%u proc=%u"
+ " type=%u status=%u serial=%u",
+ client, msg->bufferLength,
+ msg->header.prog, msg->header.vers, msg->header.proc,
+ msg->header.type, msg->header.status, msg->header.serial);
+
+ if (!(call = virNetClientCallNew(msg, false, true)))
+ return -1;
+
+ virNetClientCallQueue(&client->waitDispatch, call);
+ return 0;
+}
+
+
/*
* Returns 1 if the call was queued and will be completed later (only
* for nonBlock==true), 0 if the call was completed and -1 on error.
virNetServerClientCloseFunc privateDataCloseFunc;
virKeepAlivePtr keepalive;
- int keepaliveFilter;
};
static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
static void virNetServerClientDispatchRead(virNetServerClientPtr client);
+static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
+ virNetMessagePtr msg);
static void virNetServerClientLock(virNetServerClientPtr client)
{
client->readonly = readonly;
client->tlsCtxt = tls;
client->nrequests_max = nrequests_max;
- client->keepaliveFilter = -1;
client->sockTimer = virEventAddTimeout(-1, virNetServerClientSockTimerFunc,
client, NULL);
return;
}
- if (client->keepaliveFilter >= 0)
- virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter);
-
if (client->keepalive) {
virKeepAliveStop(client->keepalive);
ka = client->keepalive;
} else {
/* Grab the completed message */
virNetMessagePtr msg = client->rx;
+ virNetMessagePtr response = NULL;
virNetServerClientFilterPtr filter;
size_t i;
msg->header.prog, msg->header.vers, msg->header.proc,
msg->header.type, msg->header.status, msg->header.serial);
+ if (virKeepAliveCheckMessage(client->keepalive, msg, &response)) {
+ virNetMessageFree(msg);
+ client->nrequests--;
+ msg = NULL;
+
+ if (response &&
+ virNetServerClientSendMessageLocked(client, response) < 0)
+ virNetMessageFree(response);
+ }
+
/* Maybe send off for queue against a filter */
- filter = client->filters;
- while (filter) {
- int ret = filter->func(client, msg, filter->opaque);
- if (ret < 0) {
- virNetMessageFree(msg);
- msg = NULL;
- if (ret < 0)
- client->wantClose = true;
- break;
- }
- if (ret > 0) {
- msg = NULL;
- break;
- }
+ if (msg) {
+ filter = client->filters;
+ while (filter) {
+ int ret = filter->func(client, msg, filter->opaque);
+ if (ret < 0) {
+ virNetMessageFree(msg);
+ msg = NULL;
+ if (ret < 0)
+ client->wantClose = true;
+ break;
+ }
+ if (ret > 0) {
+ msg = NULL;
+ break;
+ }
- filter = filter->next;
+ filter = filter->next;
+ }
}
/* Send off to for normal dispatch to workers */
}
-int virNetServerClientSendMessage(virNetServerClientPtr client,
- virNetMessagePtr msg)
+static int
+virNetServerClientSendMessageLocked(virNetServerClientPtr client,
+ virNetMessagePtr msg)
{
int ret = -1;
VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
msg, msg->header.proc,
msg->bufferLength, msg->bufferOffset);
- virNetServerClientLock(client);
-
msg->donefds = 0;
if (client->sock && !client->wantClose) {
PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE,
ret = 0;
}
+ return ret;
+}
+
+int virNetServerClientSendMessage(virNetServerClientPtr client,
+ virNetMessagePtr msg)
+{
+ int ret;
+
+ virNetServerClientLock(client);
+ ret = virNetServerClientSendMessageLocked(client, msg);
virNetServerClientUnlock(client);
return ret;
virNetServerClientFree(opaque);
}
-static int
-virNetServerClientKeepAliveFilter(virNetServerClientPtr client,
- virNetMessagePtr msg,
- void *opaque ATTRIBUTE_UNUSED)
-{
- if (virKeepAliveCheckMessage(client->keepalive, msg)) {
- virNetMessageFree(msg);
- client->nrequests--;
- return 1;
- }
-
- return 0;
-}
-
int
virNetServerClientInitKeepAlive(virNetServerClientPtr client,
int interval,
/* keepalive object has a reference to client */
client->refs++;
- client->keepaliveFilter =
- virNetServerClientAddFilterLocked(client,
- virNetServerClientKeepAliveFilter,
- NULL);
- if (client->keepaliveFilter < 0)
- goto cleanup;
-
client->keepalive = ka;
ka = NULL;