* time by stopping consuming any incoming data
* off the socket....
*/
- char *incoming;
- size_t incomingOffset;
- size_t incomingLength;
+ virNetMessagePtr rx;
bool incomingEOF;
virNetClientStreamEventCallback cb;
if (!st->cb)
return;
- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
+ VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
- if (((st->incomingOffset || st->incomingEOF) &&
+ if (((st->rx || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer");
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
- (st->incomingOffset || st->incomingEOF))
+ (st->rx || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE;
if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
events |= VIR_STREAM_EVENT_WRITABLE;
- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
+ VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx);
if (events) {
virNetClientStreamEventCallback cb = st->cb;
void *cbOpaque = st->cbOpaque;
virNetClientStreamPtr st = obj;
virResetError(&st->err);
- VIR_FREE(st->incoming);
+ while (st->rx) {
+ virNetMessagePtr msg = st->rx;
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
+ }
virObjectUnref(st->prog);
}
int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virNetMessagePtr msg)
{
- int ret = -1;
- size_t need;
+ virNetMessagePtr tmp_msg;
+
+ VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
+
+ /* Unfortunately, we must allocate new message as the one we
+ * get in @msg is going to be cleared later in the process. */
+
+ if (!(tmp_msg = virNetMessageNew(false)))
+ return -1;
+
+ /* Copy header */
+ memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
+
+ /* Steal message buffer */
+ tmp_msg->buffer = msg->buffer;
+ tmp_msg->bufferLength = msg->bufferLength;
+ tmp_msg->bufferOffset = msg->bufferOffset;
+ msg->buffer = NULL;
+ msg->bufferLength = msg->bufferOffset = 0;
virObjectLock(st);
- need = msg->bufferLength - msg->bufferOffset;
- if (need) {
- size_t avail = st->incomingLength - st->incomingOffset;
- if (need > avail) {
- size_t extra = need - avail;
- if (VIR_REALLOC_N(st->incoming,
- st->incomingLength + extra) < 0) {
- VIR_DEBUG("Out of memory handling stream data");
- goto cleanup;
- }
- st->incomingLength += extra;
- }
- memcpy(st->incoming + st->incomingOffset,
- msg->buffer + msg->bufferOffset,
- msg->bufferLength - msg->bufferOffset);
- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
- } else {
- st->incomingEOF = true;
- }
+ virNetMessageQueuePush(&st->rx, tmp_msg);
- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
- st->incomingOffset, st->incomingLength,
- st->incomingEOF);
virNetClientStreamEventTimerUpdate(st);
- ret = 0;
-
- cleanup:
virObjectUnlock(st);
- return ret;
+ return 0;
}
bool nonblock)
{
int rv = -1;
+ size_t want;
+
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock);
virObjectLock(st);
- if (!st->incomingOffset && !st->incomingEOF) {
+ if (!st->rx && !st->incomingEOF) {
virNetMessagePtr msg;
int ret;
goto cleanup;
}
- VIR_DEBUG("After IO %zu", st->incomingOffset);
- if (st->incomingOffset) {
- int want = st->incomingOffset;
- if (want > nbytes)
- want = nbytes;
- memcpy(data, st->incoming, want);
- if (want < st->incomingOffset) {
- memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
- st->incomingOffset -= want;
- } else {
- VIR_FREE(st->incoming);
- st->incomingOffset = st->incomingLength = 0;
+ VIR_DEBUG("After IO rx=%p", st->rx);
+ want = nbytes;
+ while (want && st->rx) {
+ virNetMessagePtr msg = st->rx;
+ size_t len = want;
+
+ if (len > msg->bufferLength - msg->bufferOffset)
+ len = msg->bufferLength - msg->bufferOffset;
+
+ if (!len)
+ break;
+
+ memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
+ want -= len;
+ msg->bufferOffset += len;
+
+ if (msg->bufferOffset == msg->bufferLength) {
+ virNetMessageQueueServe(&st->rx);
+ virNetMessageFree(msg);
}
- rv = want;
- } else {
- rv = 0;
}
+ rv = nbytes - want;
virNetClientStreamEventTimerUpdate(st);