| import sys |
| import os |
| import subprocess |
| import gi |
| |
| gi.require_version('Gdk', '4.0') |
| gi.require_version('Gtk', '4.0') |
| |
| from gi.repository import GLib, GObject, Gdk, Gtk |
| from pydbus import SessionBus |
| |
| verbose = True |
| |
| remote_desktop = None |
| screen_cast = None |
| session = None |
| stream_path = None |
| done = False |
| |
| def terminate(): |
| sys.exit(1) |
| |
| loop = None |
| |
| def quit_cb(loop): |
| loop.quit() |
| print('timed out while waiting') |
| |
| def wait(millis): |
| global loop |
| before = GLib.get_monotonic_time() |
| loop = GLib.MainLoop() |
| GLib.timeout_add(millis, quit_cb, loop) |
| loop.run() |
| if verbose: |
| time = (GLib.get_monotonic_time() - before) / 1000 |
| print(f'waited for {time} milliseconds') |
| |
| display = None |
| window = None |
| entry = None |
| expected_change = None |
| |
| def key_pressed_cb (controller, keyval, keycode, state): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got key press: {keyval}, state {state}') |
| assert expected_change != None, 'Unexpected key press' |
| assert expected_change['type'] == 'press', 'Key press event expected' |
| assert keyval == expected_change['keyval'], 'Unexpected keyval in key press event' |
| assert state == expected_change['state'], 'Unexpected state in key press event' |
| |
| expected_change = None |
| loop.quit() |
| |
| def key_released_cb (controller, keyval, keycode, state): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got key release: {keyval}, state {state}') |
| assert expected_change != None, 'Unexpected key release' |
| assert expected_change['type'] == 'release', 'Key release event expected' |
| assert keyval == expected_change['keyval'], 'Unexpected keyval in key release event' |
| assert state == expected_change['state'], 'Unexpected state in key release event' |
| |
| expected_change = None |
| loop.quit() |
| |
| def motion_cb (controller, x, y): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got motion: {x}, {y}') |
| if expected_change != None: |
| assert expected_change['type'] == 'motion', 'Motion event expected' |
| assert x == expected_change['x'], 'Unexpected x coord in motion event' |
| assert y == expected_change['y'], 'Unexpected y coord in motion event' |
| expected_change = None |
| loop.quit() |
| |
| def enter_cb (controller, x, y): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got enter: {x}, {y}') |
| assert expected_change != None, 'Unexpected enter' |
| assert expected_change['type'] == 'enter', 'Enter event expected' |
| assert x == expected_change['x'], 'Unexpected x coord in enter event' |
| assert y == expected_change['y'], 'Unexpected y coord in enter event' |
| |
| expected_change = None |
| loop.quit() |
| |
| def pressed_cb(controller, n, x, y): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got pressed') |
| assert expected_change != None, 'Unexpected event' |
| assert expected_change['type'] == 'press', 'Button press expected' |
| assert expected_change['button'] == controller.get_current_button(), 'Unexpected button pressed' |
| assert x == expected_change['x'], 'Unexpected x coord in motion event' |
| assert y == expected_change['y'], 'Unexpected y coord in motion event' |
| |
| expected_change = None |
| loop.quit() |
| |
| def released_cb(controller, n, x, y): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got released') |
| assert expected_change != None, 'Unexpected event' |
| assert expected_change['type'] == 'release', 'Button release expected' |
| |
| expected_change = None |
| loop.quit() |
| |
| def expect_key_press(keyval, state, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'press', |
| 'keyval' : keyval, |
| 'state' : state |
| } |
| wait(timeout) |
| assert expected_change == None, 'Expected event did not happen' |
| |
| def expect_key_release(keyval, state, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'release', |
| 'keyval' : keyval, |
| 'state' : state |
| } |
| wait(timeout) |
| assert expected_change == None, 'Expected event did not happen' |
| |
| def expect_motion(x, y, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'motion', |
| 'x' : x, |
| 'y' : y |
| } |
| wait(timeout) |
| assert expected_change == None, 'Expected event did not happen' |
| |
| def expect_enter(x, y, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'enter', |
| 'x' : x, |
| 'y' : y |
| } |
| wait(timeout) |
| assert expected_change == None, 'Expected event did not happen' |
| |
| def expect_button_press(button, x, y, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'press', |
| 'button' : button, |
| 'x' : x, |
| 'y' : y |
| } |
| wait(timeout) |
| assert expected_change == None, 'Button press did not arrive' |
| |
| def expect_button_release(button, x, y, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'release', |
| 'button' : button, |
| 'x' : x, |
| 'y' : y |
| } |
| wait(timeout) |
| assert expected_change == None, 'Button release did not arrive' |
| |
| def got_active(object, pspec): |
| global loop |
| object.disconnect_by_func(got_active) |
| loop.quit() |
| |
| def launch_observer(): |
| global display |
| global window |
| |
| if verbose: |
| print('launch observer') |
| |
| if display == None: |
| display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY')) |
| |
| window = Gtk.Window.new() |
| |
| controller = Gtk.EventControllerKey.new() |
| controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE) |
| controller.connect('key-pressed', key_pressed_cb) |
| controller.connect('key-released', key_released_cb) |
| window.add_controller(controller) |
| |
| controller = Gtk.EventControllerMotion.new() |
| controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE) |
| controller.connect('enter', enter_cb) |
| controller.connect('motion', motion_cb) |
| window.add_controller(controller) |
| |
| controller = Gtk.GestureClick.new() |
| controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE) |
| controller.connect('pressed', pressed_cb) |
| controller.connect('released', released_cb) |
| window.add_controller(controller) |
| |
| window.connect('notify::is-active', got_active) |
| window.maximize() |
| window.present() |
| |
| wait(2000) |
| |
| assert window.is_active(), 'Observer not active' |
| assert window.get_width() == 1024, 'Window not maximized' |
| assert window.get_height() == 768, 'Window not maximized' |
| |
| # we need to wait out the map animation, or pointer coords will be off |
| wait(1000) |
| |
| def launch_entry(): |
| global display |
| global window |
| global entry |
| |
| if verbose: |
| print('launch entry') |
| |
| if display == None: |
| display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY')) |
| |
| window = Gtk.Window.new() |
| |
| entry = Gtk.Entry.new() |
| |
| window.set_child(entry) |
| |
| window.connect('notify::is-active', got_active) |
| window.maximize() |
| window.present() |
| |
| wait(500) |
| |
| assert window.is_active(), "Observer not active" |
| assert window.get_width() == 1024, "Window not maximized" |
| assert window.get_height() == 768, "Window not maximized" |
| |
| # we need to wait out the map animation, or pointer coords will be off |
| wait(1000) |
| |
| def stop_observer(): |
| global window |
| window.destroy() |
| window = None |
| |
| def expect_entry_text(text): |
| assert text == entry.get_text(), 'Unexpected entry text: ' + entry.get_text() |
| |
| def key_press(keyval): |
| if verbose: |
| print(f'press key {keyval}') |
| session.NotifyKeyboardKeysym(keyval, True) |
| |
| def key_release(keyval): |
| if verbose: |
| print(f'release key {keyval}') |
| session.NotifyKeyboardKeysym(keyval, False) |
| |
| buttons = { |
| 1 : 0x110, |
| 2 : 0x111, |
| 3 : 0x112 |
| } |
| |
| def button_press(button): |
| if verbose: |
| print(f'press button {button}') |
| session.NotifyPointerButton(buttons[button], True) |
| |
| def button_release(button): |
| if verbose: |
| print(f'release button {button}') |
| session.NotifyPointerButton(buttons[button], False) |
| |
| def pointer_move(x, y): |
| if verbose: |
| print(f'pointer move {x} {y}') |
| session.NotifyPointerMotionAbsolute(stream_path, x, y) |
| |
| def basic_keyboard_tests(): |
| try: |
| if verbose: |
| print('Starting basic keyboard tests') |
| |
| launch_observer() |
| |
| key_press(Gdk.KEY_a) |
| expect_key_press(keyval=Gdk.KEY_a, state=0, timeout=100) |
| |
| key_release(Gdk.KEY_a) |
| expect_key_release(keyval=Gdk.KEY_a, state=0, timeout=100) |
| |
| key_press(Gdk.KEY_Control_L) |
| expect_key_press(keyval=Gdk.KEY_Control_L, state=0, timeout=100) |
| |
| key_press(Gdk.KEY_x) |
| expect_key_press(keyval=Gdk.KEY_x, state=Gdk.ModifierType.CONTROL_MASK, timeout=100) |
| |
| key_release(Gdk.KEY_Control_L) |
| expect_key_release(keyval=Gdk.KEY_Control_L, state=Gdk.ModifierType.CONTROL_MASK, timeout=100) |
| |
| key_release(Gdk.KEY_x) |
| expect_key_release(keyval=Gdk.KEY_x, state=0, timeout=100) |
| |
| stop_observer() |
| except AssertionError as e: |
| print(f'Error in basic_keyboard_tests: {e}') |
| terminate() |
| |
| def quick_typing_test(): |
| try: |
| launch_entry() |
| |
| key_press(Gdk.KEY_T) |
| key_release(Gdk.KEY_T) |
| key_press(Gdk.KEY_e) |
| key_release(Gdk.KEY_e) |
| key_press(Gdk.KEY_s) |
| key_release(Gdk.KEY_s) |
| key_press(Gdk.KEY_t) |
| key_release(Gdk.KEY_t) |
| |
| wait(100) |
| expect_entry_text('Test') |
| |
| stop_observer() |
| except AssertionError as e: |
| print(f'Error in quick_typing_test: {e}') |
| terminate() |
| |
| def basic_pointer_tests(): |
| try: |
| if verbose: |
| print('Starting basic pointer tests') |
| |
| pointer_move(-100.0, -100.0) |
| launch_observer() |
| |
| # observer window is maximized, so window coords == global coords |
| pointer_move(500.0, 300.0) |
| expect_enter(x=500, y=300, timeout=200) |
| |
| pointer_move(400.0, 200.0) |
| expect_motion(x=400, y=200, timeout=200) |
| |
| button_press(1) |
| expect_button_press(button=1, x=400, y=200, timeout=200) |
| |
| pointer_move(220.0, 200.0) |
| expect_motion(x=220, y=200, timeout=200) |
| |
| button_release(1) |
| expect_button_release(button=1, x=220, y=200, timeout=200) |
| |
| stop_observer() |
| except AssertionError as e: |
| print(f'Error in basic_pointer_tests: {e}') |
| terminate() |
| |
| ds_window = None |
| ds = None |
| |
| def drag_begin(controller, drag): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print('got drag begin') |
| assert expected_change != None, 'Unexpected drag begin' |
| assert expected_change['type'] == 'drag', 'Drag begin expected' |
| |
| expected_change = None |
| loop.quit() |
| |
| def launch_drag_source(value): |
| global display |
| global ds_window |
| global ds |
| |
| if verbose: |
| print('launch drag source') |
| |
| if display == None: |
| display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY')) |
| |
| ds_window = Gtk.Window.new() |
| ds_window.set_title('Drag Source') |
| |
| ds = Gtk.DragSource.new() |
| ds.set_content(Gdk.ContentProvider.new_for_value(value)) |
| ds_window.add_controller(ds) |
| ds.connect('drag-begin', drag_begin) |
| |
| controller = Gtk.GestureClick.new() |
| controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE) |
| controller.connect('pressed', pressed_cb) |
| controller.connect('released', released_cb) |
| ds_window.add_controller(controller) |
| |
| ds_window.connect('notify::is-active', got_active) |
| ds_window.maximize() |
| ds_window.present() |
| |
| wait(2000) |
| |
| assert ds_window.is_active(), 'drag source not active' |
| assert ds_window.get_width() == 1024, 'Window not maximized' |
| assert ds_window.get_height() == 768, 'Window not maximized' |
| |
| # we need to wait out the map animation, or pointer coords will be off |
| wait(1000) |
| |
| def stop_drag_source(): |
| global ds_window |
| ds_window.destroy() |
| ds_window = None |
| |
| dt_window = None |
| |
| def do_drop(controller, value, x, y): |
| global expected_change |
| global loop |
| |
| if verbose: |
| print(f'got drop {value}') |
| assert expected_change != None, 'Unexpected drop begin' |
| assert expected_change['type'] == 'drop', 'Drop expected' |
| assert expected_change['value'] == value, 'Unexpected value dropped' |
| |
| expected_change = None |
| loop.quit() |
| |
| def launch_drop_target(): |
| global display |
| global dt_window |
| |
| if verbose: |
| print('launch drop target') |
| |
| if display == None: |
| display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY')) |
| |
| dt_window = Gtk.Window.new() |
| dt_window.set_title('Drop Target') |
| |
| controller = Gtk.DropTarget.new(GObject.TYPE_STRING, Gdk.DragAction.COPY) |
| dt_window.add_controller(controller) |
| controller.connect('drop', do_drop) |
| |
| dt_window.connect('notify::is-active', got_active) |
| dt_window.maximize() |
| dt_window.present() |
| |
| wait(2000) |
| |
| assert dt_window.is_active(), 'drop target not active' |
| assert dt_window.get_width() == 1024, 'Window not maximized' |
| assert dt_window.get_height() == 768, 'Window not maximized' |
| |
| # we need to wait out the map animation, or pointer coords will be off |
| wait(1000) |
| |
| def stop_drop_target(): |
| global dt_window |
| dt_window.destroy() |
| dt_window = None |
| |
| def expect_drag(timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'drag', |
| } |
| wait(timeout) |
| assert expected_change == None, 'DND operation not started' |
| |
| def expect_drop(value, timeout): |
| global expected_change |
| expected_change = { |
| 'type' : 'drop', |
| 'value' : value |
| } |
| wait(timeout) |
| assert expected_change == None, 'Drop has not happened' |
| |
| def dnd_tests(): |
| try: |
| if verbose: |
| print('Starting dnd tests') |
| |
| pointer_move(-100, -100) |
| |
| launch_drag_source('abc') |
| wait(100); |
| |
| pointer_move(100, 100) |
| wait(100); |
| button_press(1) |
| expect_button_press(button=1, x=100, y=100, timeout=300) |
| # need to wait out the MIN_TIME_TO_DND |
| wait(150) |
| |
| pointer_move(120, 150) |
| expect_drag(timeout=2000) |
| |
| launch_drop_target() |
| wait(100); |
| button_release(1) |
| expect_drop('abc', timeout=2000) |
| |
| stop_drop_target() |
| stop_drag_source() |
| except AssertionError as e: |
| print(f'Error in dnd_tests: {e}') |
| terminate() |
| |
| def session_closed_cb(): |
| print('Session closed') |
| |
| def run_commands(): |
| basic_keyboard_tests() |
| basic_pointer_tests() |
| dnd_tests() |
| quick_typing_test() |
| |
| def mutter_appeared(name): |
| global remote_desktop |
| global session |
| global stream_path |
| global done |
| |
| if verbose: |
| print('mutter appeared on the bus') |
| |
| remote_desktop = bus.get('org.gnome.Mutter.RemoteDesktop', |
| '/org/gnome/Mutter/RemoteDesktop') |
| device_types = remote_desktop.Get('org.gnome.Mutter.RemoteDesktop', 'SupportedDeviceTypes') |
| assert device_types & 1 == 1, 'No keyboard' |
| assert device_types & 2 == 2, 'No pointer' |
| |
| screen_cast = bus.get('org.gnome.Mutter.ScreenCast', |
| '/org/gnome/Mutter/ScreenCast') |
| |
| session_path = remote_desktop.CreateSession() |
| session = bus.get('org.gnome.Mutter.RemoteDesktop', session_path) |
| session.onClosed = session_closed_cb |
| |
| screen_cast_session_path = screen_cast.CreateSession({ 'remote-desktop-session-id' : GLib.Variant('s', session.SessionId)}) |
| screen_cast_session = bus.get('org.gnome.Mutter.ScreenCast', screen_cast_session_path) |
| |
| stream_path = screen_cast_session.RecordMonitor('Meta-0', {}) |
| session.Start() |
| |
| # work around lack of initial devices |
| key_press(Gdk.KEY_Control_L) |
| key_release(Gdk.KEY_Control_L) |
| pointer_move(-100, -100) |
| |
| run_commands() |
| |
| session.Stop() |
| |
| if verbose: |
| print('Done running commands, exiting...') |
| done = True |
| |
| def mutter_vanished(): |
| global done |
| if remote_desktop != None: |
| if verbose: |
| print('mutter left the bus') |
| done = True |
| |
| bus = SessionBus() |
| bus.watch_name('org.gnome.Mutter.RemoteDesktop', 0, mutter_appeared, mutter_vanished) |
| |
| try: |
| while not done: |
| GLib.MainContext.default().iteration(True) |
| except KeyboardInterrupt: |
| print('Interrupted') |