Advancing on actor

This commit is contained in:
Adolfo Gómez García 2019-11-29 09:14:49 +01:00
parent e5b4fb393f
commit 79f61098af
12 changed files with 315 additions and 687 deletions

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 Virtual Cable S.L.
# Copyright (c) 2014-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,

View File

View File

@ -0,0 +1,16 @@
import typing
if typing.TYPE_CHECKING:
from ..service import CommonService
class Handler:
_service: 'CommonService'
_method: str
_params: typing.MutableMapping[str, str]
def __init__(self, service: 'CommonService', method: str, params: typing.MutableMapping[str, str]):
self._service = service
self._method = method
self._params = params

View File

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
from . import handler
if typing.TYPE_CHECKING:
from ..service import CommonService
class LocalProvider(handler.Handler):
def post_login(self) -> typing.Any:
return 'ok'
def post_logout(self) -> typing.Any:
return 'ok'
def post_ping(self) -> typing.Any:
return 'ok'

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
from .. import tools
from . import handler
from ..log import logger
if typing.TYPE_CHECKING:
from ..service import CommonService
class PublicProvider(handler.Handler):
def post_logoff(self) -> typing.Any:
logger.debug('Sending LOGOFF to clients')
# TODO: invoke through service
return 'ok'
# Alias
post_logout = post_logoff
def post_message(self) -> typing.Any:
logger.debug('Sending MESSAGE to clients')
if 'message' not in self._params:
raise Exception('Invalid message parameters')
# TODO: invoke through service
return 'ok'
def post_script(self) -> typing.Any:
logger.debug('Received script: {}'.format(self._params))
if 'script' not in self._params:
raise Exception('Invalid script parameters')
if 'user' in self._params:
logger.debug('Sending SCRIPT to client')
# TODO: invoke through service
else:
# Execute script at server space, that is, here
# as a parallel thread
th = tools.ScriptExecutorThread(self._params['script'])
th.start()
return 'ok'
def post_preConnect(self) -> typing.Any:
logger.debug('Received Pre connection')
if 'user' not in self._params or 'protocol' not in self._params:
raise Exception('Invalid preConnect parameters')
return self._service.preConnect(self._params['user'], self._params['protocol'])
def get_information(self) -> typing.Any:
# Return something useful? :)
return 'UDS Actor Secure Server'
def get_uuid(self) -> typing.Any:
return self._service._cfg.own_token # pylint: disable=protected-access

