-
-
Notifications
You must be signed in to change notification settings - Fork 117
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #20 from dmunozv04/Refactoring
Refactoring
- Loading branch information
Showing
12 changed files
with
333 additions
and
190 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import argparse | ||
from . import config_setup | ||
from . import main | ||
from . import macos_install | ||
import json | ||
import os | ||
import logging | ||
|
||
def load_config(config_file): | ||
try: | ||
with open(config_file) as f: | ||
config = json.load(f) | ||
except: | ||
if os.getenv('iSPBTV_docker'): | ||
print("You are running in docker, you have to mount the config file.\nPlease check the README.md for more information.") | ||
else: | ||
config = {} #Create blank config to setup | ||
return config | ||
|
||
|
||
def app_start(): | ||
parser = argparse.ArgumentParser(description='iSponsorblockTV') | ||
parser.add_argument('--file', '-f', default='config.json', help='config file') | ||
parser.add_argument('--setup', '-s', action='store_true', help='setup the program') | ||
parser.add_argument('--debug', '-d', action='store_true', help='debug mode') | ||
parser.add_argument('--macos_install', action='store_true', help='install in macOS') | ||
args = parser.parse_args() | ||
|
||
config = load_config(args.file) | ||
if args.debug: | ||
logging.basicConfig(level=logging.DEBUG) | ||
if args.setup: #Setup the config file | ||
config_setup.main(config, args.file, args.debug) | ||
if args.macos_install: | ||
macos_install.main() | ||
|
||
else: | ||
try: #Check if config file has the correct structure | ||
config["atvs"], config["apikey"], config["skip_categories"] | ||
except: #If not, ask to setup the program | ||
print("invalid config file, please run with --setup") | ||
os.exit() | ||
main.main(config["atvs"], config["apikey"], config["skip_categories"], args.debug) | ||
|
||
if __name__ == "__main__": | ||
app_start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import plistlib | ||
import os | ||
from . import config_setup | ||
|
||
default_plist = {"Label": "com.dmunozv04iSponsorBlockTV", | ||
"RunAtLoad": True, | ||
"StartInterval": 20, | ||
"EnvironmentVariables": {"PYTHONUNBUFFERED": "YES"}, | ||
"StandardErrorPath": "", #Fill later | ||
"StandardOutPath": "", | ||
"ProgramArguments" : "", | ||
"WorkingDirectory": "" | ||
} | ||
def create_plist(path): | ||
plist = default_plist | ||
plist["ProgramArguments"] = [path + "/iSponsorBlockTV-macos"] | ||
plist["StandardErrorPath"] = path + "/iSponsorBlockTV.error.log" | ||
plist["StandardOutPath"] = path + "/iSponsorBlockTV.out.log" | ||
plist["WorkingDirectory"] = path | ||
launchd_path = os.path.expanduser("~/Library/LaunchAgents/") | ||
path_to_save = launchd_path + "com.dmunozv04.iSponsorBlockTV.plist" | ||
|
||
with open(path_to_save, 'wb') as fp: | ||
plistlib.dump(plist, fp) | ||
|
||
def run_setup(file): | ||
config = {} | ||
config_setup.main(config, file, debug=False) | ||
|
||
def main(): | ||
correct_path = os.path.expanduser("~/iSponsorBlockTV") | ||
if os.path.isfile(correct_path + "/iSponsorBlockTV-macos"): | ||
print("Program is on the right path") | ||
print("The launch daemon will now be installed") | ||
create_plist(correct_path) | ||
run_setup(correct_path + "/config.json") | ||
print("Launch daemon installed. Please restart the computer to enable it or use:\n launchctl load ~/Library/LaunchAgents/com.dmunozv04.iSponsorBlockTV.plist") | ||
else: | ||
if not os.path.exists(correct_path): | ||
os.makedirs(correct_path) | ||
print("Please move the program to the correct path: " + correct_path + "opeing now on finder...") | ||
os.system("open -R " + correct_path) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
import asyncio | ||
import pyatv | ||
import aiohttp | ||
from cache import AsyncTTL | ||
import time | ||
import logging | ||
|
||
def listToTuple(function): | ||
def wrapper(*args): | ||
args = [tuple(x) if type(x) == list else x for x in args] | ||
result = function(*args) | ||
result = tuple(result) if type(result) == list else result | ||
return result | ||
return wrapper | ||
|
||
class MyPushListener(pyatv.interface.PushListener): | ||
task = None | ||
apikey = None | ||
rc = None | ||
|
||
web_session = None | ||
categories = ["sponsor"] | ||
|
||
def __init__(self, apikey, atv, categories, web_session): | ||
self.apikey = apikey | ||
self.rc = atv.remote_control | ||
self.web_session = web_session | ||
self.categories = categories | ||
self.atv = atv | ||
|
||
|
||
def playstatus_update(self, updater, playstatus): | ||
logging.debug("Playstatus update" + str(playstatus)) | ||
try: | ||
self.task.cancel() | ||
except: | ||
pass | ||
time_start = time.time() | ||
self.task = asyncio.create_task(process_playstatus(playstatus, self.apikey, self.rc, self.web_session, self.categories, self.atv, time_start)) | ||
def playstatus_error(self, updater, exception): | ||
logging.error(exception) | ||
print("stopped") | ||
|
||
|
||
|
||
async def process_playstatus(playstatus, apikey, rc, web_session, categories, atv, time_start): | ||
logging.debug("App playing is:" + str(atv.metadata.app.identifier)) | ||
if playstatus.device_state == playstatus.device_state.Playing and atv.metadata.app.identifier == "com.google.ios.youtube": | ||
vid_id = await get_vid_id(playstatus.title, playstatus.artist, apikey, web_session) | ||
print(vid_id) | ||
segments, duration = await get_segments(vid_id, web_session, categories) | ||
print(segments) | ||
await time_to_segment(segments, playstatus.position, rc, time_start) | ||
|
||
|
||
@AsyncTTL(time_to_live=300, maxsize=5) | ||
async def get_vid_id(title, artist, api_key, web_session): | ||
url = f"https://youtube.googleapis.com/youtube/v3/search?q={title} - {artist}&key={api_key}&maxResults=1" | ||
async with web_session.get(url) as response: | ||
response = await response.json() | ||
vid_id = response["items"][0]["id"]["videoId"] | ||
return vid_id | ||
|
||
@listToTuple | ||
@AsyncTTL(time_to_live=300, maxsize=5) | ||
async def get_segments(vid_id, web_session, categories = ["sponsor"]): | ||
params = {"videoID": vid_id, | ||
"category": categories, | ||
"actionType": "skip", | ||
"service": "youtube"} | ||
headers = {'Accept': 'application/json'} | ||
url = "https://sponsor.ajay.app/api/skipSegments" | ||
async with web_session.get(url, headers = headers, params = params) as response: | ||
response = await response.json() | ||
segments = [] | ||
try: | ||
duration = response[0]["videoDuration"] | ||
for i in response: | ||
segment = i["segment"] | ||
try: | ||
#Get segment before to check if they are too close to each other | ||
segment_before_end = segments[-1][1] | ||
segment_before_start = segments[-1][0] | ||
|
||
except: | ||
segment_before_end = -10 | ||
if segment[0] - segment_before_end < 1: #Less than 1 second appart, combine them and skip them together | ||
segment = [segment_before_start, segment[1]] | ||
segments.pop() | ||
segments.append(segment) | ||
except: | ||
duration = 0 | ||
return segments, duration | ||
|
||
|
||
async def time_to_segment(segments, position, rc, time_start): | ||
position = position + (time.time() - time_start) | ||
for segment in segments: | ||
if position < 2 and (position >= segment[0] and position < segment[1]): | ||
next_segment = [position, segment[1]] | ||
break | ||
if segment[0] > position: | ||
next_segment = segment | ||
break | ||
time_to_next = next_segment[0] - position | ||
await skip(time_to_next, next_segment[1], rc) | ||
|
||
async def skip(time_to, position, rc): | ||
await asyncio.sleep(time_to) | ||
await rc.set_position(position) | ||
|
||
|
||
async def connect_atv(loop, identifier, airplay_credentials): | ||
"""Find a device and print what is playing.""" | ||
print("Discovering devices on network...") | ||
atvs = await pyatv.scan(loop, identifier = identifier) | ||
|
||
if not atvs: | ||
print("No device found, will retry") | ||
return | ||
|
||
config = atvs[0] | ||
config.set_credentials(pyatv.Protocol.AirPlay, airplay_credentials) | ||
|
||
print(f"Connecting to {config.address}") | ||
return await pyatv.connect(config, loop) | ||
|
||
|
||
async def loop_atv(event_loop, atv_config, apikey, categories, web_session): | ||
identifier = atv_config["identifier"] | ||
airplay_credentials = atv_config["airplay_credentials"] | ||
atv = await connect_atv(event_loop, identifier, airplay_credentials) | ||
if atv: | ||
listener = MyPushListener(apikey, atv, categories, web_session) | ||
|
||
atv.push_updater.listener = listener | ||
atv.push_updater.start() | ||
print("Push updater started") | ||
while True: | ||
await asyncio.sleep(20) | ||
try: | ||
atv.metadata.app | ||
except: | ||
print("Reconnecting to Apple TV") | ||
#reconnect to apple tv | ||
atv = await connect_atv(event_loop, identifier, airplay_credentials) | ||
if atv: | ||
listener = MyPushListener(apikey, atv, categories, web_session) | ||
|
||
atv.push_updater.listener = listener | ||
atv.push_updater.start() | ||
print("Push updater started") | ||
|
||
|
||
|
||
|
||
|
||
|
||
def main(atv_configs, apikey, categories, debug): | ||
loop = asyncio.get_event_loop_policy().get_event_loop() | ||
if debug: | ||
loop.set_debug(True) | ||
asyncio.set_event_loop(loop) | ||
web_session = aiohttp.ClientSession() | ||
for i in atv_configs: | ||
loop.create_task(loop_atv(loop, i, apikey, categories, web_session)) | ||
loop.run_forever() | ||
|
||
|
||
if __name__ == "__main__": | ||
print("starting") | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from iSponsorBlockTV import helpers | ||
import sys | ||
import os | ||
|
||
if getattr(sys, 'frozen', False): | ||
os.environ['SSL_CERT_FILE'] = os.path.join(sys._MEIPASS, 'lib', 'cert.pem') | ||
helpers.app_start() |
Oops, something went wrong.