2009-07-10 13:06:36 +01:00
/*
* stream . c : APIs for managing client streams
*
2014-09-03 13:39:21 -06:00
* Copyright ( C ) 2009 - 2014 Red Hat , Inc .
2009-07-10 13:06:36 +01:00
*
* This library is free software ; you can redistribute it and / or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation ; either
* version 2.1 of the License , or ( at your option ) any later version .
*
* This library is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU
* Lesser General Public License for more details .
*
* You should have received a copy of the GNU Lesser General Public
2012-09-20 16:30:55 -06:00
* License along with this library . If not , see
2012-07-21 18:06:23 +08:00
* < http : //www.gnu.org/licenses/>.
2009-07-10 13:06:36 +01:00
*
* Author : Daniel P . Berrange < berrange @ redhat . com >
*/
# include <config.h>
# include "stream.h"
2011-05-16 18:13:11 +01:00
# include "remote.h"
2012-12-12 18:06:53 +00:00
# include "viralloc.h"
2012-12-12 17:59:27 +00:00
# include "virlog.h"
2011-05-16 18:13:11 +01:00
# include "virnetserverclient.h"
2012-12-13 18:21:53 +00:00
# include "virerror.h"
2016-04-11 11:58:19 +02:00
# include "libvirt_internal.h"
2011-05-14 17:46:00 +02:00
# define VIR_FROM_THIS VIR_FROM_STREAMS
2009-07-10 13:06:36 +01:00
2014-02-28 12:16:17 +00:00
VIR_LOG_INIT ( " daemon.stream " ) ;
2011-05-16 18:13:11 +01:00
struct daemonClientStream {
daemonClientPrivatePtr priv ;
2011-08-31 17:01:01 +01:00
int refs ;
2011-05-16 18:13:11 +01:00
virNetServerProgramPtr prog ;
virStreamPtr st ;
int procedure ;
2016-03-29 17:33:14 +02:00
unsigned int serial ;
2011-05-16 18:13:11 +01:00
2016-04-04 12:46:19 +02:00
bool recvEOF ;
bool closed ;
2011-05-16 18:13:11 +01:00
int filterID ;
virNetMessagePtr rx ;
2016-04-04 12:49:48 +02:00
bool tx ;
2011-05-16 18:13:11 +01:00
2016-04-11 16:50:04 +02:00
bool allowSkip ;
2016-04-11 11:58:19 +02:00
size_t dataLen ; /* How much data is there remaining until we see a hole */
2016-04-11 16:50:04 +02:00
2011-05-16 18:13:11 +01:00
daemonClientStreamPtr next ;
} ;
2009-08-24 20:53:48 +01:00
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleWrite ( virNetServerClientPtr client ,
daemonClientStream * stream ) ;
2009-08-24 20:53:48 +01:00
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleRead ( virNetServerClientPtr client ,
daemonClientStream * stream ) ;
2009-08-24 20:57:16 +01:00
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleFinish ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg ) ;
2009-08-24 20:53:48 +01:00
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleAbort ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg ) ;
2009-08-24 20:53:48 +01:00
static void
2011-05-16 18:13:11 +01:00
daemonStreamUpdateEvents ( daemonClientStream * stream )
2009-08-24 20:53:48 +01:00
{
int newEvents = 0 ;
2016-04-25 13:18:26 -04:00
if ( stream - > closed )
return ;
2009-08-24 20:53:48 +01:00
if ( stream - > rx )
newEvents | = VIR_STREAM_EVENT_WRITABLE ;
2009-08-24 20:57:16 +01:00
if ( stream - > tx & & ! stream - > recvEOF )
newEvents | = VIR_STREAM_EVENT_READABLE ;
2009-08-24 20:53:48 +01:00
virStreamEventUpdateCallback ( stream - > st , newEvents ) ;
}
2011-05-16 18:13:11 +01:00
/*
* Invoked when an outgoing data packet message has been fully sent .
* This simply re - enables TX of further data .
*
* The idea is to stop the daemon growing without bound due to
* fast stream , but slow client
*/
static void
2016-03-29 17:33:14 +02:00
daemonStreamMessageFinished ( virNetMessagePtr msg ,
2011-05-16 18:13:11 +01:00
void * opaque )
{
daemonClientStream * stream = opaque ;
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " stream=%p proc=%d serial=%u " ,
2011-05-16 18:13:11 +01:00
stream , msg - > header . proc , msg - > header . serial ) ;
2016-04-04 12:49:48 +02:00
stream - > tx = true ;
2011-05-16 18:13:11 +01:00
daemonStreamUpdateEvents ( stream ) ;
2011-08-31 17:01:01 +01:00
daemonFreeClientStream ( NULL , stream ) ;
2011-05-16 18:13:11 +01:00
}
2009-08-24 20:53:48 +01:00
2011-07-08 12:33:52 +01:00
2009-08-24 20:53:48 +01:00
/*
* Callback that gets invoked when a stream becomes writable / readable
*/
static void
2011-05-16 18:13:11 +01:00
daemonStreamEvent ( virStreamPtr st , int events , void * opaque )
2009-08-24 20:53:48 +01:00
{
2011-05-16 18:13:11 +01:00
virNetServerClientPtr client = opaque ;
daemonClientStream * stream ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
2009-08-24 20:53:48 +01:00
2011-05-16 18:13:11 +01:00
virMutexLock ( & priv - > lock ) ;
2009-08-24 20:53:48 +01:00
2011-05-16 18:13:11 +01:00
stream = priv - > streams ;
while ( stream ) {
if ( stream - > st = = st )
break ;
stream = stream - > next ;
}
2009-08-24 20:53:48 +01:00
if ( ! stream ) {
VIR_WARN ( " event for client=%p stream st=%p, but missing stream state " , client , st ) ;
virStreamEventRemoveCallback ( st ) ;
goto cleanup ;
}
2011-05-16 18:13:11 +01:00
VIR_DEBUG ( " st=%p events=%d EOF=%d closed=%d " , st , events , stream - > recvEOF , stream - > closed ) ;
2009-08-24 20:53:48 +01:00
2011-10-07 17:38:59 +01:00
if ( ! stream - > closed & &
( events & VIR_STREAM_EVENT_WRITABLE ) ) {
2011-05-16 18:13:11 +01:00
if ( daemonStreamHandleWrite ( client , stream ) < 0 ) {
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
2009-08-24 20:53:48 +01:00
goto cleanup ;
}
}
2011-10-07 17:38:59 +01:00
if ( ! stream - > closed & & ! stream - > recvEOF & &
( events & ( VIR_STREAM_EVENT_READABLE ) ) ) {
events = events & ~ ( VIR_STREAM_EVENT_READABLE ) ;
2011-05-16 18:13:11 +01:00
if ( daemonStreamHandleRead ( client , stream ) < 0 ) {
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
2009-08-24 20:57:16 +01:00
goto cleanup ;
}
2013-04-09 13:24:02 +01:00
/* If we detected EOF during read processing,
* then clear hangup / error conditions , since
* we want the client to see the EOF message
* we just sent them
*/
if ( stream - > recvEOF )
events = events & ~ ( VIR_STREAM_EVENT_HANGUP |
VIR_STREAM_EVENT_ERROR ) ;
2009-08-24 20:57:16 +01:00
}
2011-05-16 18:13:11 +01:00
/* If we have a completion/abort message, always process it */
if ( stream - > rx ) {
virNetMessagePtr msg = stream - > rx ;
switch ( msg - > header . status ) {
case VIR_NET_CONTINUE :
/* nada */
break ;
case VIR_NET_OK :
virNetMessageQueueServe ( & stream - > rx ) ;
if ( daemonStreamHandleFinish ( client , stream , msg ) < 0 ) {
virNetMessageFree ( msg ) ;
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
break ;
case VIR_NET_ERROR :
default :
virNetMessageQueueServe ( & stream - > rx ) ;
if ( daemonStreamHandleAbort ( client , stream , msg ) < 0 ) {
virNetMessageFree ( msg ) ;
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
break ;
}
}
2011-10-07 17:38:59 +01:00
/* If we got HANGUP, we need to only send an empty
* packet so the client sees an EOF and cleans up
*/
if ( ! stream - > closed & & ! stream - > recvEOF & &
( events & VIR_STREAM_EVENT_HANGUP ) ) {
virNetMessagePtr msg ;
events & = ~ ( VIR_STREAM_EVENT_HANGUP ) ;
2016-04-04 12:49:48 +02:00
stream - > tx = false ;
2016-04-04 12:46:19 +02:00
stream - > recvEOF = true ;
2011-10-07 17:38:59 +01:00
if ( ! ( msg = virNetMessageNew ( false ) ) ) {
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
msg - > cb = daemonStreamMessageFinished ;
msg - > opaque = stream ;
stream - > refs + + ;
if ( virNetServerProgramSendStreamData ( remoteProgram ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
" " , 0 ) < 0 ) {
virNetMessageFree ( msg ) ;
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
}
2009-08-24 20:53:48 +01:00
if ( ! stream - > closed & &
( events & ( VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP ) ) ) {
int ret ;
2011-05-16 18:13:11 +01:00
virNetMessagePtr msg ;
virNetMessageError rerr ;
2017-05-29 16:29:36 +02:00
virErrorPtr origErr = virSaveLastError ( ) ;
2011-05-16 18:13:11 +01:00
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2016-04-04 12:46:19 +02:00
stream - > closed = true ;
2010-08-17 16:30:51 +01:00
virStreamEventRemoveCallback ( stream - > st ) ;
2009-08-24 20:53:48 +01:00
virStreamAbort ( stream - > st ) ;
2017-05-29 16:29:36 +02:00
if ( origErr & & origErr - > code ! = VIR_ERR_OK ) {
virSetError ( origErr ) ;
} else {
if ( events & VIR_STREAM_EVENT_HANGUP )
virReportError ( VIR_ERR_RPC ,
" %s " , _ ( " stream had unexpected termination " ) ) ;
else
virReportError ( VIR_ERR_RPC ,
" %s " , _ ( " stream had I/O failure " ) ) ;
}
2017-07-20 06:44:21 -04:00
virFreeError ( origErr ) ;
2011-05-16 18:13:11 +01:00
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 17:42:58 +01:00
msg = virNetMessageNew ( false ) ;
2011-05-16 18:13:11 +01:00
if ( ! msg ) {
ret = - 1 ;
} else {
ret = virNetServerProgramSendStreamError ( remoteProgram ,
client ,
msg ,
& rerr ,
stream - > procedure ,
stream - > serial ) ;
}
daemonRemoveClientStream ( client , stream ) ;
2009-08-24 20:53:48 +01:00
if ( ret < 0 )
2011-05-16 18:13:11 +01:00
virNetServerClientClose ( client ) ;
2009-08-24 20:53:48 +01:00
goto cleanup ;
}
if ( stream - > closed ) {
2011-05-16 18:13:11 +01:00
daemonRemoveClientStream ( client , stream ) ;
2009-08-24 20:53:48 +01:00
} else {
2011-05-16 18:13:11 +01:00
daemonStreamUpdateEvents ( stream ) ;
2009-08-24 20:53:48 +01:00
}
2014-03-25 07:45:38 +01:00
cleanup :
2011-05-16 18:13:11 +01:00
virMutexUnlock ( & priv - > lock ) ;
2009-08-24 20:53:48 +01:00
}
2009-07-10 13:06:36 +01:00
/*
* @ client : a locked client object
*
* Invoked by the main loop when filtering incoming messages .
*
* Returns 1 if the message was processed , 0 if skipped ,
* - 1 on fatal client error
*/
static int
2011-09-26 18:51:47 +02:00
daemonStreamFilter ( virNetServerClientPtr client ATTRIBUTE_UNUSED ,
2011-05-16 18:13:11 +01:00
virNetMessagePtr msg ,
void * opaque )
2009-07-10 13:06:36 +01:00
{
2011-05-16 18:13:11 +01:00
daemonClientStream * stream = opaque ;
int ret = 0 ;
2009-08-24 20:53:48 +01:00
2011-05-16 18:13:11 +01:00
virMutexLock ( & stream - > priv - > lock ) ;
2016-04-04 14:54:46 +02:00
if ( msg - > header . type ! = VIR_NET_STREAM & &
msg - > header . type ! = VIR_NET_STREAM_HOLE )
2011-05-16 18:13:11 +01:00
goto cleanup ;
if ( ! virNetServerProgramMatches ( stream - > prog , msg ) )
goto cleanup ;
if ( msg - > header . proc ! = stream - > procedure | |
msg - > header . serial ! = stream - > serial )
goto cleanup ;
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " Incoming client=%p, rx=%p, serial=%u, proc=%d, status=%d " ,
2011-05-16 18:13:11 +01:00
client , stream - > rx , msg - > header . proc ,
msg - > header . serial , msg - > header . status ) ;
virNetMessageQueuePush ( & stream - > rx , msg ) ;
daemonStreamUpdateEvents ( stream ) ;
ret = 1 ;
2014-03-25 07:45:38 +01:00
cleanup :
2011-05-16 18:13:11 +01:00
virMutexUnlock ( & stream - > priv - > lock ) ;
return ret ;
2009-07-10 13:06:36 +01:00
}
/*
* @ conn : a connection object to associate the stream with
2011-05-16 18:13:11 +01:00
* @ header : the method call to associate with the stream
2009-07-10 13:06:36 +01:00
*
* Creates a new stream for this conn
*
* Returns a new stream object , or NULL upon OOM
*/
2011-05-16 18:13:11 +01:00
daemonClientStream *
daemonCreateClientStream ( virNetServerClientPtr client ,
virStreamPtr st ,
virNetServerProgramPtr prog ,
2016-04-11 16:50:04 +02:00
virNetMessageHeaderPtr header ,
bool allowSkip )
2009-07-10 13:06:36 +01:00
{
2011-05-16 18:13:11 +01:00
daemonClientStream * stream ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
2009-07-10 13:06:36 +01:00
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%u, st=%p " ,
2011-05-16 18:13:11 +01:00
client , header - > proc , header - > serial , st ) ;
2009-07-10 13:06:36 +01:00
2013-07-04 11:58:18 +02:00
if ( VIR_ALLOC ( stream ) < 0 )
2009-07-10 13:06:36 +01:00
return NULL ;
2011-08-31 17:01:01 +01:00
stream - > refs = 1 ;
2011-05-16 18:13:11 +01:00
stream - > priv = priv ;
2012-07-11 14:35:52 +01:00
stream - > prog = virObjectRef ( prog ) ;
2011-05-16 18:13:11 +01:00
stream - > procedure = header - > proc ;
stream - > serial = header - > serial ;
stream - > filterID = - 1 ;
stream - > st = st ;
2016-04-11 16:50:04 +02:00
stream - > allowSkip = allowSkip ;
2009-07-10 13:06:36 +01:00
return stream ;
}
/*
* @ stream : an unused client stream
*
* Frees the memory associated with this inactive client
* stream
*/
2011-05-16 18:13:11 +01:00
int daemonFreeClientStream ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-07-10 13:06:36 +01:00
{
2011-05-16 18:13:11 +01:00
virNetMessagePtr msg ;
int ret = 0 ;
2009-07-10 13:06:36 +01:00
if ( ! stream )
2011-05-16 18:13:11 +01:00
return 0 ;
2011-08-31 17:01:01 +01:00
stream - > refs - - ;
if ( stream - > refs )
return 0 ;
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%u " ,
2011-05-16 18:13:11 +01:00
client , stream - > procedure , stream - > serial ) ;
2009-07-10 13:06:36 +01:00
2012-07-11 14:35:52 +01:00
virObjectUnref ( stream - > prog ) ;
2009-07-10 13:06:36 +01:00
msg = stream - > rx ;
while ( msg ) {
2011-05-16 18:13:11 +01:00
virNetMessagePtr tmp = msg - > next ;
2011-08-14 15:44:45 -07:00
if ( client ) {
/* Send a dummy reply to free up 'msg' & unblock client rx */
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 17:42:58 +01:00
virNetMessageClear ( msg ) ;
2011-08-14 15:44:45 -07:00
msg - > header . type = VIR_NET_REPLY ;
if ( virNetServerClientSendMessage ( client , msg ) < 0 ) {
virNetServerClientImmediateClose ( client ) ;
virNetMessageFree ( msg ) ;
ret = - 1 ;
}
} else {
2011-05-16 18:13:11 +01:00
virNetMessageFree ( msg ) ;
}
2009-07-10 13:06:36 +01:00
msg = tmp ;
}
2014-11-30 10:19:38 -05:00
virObjectUnref ( stream - > st ) ;
2009-07-10 13:06:36 +01:00
VIR_FREE ( stream ) ;
2011-05-16 18:13:11 +01:00
return ret ;
2009-07-10 13:06:36 +01:00
}
/*
* @ client : a locked client to add the stream to
* @ stream : a stream to add
*/
2011-05-16 18:13:11 +01:00
int daemonAddClientStream ( virNetServerClientPtr client ,
daemonClientStream * stream ,
bool transmit )
2009-07-10 13:06:36 +01:00
{
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%u, st=%p, transmit=%d " ,
2011-05-16 18:13:11 +01:00
client , stream - > procedure , stream - > serial , stream - > st , transmit ) ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
2009-07-10 13:06:36 +01:00
2011-05-16 18:13:11 +01:00
if ( stream - > filterID ! = - 1 ) {
VIR_WARN ( " Filter already added to client %p " , client ) ;
return - 1 ;
}
2009-07-10 13:06:36 +01:00
2009-08-24 20:53:48 +01:00
if ( virStreamEventAddCallback ( stream - > st , 0 ,
2011-07-08 12:33:52 +01:00
daemonStreamEvent , client ,
2012-07-11 14:35:52 +01:00
virObjectFreeCallback ) < 0 )
2009-08-24 20:53:48 +01:00
return - 1 ;
2012-07-11 14:35:52 +01:00
virObjectRef ( client ) ;
2011-05-16 18:13:11 +01:00
if ( ( stream - > filterID = virNetServerClientAddFilter ( client ,
daemonStreamFilter ,
stream ) ) < 0 ) {
virStreamEventRemoveCallback ( stream - > st ) ;
return - 1 ;
2009-07-10 13:06:36 +01:00
}
2009-09-30 12:47:43 +02:00
if ( transmit )
2016-04-04 12:49:48 +02:00
stream - > tx = true ;
2009-07-10 13:06:36 +01:00
2011-05-16 18:13:11 +01:00
virMutexLock ( & priv - > lock ) ;
stream - > next = priv - > streams ;
priv - > streams = stream ;
2009-08-24 20:53:48 +01:00
2011-05-16 18:13:11 +01:00
daemonStreamUpdateEvents ( stream ) ;
2009-07-10 13:06:36 +01:00
2011-05-16 18:13:11 +01:00
virMutexUnlock ( & priv - > lock ) ;
2009-07-10 13:06:36 +01:00
2011-05-16 18:13:11 +01:00
return 0 ;
2009-07-10 13:06:36 +01:00
}
/*
* @ client : a locked client object
* @ stream : an inactive , closed stream object
*
* Removes a stream from the list of active streams for the client
*
2016-09-21 14:06:09 +05:30
* Returns 0 if the stream was removed , - 1 if it doesn ' t exist
2009-07-10 13:06:36 +01:00
*/
int
2011-05-16 18:13:11 +01:00
daemonRemoveClientStream ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-07-10 13:06:36 +01:00
{
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%u, st=%p " ,
2011-05-16 18:13:11 +01:00
client , stream - > procedure , stream - > serial , stream - > st ) ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
daemonClientStream * curr = priv - > streams ;
daemonClientStream * prev = NULL ;
if ( stream - > filterID ! = - 1 ) {
virNetServerClientRemoveFilter ( client ,
stream - > filterID ) ;
stream - > filterID = - 1 ;
2009-07-10 13:06:36 +01:00
}
2010-08-17 16:30:51 +01:00
if ( ! stream - > closed ) {
2016-04-25 14:16:07 -04:00
stream - > closed = true ;
2010-08-17 16:30:51 +01:00
virStreamEventRemoveCallback ( stream - > st ) ;
2009-07-10 13:06:36 +01:00
virStreamAbort ( stream - > st ) ;
2010-08-17 16:30:51 +01:00
}
2009-07-10 13:06:36 +01:00
while ( curr ) {
if ( curr = = stream ) {
if ( prev )
prev - > next = curr - > next ;
else
2011-05-16 18:13:11 +01:00
priv - > streams = curr - > next ;
return daemonFreeClientStream ( client , stream ) ;
2009-07-10 13:06:36 +01:00
}
prev = curr ;
curr = curr - > next ;
}
return - 1 ;
}
2009-08-24 20:53:48 +01:00
2011-08-14 15:44:45 -07:00
void
daemonRemoveAllClientStreams ( daemonClientStream * stream )
{
daemonClientStream * tmp ;
VIR_DEBUG ( " stream=%p " , stream ) ;
while ( stream ) {
tmp = stream - > next ;
if ( ! stream - > closed ) {
2016-04-25 14:16:07 -04:00
stream - > closed = true ;
2011-08-14 15:44:45 -07:00
virStreamEventRemoveCallback ( stream - > st ) ;
virStreamAbort ( stream - > st ) ;
}
daemonFreeClientStream ( NULL , stream ) ;
VIR_DEBUG ( " next stream=%p " , tmp ) ;
stream = tmp ;
}
}
2009-08-24 20:53:48 +01:00
/*
* Returns :
* - 1 if fatal error occurred
* 0 if message was fully processed
* 1 if message is still being processed
*/
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleWriteData ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
2009-08-24 20:53:48 +01:00
{
int ret ;
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%u, len=%zu, offset=%zu " ,
2011-05-16 18:13:11 +01:00
client , stream , msg - > header . proc , msg - > header . serial ,
msg - > bufferLength , msg - > bufferOffset ) ;
2009-08-24 20:53:48 +01:00
ret = virStreamSend ( stream - > st ,
msg - > buffer + msg - > bufferOffset ,
msg - > bufferLength - msg - > bufferOffset ) ;
if ( ret > 0 ) {
msg - > bufferOffset + = ret ;
/* Partial write, so indicate we have more todo later */
if ( msg - > bufferOffset < msg - > bufferLength )
return 1 ;
} else if ( ret = = - 2 ) {
/* Blocking, so indicate we have more todo later */
return 1 ;
} else {
2011-05-16 18:13:11 +01:00
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2011-05-09 17:24:09 +08:00
VIR_INFO ( " Stream send failed " ) ;
2016-04-04 12:46:19 +02:00
stream - > closed = true ;
2016-04-25 13:15:48 -04:00
virStreamEventRemoveCallback ( stream - > st ) ;
virStreamAbort ( stream - > st ) ;
2011-05-16 18:13:11 +01:00
return virNetServerProgramSendReplyError ( stream - > prog ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
2009-08-24 20:53:48 +01:00
}
return 0 ;
}
/*
2011-12-03 17:06:07 -07:00
* Process a finish handshake from the client .
2009-08-24 20:53:48 +01:00
*
2011-05-16 18:13:11 +01:00
* Returns a VIR_NET_OK confirmation if successful , or a VIR_NET_ERROR
2009-08-24 20:53:48 +01:00
* if there was a stream error
*
* Returns 0 if successfully sent RPC reply , - 1 upon fatal error
*/
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleFinish ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
2009-08-24 20:53:48 +01:00
{
int ret ;
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%u " ,
2011-05-16 18:13:11 +01:00
client , stream , msg - > header . proc , msg - > header . serial ) ;
2009-08-24 20:53:48 +01:00
2016-04-04 12:46:19 +02:00
stream - > closed = true ;
2010-08-17 16:30:51 +01:00
virStreamEventRemoveCallback ( stream - > st ) ;
2009-08-24 20:53:48 +01:00
ret = virStreamFinish ( stream - > st ) ;
if ( ret < 0 ) {
2011-05-16 18:13:11 +01:00
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
return virNetServerProgramSendReplyError ( stream - > prog ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
2009-08-24 20:53:48 +01:00
} else {
/* Send zero-length confirm */
2011-05-16 18:13:11 +01:00
return virNetServerProgramSendStreamData ( stream - > prog ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
NULL , 0 ) ;
2009-08-24 20:53:48 +01:00
}
}
/*
* Process an abort request from the client .
*
* Returns 0 if successfully aborted , - 1 upon error
*/
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleAbort ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
2009-08-24 20:53:48 +01:00
{
2016-03-29 17:33:14 +02:00
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%u " ,
2011-05-16 18:13:11 +01:00
client , stream , msg - > header . proc , msg - > header . serial ) ;
2016-04-24 16:28:18 -04:00
int ret ;
bool raise_error = false ;
2009-08-24 20:53:48 +01:00
2016-04-04 12:46:19 +02:00
stream - > closed = true ;
2010-08-17 16:30:51 +01:00
virStreamEventRemoveCallback ( stream - > st ) ;
2016-04-24 16:28:18 -04:00
ret = virStreamAbort ( stream - > st ) ;
2009-08-24 20:53:48 +01:00
2014-09-03 13:39:21 -06:00
if ( msg - > header . status = = VIR_NET_ERROR ) {
2016-04-24 16:28:18 -04:00
VIR_INFO ( " stream aborted at client request " ) ;
raise_error = ( ret < 0 ) ;
2014-09-03 13:39:21 -06:00
} else {
2012-07-18 19:30:53 +01:00
virReportError ( VIR_ERR_RPC ,
_ ( " stream aborted with unexpected status %d " ) ,
msg - > header . status ) ;
2016-04-24 16:28:18 -04:00
raise_error = true ;
2009-08-24 20:53:48 +01:00
}
2016-04-24 16:28:18 -04:00
if ( raise_error ) {
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
return virNetServerProgramSendReplyError ( remoteProgram ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
} else {
/* Send zero-length confirm */
return virNetServerProgramSendStreamData ( stream - > prog ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
NULL , 0 ) ;
}
2009-08-24 20:53:48 +01:00
}
2017-05-12 11:26:33 +02:00
static int
daemonStreamHandleHole ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
{
int ret ;
virNetStreamHole data ;
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%u " ,
client , stream , msg - > header . proc , msg - > header . serial ) ;
/* Let's check if client plays nicely and advertised usage of
* sparse stream upfront . */
if ( ! stream - > allowSkip ) {
virReportError ( VIR_ERR_RPC , " %s " ,
_ ( " Unexpected stream hole " ) ) ;
return - 1 ;
}
if ( virNetMessageDecodePayload ( msg ,
( xdrproc_t ) xdr_virNetStreamHole ,
& data ) < 0 )
return - 1 ;
ret = virStreamSendHole ( stream - > st , data . length , data . flags ) ;
if ( ret < 0 ) {
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
VIR_INFO ( " Stream send hole failed " ) ;
stream - > closed = true ;
virStreamEventRemoveCallback ( stream - > st ) ;
virStreamAbort ( stream - > st ) ;
return virNetServerProgramSendReplyError ( stream - > prog ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
}
return 0 ;
}
2009-08-24 20:53:48 +01:00
/*
* Called when the stream is signalled has being able to accept
* data writes . Will process all pending incoming messages
* until they ' re all gone , or I / O blocks
*
* Returns 0 on success , or - 1 upon fatal error
*/
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleWrite ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-08-24 20:53:48 +01:00
{
2011-05-16 18:13:11 +01:00
VIR_DEBUG ( " client=%p, stream=%p " , client , stream ) ;
2009-08-24 20:53:48 +01:00
2011-05-16 18:13:11 +01:00
while ( stream - > rx & & ! stream - > closed ) {
virNetMessagePtr msg = stream - > rx ;
2009-08-24 20:53:48 +01:00
int ret ;
2011-05-16 18:13:11 +01:00
2017-05-12 11:26:33 +02:00
if ( msg - > header . type = = VIR_NET_STREAM_HOLE ) {
/* Handle special case when the client sent us a hole.
* Otherwise just carry on with processing stream
* data . */
ret = daemonStreamHandleHole ( client , stream , msg ) ;
} else if ( msg - > header . type = = VIR_NET_STREAM ) {
switch ( msg - > header . status ) {
case VIR_NET_OK :
ret = daemonStreamHandleFinish ( client , stream , msg ) ;
break ;
case VIR_NET_CONTINUE :
ret = daemonStreamHandleWriteData ( client , stream , msg ) ;
break ;
case VIR_NET_ERROR :
default :
ret = daemonStreamHandleAbort ( client , stream , msg ) ;
break ;
}
} else {
virReportError ( VIR_ERR_RPC ,
_ ( " Unexpected message type: %d " ) ,
msg - > header . type ) ;
ret = - 1 ;
2009-08-24 20:53:48 +01:00
}
2011-05-16 18:13:11 +01:00
if ( ret > 0 )
break ; /* still processing data from msg */
2009-08-24 20:53:48 +01:00
2011-05-16 18:13:11 +01:00
virNetMessageQueueServe ( & stream - > rx ) ;
if ( ret < 0 ) {
virNetMessageFree ( msg ) ;
2011-07-08 12:54:29 +01:00
virNetServerClientImmediateClose ( client ) ;
2011-05-16 18:13:11 +01:00
return - 1 ;
}
2011-06-30 12:28:10 +01:00
/* 'CONTINUE' messages don't send a reply (unless error
* occurred ) , so to release the ' msg ' object we need to
* send a fake zero - length reply . Nothing actually gets
* onto the wire , but this causes the client to reset
* its active request count / throttling
*/
if ( msg - > header . status = = VIR_NET_CONTINUE ) {
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 17:42:58 +01:00
virNetMessageClear ( msg ) ;
2011-06-30 12:28:10 +01:00
msg - > header . type = VIR_NET_REPLY ;
if ( virNetServerClientSendMessage ( client , msg ) < 0 ) {
virNetMessageFree ( msg ) ;
2011-07-08 12:54:29 +01:00
virNetServerClientImmediateClose ( client ) ;
2011-06-30 12:28:10 +01:00
return - 1 ;
}
}
2009-08-24 20:53:48 +01:00
}
return 0 ;
}
2009-08-24 20:57:16 +01:00
/*
* Invoked when a stream is signalled as having data
2012-10-11 18:31:20 +02:00
* available to read . This reads up to one message
2009-08-24 20:57:16 +01:00
* worth of data , and then queues that for transmission
* to the client .
*
2016-12-01 22:50:08 +02:00
* Returns 0 if data was queued for TX , or an error RPC
2009-08-24 20:57:16 +01:00
* was sent , or - 1 on fatal error , indicating client should
* be killed
*/
static int
2011-05-16 18:13:11 +01:00
daemonStreamHandleRead ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-08-24 20:57:16 +01:00
{
2016-04-08 16:56:28 +02:00
virNetMessagePtr msg = NULL ;
virNetMessageError rerr ;
2009-08-24 20:57:16 +01:00
char * buffer ;
2013-09-30 17:27:51 +01:00
size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX ;
2016-04-08 16:56:28 +02:00
int ret = - 1 ;
int rv ;
2016-04-11 11:58:19 +02:00
int inData = 0 ;
long long length = 0 ;
2009-08-24 20:57:16 +01:00
2011-08-16 16:20:58 -07:00
VIR_DEBUG ( " client=%p, stream=%p tx=%d closed=%d " ,
client , stream , stream - > tx , stream - > closed ) ;
/* We might have had an event pending before we shut
* down the stream , so if we ' re marked as closed ,
* then do nothing
*/
if ( stream - > closed )
return 0 ;
2009-08-24 20:57:16 +01:00
/* Shouldn't ever be called unless we're marked able to
* transmit , but doesn ' t hurt to check */
if ( ! stream - > tx )
return 0 ;
2016-04-08 16:56:28 +02:00
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2009-08-24 20:57:16 +01:00
if ( VIR_ALLOC_N ( buffer , bufferLen ) < 0 )
return - 1 ;
2016-04-08 16:56:28 +02:00
if ( ! ( msg = virNetMessageNew ( false ) ) )
goto cleanup ;
2016-04-11 11:58:19 +02:00
if ( stream - > allowSkip & & stream - > dataLen = = 0 ) {
/* Handle skip. We want to send some data to the client. But we might
* be in a hole . Seek to next data . But if we are in data already , just
* carry on . */
rv = virStreamInData ( stream - > st , & inData , & length ) ;
VIR_DEBUG ( " rv=%d inData=%d length=%lld " , rv , inData , length ) ;
if ( rv < 0 ) {
if ( virNetServerProgramSendStreamError ( remoteProgram ,
client ,
msg ,
& rerr ,
stream - > procedure ,
stream - > serial ) < 0 )
goto cleanup ;
msg = NULL ;
/* We're done with this call */
goto done ;
} else {
if ( ! inData & & length ) {
stream - > tx = false ;
msg - > cb = daemonStreamMessageFinished ;
msg - > opaque = stream ;
stream - > refs + + ;
if ( virNetServerProgramSendStreamHole ( remoteProgram ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
length ,
0 ) < 0 )
goto cleanup ;
msg = NULL ;
/* We have successfully sent stream skip to the other side.
* To keep streams in sync seek locally too . */
virStreamSendHole ( stream - > st , length , 0 ) ;
/* We're done with this call */
goto done ;
}
}
stream - > dataLen = length ;
}
if ( stream - > allowSkip & &
bufferLen > stream - > dataLen )
bufferLen = stream - > dataLen ;
2016-04-08 16:56:28 +02:00
rv = virStreamRecv ( stream - > st , buffer , bufferLen ) ;
if ( rv = = - 2 ) {
2009-08-24 20:57:16 +01:00
/* Should never get this, since we're only called when we know
* we ' re readable , but hey things change . . . */
2016-04-08 16:56:28 +02:00
} else if ( rv < 0 ) {
if ( virNetServerProgramSendStreamError ( remoteProgram ,
client ,
msg ,
& rerr ,
stream - > procedure ,
stream - > serial ) < 0 )
goto cleanup ;
msg = NULL ;
2009-08-24 20:57:16 +01:00
} else {
2016-04-11 11:58:19 +02:00
if ( stream - > allowSkip )
stream - > dataLen - = rv ;
2016-04-04 12:49:48 +02:00
stream - > tx = false ;
2016-04-08 16:56:28 +02:00
if ( rv = = 0 )
2016-04-04 12:46:19 +02:00
stream - > recvEOF = true ;
2011-05-16 18:13:11 +01:00
2016-04-08 16:56:28 +02:00
msg - > cb = daemonStreamMessageFinished ;
msg - > opaque = stream ;
stream - > refs + + ;
if ( virNetServerProgramSendStreamData ( remoteProgram ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
buffer , rv ) < 0 )
goto cleanup ;
msg = NULL ;
2009-08-24 20:57:16 +01:00
}
2016-04-11 11:58:19 +02:00
done :
2016-04-08 16:56:28 +02:00
ret = 0 ;
cleanup :
2009-08-24 20:57:16 +01:00
VIR_FREE ( buffer ) ;
2016-04-08 16:56:28 +02:00
virNetMessageFree ( msg ) ;
2009-08-24 20:57:16 +01:00
return ret ;
}