View File

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import threading
import http.server
import json
import time
import ssl
import typing
from ..log import logger
from .. import certs
from .public import PublicProvider
from .local import LocalProvider
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from ..service import CommonService
from .handler import Handler
LISTEN_PORT = 43910
startTime = time.time()
class HTTPServerHandler(http.server.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.0'
server_version = 'UDS Actor Server'
sys_version = ''
_uuid: typing.Optional[str] = None
_service: typing.Optional['CommonService'] = None
def sendJsonResponse(self, result: typing.Optional[typing.Any] = None, error: typing.Optional[str] = None, code: int = 200) -> None:
data = json.dumps({'result': result, 'error': error})
self.send_response(code)
self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', str(len(data)))
self.send_header('Server: ', self.server_version)
self.end_headers()
self.wfile.write(data.encode())
def process(self, method: str, params: typing.MutableMapping[str, str]) -> None:
# Very simple path & params splitter
path = self.path.split('?')[0][1:].split('/')
handlerType: typing.Optional[typing.Type['Handler']] = None
logger.debug('Path: {}, uuid: {}'.format(path, self._uuid))
if len(path) == 3 and path[0] == 'actor' and path[1] == self._uuid:
# public method
handlerType = PublicProvider
elif len(path) == 2 and path[0] == 'ui':
# private method, only from localhost
handlerType = LocalProvider
if not handlerType or not self._service:
self.sendJsonResponse(error='Forbidden', code=403)
return
try:
result = getattr(handlerType(self._service, method, params), method + '_' + path[-1])()
except AttributeError:
self.sendJsonResponse(error='Method not found', code=404)
return
except Exception as e:
logger.error('Got exception executing {} {}: {}'.format(method, '/'.join(path), str(e)))
self.sendJsonResponse(error=str(e), code=500)
return
self.sendJsonResponse(result)
def do_GET(self) -> None:
try:
params = {v.split('=')[0]: v.split('=')[1] for v in self.path.split('?')[1].split('&')}
except Exception:
params = {}
self.process('get', params)
def do_POST(self) -> None:
try:
length = int(str(self.headers.get('content-length', '0')))
content = self.rfile.read(length)
logger.debug('length: {}, content >>{}<<'.format(length, content))
params: typing.MutableMapping[str, str] = json.loads(content)
except Exception as e:
logger.error('Got exception executing POST {}: {}'.format(self.path, str(e)))
self.sendJsonResponse(error=str(e), code=500)
return
self.process('post', params)
def log_error(self, format, *args): # pylint: disable=redefined-builtin
logger.error('ERROR ' + format % args)
def log_message(self, format, *args): # pylint: disable=redefined-builtin
logger.info('INFO ' + format % args)
class HTTPServerThread(threading.Thread):
_server: http.server.HTTPServer
def __init__(self, service: 'CommonService'):
super().__init__()
HTTPServerHandler._uuid = service._cfg.own_token # pylint: disable=protected-access
HTTPServerHandler._service = service # pylint: disable=protected-access
self.certFile = certs.createSelfSignedCert()
self._server = http.server.HTTPServer(('0.0.0.0', LISTEN_PORT), HTTPServerHandler)
self._server.socket = ssl.wrap_socket(self._server.socket, certfile=self.certFile, server_side=True)
def stop(self) -> None:
logger.debug('Stopping REST Service')
self._server.shutdown()
def run(self):
self._server.serve_forever()

View File

@ -1,233 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
from __future__ import unicode_literals
from udsactor.log import logger
from udsactor import utils
from udsactor.certs import createSelfSignedCert
from udsactor.scriptThread import ScriptExecutorThread
import threading
import string
import random
import json
import six
from six.moves import socketserver # @UnresolvedImport, pylint: disable=import-error
from six.moves import BaseHTTPServer # @UnresolvedImport, pylint: disable=import-error
import time
import ssl
startTime = time.time()
class HTTPServerHandler(BaseHTTPServer.BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.0'
server_version = 'UDS Actor Server'
sys_version = ''
uuid = None
service = None
lock = threading.Lock()
def sendJsonError(self, code, message):
self.send_response(code)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'error': message}))
return
def sendJsonResponse(self, data):
self.send_response(200)
data = json.dumps(data)
self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', len(data))
self.end_headers()
# Send the html message
self.wfile.write(data)
def do_GET(self):
# Very simple path & params splitter
path = self.path.split('?')[0][1:].split('/')
try:
params = dict((v.split('=') for v in self.path.split('?')[1].split('&')))
except Exception:
params = {}
if path[0] != HTTPServerHandler.uuid:
self.sendJsonError(403, 'Forbidden')
return
if len(path) != 2:
self.sendJsonResponse("UDS Actor has been running for {} seconds".format(time.time() - startTime))
return
try:
operation = getattr(self, 'get_' + path[1])
result = operation(params) # Protect not POST methods
except AttributeError:
self.sendJsonError(404, 'Method not found')
return
except Exception as e:
logger.error('Got exception executing GET {}: {}'.format(path[1], utils.toUnicode(e.message)))
self.sendJsonError(500, str(e))
return
self.sendJsonResponse(result)
def do_POST(self):
path = self.path.split('?')[0][1:].split('/')
if path[0] != HTTPServerHandler.uuid:
self.sendJsonError(403, 'Forbidden')
return
if len(path) != 2:
self.sendJsonError(400, 'Invalid request')
return
try:
HTTPServerHandler.lock.acquire()
length = int(self.headers.get('content-length'))
content = self.rfile.read(length).decode('utf8')
logger.debug('length: {}, content >>{}<<'.format(length, content))
params = json.loads(content)
operation = getattr(self, 'post_' + path[1])
result = operation(params) # Protect not POST methods
except AttributeError:
self.sendJsonError(404, 'Method not found')
return
except Exception as e:
logger.error('Got exception executing POST {}: {}'.format(path[1], utils.toUnicode(e.message)))
self.sendJsonError(500, str(e))
return
finally:
HTTPServerHandler.lock.release()
self.sendJsonResponse(result)
def post_logoff(self, params):
logger.debug('Sending LOGOFF to clients')
HTTPServerHandler.service.ipc.sendLoggofMessage()
return 'ok'
# Alias
post_logout = post_logoff
def post_message(self, params):
logger.debug('Sending MESSAGE to clients')
if 'message' not in params:
raise Exception('Invalid message parameters')
HTTPServerHandler.service.ipc.sendMessageMessage(params['message'])
return 'ok'
def post_script(self, params):
logger.debug('Received script: {}'.format(params))
if 'script' not in params:
raise Exception('Invalid script parameters')
if 'user' in params:
logger.debug('Sending SCRIPT to clients')
HTTPServerHandler.service.ipc.sendScriptMessage(params['script'])
else:
# Execute script at server space, that is, here
# as a parallel thread
th = ScriptExecutorThread(params['script'])
th.start()
return 'ok'
def post_preConnect(self, params):
logger.debug('Received Pre connection')
if 'user' not in params or 'protocol' not in params:
raise Exception('Invalid preConnect parameters')
return HTTPServerHandler.service.preConnect(params.get('user'), params.get('protocol'))
def get_information(self, params):
# TODO: Return something useful? :)
return 'Up and running'
def get_uuid(self, params):
return self.service.api.uuid
def log_error(self, fmt, *args):
logger.error('HTTP ' + fmt % args)
def log_message(self, fmt, *args):
logger.info('HTTP ' + fmt % args)
class HTTPServerThread(threading.Thread):
def __init__(self, address, service):
super(self.__class__, self).__init__()
if HTTPServerHandler.uuid is None:
HTTPServerHandler.uuid = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(48))
self.certFile = createSelfSignedCert()
HTTPServerHandler.service = service
self.initiateServer(address)
def getPort(self):
return self.address[1]
def getIp(self):
return self.address[0]
def initiateServer(self, address):
self.address = (address[0], address[1]) # Copy address & keep it for future reference...
addr = ('0.0.0.0', address[1]) # Adapt to listen on 0.0.0.0
self.server = socketserver.TCPServer(addr, HTTPServerHandler)
self.server.socket = ssl.wrap_socket(self.server.socket, certfile=self.certFile, server_side=True)
def getServerUrl(self):
return 'https://{}:{}/{}'.format(self.getIp(), self.getPort(), HTTPServerHandler.uuid)
def stop(self):
logger.debug('Stopping REST Service')
self.server.shutdown()
def restart(self, address=None):
if address is None:
# address = self.server.server_address
address = self.address
self.address = (address[0], self.address[1]) # Copy address & keep it for future reference, port is never changed once assigned on init
# Listening on 0.0.0.0, does not need to restart listener..
# self.stop()
# self.initiateServer(address)
def run(self):
self.server.serve_forever()

