2022-08-29 19:45:06 +08:00
use std ::sync ::{ Arc , RwLock } ;
2021-03-29 15:59:14 +08:00
use crate ::client ::* ;
use hbb_common ::{
allow_err , bail ,
2022-07-31 19:06:49 +08:00
config ::READ_TIMEOUT ,
2021-06-25 19:42:51 +08:00
futures ::{ SinkExt , StreamExt } ,
2021-03-29 15:59:14 +08:00
log ,
message_proto ::* ,
protobuf ::Message as _ ,
2021-07-27 23:53:12 +08:00
rendezvous_proto ::ConnType ,
2021-03-29 15:59:14 +08:00
tcp , timeout ,
2021-06-25 19:42:51 +08:00
tokio ::{ self , net ::TcpStream , sync ::mpsc } ,
2021-03-29 15:59:14 +08:00
tokio_util ::codec ::{ BytesCodec , Framed } ,
ResultType , Stream ,
} ;
fn run_rdp ( port : u16 ) {
2022-05-12 17:35:25 +08:00
std ::process ::Command ::new ( " cmdkey " )
. arg ( " /delete:localhost " )
. output ( )
. ok ( ) ;
let username = std ::env ::var ( " rdp_username " ) . unwrap_or_default ( ) ;
let password = std ::env ::var ( " rdp_password " ) . unwrap_or_default ( ) ;
if ! username . is_empty ( ) | | ! password . is_empty ( ) {
let mut args = vec! [ " /generic:localhost " . to_owned ( ) ] ;
if ! username . is_empty ( ) {
args . push ( format! ( " /user: {} " , username ) ) ;
}
if ! password . is_empty ( ) {
args . push ( format! ( " /pass: {} " , password ) ) ;
}
println! ( " {:?} " , args ) ;
std ::process ::Command ::new ( " cmdkey " )
. args ( & args )
. output ( )
. ok ( ) ;
}
2021-03-29 15:59:14 +08:00
std ::process ::Command ::new ( " mstsc " )
. arg ( format! ( " /v:localhost: {} " , port ) )
. spawn ( )
. ok ( ) ;
}
pub async fn listen (
id : String ,
2022-07-27 00:31:20 +08:00
password : String ,
2021-03-29 15:59:14 +08:00
port : i32 ,
interface : impl Interface ,
ui_receiver : mpsc ::UnboundedReceiver < Data > ,
2022-05-12 17:35:25 +08:00
key : & str ,
token : & str ,
2022-08-29 19:45:06 +08:00
lc : Arc < RwLock < LoginConfigHandler > > ,
remote_host : String ,
remote_port : i32 ,
2021-03-29 15:59:14 +08:00
) -> ResultType < ( ) > {
2021-06-26 01:14:22 +08:00
let listener = tcp ::new_listener ( format! ( " 0.0.0.0: {} " , port ) , true ) . await ? ;
2021-03-29 15:59:14 +08:00
let addr = listener . local_addr ( ) ? ;
log ::info! ( " listening on port {:?} " , addr ) ;
let is_rdp = port = = 0 ;
if is_rdp {
run_rdp ( addr . port ( ) ) ;
}
let mut ui_receiver = ui_receiver ;
loop {
tokio ::select! {
Ok ( ( forward , addr ) ) = listener . accept ( ) = > {
log ::info! ( " new connection from {:?} " , addr ) ;
2022-08-29 19:45:06 +08:00
lc . write ( ) . unwrap ( ) . port_forward = ( remote_host . clone ( ) , remote_port ) ;
2021-03-29 15:59:14 +08:00
let id = id . clone ( ) ;
2022-07-27 00:31:20 +08:00
let password = password . clone ( ) ;
2021-03-29 15:59:14 +08:00
let mut forward = Framed ::new ( forward , BytesCodec ::new ( ) ) ;
2022-07-27 00:31:20 +08:00
match connect_and_login ( & id , & password , & mut ui_receiver , interface . clone ( ) , & mut forward , key , token , is_rdp ) . await {
2021-03-29 15:59:14 +08:00
Ok ( Some ( stream ) ) = > {
let interface = interface . clone ( ) ;
tokio ::spawn ( async move {
if let Err ( err ) = run_forward ( forward , stream ) . await {
2022-10-14 11:19:49 +08:00
interface . msgbox ( " error " , " Error " , & err . to_string ( ) , " " ) ;
2021-03-29 15:59:14 +08:00
}
log ::info! ( " connection from {:?} closed " , addr ) ;
} ) ;
}
Err ( err ) = > {
2022-10-14 11:19:49 +08:00
interface . msgbox ( " error " , " Error " , & err . to_string ( ) , " " ) ;
2021-03-29 15:59:14 +08:00
}
_ = > { }
}
}
d = ui_receiver . recv ( ) = > {
match d {
Some ( Data ::Close ) = > {
break ;
}
Some ( Data ::NewRDP ) = > {
run_rdp ( addr . port ( ) ) ;
}
_ = > { }
}
}
}
}
Ok ( ( ) )
}
async fn connect_and_login (
id : & str ,
2022-07-27 00:31:20 +08:00
password : & str ,
2021-03-29 15:59:14 +08:00
ui_receiver : & mut mpsc ::UnboundedReceiver < Data > ,
interface : impl Interface ,
forward : & mut Framed < TcpStream , BytesCodec > ,
2022-05-12 17:35:25 +08:00
key : & str ,
token : & str ,
2021-07-27 23:53:12 +08:00
is_rdp : bool ,
2022-07-31 19:06:49 +08:00
) -> ResultType < Option < Stream > > {
let mut res = connect_and_login_2 (
id ,
password ,
ui_receiver ,
interface . clone ( ) ,
forward ,
key ,
token ,
is_rdp ,
)
. await ;
if res . is_err ( ) & & interface . is_force_relay ( ) {
res = connect_and_login_2 (
id ,
password ,
ui_receiver ,
interface ,
forward ,
key ,
token ,
is_rdp ,
)
. await ;
}
res
}
async fn connect_and_login_2 (
id : & str ,
password : & str ,
ui_receiver : & mut mpsc ::UnboundedReceiver < Data > ,
interface : impl Interface ,
forward : & mut Framed < TcpStream , BytesCodec > ,
key : & str ,
token : & str ,
is_rdp : bool ,
2021-03-29 15:59:14 +08:00
) -> ResultType < Option < Stream > > {
2021-07-27 23:53:12 +08:00
let conn_type = if is_rdp {
ConnType ::RDP
} else {
ConnType ::PORT_FORWARD
} ;
2022-07-31 19:06:49 +08:00
let ( mut stream , direct ) = Client ::start ( id , key , token , conn_type , interface . clone ( ) ) . await ? ;
2021-03-29 15:59:14 +08:00
let mut interface = interface ;
let mut buffer = Vec ::new ( ) ;
2022-07-31 19:06:49 +08:00
let mut received = false ;
2021-03-29 15:59:14 +08:00
loop {
tokio ::select! {
2022-07-31 19:06:49 +08:00
res = timeout ( READ_TIMEOUT , stream . next ( ) ) = > match res {
2021-03-29 15:59:14 +08:00
Err ( _ ) = > {
bail! ( " Timeout " ) ;
}
Ok ( Some ( Ok ( bytes ) ) ) = > {
2022-07-31 19:06:49 +08:00
received = true ;
2021-03-29 15:59:14 +08:00
let msg_in = Message ::parse_from_bytes ( & bytes ) ? ;
match msg_in . union {
2022-07-14 17:20:01 +08:00
Some ( message ::Union ::Hash ( hash ) ) = > {
2022-07-27 00:31:20 +08:00
interface . handle_hash ( password , hash , & mut stream ) . await ;
2021-03-29 15:59:14 +08:00
}
2022-07-14 17:20:01 +08:00
Some ( message ::Union ::LoginResponse ( lr ) ) = > match lr . union {
Some ( login_response ::Union ::Error ( err ) ) = > {
2021-03-29 15:59:14 +08:00
interface . handle_login_error ( & err ) ;
return Ok ( None ) ;
}
2022-07-14 17:20:01 +08:00
Some ( login_response ::Union ::PeerInfo ( pi ) ) = > {
2021-03-29 15:59:14 +08:00
interface . handle_peer_info ( pi ) ;
break ;
}
_ = > { }
}
2022-07-14 17:20:01 +08:00
Some ( message ::Union ::TestDelay ( t ) ) = > {
2021-03-29 15:59:14 +08:00
interface . handle_test_delay ( t , & mut stream ) . await ;
}
_ = > { }
}
}
2022-07-31 19:06:49 +08:00
Ok ( Some ( Err ( err ) ) ) = > {
log ::error! ( " Connection closed: {} " , err ) ;
interface . set_force_relay ( direct , received ) ;
bail! ( " Connection closed: {} " , err ) ;
}
2021-03-29 15:59:14 +08:00
_ = > {
bail! ( " Reset by the peer " ) ;
}
} ,
d = ui_receiver . recv ( ) = > {
match d {
Some ( Data ::Login ( ( password , remember ) ) ) = > {
interface . handle_login_from_ui ( password , remember , & mut stream ) . await ;
}
_ = > { }
}
} ,
res = forward . next ( ) = > {
if let Some ( Ok ( bytes ) ) = res {
buffer . extend ( bytes ) ;
} else {
return Ok ( None ) ;
}
} ,
}
}
stream . set_raw ( ) ;
if ! buffer . is_empty ( ) {
allow_err! ( stream . send_bytes ( buffer . into ( ) ) . await ) ;
}
Ok ( Some ( stream ) )
}
async fn run_forward ( forward : Framed < TcpStream , BytesCodec > , stream : Stream ) -> ResultType < ( ) > {
log ::info! ( " new port forwarding connection started " ) ;
let mut forward = forward ;
let mut stream = stream ;
loop {
tokio ::select! {
res = forward . next ( ) = > {
if let Some ( Ok ( bytes ) ) = res {
allow_err! ( stream . send_bytes ( bytes . into ( ) ) . await ) ;
} else {
break ;
}
} ,
res = stream . next ( ) = > {
if let Some ( Ok ( bytes ) ) = res {
2022-07-15 01:29:52 +08:00
allow_err! ( forward . send ( bytes ) . await ) ;
2021-03-29 15:59:14 +08:00
} else {
break ;
}
} ,
}
}
Ok ( ( ) )
}