diff --git a/pynput_service.py b/pynput_service.py new file mode 100644 index 000000000..c51e9a524 --- /dev/null +++ b/pynput_service.py @@ -0,0 +1,236 @@ +from pynput.keyboard import Key, Controller +from pynput.keyboard._xorg import KeyCode +from pynput._util.xorg import display_manager +import Xlib +from pynput._util.xorg import * +import Xlib +import os +import sys +import socket + +KeyCode._from_symbol("\0") # test + +DEAD_KEYS = { + '`': 65104, + '´': 65105, + '^': 65106, + '~': 65107, + '¯': 65108, + '˘': 65109, + '˙': 65110, + '¨': 65111, + '˚': 65112, + '˝': 65113, + 'ˇ': 65114, + '¸': 65115, + '˛': 65116, + '℩': 65117, # ? + '゛': 65118, # ? + '゚ ': 65119, + 'ٜ': 65120, + '↪': 65121, + ' ̛': 65122, +} + + + +def my_keyboard_mapping(display): + """Generates a mapping from *keysyms* to *key codes* and required + modifier shift states. + + :param Xlib.display.Display display: The display for which to retrieve the + keyboard mapping. + + :return: the keyboard mapping + """ + mapping = {} + + shift_mask = 1 << 0 + group_mask = alt_gr_mask(display) + + # Iterate over all keysym lists in the keyboard mapping + min_keycode = display.display.info.min_keycode + keycode_count = display.display.info.max_keycode - min_keycode + 1 + for index, keysyms in enumerate(display.get_keyboard_mapping( + min_keycode, keycode_count)): + key_code = index + min_keycode + + # Normalise the keysym list to yield a tuple containing the two groups + normalized = keysym_normalize(keysyms) + if not normalized: + continue + + # Iterate over the groups to extract the shift and modifier state + for groups, group in zip(normalized, (False, True)): + for keysym, shift in zip(groups, (False, True)): + + if not keysym: + continue + shift_state = 0 \ + | (shift_mask if shift else 0) \ + | (group_mask if group else 0) + + # !!!: Save all keycode combinations of keysym + if keysym in mapping: + mapping[keysym].append((key_code, shift_state)) + else: + mapping[keysym] = [(key_code, shift_state)] + return mapping + + +class MyController(Controller): + def _update_keyboard_mapping(self): + """Updates the keyboard mapping. + """ + with display_manager(self._display) as dm: + self._keyboard_mapping = my_keyboard_mapping(dm) + + def send_event(self, event, keycode, shift_state): + with display_manager(self._display) as dm, self.modifiers as modifiers: + # Under certain cimcumstances, such as when running under Xephyr, + # the value returned by dm.get_input_focus is an int + window = dm.get_input_focus().focus + send_event = getattr( + window, + 'send_event', + lambda event: dm.send_event(window, event)) + send_event(event( + detail=keycode, + state=shift_state | self._shift_mask(modifiers), + time=0, + root=dm.screen().root, + window=window, + same_screen=0, + child=Xlib.X.NONE, + root_x=0, root_y=0, event_x=0, event_y=0)) + + def fake_input(self, keycode, is_press): + with display_manager(self._display) as dm: + Xlib.ext.xtest.fake_input( + dm, + Xlib.X.KeyPress if is_press else Xlib.X.KeyRelease, + keycode) + + def _handle(self, key, is_press): + """Resolves a key identifier and sends a keyboard event. + :param event: The *X* keyboard event. + :param int keysym: The keysym to handle. + """ + event = Xlib.display.event.KeyPress if is_press \ + else Xlib.display.event.KeyRelease + keysym = self._keysym(key) + + if key.vk is not None: + keycode = self._display.keysym_to_keycode(key.vk) + self.fake_input(keycode, is_press) + # Otherwise use XSendEvent; we need to use this in the general case to + # work around problems with keyboard layouts + self._emit('_on_fake_event', key, is_press) + return + + # Make sure to verify that the key was resolved + if keysym is None: + raise self.InvalidKeyException(key) + + # There may be multiple keycodes for keysym in keyboard_mapping + keycode_flag = len(self.keyboard_mapping[keysym]) == 1 + if keycode_flag: + keycode, shift_state = self.keyboard_mapping[keysym][0] + else: + keycode, shift_state = self._display.keysym_to_keycode(keysym), 0 + + keycode_set = set(map(lambda x: x[0], self.keyboard_mapping[keysym])) + # The keycode of the dead key is inconsistent, The keysym has multiple combinations of a keycode. + if keycode != self._display.keysym_to_keycode(keysym) \ + or (keycode_flag == False and keycode == list(keycode_set)[0] and len(keycode_set) == 1): + deakkey_chr = str(key).replace("'", '') + keysym = DEAD_KEYS[deakkey_chr] + keycode, shift_state = self.keyboard_mapping[keysym][0] + + # If the key has a virtual key code, use that immediately with + # fake_input; fake input,being an X server extension, has access to + # more internal state that we do + + try: + with self.modifiers as modifiers: + alt_gr = Key.alt_gr in modifiers + # !!!: Send_event can't support lock screen, this condition cann't be modified + if alt_gr: + self.send_event( + event, keycode, shift_state) + else: + self.fake_input(keycode, is_press) + except KeyError: + with self._borrow_lock: + keycode, index, count = self._borrows[keysym] + self._send_key( + event, + keycode, + index_to_shift(self._display, index)) + count += 1 if is_press else -1 + self._borrows[keysym] = (keycode, index, count) + + # Notify any running listeners + self._emit('_on_fake_event', key, is_press) + + +keyboard = MyController() + +server_address = sys.argv[1] +if not os.path.exists(os.path.dirname(server_address)): + os.makedirs(os.path.dirname(server_address)) + +try: + os.unlink(server_address) +except OSError: + if os.path.exists(server_address): + raise + +server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +server.bind(server_address) +server.listen(1) +clientsocket, address = server.accept() +os.system('chmod a+rw %s' % server_address) +print("Got pynput connection") + + +def loop(): + global keyboard + buf = [] + while True: + data = clientsocket.recv(1024) + if not data: + print("Connection broken") + break + buf.extend(data) + while buf: + n = buf[0] + n = n + 1 + if len(buf) < n: + break + msg = bytearray(buf[1:n]).decode("utf-8") + buf = buf[n:] + if len(msg) < 2: + continue + if msg[1] == "\0": + keyboard = MyController() + print("Keyboard reset") + continue + if len(msg) == 2: + name = msg[1] + else: + name = KeyCode._from_symbol(msg[1:]) + if str(name) == "<0>": + continue + try: + if msg[0] == "p": + keyboard.press(name) + else: + keyboard.release(name) + except Exception as e: + print('[x] error key',e) + + +loop() +clientsocket.close() +server.close()