mirror of
https://gitlab.com/libvirt/libvirt.git
synced 2025-02-09 13:57:27 +03:00
Handle incoming data streams in libvirtd
* daemon/stream.c: Handle incoming stream data packets, queuing until stream becomes writable. Handle stream completion handshake * po/POTFILES.in: Add daemon/stream.c
This commit is contained in:
parent
11573f3ec1
commit
d790a66d6b
305
daemon/stream.c
305
daemon/stream.c
@ -28,6 +28,93 @@
|
|||||||
#include "dispatch.h"
|
#include "dispatch.h"
|
||||||
#include "logging.h"
|
#include "logging.h"
|
||||||
|
|
||||||
|
static int
|
||||||
|
remoteStreamHandleWrite(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream);
|
||||||
|
static int
|
||||||
|
remoteStreamHandleFinish(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream,
|
||||||
|
struct qemud_client_message *msg);
|
||||||
|
static int
|
||||||
|
remoteStreamHandleAbort(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream,
|
||||||
|
struct qemud_client_message *msg);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
remoteStreamUpdateEvents(struct qemud_client_stream *stream)
|
||||||
|
{
|
||||||
|
int newEvents = 0;
|
||||||
|
if (stream->rx)
|
||||||
|
newEvents |= VIR_STREAM_EVENT_WRITABLE;
|
||||||
|
|
||||||
|
virStreamEventUpdateCallback(stream->st, newEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Callback that gets invoked when a stream becomes writable/readable
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
remoteStreamEvent(virStreamPtr st, int events, void *opaque)
|
||||||
|
{
|
||||||
|
struct qemud_client *client = opaque;
|
||||||
|
struct qemud_client_stream *stream;
|
||||||
|
|
||||||
|
/* XXX sub-optimal - we really should be taking the server lock
|
||||||
|
* first, but we have no handle to the server object
|
||||||
|
* We're lucky to get away with it for now, due to this callback
|
||||||
|
* executing in the main thread, but this should really be fixed
|
||||||
|
*/
|
||||||
|
virMutexLock(&client->lock);
|
||||||
|
|
||||||
|
stream = remoteFindClientStream(client, st);
|
||||||
|
|
||||||
|
if (!stream) {
|
||||||
|
VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st);
|
||||||
|
virStreamEventRemoveCallback(st);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
DEBUG("st=%p events=%d", st, events);
|
||||||
|
|
||||||
|
if (events & VIR_STREAM_EVENT_WRITABLE) {
|
||||||
|
if (remoteStreamHandleWrite(client, stream) < 0) {
|
||||||
|
remoteRemoveClientStream(client, stream);
|
||||||
|
qemudDispatchClientFailure(client);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!stream->closed &&
|
||||||
|
(events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
|
||||||
|
int ret;
|
||||||
|
remote_error rerr;
|
||||||
|
memset(&rerr, 0, sizeof rerr);
|
||||||
|
stream->closed = 1;
|
||||||
|
virStreamAbort(stream->st);
|
||||||
|
if (events & VIR_STREAM_EVENT_HANGUP)
|
||||||
|
remoteDispatchFormatError(&rerr, "%s", _("stream had unexpected termination"));
|
||||||
|
else
|
||||||
|
remoteDispatchFormatError(&rerr, "%s", _("stream had I/O failure"));
|
||||||
|
ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial);
|
||||||
|
remoteRemoveClientStream(client, stream);
|
||||||
|
if (ret < 0)
|
||||||
|
qemudDispatchClientFailure(client);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream->closed) {
|
||||||
|
remoteRemoveClientStream(client, stream);
|
||||||
|
} else {
|
||||||
|
remoteStreamUpdateEvents(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
virMutexUnlock(&client->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @client: a locked client object
|
* @client: a locked client object
|
||||||
@ -38,10 +125,54 @@
|
|||||||
* -1 on fatal client error
|
* -1 on fatal client error
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED,
|
remoteStreamFilter(struct qemud_client *client,
|
||||||
struct qemud_client_message *msg ATTRIBUTE_UNUSED,
|
struct qemud_client_message *msg, void *opaque)
|
||||||
void *opaque ATTRIBUTE_UNUSED)
|
|
||||||
{
|
{
|
||||||
|
struct qemud_client_stream *stream = opaque;
|
||||||
|
|
||||||
|
if (msg->hdr.serial == stream->serial &&
|
||||||
|
msg->hdr.proc == stream->procedure &&
|
||||||
|
msg->hdr.type == REMOTE_STREAM) {
|
||||||
|
DEBUG("Incoming rx=%p serial=%d proc=%d status=%d",
|
||||||
|
stream->rx, msg->hdr.proc, msg->hdr.serial, msg->hdr.status);
|
||||||
|
|
||||||
|
/* If there are queued packets, we need to queue all further
|
||||||
|
* messages, since they must be processed strictly in order.
|
||||||
|
* If there are no queued packets, then OK/ERROR messages
|
||||||
|
* should be processed immediately. Data packets are still
|
||||||
|
* queued to only be processed when the stream is marked as
|
||||||
|
* writable.
|
||||||
|
*/
|
||||||
|
if (stream->rx) {
|
||||||
|
qemudClientMessageQueuePush(&stream->rx, msg);
|
||||||
|
remoteStreamUpdateEvents(stream);
|
||||||
|
} else {
|
||||||
|
int ret = 0;
|
||||||
|
switch (msg->hdr.status) {
|
||||||
|
case REMOTE_OK:
|
||||||
|
ret = remoteStreamHandleFinish(client, stream, msg);
|
||||||
|
if (ret == 0)
|
||||||
|
qemudClientMessageRelease(client, msg);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REMOTE_CONTINUE:
|
||||||
|
qemudClientMessageQueuePush(&stream->rx, msg);
|
||||||
|
remoteStreamUpdateEvents(stream);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REMOTE_ERROR:
|
||||||
|
default:
|
||||||
|
ret = remoteStreamHandleAbort(client, stream, msg);
|
||||||
|
if (ret == 0)
|
||||||
|
qemudClientMessageRelease(client, msg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret < 0)
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,6 +250,10 @@ int remoteAddClientStream(struct qemud_client *client,
|
|||||||
|
|
||||||
DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
|
DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
|
||||||
|
|
||||||
|
if (virStreamEventAddCallback(stream->st, 0,
|
||||||
|
remoteStreamEvent, client, NULL) < 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
if (tmp) {
|
if (tmp) {
|
||||||
while (tmp->next)
|
while (tmp->next)
|
||||||
tmp = tmp->next;
|
tmp = tmp->next;
|
||||||
@ -132,6 +267,8 @@ int remoteAddClientStream(struct qemud_client *client,
|
|||||||
|
|
||||||
stream->tx = 1;
|
stream->tx = 1;
|
||||||
|
|
||||||
|
remoteStreamUpdateEvents(stream);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,3 +345,165 @@ remoteRemoveClientStream(struct qemud_client *client,
|
|||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns:
|
||||||
|
* -1 if fatal error occurred
|
||||||
|
* 0 if message was fully processed
|
||||||
|
* 1 if message is still being processed
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
remoteStreamHandleWriteData(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream,
|
||||||
|
struct qemud_client_message *msg)
|
||||||
|
{
|
||||||
|
remote_error rerr;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
DEBUG("stream=%p proc=%d serial=%d len=%d offset=%d",
|
||||||
|
stream, msg->hdr.proc, msg->hdr.serial, msg->bufferLength, msg->bufferOffset);
|
||||||
|
|
||||||
|
memset(&rerr, 0, sizeof rerr);
|
||||||
|
|
||||||
|
ret = virStreamSend(stream->st,
|
||||||
|
msg->buffer + msg->bufferOffset,
|
||||||
|
msg->bufferLength - msg->bufferOffset);
|
||||||
|
|
||||||
|
if (ret > 0) {
|
||||||
|
msg->bufferOffset += ret;
|
||||||
|
|
||||||
|
/* Partial write, so indicate we have more todo later */
|
||||||
|
if (msg->bufferOffset < msg->bufferLength)
|
||||||
|
return 1;
|
||||||
|
} else if (ret == -2) {
|
||||||
|
/* Blocking, so indicate we have more todo later */
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
VIR_INFO0("Stream send failed");
|
||||||
|
stream->closed = 1;
|
||||||
|
remoteDispatchConnError(&rerr, client->conn);
|
||||||
|
return remoteSerializeReplyError(client, &rerr, &msg->hdr);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Process an finish handshake from the client.
|
||||||
|
*
|
||||||
|
* Returns a REMOTE_OK confirmation if successful, or a REMOTE_ERROR
|
||||||
|
* if there was a stream error
|
||||||
|
*
|
||||||
|
* Returns 0 if successfully sent RPC reply, -1 upon fatal error
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
remoteStreamHandleFinish(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream,
|
||||||
|
struct qemud_client_message *msg)
|
||||||
|
{
|
||||||
|
remote_error rerr;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
DEBUG("stream=%p proc=%d serial=%d",
|
||||||
|
stream, msg->hdr.proc, msg->hdr.serial);
|
||||||
|
|
||||||
|
memset(&rerr, 0, sizeof rerr);
|
||||||
|
|
||||||
|
stream->closed = 1;
|
||||||
|
ret = virStreamFinish(stream->st);
|
||||||
|
|
||||||
|
if (ret < 0) {
|
||||||
|
remoteDispatchConnError(&rerr, client->conn);
|
||||||
|
return remoteSerializeReplyError(client, &rerr, &msg->hdr);
|
||||||
|
} else {
|
||||||
|
/* Send zero-length confirm */
|
||||||
|
if (remoteSendStreamData(client, stream, NULL, 0) < 0)
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Process an abort request from the client.
|
||||||
|
*
|
||||||
|
* Returns 0 if successfully aborted, -1 upon error
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
remoteStreamHandleAbort(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream,
|
||||||
|
struct qemud_client_message *msg)
|
||||||
|
{
|
||||||
|
remote_error rerr;
|
||||||
|
|
||||||
|
DEBUG("stream=%p proc=%d serial=%d",
|
||||||
|
stream, msg->hdr.proc, msg->hdr.serial);
|
||||||
|
|
||||||
|
memset(&rerr, 0, sizeof rerr);
|
||||||
|
|
||||||
|
stream->closed = 1;
|
||||||
|
virStreamAbort(stream->st);
|
||||||
|
|
||||||
|
if (msg->hdr.status == REMOTE_ERROR)
|
||||||
|
remoteDispatchFormatError(&rerr, "%s", _("stream aborted at client request"));
|
||||||
|
else {
|
||||||
|
VIR_WARN("unexpected stream status %d", msg->hdr.status);
|
||||||
|
remoteDispatchFormatError(&rerr, _("stream aborted with unexpected status %d"),
|
||||||
|
msg->hdr.status);
|
||||||
|
}
|
||||||
|
|
||||||
|
return remoteSerializeReplyError(client, &rerr, &msg->hdr);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Called when the stream is signalled has being able to accept
|
||||||
|
* data writes. Will process all pending incoming messages
|
||||||
|
* until they're all gone, or I/O blocks
|
||||||
|
*
|
||||||
|
* Returns 0 on success, or -1 upon fatal error
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
remoteStreamHandleWrite(struct qemud_client *client,
|
||||||
|
struct qemud_client_stream *stream)
|
||||||
|
{
|
||||||
|
struct qemud_client_message *msg, *tmp;
|
||||||
|
|
||||||
|
DEBUG("stream=%p", stream);
|
||||||
|
|
||||||
|
msg = stream->rx;
|
||||||
|
while (msg && !stream->closed) {
|
||||||
|
int ret;
|
||||||
|
switch (msg->hdr.status) {
|
||||||
|
case REMOTE_OK:
|
||||||
|
ret = remoteStreamHandleFinish(client, stream, msg);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REMOTE_CONTINUE:
|
||||||
|
ret = remoteStreamHandleWriteData(client, stream, msg);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REMOTE_ERROR:
|
||||||
|
default:
|
||||||
|
ret = remoteStreamHandleAbort(client, stream, msg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret == 0)
|
||||||
|
qemudClientMessageQueueServe(&stream->rx);
|
||||||
|
else if (ret < 0)
|
||||||
|
return -1;
|
||||||
|
else
|
||||||
|
break; /* still processing data */
|
||||||
|
|
||||||
|
tmp = msg->next;
|
||||||
|
qemudClientMessageRelease(client, msg);
|
||||||
|
msg = tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
daemon/dispatch.c
|
daemon/dispatch.c
|
||||||
daemon/libvirtd.c
|
daemon/libvirtd.c
|
||||||
daemon/remote.c
|
daemon/remote.c
|
||||||
|
daemon/stream.c
|
||||||
src/conf/domain_conf.c
|
src/conf/domain_conf.c
|
||||||
src/conf/interface_conf.c
|
src/conf/interface_conf.c
|
||||||
src/conf/network_conf.c
|
src/conf/network_conf.c
|
||||||
|
Loading…
x
Reference in New Issue
Block a user