Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'multi' backend and screen locking detection #170

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
27 changes: 27 additions & 0 deletions ntfy/backends/multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from importlib import import_module
try:
from ..terminal import is_focused
except ImportError:

def is_focused():
return True


from ..screensaver import is_locked


def notify(title,
message,
locked=None,
focused=None,
unfocused=None,
retcode=None):
for condition, options in ((is_locked, locked), (is_focused, focused),
(lambda: not is_focused(), unfocused)):
for backend_name, backend_options in options.items():
if not condition():
continue
backend = import_module('ntfy.backends.{}'.format(
backend_options.get('backend', backend_name)))
backend_options.pop('backend', None)
backend.notify(title, message, retcode=retcode, **backend_options)
17 changes: 17 additions & 0 deletions ntfy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def is_focused():
return True


from .screensaver import is_locked


def run_cmd(args):
if getattr(args, 'pid', False):
return watch_pid(args)
Expand Down Expand Up @@ -64,6 +67,8 @@ def run_cmd(args):
retcode = process.returncode
if args.longer_than is not None and duration <= args.longer_than:
return None, None
if args.locked_only and not is_locked():
return None, None
if args.unfocused_only and is_focused():
return None, None
message = _result_message(args.command if not args.hide_command else None,
Expand Down Expand Up @@ -230,6 +235,12 @@ def default_sender(args):
type=int,
metavar='N',
help="Only notify if the command runs longer than N seconds")
done_parser.add_argument(
'--locked-only',
action='store_true',
default=False,
dest='locked_only',
help='Only notify if the screen is locked')
done_parser.add_argument(
'-b',
'--background-only',
Expand Down Expand Up @@ -283,6 +294,12 @@ def default_sender(args):
type=int,
metavar='N',
help="Only notify if the command runs longer than N seconds")
shell_integration_parser.add_argument(
'--locked-only',
action='store_true',
default=False,
dest='locked_only',
help='Only notify if the screen is locked')
shell_integration_parser.add_argument(
'-f',
'--foreground-too',
Expand Down
131 changes: 131 additions & 0 deletions ntfy/screensaver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from shlex import split
from subprocess import check_output, check_call, CalledProcessError, PIPE
import sys

# some adapted from
# https://github.com/mtorromeo/xdg-utils/blob/master/scripts/xdg-screensaver.in#L540


def xscreensaver_detect():
try:
check_call(split('pgrep xscreensaver'), stdout=PIPE)
except (CalledProcessError, OSError):
return False
else:
return True


def xscreensaver_is_locked():
return 'screen locked' in check_output(split('xscreensaver-command -time'))


def lightlocker_detect():
try:
check_call(split('pgrep light-locker'), stdout=PIPE)
except (CalledProcessError, OSError):
return False
else:
return True


def lightlocker_is_active():
return 'The screensaver is active' in check_output(
split('light-locker-command -q'))


def gnomescreensaver_detect():
try:
import dbus
except ImportError:
return False
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.freedesktop.DBus')
try:
dbus_iface.GetNameOwner('org.gnome.ScreenSaver')
except dbus.DBusException as e:
if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NameHasNoOwner':
return False
else:
raise e
else:
return True


def gnomescreensaver_is_locked():
import dbus
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.gnome.ScreenSaver',
'/org/gnome/ScreenSaver')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.gnome.ScreenSaver')
return bool(dbus_iface.GetActive())


def matescreensaver_detect():
try:
import dbus
except ImportError:
return False
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.freedesktop.DBus')
try:
dbus_iface.GetNameOwner('org.mate.ScreenSaver')
except dbus.DBusException as e:
if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NameHasNoOwner':
return False
else:
raise e
else:
return True


def matescreensaver_is_locked():
import dbus
bus = dbus.SessionBus()
dbus_obj = bus.get_object('org.mate.ScreenSaver', '/org/mate/ScreenSaver')
dbus_iface = dbus.Interface(
dbus_obj, dbus_interface='org.mate.ScreenSaver')
return bool(dbus_iface.GetActive())


