* Fixed the fact that Centos has an old release of six, that does not

support, for example, byte2int method..
* Added some logs and minor fixes
* Added callbacks for LOGIN/LOGOUT client messages
This commit is contained in:
Adolfo Gómez García 2014-11-24 00:47:08 +01:00
parent c72f0dd46e
commit cca8fee453
7 changed files with 63 additions and 10 deletions

View File

@ -82,7 +82,7 @@ install-udsactor:
# If for red hat based, copy init.d
ifeq ($(DISTRO),rh)
mkdir -p $(INITDIR)
cp debian/init $(INITDIR)/udsactor
cp debian/udsactor.init $(INITDIR)/udsactor
ln -s /usr/share/UDSActor/UDSActorConfig.py $(SBINDIR)/UDSActorConfig
ln -s /usr/share/UDSActor/UDSActorUser.py $(BINDIR)/UDSActorTool
endif

View File

@ -39,6 +39,7 @@ systemctl enable udsactor.service > /dev/null 2>&1
%preun
systemctl disable udsactor.service > /dev/null 2>&1
systemctl stop udsactor.service > /dev/null 2>&1
%postun
# $1 == 0 on uninstall, == 1 on upgrade for preun and postun (just a reminder for me... :) )

View File

@ -37,3 +37,16 @@ __build__ = 0x010700
__author__ = 'Adolfo Gómez'
__license__ = "BSD 3-clause"
__copyright__ = "Copyright 2014 VirtualCable S.L.U."
# On centos, old six release does not includes byte2int, nor six.PY2
import six
if not hasattr(six, 'byte2int'):
if six.PY3:
import operator
six.byte2int = operator.itemgetter(0)
else:
def _byte2int(bs):
return ord(bs[0])
six.byte2int = _byte2int

View File

@ -35,7 +35,7 @@ import threading
import sys
import six
import traceback
import pickle
import time
from udsactor.utils import toUnicode
from udsactor.log import logger
@ -101,22 +101,30 @@ class ClientProcessor(threading.Thread):
self.running = False
def processRequest(self, msg, data):
print('Got message {}, with data {}'.format(msg, data))
if self.parent.clientMessageProcessor is not None:
self.parent.clientMessageProcessor(msg, data)
def run(self):
self.running = True
self.clientSocket.setblocking(0)
logger.debug('Client processor running')
state = None
recv_msg = None
recv_data = None
while self.running:
logger.debug('Iteration')
# Slice some time, we do not need "realtime"
time.sleep(1)
try:
counter = 1024
while counter > 0: # So we process at least the incoming queue every XX bytes readed
counter -= 1
b = self.clientSocket.recv(1)
if b == b'':
# Client disconnected
self.running = False
break
buf = six.byte2int(b) # Empty buffer, this is set as non-blocking
if state is None:
@ -155,6 +163,12 @@ class ClientProcessor(threading.Thread):
except socket.error as e:
# If no data is present, no problem at all, pass to check messages
pass
except Exception as e:
tb = traceback.format_exc()
logger.error('Error: {}, trace: {}'.format(e, tb))
if self.running is False:
break
try:
msg = self.messages.get(block=False)
@ -176,6 +190,7 @@ class ClientProcessor(threading.Thread):
except Exception as e:
logger.error('Invalid message in queue: {}'.format(e))
logger.debug('Client processor stopped')
try:
self.clientSocket.close()
except Exception:
@ -184,14 +199,14 @@ class ClientProcessor(threading.Thread):
class ServerIPC(threading.Thread):
def __init__(self, listenPort, infoParams=None):
def __init__(self, listenPort, clientMessageProcessor=None):
super(self.__class__, self).__init__()
self.port = listenPort
self.running = False
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.threads = []
self.infoParams = infoParams
self.clientMessageProcessor = clientMessageProcessor
def stop(self):
logger.debug('Stopping Server IPC')
@ -249,13 +264,14 @@ class ServerIPC(threading.Thread):
while True:
try:
(clientSocket, address) = self.serverSocket.accept()
# Stop processiong if thread is mean to stop
# Stop processing if thread is mean to stop
if self.running is False:
break
logger.debug('Got connection from {}'.format(address))
self.cleanupFinishedThreads() # House keeping
logger.debug('Starting new thread, current: {}'.format(self.threads))
t = ClientProcessor(self, clientSocket)
self.threads.append(t)
t.start()
@ -286,6 +302,7 @@ class ClientIPC(threading.Thread):
return None
def sendRequestMessage(self, msg, data=None):
logger.debug('Sending request for msg: {}, {}'.format(msg, data))
if data is None:
data = b''
@ -318,6 +335,7 @@ class ClientIPC(threading.Thread):
try:
buf = self.clientSocket.recv(number - len(msg))
if buf == b'':
logger.debug('Buf {}, msg {}'.format(buf, msg))
self.running = False
break
msg += buf
@ -325,6 +343,7 @@ class ClientIPC(threading.Thread):
pass
if self.running is False:
logger.debug('Not running, returning None')
return None
return msg
@ -356,6 +375,9 @@ class ClientIPC(threading.Thread):
except socket.timeout: # Timeout is here so we can get stop thread
continue
if self.running is False:
break
# Now we get message basic data (msg + datalen)
msg = bytearray(self.receiveBytes(3))
@ -380,7 +402,8 @@ class ClientIPC(threading.Thread):
self.running = False
return
except Exception as e:
logger.error('Error: {}'.format(e.args))
tb = traceback.format_exc()
logger.error('Error: {}, trace: {}'.format(e, tb))
try:
self.clientSocket.close()

View File

@ -44,6 +44,7 @@ from udsactor.linux.daemon import Daemon
from udsactor.linux import renamer
import sys
import time
try:
from prctl import set_proctitle
except Exception: # Platform may not include prctl, so in case it's not available, we let the "name" as is
@ -124,8 +125,10 @@ def usage():
sys.exit(2)
if __name__ == '__main__':
initCfg()
logger.setLevel(20000)
if len(sys.argv) == 3:
logger.debug('Running client udsactor')
client = None
try:
client = ipc.ClientIPC(IPC_PORT)

View File

@ -61,7 +61,7 @@ class Logger(object):
# If remote logger is available, notify message to it
try:
if self.remoteLogger is not None and self.remoteLogger.isConnected:
if self.remoteLogger is not None and self.remoteLogger.isConnected and level >= INFO:
self.remoteLogger.log(level, message)
except Exception as e:
self.logger.log(FATAL, 'Error notifying log to broker: {}'.format(e.message))

View File

@ -207,12 +207,25 @@ class CommonService(object):
except Exception as e:
logger.warn('Got an error notifiying IPs to broker: {} (will retry in a bit)'.format(e.message.decode('windows-1250', 'ignore')))
def clientMessageProcessor(self, msg, data):
logger.debug('Got message {}'.format(msg))
if self.api is None:
logger.info('Rest api not ready')
return
if msg == ipc.REQ_LOGIN:
self.api.login(data)
elif msg == ipc.REQ_LOGOUT:
self.api.logout(data)
elif msg == ipc.REQ_INFORMATION:
logger.debug('Requested information')
def initIPC(self):
# ******************************************
# * Initialize listener IPC & REST threads *
# ******************************************
logger.debug('Starting IPC listener at {}'.format(IPC_PORT))
self.ipc = ipc.ServerIPC(IPC_PORT)
self.ipc = ipc.ServerIPC(IPC_PORT, clientMessageProcessor=self.clientMessageProcessor)
self.ipc.start()
if self.api.mac in self.knownIps: