2012-07-12 12:28:27 +02:00
package PVE::QMPClient ;
use strict ;
use PVE::QemuServer ;
use IO::Multiplex ;
2012-08-27 13:41:24 +02:00
use POSIX qw( EINTR EAGAIN ) ;
2012-07-12 12:28:27 +02:00
use JSON ;
2012-08-27 13:41:24 +02:00
use Time::HiRes qw( usleep gettimeofday tv_interval ) ;
2012-10-29 12:15:43 +01:00
use Scalar::Util qw( weaken ) ;
2012-12-06 08:39:03 +01:00
use PVE::IPCC ;
2012-08-27 13:41:24 +02:00
2012-07-12 12:28:27 +02:00
use Data::Dumper ;
# Qemu Monitor Protocol (QMP) client.
#
# This implementation uses IO::Multiplex (libio-multiplex-perl) and
# allows you to issue qmp commands to different VMs in parallel.
# Note: kvm can onyl handle 1 connection, so we close connections asap
sub new {
my ( $ class , $ eventcb ) = @ _ ;
my $ mux = new IO:: Multiplex ;
my $ self = bless {
mux = > $ mux ,
fhs = > { } , # $vmid => fh
fhs_lookup = > { } , # $fh => $vmid
queue = > { } ,
current = > { } ,
errors = > { } ,
} , $ class ;
$ self - > { eventcb } = $ eventcb if $ eventcb ;
$ mux - > set_callback_object ( $ self ) ;
2012-12-06 08:39:03 +01:00
# make sure perl doesn't believe this is a circular reference as we
2012-10-29 12:15:43 +01:00
# delete mux in DESTROY
weaken ( $ mux - > { _object } ) ;
2012-07-12 12:28:27 +02:00
return $ self ;
}
2012-12-06 08:39:03 +01:00
# add a single command to the queue for later execution
2012-07-12 12:28:27 +02:00
# with queue_execute()
sub queue_cmd {
my ( $ self , $ vmid , $ callback , $ execute , % params ) = @ _ ;
my $ cmd = { } ;
$ cmd - > { execute } = $ execute ;
$ cmd - > { arguments } = \ % params ;
$ cmd - > { callback } = $ callback ;
push @ { $ self - > { queue } - > { $ vmid } } , $ cmd ;
}
# execute a single command
sub cmd {
2012-07-13 12:36:40 +02:00
my ( $ self , $ vmid , $ cmd , $ timeout ) = @ _ ;
2012-07-12 12:28:27 +02:00
my $ result ;
my $ callback = sub {
my ( $ vmid , $ resp ) = @ _ ;
$ result = $ resp - > { 'return' } ;
} ;
2012-07-13 12:36:40 +02:00
die "no command specified" if ! ( $ cmd && $ cmd - > { execute } ) ;
2012-07-12 12:28:27 +02:00
$ cmd - > { callback } = $ callback ;
$ cmd - > { arguments } = { } if ! defined ( $ cmd - > { arguments } ) ;
$ self - > { queue } - > { $ vmid } = [ $ cmd ] ;
2012-07-13 12:36:40 +02:00
if ( ! $ timeout ) {
# hack: monitor sometime blocks
if ( $ cmd - > { execute } eq 'query-migrate' ) {
$ timeout = 60 * 60 ; # 1 hour
} elsif ( $ cmd - > { execute } =~ m/^(eject|change)/ ) {
$ timeout = 60 ; # note: cdrom mount command is slow
2012-09-24 10:43:19 +02:00
} elsif ( $ cmd - > { execute } eq 'savevm-start' ||
$ cmd - > { execute } eq 'savevm-end' ||
2012-12-06 08:39:03 +01:00
$ cmd - > { execute } eq 'query-backup' ||
2013-03-01 10:57:15 +01:00
$ cmd - > { execute } eq 'backup-cancel' ||
2012-09-24 10:43:19 +02:00
$ cmd - > { execute } eq 'query-savevm' ||
2012-09-12 13:32:12 +02:00
$ cmd - > { execute } eq 'delete-drive-snapshot' ||
$ cmd - > { execute } eq 'snapshot-drive' ) {
$ timeout = 10 * 60 ; # 10 mins ?
2012-08-27 13:13:36 +02:00
} else {
$ timeout = 3 ; # default
2012-07-13 12:36:40 +02:00
}
}
$ self - > queue_execute ( $ timeout ) ;
2012-07-12 12:28:27 +02:00
my $ cmdstr = $ cmd - > { execute } || '' ;
die "VM $vmid qmp command '$cmdstr' failed - $self->{errors}->{$vmid}"
2012-12-06 08:39:03 +01:00
if defined ( $ self - > { errors } - > { $ vmid } ) ;
2012-07-12 12:28:27 +02:00
return $ result ;
} ;
my $ cmdid_seq = 0 ;
my $ next_cmdid = sub {
$ cmdid_seq + + ;
return "$$:$cmdid_seq" ;
} ;
my $ close_connection = sub {
my ( $ self , $ vmid ) = @ _ ;
2012-12-06 08:39:03 +01:00
2012-07-12 12:28:27 +02:00
my $ fh = $ self - > { fhs } - > { $ vmid } ;
return if ! $ fh ;
2012-12-06 08:39:03 +01:00
2012-07-12 12:28:27 +02:00
delete $ self - > { fhs } - > { $ vmid } ;
delete $ self - > { fhs_lookup } - > { $ fh } ;
$ self - > { mux } - > close ( $ fh ) ;
} ;
my $ open_connection = sub {
2012-09-25 09:27:24 +02:00
my ( $ self , $ vmid , $ timeout ) = @ _ ;
2012-07-12 12:28:27 +02:00
my $ sname = PVE::QemuServer:: qmp_socket ( $ vmid ) ;
2012-09-25 09:27:24 +02:00
$ timeout = 1 if ! $ timeout ;
2012-08-27 13:41:24 +02:00
my $ fh ;
my $ starttime = [ gettimeofday ] ;
my $ count = 0 ;
for ( ; ; ) {
$ count + + ;
$ fh = IO::Socket::UNIX - > new ( Peer = > $ sname , Blocking = > 0 , Timeout = > 1 ) ;
last if $ fh ;
if ( $! != EINTR && $! != EAGAIN ) {
die "unable to connect to VM $vmid socket - $!\n" ;
}
my $ elapsed = tv_interval ( $ starttime , [ gettimeofday ] ) ;
2012-09-25 09:27:24 +02:00
if ( $ elapsed >= $ timeout ) {
2012-08-27 13:41:24 +02:00
die "unable to connect to VM $vmid socket - timeout after $count retries\n" ;
}
usleep ( 100000 ) ;
}
2012-07-12 12:28:27 +02:00
$ self - > { fhs } - > { $ vmid } = $ fh ;
$ self - > { fhs_lookup } - > { $ fh } = $ vmid ;
$ self - > { mux } - > add ( $ fh ) ;
2012-12-06 08:39:03 +01:00
2012-07-12 12:28:27 +02:00
return $ fh ;
} ;
my $ check_queue = sub {
my ( $ self ) = @ _ ;
my $ running = 0 ;
2012-12-06 08:39:03 +01:00
2012-07-12 12:28:27 +02:00
foreach my $ vmid ( keys % { $ self - > { queue } } ) {
my $ fh = $ self - > { fhs } - > { $ vmid } ;
next if ! $ fh ;
if ( $ self - > { errors } - > { $ vmid } ) {
& $ close_connection ( $ self , $ vmid ) ;
next ;
}
if ( $ self - > { current } - > { $ vmid } ) { # command running, waiting for response
$ running + + ;
next ;
}
if ( ! scalar ( @ { $ self - > { queue } - > { $ vmid } } ) ) { # no more commands for the VM
& $ close_connection ( $ self , $ vmid ) ;
next ;
}
eval {
my $ cmd = $ self - > { current } - > { $ vmid } = shift @ { $ self - > { queue } - > { $ vmid } } ;
$ cmd - > { id } = & $ next_cmdid ( ) ;
2012-12-06 08:39:03 +01:00
my $ fd = - 1 ;
2012-12-06 09:01:56 +01:00
if ( $ cmd - > { execute } eq 'add-fd' || $ cmd - > { execute } eq 'getfd' ) {
2012-12-06 08:39:03 +01:00
$ fd = $ cmd - > { arguments } - > { fd } ;
delete $ cmd - > { arguments } - > { fd } ;
}
2012-07-12 12:28:27 +02:00
my $ qmpcmd = to_json ( {
execute = > $ cmd - > { execute } ,
arguments = > $ cmd - > { arguments } ,
id = > $ cmd - > { id } } ) ;
2012-12-06 08:39:03 +01:00
if ( $ fd >= 0 ) {
my $ ret = PVE::IPCC:: sendfd ( fileno ( $ fh ) , $ fd , $ qmpcmd ) ;
die "sendfd failed" if $ ret < 0 ;
} else {
$ self - > { mux } - > write ( $ fh , $ qmpcmd ) ;
}
2012-07-12 12:28:27 +02:00
} ;
if ( my $ err = $@ ) {
$ self - > { errors } - > { $ vmid } = $ err ;
} else {
$ running + + ;
}
}
$ self - > { mux } - > endloop ( ) if ! $ running ;
return $ running ;
} ;
# execute all queued command
sub queue_execute {
my ( $ self , $ timeout ) = @ _ ;
$ timeout = 3 if ! $ timeout ;
$ self - > { current } = { } ;
$ self - > { errors } = { } ;
# open all necessary connections
foreach my $ vmid ( keys % { $ self - > { queue } } ) {
next if ! scalar ( @ { $ self - > { queue } - > { $ vmid } } ) ; # no commands for the VM
eval {
2012-09-25 09:27:24 +02:00
my $ fh = & $ open_connection ( $ self , $ vmid , $ timeout ) ;
2012-07-12 12:28:27 +02:00
my $ cmd = { execute = > 'qmp_capabilities' , arguments = > { } } ;
unshift @ { $ self - > { queue } - > { $ vmid } } , $ cmd ;
$ self - > { mux } - > set_timeout ( $ fh , $ timeout ) ;
} ;
if ( my $ err = $@ ) {
warn $ err ;
$ self - > { errors } - > { $ vmid } = $ err ;
}
}
my $ running ;
for ( ; ; ) {
$ running = & $ check_queue ( $ self ) ;
last if ! $ running ;
$ self - > { mux } - > loop ;
}
# make sure we close everything
foreach my $ vmid ( keys % { $ self - > { fhs } } ) {
& $ close_connection ( $ self , $ vmid ) ;
}
$ self - > { queue } = $ self - > { current } = $ self - > { fhs } = $ self - > { fhs_lookup } = { } ;
}
# mux_input is called when input is available on one of
# the descriptors.
sub mux_input {
my ( $ self , $ mux , $ fh , $ input ) = @ _ ;
2013-04-18 08:26:23 +02:00
return if $$ input !~ s/^(.*})\r\n(.*)$/$2/so ;
2012-07-12 12:28:27 +02:00
2013-04-18 08:26:23 +02:00
my $ raw = $ 1 ;
2012-07-12 12:28:27 +02:00
my $ vmid = $ self - > { fhs_lookup } - > { $ fh } ;
if ( ! $ vmid ) {
warn "internal error - unable to lookup vmid" ;
return ;
}
eval {
my @ jsons = split ( "\n" , $ raw ) ;
foreach my $ json ( @ jsons ) {
my $ obj = from_json ( $ json ) ;
next if defined ( $ obj - > { QMP } ) ; # skip monitor greeting
if ( exists ( $ obj - > { error } - > { desc } ) ) {
my $ desc = $ obj - > { error } - > { desc } ;
chomp $ desc ;
die "$desc\n" if $ desc !~ m/Connection can not be completed immediately/ ;
next ;
}
if ( defined ( $ obj - > { event } ) ) {
if ( my $ eventcb = $ self - > { eventcb } ) {
& $ eventcb ( $ obj ) ;
}
next ;
}
my $ cmdid = $ obj - > { id } ;
die "received responsed without command id\n" if ! $ cmdid ;
my $ curcmd = $ self - > { current } - > { $ vmid } ;
die "unable to lookup current command for VM $vmid\n" if ! $ curcmd ;
delete $ self - > { current } - > { $ vmid } ;
2012-12-06 08:39:03 +01:00
2012-07-12 12:28:27 +02:00
if ( $ curcmd - > { id } ne $ cmdid ) {
die "got wrong command id '$cmdid' (expected $curcmd->{id})\n" ;
}
if ( my $ callback = $ curcmd - > { callback } ) {
& $ callback ( $ vmid , $ obj ) ;
}
}
} ;
if ( my $ err = $@ ) {
$ self - > { errors } - > { $ vmid } = $ err ;
}
& $ check_queue ( $ self ) ;
}
# This gets called every second to update player info, etc...
sub mux_timeout {
my ( $ self , $ mux , $ fh ) = @ _ ;
if ( my $ vmid = $ self - > { fhs_lookup } - > { $ fh } ) {
$ self - > { errors } - > { $ vmid } = "got timeout\n" ;
}
& $ check_queue ( $ self ) ;
}
2012-07-13 07:06:22 +02:00
1 ;