def macos_detect():
return sys.platform == 'darwin'


def macos_is_locked():
# Strictly-speaking, this detects whether or not the screensaver is running. The screensaver
# may or may not be locked.
cmd = '''tell application "System Events"
get running of screen saver preferences
end tell'''
screensaver_is_running = check_output(
['osascript', '-e', cmd]) == b'true\n'
if screensaver_is_running:
return True

# The screen may be locked even if the scrensaver is not running. This
# *should* cover that scenario.
# https: // stackoverflow.com/questions/11505255/osx-check-if-the-screen-is-locked
import Quartz
d = Quartz.CGSessionCopyCurrentDictionary()
screen_is_locked = d.get("CGSSessionScreenIsLocked", 0) == 1

return screen_is_locked


def is_locked():
if xscreensaver_detect():
return xscreensaver_is_locked()
if lightlocker_detect():
return lightlocker_is_active()
if gnomescreensaver_detect():
return gnomescreensaver_is_locked()
if matescreensaver_detect():
return matescreensaver_is_locked()
if macos_detect():
return macos_is_locked()
return True
6 changes: 6 additions & 0 deletions ntfy/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from os import environ, ttyname
from subprocess import PIPE, Popen, check_output
from sys import platform, stdout
from screensaver import is_locked


def linux_window_is_focused():
Expand Down Expand Up @@ -36,6 +37,11 @@ def darwin_iterm2_shell_is_focused():


def darwin_terminal_shell_is_focused():
# The osascript for detecting window focus throws an error if the screen is
# locked, so we'll check that first.
if is_locked() == True:
return False

focused_tty = osascript_tell(
'Terminal',
'tty of (first tab of (first window whose frontmost is true) '
Expand Down
11 changes: 11 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_default(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"true" succeeded in 0:00 minutes', 0),
run_cmd(args))

Expand All @@ -39,6 +40,7 @@ def test_emoji(self, mock_Popen):
args.no_emoji = False
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(
(':white_check_mark: "true" succeeded in 0:00 minutes', 0),
run_cmd(args))
Expand Down Expand Up @@ -70,6 +72,7 @@ def test_failure(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"false" failed (code 42) in 0:00 minutes', 42),
run_cmd(args))

Expand All @@ -82,6 +85,7 @@ def test_stdout(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = True
args.stderr = False
Expand All @@ -97,6 +101,7 @@ def test_stderr(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = False
args.stderr = True
Expand All @@ -112,6 +117,7 @@ def test_stdout_and_stderr(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = True
args.stderr = True
Expand All @@ -128,6 +134,7 @@ def test_failure_stdout_and_stderr(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
# not actually used
args.stdout = True
args.stderr = True
Expand All @@ -144,6 +151,7 @@ def test_hide_command(self, mock_Popen):
args.pid = None
args.unfocused_only = False
args.hide_command = True
args.locked_only = False
self.assertEqual(('Your command succeeded in 0:00 minutes', 0),
run_cmd(args))

Expand All @@ -155,6 +163,7 @@ def test_formatter(self):
args.longer_than = -1
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"true" succeeded in 1:05 minutes', 0),
run_cmd(args))

Expand All @@ -166,6 +175,7 @@ def test_formatter_failure(self):
args.longer_than = -1
args.unfocused_only = False
args.hide_command = False
args.locked_only = False
self.assertEqual(('"false" failed (code 1) in 0:10 minutes', 1),
run_cmd(args))

Expand Down Expand Up @@ -199,6 +209,7 @@ def test_watch_pid(self, mock_process):
args = MagicMock()
args.pid = 1
args.unfocused_only = False
args.locked_only = False
self.assertEqual('PID[1]: "cmd" finished in 0:00 minutes',
run_cmd(args)[0])

Expand Down