View File

@ -1,438 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
from __future__ import unicode_literals
import socket
import threading
import sys
import traceback
import pickle
import errno
import time
import six
from udsactor.utils import toUnicode
from udsactor.log import logger
# The IPC Server will wait for connections from clients
# Clients will open socket, and wait for data from server
# The messages sent (from server) will be the following (subject to future changes):
# Message_id Data Action
# ------------ -------- --------------------------
# MSG_LOGOFF None Logout user from session
# MSG_MESSAGE message,level Display a message with level (INFO, WARN, ERROR, FATAL) # TODO: Include level, right now only has message
# MSG_SCRIPT python script Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
# The messages received (sent from client) will be the following:
# Message_id Data Action
# ------------ -------- --------------------------
# REQ_LOGOUT Logout user from session
# REQ_INFORMATION None Request information from ipc server (maybe configuration parameters in a near future)
# REQ_LOGIN python script Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
#
# All messages are in the form:
# BYTE
# 0 1-2 3 4 ...
# MSG_ID DATA_LENGTH (little endian) Data (can be 0 length)
# With a previos "MAGIC" header in fron of each message
MSG_LOGOFF = 0xA1
MSG_MESSAGE = 0xB2
MSG_SCRIPT = 0xC3
MSG_INFORMATION = 0xD4
MSG_TICKET = 0x90
# Request messages
REQ_INFORMATION = MSG_INFORMATION
REQ_LOGIN = 0xE5
REQ_LOGOUT = MSG_LOGOFF
REQ_TICKET = MSG_TICKET
VALID_REQUESTS = (REQ_INFORMATION, REQ_LOGIN, REQ_LOGOUT, REQ_TICKET)
VALID_MESSAGES = (MSG_LOGOFF, MSG_MESSAGE, MSG_SCRIPT, MSG_INFORMATION)
REQ_INFORMATION = 0xAA
# Reverse msgs dict for debugging
REV_DICT = {
MSG_LOGOFF: 'MSG_LOGOFF',
MSG_MESSAGE: 'MSG_MESSAGE',
MSG_SCRIPT: 'MSG_SCRIPT',
MSG_INFORMATION: 'MSG_INFORMATION',
REQ_TICKET: 'REQ_TICKET',
REQ_LOGIN: 'REQ_LOGIN',
REQ_LOGOUT: 'REQ_LOGOUT'
}
MAGIC = b'\x55\x44\x53\x00' # UDS in hexa with a padded 0 to the right
# Allows notifying login/logout from client for linux platform
ALLOW_LOG_METHODS = sys.platform != 'win32'
# States for client processor
ST_SECOND_BYTE = 0x01
ST_RECEIVING = 0x02
ST_PROCESS_MESSAGE = 0x02
class ClientProcessor(threading.Thread):
def __init__(self, parent, clientSocket):
super(ClientProcessor, self).__init__()
self.parent = parent
self.clientSocket = clientSocket
self.running = False
self.messages = six.moves.queue.Queue(32) # @UndefinedVariable
def stop(self):
logger.debug('Stoping client processor')
self.running = False
def processRequest(self, msg, data):
logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
if self.parent.clientMessageProcessor is not None:
self.parent.clientMessageProcessor(msg, data)
def run(self):
self.running = True
self.clientSocket.setblocking(0)
state = None
recv_msg = None
recv_data = None
while self.running:
try:
counter = 1024
while counter > 0: # So we process at least the incoming queue every XX bytes readed
counter -= 1
b = self.clientSocket.recv(1)
if b == b'':
# Client disconnected
self.running = False
self.processRequest(REQ_LOGOUT, 'CLIENT_CONNECTION_LOST')
break
buf = six.byte2int(b) # Empty buffer, this is set as non-blocking
if state is None:
if buf in VALID_REQUESTS:
logger.debug('State set to {}'.format(buf))
state = buf
recv_msg = buf
continue # Get next byte
else:
logger.debug('Got unexpected data {}'.format(buf))
elif state in VALID_REQUESTS:
logger.debug('First length byte is {}'.format(buf))
msg_len = buf
state = ST_SECOND_BYTE
continue
elif state == ST_SECOND_BYTE:
msg_len += buf << 8
logger.debug('Second length byte is {}, len is {}'.format(buf, msg_len))
if msg_len == 0:
self.processRequest(recv_msg, None)
state = None
break
state = ST_RECEIVING
recv_data = b''
continue
elif state == ST_RECEIVING:
recv_data += six.int2byte(buf)
msg_len -= 1
if msg_len == 0:
self.processRequest(recv_msg, recv_data)
recv_data = None
state = None
break
else:
logger.debug('Got invalid message from request: {}, state: {}'.format(buf, state))
except socket.error as e:
# If no data is present, no problem at all, pass to check messages
pass
except Exception as e:
tb = traceback.format_exc()
logger.error('Error: {}, trace: {}'.format(e, tb))
if self.running is False:
break
try:
msg = self.messages.get(block=True, timeout=1)
except six.moves.queue.Empty: # No message got in time @UndefinedVariable
continue
logger.debug('Got message {}={}'.format(msg, REV_DICT.get(msg)))
try:
m = msg[1] if msg[1] is not None else b''
ln = len(m)
data = MAGIC + six.int2byte(msg[0]) + six.int2byte(ln & 0xFF) + six.int2byte(ln >> 8) + m
try:
self.clientSocket.sendall(data)
except socket.error as e:
# Send data error
logger.debug('Socket connection is no more available: {}'.format(e.args))
self.running = False
except Exception as e:
logger.error('Invalid message in queue: {}'.format(e))
logger.debug('Client processor stopped')
try:
self.clientSocket.close()
except Exception:
pass # If can't close, nothing happens, just end thread
class ServerIPC(threading.Thread):
def __init__(self, listenPort, clientMessageProcessor=None):
super(ServerIPC, self).__init__()
self.port = listenPort
self.running = False
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.threads = []
self.clientMessageProcessor = clientMessageProcessor
def stop(self):
logger.debug('Stopping Server IPC')
self.running = False
for t in self.threads:
t.stop()
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('localhost', self.port))
self.serverSocket.close()
for t in self.threads:
t.join()
def sendMessage(self, msgId, msgData):
'''
Notify message to all listening threads
'''
logger.debug('Sending message {}({}),{} to all clients'.format(msgId, REV_DICT.get(msgId), msgData))
# Convert to bytes so length is correctly calculated
if isinstance(msgData, six.text_type):
msgData = msgData.encode('utf8')
for t in self.threads:
if t.isAlive():
logger.debug('Sending to {}'.format(t))
t.messages.put((msgId, msgData))
def sendLoggofMessage(self):
self.sendMessage(MSG_LOGOFF, '')
def sendMessageMessage(self, message):
self.sendMessage(MSG_MESSAGE, message)
def sendScriptMessage(self, script):
self.sendMessage(MSG_SCRIPT, script)
def sendInformationMessage(self, info):
self.sendMessage(MSG_INFORMATION, pickle.dumps(info))
# This one is the only one dumped in json, be care with this!!
def sendTicketMessage(self, ticketData):
self.sendMessage(MSG_TICKET, json.dumps(ticketData))
def cleanupFinishedThreads(self):
'''
Cleans up current threads list
'''
aliveThreads = []
for t in self.threads:
if t.isAlive():
logger.debug('Thread {} is alive'.format(t))
aliveThreads.append(t)
self.threads[:] = aliveThreads
def run(self):
self.running = True
self.serverSocket.bind(('localhost', self.port))
self.serverSocket.setblocking(1)
self.serverSocket.listen(4)
while True:
try:
(clientSocket, address) = self.serverSocket.accept()
# Stop processing if thread is mean to stop
if self.running is False:
break
logger.debug('Got connection from {}'.format(address))
self.cleanupFinishedThreads() # House keeping
logger.debug('Starting new thread, current: {}'.format(self.threads))
t = ClientProcessor(self, clientSocket)
self.threads.append(t)
t.start()
except Exception as e:
logger.error('Got an exception on Server ipc thread: {}'.format(e))
class ClientIPC(threading.Thread):
def __init__(self, listenPort):
super(ClientIPC, self).__init__()
self.port = listenPort
self.running = False
self.clientSocket = None
self.messages = six.moves.queue.Queue(32) # @UndefinedVariable
self.connect()
def stop(self):
self.running = False
def getMessage(self):
while self.running:
try:
return self.messages.get(timeout=1)
except six.moves.queue.Empty: # @UndefinedVariable
continue
return None
def sendRequestMessage(self, msg, data=None):
logger.debug('Sending request for msg: {}({}), {}'.format(msg, REV_DICT.get(msg), data))
if data is None:
data = b''
if isinstance(data, six.text_type): # Convert to bytes if necessary
data = data.encode('utf-8')
ln = len(data)
msg = six.int2byte(msg) + six.int2byte(ln & 0xFF) + six.int2byte(ln >> 8) + data
self.clientSocket.sendall(msg)
def requestInformation(self):
self.sendRequestMessage(REQ_INFORMATION)
def sendLogin(self, username):
self.sendRequestMessage(REQ_LOGIN, username)
def sendLogout(self, username):
self.sendRequestMessage(REQ_LOGOUT, username)
def requestTicket(self, ticketId, secure=True):
self.sendRequestMessage(REQ_TICKET, json.dumps({'ticketId': ticketId, 'secure': secure}))
def messageReceived(self):
'''
Override this method to automatically get notified on new message
received. Message is at self.messages queue
'''
pass # Messa
def receiveBytes(self, number):
msg = b''
while self.running and len(msg) < number:
try:
buf = self.clientSocket.recv(number - len(msg))
if buf == b'':
logger.debug('Buf {}, msg {}({})'.format(buf, msg, REV_DICT.get(msg)))
self.running = False
break
msg += buf
except socket.timeout:
pass
if self.running is False:
logger.debug('Not running, returning None')
return None
return msg
def connect(self):
self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clientSocket.connect(('localhost', self.port))
self.clientSocket.settimeout(2) # 2 seconds timeout
def run(self):
self.running = True
while self.running:
try:
msg = b''
# We look for magic message header
while self.running: # Wait for MAGIC
try:
buf = self.clientSocket.recv(len(MAGIC) - len(msg))
if buf == b'':
self.running = False
break
msg += buf
if len(msg) != len(MAGIC):
continue # Do not have message
if msg != MAGIC: # Skip first byte an continue searchong
msg = msg[1:]
continue
break
except socket.timeout: # Timeout is here so we can get stop thread
continue
if self.running is False:
break
# Now we get message basic data (msg + datalen)
msg = bytearray(self.receiveBytes(3))
# We have the magic header, here comes the message itself
if msg is None:
continue
msgId = msg[0]
dataLen = msg[1] + (msg[2] << 8)
if msgId not in VALID_MESSAGES:
raise Exception('Invalid message id: {}'.format(msgId))
data = self.receiveBytes(dataLen)
if data is None:
continue
self.messages.put((msgId, data))
self.messageReceived()
except socket.error as e:
if e.errno == errno.EINTR:
time.sleep(1) #
continue # Ignore interrupted system call
logger.error('Communication with server got an error: {}'.format(toUnicode(e.strerror)))
# self.running = False
return
except Exception as e:
tb = traceback.format_exc()
logger.error('Error: {}, trace: {}'.format(e, tb))
try:
self.clientSocket.close()
except Exception:
pass # If can't close, nothing happens, just end thread

