-
Notifications
You must be signed in to change notification settings - Fork 5
/
libvirt-evdev.py
executable file
·179 lines (140 loc) · 5.33 KB
/
libvirt-evdev.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python3
import systemd.daemon as sd
import asyncio
import evdev
import evdev.ecodes as e
import os
import re
import sys
import subprocess
import pyudev
import toml
import pickle
async def screen_input_switch(owner):
for screen in config['screens']:
for source in screen['sources']:
if source['owner'] == owner:
print("Screen input switch: {0} {1}".format(screen['name'], source['name']))
await asyncio.create_subprocess_exec("/usr/bin/ddccontrol", "-r", screen['address'], "-w", source['id'], screen['dev'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
async def replicate(source_device):
global current_mode
try:
async for event in input_devices[source_device].async_read_loop():
if event.type == e.EV_SYN:
continue
if event.type == e.EV_KEY and event.code == e.KEY_SCROLLLOCK:
if event.value == 0:
current_mode = {
"host": "guest",
"guest": "host",
}[mode]
await screen_input_switch(current_mode)
mode = "host"
else:
mode = current_mode
target_device = {
"host": host_devices[source_device],
"guest": guest_devices[source_device],
}[mode]
target_device.write_event(event)
target_device.syn()
except:
pass
def action_input(action, device):
if action != 'add':
return
if not device.get('DEVLINKS'):
return
for link in device.get('DEVLINKS').split():
for device in config['inputs']:
if link == config['inputs'][device]:
input_devices[device] = evdev.InputDevice(link)
input_devices[device].grab()
asyncio.run_coroutine_threadsafe(replicate(device), loop)
def action_usb(action, device):
global current_mode
if device.get(config['usb_switch']['property_name']) == config['usb_switch']['property_value']:
if action == 'add':
asyncio.run_coroutine_threadsafe(screen_input_switch(current_mode), loop)
elif action == 'remove':
asyncio.run_coroutine_threadsafe(screen_input_switch("external"), loop);
if __name__ == '__main__':
try:
config = toml.load('/usr/local/etc/libvirt-evdev.toml')
except:
print("Reading config file failed.")
exit(0)
if not 'inputs' in config:
print("No [inputs] section in config file.")
exit(0)
if not config['inputs']:
print("No keyboard/mouse in config [inputs] section.")
exit(0)
current_mode = "host"
# read capabilities and grab input devices
input_devices = {}
capabilities = {}
for device in config['inputs']:
p = '/var/lib/libvirt-evdev/' + device + '.p'
# use capabilities from real device if connected
if os.path.exists(config['inputs'][device]):
input_devices[device] = evdev.InputDevice(config['inputs'][device])
input_devices[device].grab()
capabilities[device] = input_devices[device].capabilities()
del capabilities[device][0]
pickle.dump(capabilities[device], open(p,'wb'))
# read stored capabilities otherwise
elif os.path.exists(p):
with open(p,'rb') as fp:
capabilities[device] = pickle.load(open(p,'rb'))
if not capabilities:
exit(0)
# create host devices
host_devices = {
key:evdev.UInput(cap)
for (key,cap) in capabilities.items()
}
# create guest devices
guest_devices = {
key:evdev.UInput(cap)
for (key,cap) in capabilities.items()
}
#prepare host device paths
host_device_paths = [
os.path.join("/dev/input/by-id", "host-%s" % device)
for device in config['inputs']
]
#prepare guest device paths
guest_device_paths = [
os.path.join("/dev/input/by-id", "guest-%s" % device)
for device in config['inputs']
]
virtual_devices = list(host_devices.values()) + list(guest_devices.values())
virtual_device_paths = host_device_paths + guest_device_paths
if not os.path.exists('/dev/input/by-id'):
try:
os.makedirs('/dev/input/by-id')
except FileExistsError:
pass
# create symlinks
for (device, path) in zip(virtual_devices, virtual_device_paths):
if os.path.exists(path):
subprocess.run(["unlink", "--", path])
subprocess.run(["ln", "-s", device.device, path])
# setup replicate input coroutine
for device in input_devices:
asyncio.ensure_future(replicate(device))
#setup udev monitoring of inpt and usb susbsystems
context = pyudev.Context()
monitor_input = pyudev.Monitor.from_netlink(context)
monitor_input.filter_by(subsystem='input')
monitor_usb = pyudev.Monitor.from_netlink(context)
monitor_usb.filter_by(subsystem='usb')
observer_input = pyudev.MonitorObserver(monitor_input, action_input)
observer_usb = pyudev.MonitorObserver(monitor_usb, action_usb)
observer_input.start()
observer_usb.start()
# notify systemd that start is done
sd.notify(sd.Notification.READY)
loop = asyncio.get_event_loop()
loop.run_forever()