2009-07-10 16:06:36 +04:00
/*
* stream . c : APIs for managing client streams
*
* Copyright ( C ) 2009 Red Hat , Inc .
*
* This library is free software ; you can redistribute it and / or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation ; either
* version 2.1 of the License , or ( at your option ) any later version .
*
* This library is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU
* Lesser General Public License for more details .
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library ; if not , write to the Free Software
* Foundation , Inc . , 59 Temple Place , Suite 330 , Boston , MA 02111 - 1307 USA
*
* Author : Daniel P . Berrange < berrange @ redhat . com >
*/
# include <config.h>
# include "stream.h"
2011-05-16 21:13:11 +04:00
# include "remote.h"
2009-07-10 16:06:36 +04:00
# include "memory.h"
# include "logging.h"
2011-05-16 21:13:11 +04:00
# include "virnetserverclient.h"
2011-05-14 19:46:00 +04:00
# include "virterror_internal.h"
# define VIR_FROM_THIS VIR_FROM_STREAMS
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
# define virNetError(code, ...) \
virReportErrorHelper ( VIR_FROM_THIS , code , __FILE__ , \
__FUNCTION__ , __LINE__ , __VA_ARGS__ )
struct daemonClientStream {
daemonClientPrivatePtr priv ;
2011-08-31 20:01:01 +04:00
int refs ;
2011-05-16 21:13:11 +04:00
virNetServerProgramPtr prog ;
virStreamPtr st ;
int procedure ;
int serial ;
unsigned int recvEOF : 1 ;
unsigned int closed : 1 ;
int filterID ;
virNetMessagePtr rx ;
int tx ;
daemonClientStreamPtr next ;
} ;
2009-08-24 23:53:48 +04:00
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleWrite ( virNetServerClientPtr client ,
daemonClientStream * stream ) ;
2009-08-24 23:53:48 +04:00
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleRead ( virNetServerClientPtr client ,
daemonClientStream * stream ) ;
2009-08-24 23:57:16 +04:00
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleFinish ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg ) ;
2009-08-24 23:53:48 +04:00
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleAbort ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg ) ;
2009-08-24 23:53:48 +04:00
static void
2011-05-16 21:13:11 +04:00
daemonStreamUpdateEvents ( daemonClientStream * stream )
2009-08-24 23:53:48 +04:00
{
int newEvents = 0 ;
if ( stream - > rx )
newEvents | = VIR_STREAM_EVENT_WRITABLE ;
2009-08-24 23:57:16 +04:00
if ( stream - > tx & & ! stream - > recvEOF )
newEvents | = VIR_STREAM_EVENT_READABLE ;
2009-08-24 23:53:48 +04:00
virStreamEventUpdateCallback ( stream - > st , newEvents ) ;
}
2011-05-16 21:13:11 +04:00
/*
* Invoked when an outgoing data packet message has been fully sent .
* This simply re - enables TX of further data .
*
* The idea is to stop the daemon growing without bound due to
* fast stream , but slow client
*/
static void
2011-09-26 20:51:47 +04:00
daemonStreamMessageFinished ( virNetMessagePtr msg ATTRIBUTE_UNUSED ,
2011-05-16 21:13:11 +04:00
void * opaque )
{
daemonClientStream * stream = opaque ;
VIR_DEBUG ( " stream=%p proc=%d serial=%d " ,
stream , msg - > header . proc , msg - > header . serial ) ;
stream - > tx = 1 ;
daemonStreamUpdateEvents ( stream ) ;
2011-08-31 20:01:01 +04:00
daemonFreeClientStream ( NULL , stream ) ;
2011-05-16 21:13:11 +04:00
}
2009-08-24 23:53:48 +04:00
2011-07-08 15:33:52 +04:00
static void
daemonStreamEventFreeFunc ( void * opaque )
{
virNetServerClientPtr client = opaque ;
virNetServerClientFree ( client ) ;
}
2009-08-24 23:53:48 +04:00
/*
* Callback that gets invoked when a stream becomes writable / readable
*/
static void
2011-05-16 21:13:11 +04:00
daemonStreamEvent ( virStreamPtr st , int events , void * opaque )
2009-08-24 23:53:48 +04:00
{
2011-05-16 21:13:11 +04:00
virNetServerClientPtr client = opaque ;
daemonClientStream * stream ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
virMutexLock ( & priv - > lock ) ;
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
stream = priv - > streams ;
while ( stream ) {
if ( stream - > st = = st )
break ;
stream = stream - > next ;
}
2009-08-24 23:53:48 +04:00
if ( ! stream ) {
VIR_WARN ( " event for client=%p stream st=%p, but missing stream state " , client , st ) ;
virStreamEventRemoveCallback ( st ) ;
goto cleanup ;
}
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " st=%p events=%d EOF=%d closed=%d " , st , events , stream - > recvEOF , stream - > closed ) ;
2009-08-24 23:53:48 +04:00
2011-10-07 20:38:59 +04:00
if ( ! stream - > closed & &
( events & VIR_STREAM_EVENT_WRITABLE ) ) {
2011-05-16 21:13:11 +04:00
if ( daemonStreamHandleWrite ( client , stream ) < 0 ) {
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
2009-08-24 23:53:48 +04:00
goto cleanup ;
}
}
2011-10-07 20:38:59 +04:00
if ( ! stream - > closed & & ! stream - > recvEOF & &
( events & ( VIR_STREAM_EVENT_READABLE ) ) ) {
events = events & ~ ( VIR_STREAM_EVENT_READABLE ) ;
2011-05-16 21:13:11 +04:00
if ( daemonStreamHandleRead ( client , stream ) < 0 ) {
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
2009-08-24 23:57:16 +04:00
goto cleanup ;
}
}
2011-05-16 21:13:11 +04:00
/* If we have a completion/abort message, always process it */
if ( stream - > rx ) {
virNetMessagePtr msg = stream - > rx ;
switch ( msg - > header . status ) {
case VIR_NET_CONTINUE :
/* nada */
break ;
case VIR_NET_OK :
virNetMessageQueueServe ( & stream - > rx ) ;
if ( daemonStreamHandleFinish ( client , stream , msg ) < 0 ) {
virNetMessageFree ( msg ) ;
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
break ;
case VIR_NET_ERROR :
default :
virNetMessageQueueServe ( & stream - > rx ) ;
if ( daemonStreamHandleAbort ( client , stream , msg ) < 0 ) {
virNetMessageFree ( msg ) ;
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
break ;
}
}
2011-10-07 20:38:59 +04:00
/* If we got HANGUP, we need to only send an empty
* packet so the client sees an EOF and cleans up
*/
if ( ! stream - > closed & & ! stream - > recvEOF & &
( events & VIR_STREAM_EVENT_HANGUP ) ) {
virNetMessagePtr msg ;
events & = ~ ( VIR_STREAM_EVENT_HANGUP ) ;
stream - > tx = 0 ;
stream - > recvEOF = 1 ;
if ( ! ( msg = virNetMessageNew ( false ) ) ) {
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
msg - > cb = daemonStreamMessageFinished ;
msg - > opaque = stream ;
stream - > refs + + ;
if ( virNetServerProgramSendStreamData ( remoteProgram ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
" " , 0 ) < 0 ) {
virNetMessageFree ( msg ) ;
daemonRemoveClientStream ( client , stream ) ;
virNetServerClientClose ( client ) ;
goto cleanup ;
}
}
2009-08-24 23:53:48 +04:00
if ( ! stream - > closed & &
( events & ( VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP ) ) ) {
int ret ;
2011-05-16 21:13:11 +04:00
virNetMessagePtr msg ;
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2009-08-24 23:53:48 +04:00
stream - > closed = 1 ;
2010-08-17 19:30:51 +04:00
virStreamEventRemoveCallback ( stream - > st ) ;
2009-08-24 23:53:48 +04:00
virStreamAbort ( stream - > st ) ;
if ( events & VIR_STREAM_EVENT_HANGUP )
2011-05-16 21:13:11 +04:00
virNetError ( VIR_ERR_RPC ,
" %s " , _ ( " stream had unexpected termination " ) ) ;
2009-08-24 23:53:48 +04:00
else
2011-05-16 21:13:11 +04:00
virNetError ( VIR_ERR_RPC ,
" %s " , _ ( " stream had I/O failure " ) ) ;
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 20:42:58 +04:00
msg = virNetMessageNew ( false ) ;
2011-05-16 21:13:11 +04:00
if ( ! msg ) {
ret = - 1 ;
} else {
ret = virNetServerProgramSendStreamError ( remoteProgram ,
client ,
msg ,
& rerr ,
stream - > procedure ,
stream - > serial ) ;
}
daemonRemoveClientStream ( client , stream ) ;
2009-08-24 23:53:48 +04:00
if ( ret < 0 )
2011-05-16 21:13:11 +04:00
virNetServerClientClose ( client ) ;
2009-08-24 23:53:48 +04:00
goto cleanup ;
}
if ( stream - > closed ) {
2011-05-16 21:13:11 +04:00
daemonRemoveClientStream ( client , stream ) ;
2009-08-24 23:53:48 +04:00
} else {
2011-05-16 21:13:11 +04:00
daemonStreamUpdateEvents ( stream ) ;
2009-08-24 23:53:48 +04:00
}
cleanup :
2011-05-16 21:13:11 +04:00
virMutexUnlock ( & priv - > lock ) ;
2009-08-24 23:53:48 +04:00
}
2009-07-10 16:06:36 +04:00
/*
* @ client : a locked client object
*
* Invoked by the main loop when filtering incoming messages .
*
* Returns 1 if the message was processed , 0 if skipped ,
* - 1 on fatal client error
*/
static int
2011-09-26 20:51:47 +04:00
daemonStreamFilter ( virNetServerClientPtr client ATTRIBUTE_UNUSED ,
2011-05-16 21:13:11 +04:00
virNetMessagePtr msg ,
void * opaque )
2009-07-10 16:06:36 +04:00
{
2011-05-16 21:13:11 +04:00
daemonClientStream * stream = opaque ;
int ret = 0 ;
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
virMutexLock ( & stream - > priv - > lock ) ;
if ( msg - > header . type ! = VIR_NET_STREAM )
goto cleanup ;
if ( ! virNetServerProgramMatches ( stream - > prog , msg ) )
goto cleanup ;
if ( msg - > header . proc ! = stream - > procedure | |
msg - > header . serial ! = stream - > serial )
goto cleanup ;
VIR_DEBUG ( " Incoming client=%p, rx=%p, serial=%d, proc=%d, status=%d " ,
client , stream - > rx , msg - > header . proc ,
msg - > header . serial , msg - > header . status ) ;
virNetMessageQueuePush ( & stream - > rx , msg ) ;
daemonStreamUpdateEvents ( stream ) ;
ret = 1 ;
cleanup :
virMutexUnlock ( & stream - > priv - > lock ) ;
return ret ;
2009-07-10 16:06:36 +04:00
}
/*
* @ conn : a connection object to associate the stream with
2011-05-16 21:13:11 +04:00
* @ header : the method call to associate with the stream
2009-07-10 16:06:36 +04:00
*
* Creates a new stream for this conn
*
* Returns a new stream object , or NULL upon OOM
*/
2011-05-16 21:13:11 +04:00
daemonClientStream *
daemonCreateClientStream ( virNetServerClientPtr client ,
virStreamPtr st ,
virNetServerProgramPtr prog ,
virNetMessageHeaderPtr header )
2009-07-10 16:06:36 +04:00
{
2011-05-16 21:13:11 +04:00
daemonClientStream * stream ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%d, st=%p " ,
client , header - > proc , header - > serial , st ) ;
2009-07-10 16:06:36 +04:00
2011-05-14 19:46:00 +04:00
if ( VIR_ALLOC ( stream ) < 0 ) {
virReportOOMError ( ) ;
2009-07-10 16:06:36 +04:00
return NULL ;
2011-05-14 19:46:00 +04:00
}
2009-07-10 16:06:36 +04:00
2011-08-31 20:01:01 +04:00
stream - > refs = 1 ;
2011-05-16 21:13:11 +04:00
stream - > priv = priv ;
stream - > prog = prog ;
stream - > procedure = header - > proc ;
stream - > serial = header - > serial ;
stream - > filterID = - 1 ;
stream - > st = st ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
virNetServerProgramRef ( prog ) ;
2009-07-10 16:06:36 +04:00
return stream ;
}
/*
* @ stream : an unused client stream
*
* Frees the memory associated with this inactive client
* stream
*/
2011-05-16 21:13:11 +04:00
int daemonFreeClientStream ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-07-10 16:06:36 +04:00
{
2011-05-16 21:13:11 +04:00
virNetMessagePtr msg ;
int ret = 0 ;
2009-07-10 16:06:36 +04:00
if ( ! stream )
2011-05-16 21:13:11 +04:00
return 0 ;
2011-08-31 20:01:01 +04:00
stream - > refs - - ;
if ( stream - > refs )
return 0 ;
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%d " ,
client , stream - > procedure , stream - > serial ) ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
virNetServerProgramFree ( stream - > prog ) ;
2009-07-10 16:06:36 +04:00
msg = stream - > rx ;
while ( msg ) {
2011-05-16 21:13:11 +04:00
virNetMessagePtr tmp = msg - > next ;
2011-08-15 02:44:45 +04:00
if ( client ) {
/* Send a dummy reply to free up 'msg' & unblock client rx */
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 20:42:58 +04:00
virNetMessageClear ( msg ) ;
2011-08-15 02:44:45 +04:00
msg - > header . type = VIR_NET_REPLY ;
if ( virNetServerClientSendMessage ( client , msg ) < 0 ) {
virNetServerClientImmediateClose ( client ) ;
virNetMessageFree ( msg ) ;
ret = - 1 ;
}
} else {
2011-05-16 21:13:11 +04:00
virNetMessageFree ( msg ) ;
}
2009-07-10 16:06:36 +04:00
msg = tmp ;
}
virStreamFree ( stream - > st ) ;
VIR_FREE ( stream ) ;
2011-05-16 21:13:11 +04:00
return ret ;
2009-07-10 16:06:36 +04:00
}
/*
* @ client : a locked client to add the stream to
* @ stream : a stream to add
*/
2011-05-16 21:13:11 +04:00
int daemonAddClientStream ( virNetServerClientPtr client ,
daemonClientStream * stream ,
bool transmit )
2009-07-10 16:06:36 +04:00
{
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%d, st=%p, transmit=%d " ,
client , stream - > procedure , stream - > serial , stream - > st , transmit ) ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
if ( stream - > filterID ! = - 1 ) {
VIR_WARN ( " Filter already added to client %p " , client ) ;
return - 1 ;
}
2009-07-10 16:06:36 +04:00
2009-08-24 23:53:48 +04:00
if ( virStreamEventAddCallback ( stream - > st , 0 ,
2011-07-08 15:33:52 +04:00
daemonStreamEvent , client ,
daemonStreamEventFreeFunc ) < 0 )
2009-08-24 23:53:48 +04:00
return - 1 ;
2011-07-08 15:33:52 +04:00
virNetServerClientRef ( client ) ;
2011-05-16 21:13:11 +04:00
if ( ( stream - > filterID = virNetServerClientAddFilter ( client ,
daemonStreamFilter ,
stream ) ) < 0 ) {
virStreamEventRemoveCallback ( stream - > st ) ;
return - 1 ;
2009-07-10 16:06:36 +04:00
}
2009-09-30 14:47:43 +04:00
if ( transmit )
stream - > tx = 1 ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
virMutexLock ( & priv - > lock ) ;
stream - > next = priv - > streams ;
priv - > streams = stream ;
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
daemonStreamUpdateEvents ( stream ) ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
virMutexUnlock ( & priv - > lock ) ;
2009-07-10 16:06:36 +04:00
2011-05-16 21:13:11 +04:00
return 0 ;
2009-07-10 16:06:36 +04:00
}
/*
* @ client : a locked client object
* @ stream : an inactive , closed stream object
*
* Removes a stream from the list of active streams for the client
*
* Returns 0 if the stream was removd , - 1 if it doesn ' t exist
*/
int
2011-05-16 21:13:11 +04:00
daemonRemoveClientStream ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-07-10 16:06:36 +04:00
{
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, proc=%d, serial=%d, st=%p " ,
client , stream - > procedure , stream - > serial , stream - > st ) ;
daemonClientPrivatePtr priv = virNetServerClientGetPrivateData ( client ) ;
daemonClientStream * curr = priv - > streams ;
daemonClientStream * prev = NULL ;
if ( stream - > filterID ! = - 1 ) {
virNetServerClientRemoveFilter ( client ,
stream - > filterID ) ;
stream - > filterID = - 1 ;
2009-07-10 16:06:36 +04:00
}
2010-08-17 19:30:51 +04:00
if ( ! stream - > closed ) {
virStreamEventRemoveCallback ( stream - > st ) ;
2009-07-10 16:06:36 +04:00
virStreamAbort ( stream - > st ) ;
2010-08-17 19:30:51 +04:00
}
2009-07-10 16:06:36 +04:00
while ( curr ) {
if ( curr = = stream ) {
if ( prev )
prev - > next = curr - > next ;
else
2011-05-16 21:13:11 +04:00
priv - > streams = curr - > next ;
return daemonFreeClientStream ( client , stream ) ;
2009-07-10 16:06:36 +04:00
}
prev = curr ;
curr = curr - > next ;
}
return - 1 ;
}
2009-08-24 23:53:48 +04:00
2011-08-15 02:44:45 +04:00
void
daemonRemoveAllClientStreams ( daemonClientStream * stream )
{
daemonClientStream * tmp ;
VIR_DEBUG ( " stream=%p " , stream ) ;
while ( stream ) {
tmp = stream - > next ;
if ( ! stream - > closed ) {
virStreamEventRemoveCallback ( stream - > st ) ;
virStreamAbort ( stream - > st ) ;
}
daemonFreeClientStream ( NULL , stream ) ;
VIR_DEBUG ( " next stream=%p " , tmp ) ;
stream = tmp ;
}
}
2009-08-24 23:53:48 +04:00
/*
* Returns :
* - 1 if fatal error occurred
* 0 if message was fully processed
* 1 if message is still being processed
*/
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleWriteData ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
2009-08-24 23:53:48 +04:00
{
int ret ;
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%d, len=%zu, offset=%zu " ,
client , stream , msg - > header . proc , msg - > header . serial ,
msg - > bufferLength , msg - > bufferOffset ) ;
2009-08-24 23:53:48 +04:00
ret = virStreamSend ( stream - > st ,
msg - > buffer + msg - > bufferOffset ,
msg - > bufferLength - msg - > bufferOffset ) ;
if ( ret > 0 ) {
msg - > bufferOffset + = ret ;
/* Partial write, so indicate we have more todo later */
if ( msg - > bufferOffset < msg - > bufferLength )
return 1 ;
} else if ( ret = = - 2 ) {
/* Blocking, so indicate we have more todo later */
return 1 ;
} else {
2011-05-16 21:13:11 +04:00
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2011-05-09 13:24:09 +04:00
VIR_INFO ( " Stream send failed " ) ;
2009-08-24 23:53:48 +04:00
stream - > closed = 1 ;
2011-05-16 21:13:11 +04:00
return virNetServerProgramSendReplyError ( stream - > prog ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
2009-08-24 23:53:48 +04:00
}
return 0 ;
}
/*
* Process an finish handshake from the client .
*
2011-05-16 21:13:11 +04:00
* Returns a VIR_NET_OK confirmation if successful , or a VIR_NET_ERROR
2009-08-24 23:53:48 +04:00
* if there was a stream error
*
* Returns 0 if successfully sent RPC reply , - 1 upon fatal error
*/
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleFinish ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
2009-08-24 23:53:48 +04:00
{
int ret ;
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%d " ,
client , stream , msg - > header . proc , msg - > header . serial ) ;
2009-08-24 23:53:48 +04:00
stream - > closed = 1 ;
2010-08-17 19:30:51 +04:00
virStreamEventRemoveCallback ( stream - > st ) ;
2009-08-24 23:53:48 +04:00
ret = virStreamFinish ( stream - > st ) ;
if ( ret < 0 ) {
2011-05-16 21:13:11 +04:00
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
return virNetServerProgramSendReplyError ( stream - > prog ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
2009-08-24 23:53:48 +04:00
} else {
/* Send zero-length confirm */
2011-05-16 21:13:11 +04:00
return virNetServerProgramSendStreamData ( stream - > prog ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
NULL , 0 ) ;
2009-08-24 23:53:48 +04:00
}
}
/*
* Process an abort request from the client .
*
* Returns 0 if successfully aborted , - 1 upon error
*/
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleAbort ( virNetServerClientPtr client ,
daemonClientStream * stream ,
virNetMessagePtr msg )
2009-08-24 23:53:48 +04:00
{
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, stream=%p, proc=%d, serial=%d " ,
client , stream , msg - > header . proc , msg - > header . serial ) ;
virNetMessageError rerr ;
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2009-08-24 23:53:48 +04:00
stream - > closed = 1 ;
2010-08-17 19:30:51 +04:00
virStreamEventRemoveCallback ( stream - > st ) ;
2009-08-24 23:53:48 +04:00
virStreamAbort ( stream - > st ) ;
2011-05-16 21:13:11 +04:00
if ( msg - > header . status = = VIR_NET_ERROR )
virNetError ( VIR_ERR_RPC ,
" %s " , _ ( " stream aborted at client request " ) ) ;
2009-08-24 23:53:48 +04:00
else {
2011-05-16 21:13:11 +04:00
VIR_WARN ( " unexpected stream status %d " , msg - > header . status ) ;
virNetError ( VIR_ERR_RPC ,
_ ( " stream aborted with unexpected status %d " ) ,
msg - > header . status ) ;
2009-08-24 23:53:48 +04:00
}
2011-05-16 21:13:11 +04:00
return virNetServerProgramSendReplyError ( remoteProgram ,
client ,
msg ,
& rerr ,
& msg - > header ) ;
2009-08-24 23:53:48 +04:00
}
/*
* Called when the stream is signalled has being able to accept
* data writes . Will process all pending incoming messages
* until they ' re all gone , or I / O blocks
*
* Returns 0 on success , or - 1 upon fatal error
*/
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleWrite ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-08-24 23:53:48 +04:00
{
2011-05-16 21:13:11 +04:00
VIR_DEBUG ( " client=%p, stream=%p " , client , stream ) ;
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
while ( stream - > rx & & ! stream - > closed ) {
virNetMessagePtr msg = stream - > rx ;
2009-08-24 23:53:48 +04:00
int ret ;
2011-05-16 21:13:11 +04:00
switch ( msg - > header . status ) {
case VIR_NET_OK :
ret = daemonStreamHandleFinish ( client , stream , msg ) ;
2009-08-24 23:53:48 +04:00
break ;
2011-05-16 21:13:11 +04:00
case VIR_NET_CONTINUE :
ret = daemonStreamHandleWriteData ( client , stream , msg ) ;
2009-08-24 23:53:48 +04:00
break ;
2011-05-16 21:13:11 +04:00
case VIR_NET_ERROR :
2009-08-24 23:53:48 +04:00
default :
2011-05-16 21:13:11 +04:00
ret = daemonStreamHandleAbort ( client , stream , msg ) ;
2009-08-24 23:53:48 +04:00
break ;
}
2011-05-16 21:13:11 +04:00
if ( ret > 0 )
break ; /* still processing data from msg */
2009-08-24 23:53:48 +04:00
2011-05-16 21:13:11 +04:00
virNetMessageQueueServe ( & stream - > rx ) ;
if ( ret < 0 ) {
virNetMessageFree ( msg ) ;
2011-07-08 15:54:29 +04:00
virNetServerClientImmediateClose ( client ) ;
2011-05-16 21:13:11 +04:00
return - 1 ;
}
2011-06-30 15:28:10 +04:00
/* 'CONTINUE' messages don't send a reply (unless error
* occurred ) , so to release the ' msg ' object we need to
* send a fake zero - length reply . Nothing actually gets
* onto the wire , but this causes the client to reset
* its active request count / throttling
*/
if ( msg - > header . status = = VIR_NET_CONTINUE ) {
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 20:42:58 +04:00
virNetMessageClear ( msg ) ;
2011-06-30 15:28:10 +04:00
msg - > header . type = VIR_NET_REPLY ;
if ( virNetServerClientSendMessage ( client , msg ) < 0 ) {
virNetMessageFree ( msg ) ;
2011-07-08 15:54:29 +04:00
virNetServerClientImmediateClose ( client ) ;
2011-06-30 15:28:10 +04:00
return - 1 ;
}
}
2009-08-24 23:53:48 +04:00
}
return 0 ;
}
2009-08-24 23:57:16 +04:00
/*
* Invoked when a stream is signalled as having data
* available to read . This reads upto one message
* worth of data , and then queues that for transmission
* to the client .
*
* Returns 0 if data was queued for TX , or a error RPC
* was sent , or - 1 on fatal error , indicating client should
* be killed
*/
static int
2011-05-16 21:13:11 +04:00
daemonStreamHandleRead ( virNetServerClientPtr client ,
daemonClientStream * stream )
2009-08-24 23:57:16 +04:00
{
char * buffer ;
2011-05-16 21:13:11 +04:00
size_t bufferLen = VIR_NET_MESSAGE_PAYLOAD_MAX ;
2009-08-24 23:57:16 +04:00
int ret ;
2011-08-17 03:20:58 +04:00
VIR_DEBUG ( " client=%p, stream=%p tx=%d closed=%d " ,
client , stream , stream - > tx , stream - > closed ) ;
/* We might have had an event pending before we shut
* down the stream , so if we ' re marked as closed ,
* then do nothing
*/
if ( stream - > closed )
return 0 ;
2009-08-24 23:57:16 +04:00
/* Shouldn't ever be called unless we're marked able to
* transmit , but doesn ' t hurt to check */
if ( ! stream - > tx )
return 0 ;
if ( VIR_ALLOC_N ( buffer , bufferLen ) < 0 )
return - 1 ;
ret = virStreamRecv ( stream - > st , buffer , bufferLen ) ;
if ( ret = = - 2 ) {
/* Should never get this, since we're only called when we know
* we ' re readable , but hey things change . . . */
ret = 0 ;
} else if ( ret < 0 ) {
2011-05-16 21:13:11 +04:00
virNetMessagePtr msg ;
virNetMessageError rerr ;
memset ( & rerr , 0 , sizeof ( rerr ) ) ;
2009-08-24 23:57:16 +04:00
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 20:42:58 +04:00
if ( ! ( msg = virNetMessageNew ( false ) ) )
2011-05-16 21:13:11 +04:00
ret = - 1 ;
else
ret = virNetServerProgramSendStreamError ( remoteProgram ,
client ,
msg ,
& rerr ,
stream - > procedure ,
stream - > serial ) ;
2009-08-24 23:57:16 +04:00
} else {
2011-05-16 21:13:11 +04:00
virNetMessagePtr msg ;
2009-08-24 23:57:16 +04:00
stream - > tx = 0 ;
if ( ret = = 0 )
stream - > recvEOF = 1 ;
Fix tracking of RPC messages wrt streams
Commit 2c85644b0b51fbe5b6244e6773531af29933a727 attempted to
fix a problem with tracking RPC messages from streams by doing
- if (msg->header.type == VIR_NET_REPLY) {
+ if (msg->header.type == VIR_NET_REPLY ||
+ (msg->header.type == VIR_NET_STREAM &&
+ msg->header.status != VIR_NET_CONTINUE)) {
client->nrequests--;
In other words any stream packet, with status NET_OK or NET_ERROR
would cause nrequests to be decremented. This is great if the
packet from from a synchronous virStreamFinish or virStreamAbort
API call, but wildly wrong if from a server initiated abort.
The latter resulted in 'nrequests' being decremented below zero.
This then causes all I/O for that client to be stopped.
Instead of trying to infer whether we need to decrement the
nrequests field, from the message type/status, introduce an
explicit 'bool tracked' field to mark whether the virNetMessagePtr
object is subject to tracking.
Also add a virNetMessageClear function to allow a message
contents to be cleared out, without adversely impacting the
'tracked' field as a naive memset() would do
* src/rpc/virnetmessage.c, src/rpc/virnetmessage.h: Add
a 'bool tracked' field and virNetMessageClear() API
* daemon/remote.c, daemon/stream.c, src/rpc/virnetclientprogram.c,
src/rpc/virnetclientstream.c, src/rpc/virnetserverclient.c,
src/rpc/virnetserverprogram.c: Switch over to use
virNetMessageClear() and pass in the 'bool tracked' value
when creating messages.
2011-08-31 20:42:58 +04:00
if ( ! ( msg = virNetMessageNew ( false ) ) )
2011-05-16 21:13:11 +04:00
ret = - 1 ;
if ( msg ) {
msg - > cb = daemonStreamMessageFinished ;
msg - > opaque = stream ;
2011-08-31 20:01:01 +04:00
stream - > refs + + ;
2011-05-16 21:13:11 +04:00
ret = virNetServerProgramSendStreamData ( remoteProgram ,
client ,
msg ,
stream - > procedure ,
stream - > serial ,
buffer , ret ) ;
}
2009-08-24 23:57:16 +04:00
}
VIR_FREE ( buffer ) ;
return ret ;
}