View File

@ -33,5 +33,4 @@ import win32serviceutil
from .service import UDSActorSvc
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(UDSActorSvc)
win32serviceutil.HandleCommandLine(UDSActorSvc)

View File

@ -256,7 +256,21 @@ class ActorV2Log(ActorV2Action):
logger.debug('Args: %s, Params: %s', self._args, self._params)
return actorResult('ok')
class ActorV2IpChange(ActorV2Action):
class ActorV2Ready(ActorV2Action):
"""
Notifies the service is ready
"""
name = 'ready'
def setCommsUrl(self, userService: UserService):
url = 'https://{}/actor/{}'.format(userService.getLoggedIP(), userService.uuid)
userService.setCommsUrl(url)
def post(self):
logger.debug('Args: %s, Params: %s', self._args, self._params)
return actorResult('ok')
class ActorV2IpChange(ActorV2Ready):
"""
Notifies an IP change
"""
@ -269,16 +283,6 @@ class ActorV2IpChange(ActorV2Action):
logger.debug('Args: %s, Params: %s', self._args, self._params)
return actorResult('ok')
class ActorV2Ready(ActorV2Action):
"""
Notifies the service is ready
"""
name = 'ready'
def post(self):
logger.debug('Args: %s, Params: %s', self._args, self._params)
return actorResult('ok')
class ActorV2Ticket(ActorV2Action):
"""
Gets an stored ticket

View File

@ -491,7 +491,7 @@ class UserServiceManager:
logger.debug('No uuid to retrieve because agent does not supports notifications')
return True # UUid is valid because it is not supported checking it
version = typing.cast(str, userService.getProperty('actor_version', ''))
version = userService.getProperty('actor_version') or ''
# Just for 2.0 or newer, previous actors will not support this method.
# Also externally supported agents will not support this method (as OpenGnsys)
if '-' in version or version < '2.0.0':
@ -504,7 +504,11 @@ class UserServiceManager:
r = proxy.doProxyRequest(url=url, timeout=5)
else:
r = requests.get(url, verify=False, timeout=5)
uuid = json.loads(r.content)
if version >= '3.0.0': # New type of response: {'result': uuid}
uuid = json.loads(r.content)['result']
else:
uuid = json.loads(r.content)
if uuid != userService.uuid:
logger.info('The requested machine has uuid %s and the expected was %s', uuid, userService.uuid)