From cca8fee453f36abbdd676b9994130bb5fad4f6cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Mon, 24 Nov 2014 00:47:08 +0100 Subject: [PATCH] * Fixed the fact that Centos has an old release of six, that does not support, for example, byte2int method.. * Added some logs and minor fixes * Added callbacks for LOGIN/LOGOUT client messages --- actors/linux/Makefile | 2 +- actors/linux/udsactor-1.7.0.spec | 1 + actors/src/udsactor/__init__.py | 13 ++++++++ actors/src/udsactor/ipc.py | 35 ++++++++++++++++---- actors/src/udsactor/linux/UDSActorService.py | 5 ++- actors/src/udsactor/log.py | 2 +- actors/src/udsactor/service.py | 15 ++++++++- 7 files changed, 63 insertions(+), 10 deletions(-) diff --git a/actors/linux/Makefile b/actors/linux/Makefile index def32bbc..138a34f0 100644 --- a/actors/linux/Makefile +++ b/actors/linux/Makefile @@ -82,7 +82,7 @@ install-udsactor: # If for red hat based, copy init.d ifeq ($(DISTRO),rh) mkdir -p $(INITDIR) - cp debian/init $(INITDIR)/udsactor + cp debian/udsactor.init $(INITDIR)/udsactor ln -s /usr/share/UDSActor/UDSActorConfig.py $(SBINDIR)/UDSActorConfig ln -s /usr/share/UDSActor/UDSActorUser.py $(BINDIR)/UDSActorTool endif diff --git a/actors/linux/udsactor-1.7.0.spec b/actors/linux/udsactor-1.7.0.spec index a10966a7..3de40da2 100644 --- a/actors/linux/udsactor-1.7.0.spec +++ b/actors/linux/udsactor-1.7.0.spec @@ -39,6 +39,7 @@ systemctl enable udsactor.service > /dev/null 2>&1 %preun systemctl disable udsactor.service > /dev/null 2>&1 +systemctl stop udsactor.service > /dev/null 2>&1 %postun # $1 == 0 on uninstall, == 1 on upgrade for preun and postun (just a reminder for me... :) ) diff --git a/actors/src/udsactor/__init__.py b/actors/src/udsactor/__init__.py index 752d94ba..7f115997 100644 --- a/actors/src/udsactor/__init__.py +++ b/actors/src/udsactor/__init__.py @@ -37,3 +37,16 @@ __build__ = 0x010700 __author__ = 'Adolfo Gómez' __license__ = "BSD 3-clause" __copyright__ = "Copyright 2014 VirtualCable S.L.U." + + +# On centos, old six release does not includes byte2int, nor six.PY2 +import six + +if not hasattr(six, 'byte2int'): + if six.PY3: + import operator + six.byte2int = operator.itemgetter(0) + else: + def _byte2int(bs): + return ord(bs[0]) + six.byte2int = _byte2int diff --git a/actors/src/udsactor/ipc.py b/actors/src/udsactor/ipc.py index 32d1b16d..9e6c0ac8 100644 --- a/actors/src/udsactor/ipc.py +++ b/actors/src/udsactor/ipc.py @@ -35,7 +35,7 @@ import threading import sys import six import traceback -import pickle +import time from udsactor.utils import toUnicode from udsactor.log import logger @@ -101,22 +101,30 @@ class ClientProcessor(threading.Thread): self.running = False def processRequest(self, msg, data): - print('Got message {}, with data {}'.format(msg, data)) + if self.parent.clientMessageProcessor is not None: + self.parent.clientMessageProcessor(msg, data) def run(self): self.running = True self.clientSocket.setblocking(0) + logger.debug('Client processor running') + state = None recv_msg = None recv_data = None while self.running: + logger.debug('Iteration') + # Slice some time, we do not need "realtime" + time.sleep(1) try: counter = 1024 while counter > 0: # So we process at least the incoming queue every XX bytes readed counter -= 1 b = self.clientSocket.recv(1) if b == b'': + # Client disconnected + self.running = False break buf = six.byte2int(b) # Empty buffer, this is set as non-blocking if state is None: @@ -155,6 +163,12 @@ class ClientProcessor(threading.Thread): except socket.error as e: # If no data is present, no problem at all, pass to check messages pass + except Exception as e: + tb = traceback.format_exc() + logger.error('Error: {}, trace: {}'.format(e, tb)) + + if self.running is False: + break try: msg = self.messages.get(block=False) @@ -176,6 +190,7 @@ class ClientProcessor(threading.Thread): except Exception as e: logger.error('Invalid message in queue: {}'.format(e)) + logger.debug('Client processor stopped') try: self.clientSocket.close() except Exception: @@ -184,14 +199,14 @@ class ClientProcessor(threading.Thread): class ServerIPC(threading.Thread): - def __init__(self, listenPort, infoParams=None): + def __init__(self, listenPort, clientMessageProcessor=None): super(self.__class__, self).__init__() self.port = listenPort self.running = False self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.threads = [] - self.infoParams = infoParams + self.clientMessageProcessor = clientMessageProcessor def stop(self): logger.debug('Stopping Server IPC') @@ -249,13 +264,14 @@ class ServerIPC(threading.Thread): while True: try: (clientSocket, address) = self.serverSocket.accept() - # Stop processiong if thread is mean to stop + # Stop processing if thread is mean to stop if self.running is False: break logger.debug('Got connection from {}'.format(address)) self.cleanupFinishedThreads() # House keeping + logger.debug('Starting new thread, current: {}'.format(self.threads)) t = ClientProcessor(self, clientSocket) self.threads.append(t) t.start() @@ -286,6 +302,7 @@ class ClientIPC(threading.Thread): return None def sendRequestMessage(self, msg, data=None): + logger.debug('Sending request for msg: {}, {}'.format(msg, data)) if data is None: data = b'' @@ -318,6 +335,7 @@ class ClientIPC(threading.Thread): try: buf = self.clientSocket.recv(number - len(msg)) if buf == b'': + logger.debug('Buf {}, msg {}'.format(buf, msg)) self.running = False break msg += buf @@ -325,6 +343,7 @@ class ClientIPC(threading.Thread): pass if self.running is False: + logger.debug('Not running, returning None') return None return msg @@ -356,6 +375,9 @@ class ClientIPC(threading.Thread): except socket.timeout: # Timeout is here so we can get stop thread continue + if self.running is False: + break + # Now we get message basic data (msg + datalen) msg = bytearray(self.receiveBytes(3)) @@ -380,7 +402,8 @@ class ClientIPC(threading.Thread): self.running = False return except Exception as e: - logger.error('Error: {}'.format(e.args)) + tb = traceback.format_exc() + logger.error('Error: {}, trace: {}'.format(e, tb)) try: self.clientSocket.close() diff --git a/actors/src/udsactor/linux/UDSActorService.py b/actors/src/udsactor/linux/UDSActorService.py index 08b32026..250857ee 100644 --- a/actors/src/udsactor/linux/UDSActorService.py +++ b/actors/src/udsactor/linux/UDSActorService.py @@ -44,6 +44,7 @@ from udsactor.linux.daemon import Daemon from udsactor.linux import renamer import sys +import time try: from prctl import set_proctitle except Exception: # Platform may not include prctl, so in case it's not available, we let the "name" as is @@ -124,8 +125,10 @@ def usage(): sys.exit(2) if __name__ == '__main__': - initCfg() + logger.setLevel(20000) + if len(sys.argv) == 3: + logger.debug('Running client udsactor') client = None try: client = ipc.ClientIPC(IPC_PORT) diff --git a/actors/src/udsactor/log.py b/actors/src/udsactor/log.py index fbefdeaf..00d79683 100644 --- a/actors/src/udsactor/log.py +++ b/actors/src/udsactor/log.py @@ -61,7 +61,7 @@ class Logger(object): # If remote logger is available, notify message to it try: - if self.remoteLogger is not None and self.remoteLogger.isConnected: + if self.remoteLogger is not None and self.remoteLogger.isConnected and level >= INFO: self.remoteLogger.log(level, message) except Exception as e: self.logger.log(FATAL, 'Error notifying log to broker: {}'.format(e.message)) diff --git a/actors/src/udsactor/service.py b/actors/src/udsactor/service.py index 195875b0..bfa94f8f 100644 --- a/actors/src/udsactor/service.py +++ b/actors/src/udsactor/service.py @@ -207,12 +207,25 @@ class CommonService(object): except Exception as e: logger.warn('Got an error notifiying IPs to broker: {} (will retry in a bit)'.format(e.message.decode('windows-1250', 'ignore'))) + def clientMessageProcessor(self, msg, data): + logger.debug('Got message {}'.format(msg)) + if self.api is None: + logger.info('Rest api not ready') + return + + if msg == ipc.REQ_LOGIN: + self.api.login(data) + elif msg == ipc.REQ_LOGOUT: + self.api.logout(data) + elif msg == ipc.REQ_INFORMATION: + logger.debug('Requested information') + def initIPC(self): # ****************************************** # * Initialize listener IPC & REST threads * # ****************************************** logger.debug('Starting IPC listener at {}'.format(IPC_PORT)) - self.ipc = ipc.ServerIPC(IPC_PORT) + self.ipc = ipc.ServerIPC(IPC_PORT, clientMessageProcessor=self.clientMessageProcessor) self.ipc.start() if self.api.mac in self.knownIps: