From 42352e43e9b0d5f2e4dbafd4588001deb70d07d2 Mon Sep 17 00:00:00 2001 From: in03 Date: Fri, 10 Feb 2023 03:03:21 +0000 Subject: [PATCH 1/6] Create draft PR for #9 From 0b2bd40db6dce173b1bf0ce8d3a72b045f35e2cf Mon Sep 17 00:00:00 2001 From: in03 Date: Tue, 21 Feb 2023 15:07:00 +1000 Subject: [PATCH 2/6] refactor: Merge new settings --- src/proxima/cli/queue.py | 5 ++- src/proxima/settings/manager.py | 3 +- src/proxima/types/batch.py | 73 +++++++++++++++++---------------- 3 files changed, 42 insertions(+), 39 deletions(-) diff --git a/src/proxima/cli/queue.py b/src/proxima/cli/queue.py index 4cfed52..259d131 100644 --- a/src/proxima/cli/queue.py +++ b/src/proxima/cli/queue.py @@ -59,7 +59,7 @@ def main(): # handle healthy media. batch.remove_healthy() - batch.handle_existing_unlinked() + batch.get_existing_unlinked() batch.remove_healthy() batch.handle_offline_proxies() app_status = AppStatus("proxima") @@ -122,4 +122,5 @@ def main(): # TODO: Refactor queue module # This module should be CLI/API agnostic -# Move interactivity to the CLI module, then this queue module can move to 'app' +# Move interactivity to the CLI module, +# then this queue module can move to 'app' diff --git a/src/proxima/settings/manager.py b/src/proxima/settings/manager.py index 03d8944..d6ab2e3 100644 --- a/src/proxima/settings/manager.py +++ b/src/proxima/settings/manager.py @@ -195,8 +195,7 @@ def customise_sources( try: - settings = Settings() - + settings = Settings() # type: ignore except ValidationError as e: print( Panel( diff --git a/src/proxima/types/batch.py b/src/proxima/types/batch.py index 36c41cc..879a064 100644 --- a/src/proxima/types/batch.py +++ b/src/proxima/types/batch.py @@ -4,7 +4,7 @@ from dataclasses import asdict from functools import cached_property -from rich import print +from rich import print, progress from rich.console import Console from rich.panel import Panel from rich.prompt import Confirm, Prompt @@ -37,10 +37,12 @@ def project(self): """ Project name derived from first job in batch. - Property is cached to prevent KeyError if handler removes all jobs. + Property is cached to prevent KeyError + if handler removes all jobs. Returns: - project_name: The name of the Resolve project the job refers to + project_name: The name of the Resolve project + the job refers to """ try: return self.batch[0].project.project_name @@ -53,10 +55,12 @@ def timeline(self): """ Timeline name derived from first job in batch. - Timeline is cached to prevent KeyError if handler removes all jobs. + Timeline is cached to prevent KeyError + if handler removes all jobs. Returns: - timeline_name: The name of the Resolve timeline the job refers to + timeline_name: The name of the Resolve timeline + the job refers to """ try: return self.batch[0].project.timeline_name @@ -161,44 +165,40 @@ def remove_healthy(self): """Remove linked and online source media, i.e. \"healthy\" """ self.batch = [x for x in self.batch if not x.is_linked or x.is_offline] - def handle_existing_unlinked(self): + def get_existing_unlinked(self): """ Prompts to link or re-render existing but unlinked media. """ logger.info("[cyan]Checking for existing, unlinked media...") - existing_unlinked, mismatch_fail, link_success = [], [], [] - if not self.batch: - raise ValueError("No batch to handle!") - - existing_unlinked = [ + self.existing_unlinked = [ x for x in self.batch if not x.is_linked and x.newest_linkable_proxy ] # Exit early if none - if not len(existing_unlinked) > 0: + if not len(self.existing_unlinked) > 0: logger.debug("[magenta]No existing unlinked media detected.") return # 'Online' handled media so the offline handler doesn't catch it for x in self.batch: - if x in existing_unlinked: + if x in self.existing_unlinked: x.is_offline = False # Log with abbreviated file paths - for x in existing_unlinked: + for x in self.existing_unlinked: logger.debug( f"[magenta] * Existing unlinked - '{x.source.file_name}' <-> {(core.shorten_long_path(x.newest_linkable_proxy))}" ) # Prompt user to relink or rerender if not Confirm.ask( - f"\n[yellow][bold]{len(existing_unlinked)} source files have existing but unlinked proxy media.\n" + f"\n[yellow][bold]{len(self.existing_unlinked)} source files have existing but unlinked proxy media.\n" "[/bold]Would you like to link them? If not they will be re-rendered." ): # Mark all as requeued and carry on - self.existing_link_requeued_count = len(existing_unlinked) + self.existing_link_requeued_count = len(self.existing_unlinked) if settings.proxy.overwrite: logger.debug("[magenta] * Existing proxies set to be overwritten") @@ -209,8 +209,8 @@ def handle_existing_unlinked(self): # Handle linking from rich.progress import track - for job in track( - existing_unlinked, description="[cyan]Linking...", transient=True + for job in progress.track( + self.existing_unlinked, description="[cyan]Linking...", transient=True ): if not job.newest_linkable_proxy: continue @@ -218,36 +218,39 @@ def handle_existing_unlinked(self): try: job.link_proxy(job.newest_linkable_proxy) except exceptions.ResolveLinkMismatchError: - mismatch_fail.append(job) + self.mismatch_fail.append(job) logger.error( f"[red]Failed to link '{os.path.basename(job.newest_linkable_proxy)}' - proxy does not match source!" ) else: - link_success.append(job) + self.link_success.append(job) self.batch.remove(job) # Mark any successful links - self.existing_link_success_count = len(link_success) + self.existing_link_success_count = len(self.link_success) # Prompt to requeue any failed links - if mismatch_fail: - if not Confirm.ask( - f"[yellow]{len(mismatch_fail)} existing proxies failed to link.\n" - "They may be corrupt or incomplete. Re-render them?" - ): - # Mark failed links as failed and remove - [self.batch.remove(x) for x in mismatch_fail] - self.existing_link_failed_count = len(mismatch_fail) - return + if not self.mismatch_fail: + return + if not Confirm.ask( + f"[yellow]{len(self.mismatch_fail)} existing proxies failed to link.\n" + "They may be corrupt or incomplete. Re-render them?" + ): + # Mark failed links as failed and remove + [self.batch.remove(x) for x in self.mismatch_fail] + self.existing_link_failed_count = len(self.mismatch_fail) + return - # Mark failed links as requeued, not offline - self.existing_link_requeued_count += len(mismatch_fail) + # Mark failed links as requeued, not offline + self.existing_link_requeued_count += len(self.mismatch_fail) def handle_offline_proxies(self): - """Prompt to rerender proxies that are 'linked' but their media does not exist. + """ + Prompt to rerender 'linked' but offline proxies. - Resolve refers to proxies that are linked but inaccessible as 'offline'. - This prompt can warn users to find that media if it's missing, or rerender if intentionally unavailable. + Resolve refers to linked, inaccessible proxy media as 'offline'. + This prompt warns that media is missing + and facilitiates rerendering if desirable. """ logger.info("[cyan]Checking for offline proxies...") From 466fb39bcad7b49588cd19127918917ab4a0b01a Mon Sep 17 00:00:00 2001 From: in03 Date: Tue, 21 Feb 2023 15:08:07 +1000 Subject: [PATCH 3/6] fix: Link handler not linking --- src/proxima/types/batch.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/proxima/types/batch.py b/src/proxima/types/batch.py index 879a064..ad67a97 100644 --- a/src/proxima/types/batch.py +++ b/src/proxima/types/batch.py @@ -206,8 +206,11 @@ def get_existing_unlinked(self): # also need to test for oplock issues" return - # Handle linking - from rich.progress import track + self.link_existing_unlinked() + + def link_existing_unlinked(self): + self.mismatch_fail = [] + self.link_success = [] for job in progress.track( self.existing_unlinked, description="[cyan]Linking...", transient=True From 1b15cb1a57c6f6a8663a32511a5fd69f2c11e653 Mon Sep 17 00:00:00 2001 From: in03 Date: Wed, 22 Feb 2023 10:30:12 +1000 Subject: [PATCH 4/6] fix(settings): Updates overwrite user_settings.toml --- .gitignore | 1 + src/proxima/settings/__init__.py | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/.gitignore b/.gitignore index 1a2deb4..5d56de8 100644 --- a/.gitignore +++ b/.gitignore @@ -117,3 +117,4 @@ dmypy.json # Pyre type checker .pyre/ +src/proxima/settings/user_settings.toml diff --git a/src/proxima/settings/__init__.py b/src/proxima/settings/__init__.py index 0920b5d..103f732 100644 --- a/src/proxima/settings/__init__.py +++ b/src/proxima/settings/__init__.py @@ -1,6 +1,13 @@ +import os +import shutil from pathlib import Path +from rich import print settings_dir = Path(__file__).parent default_settings_file = str(Path(settings_dir, "default_settings.toml").absolute()) user_settings_file = str(Path(settings_dir, "user_settings.toml")) dotenv_settings_file = str(Path(settings_dir, ".env").absolute()) + +if not os.path.exists(user_settings_file): + print("[magenta][Initialising 'user_settings.toml']") + shutil.copy(default_settings_file, user_settings_file) From f30a0bf4098ad871fbafe5658c295dc847ec97d1 Mon Sep 17 00:00:00 2001 From: in03 Date: Thu, 23 Feb 2023 15:36:54 +1000 Subject: [PATCH 5/6] wip(): Functional segment encoding! Good - We've got segment encoding working. Hurray! - New user configurable settings for split and stitch, segment size. Not Yet Good - Concatenation is broken, only concats first segment. - No output subfolder to keep things tidy - Progress reporting is broken (expected) - No tidy up afterwards - No handling for jobs smaller than segment size - Celery chord is not durable - fails if a segment fails - Existing segments are not handled, just overwritten --- poetry.lock | 33 ++- pyproject.toml | 1 + src/proxima/celery/ffmpeg/ffmpeg_process.py | 4 +- src/proxima/celery/tasks.py | 227 ++++++++++++++++++++ src/proxima/cli/queue.py | 52 ++++- src/proxima/settings/default_settings.toml | 2 + src/proxima/settings/manager.py | 8 + src/proxima/settings/user_settings.toml | 2 + src/proxima/types/batch.py | 87 ++++++-- src/proxima/types/job.py | 7 +- 10 files changed, 396 insertions(+), 27 deletions(-) diff --git a/poetry.lock b/poetry.lock index d5114bd..9307de7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1443,6 +1443,21 @@ url = "https://github.com/in03/pydavinci" reference = "HEAD" resolved_reference = "59fad01d302f3628bfe9979a90da59196216e2b4" +[[package]] +name = "pyee" +version = "9.0.4" +description = "A port of node.js's EventEmitter to python." +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "pyee-9.0.4-py2.py3-none-any.whl", hash = "sha256:9f066570130c554e9cc12de5a9d86f57c7ee47fece163bbdaa3e9c933cfbdfa5"}, + {file = "pyee-9.0.4.tar.gz", hash = "sha256:2770c4928abc721f46b705e6a72b0c59480c4a69c9a83ca0b00bb994f1ea4b32"}, +] + +[package.dependencies] +typing-extensions = "*" + [[package]] name = "pyfiglet" version = "0.8.post1" @@ -1565,6 +1580,22 @@ files = [ [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "python-ffmpeg" +version = "2.0.2" +description = "A python binding for FFmpeg which provides sync and async APIs" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "python-ffmpeg-2.0.2.tar.gz", hash = "sha256:56929580b3a2f4b55cd6f1a6928e9e689d525e0198b5194185c144b58e2de06a"}, + {file = "python_ffmpeg-2.0.2-py3-none-any.whl", hash = "sha256:c7528eba0d8dfcb511bab08946fc14a90bd0fa41c456b4621768392295d45ba2"}, +] + +[package.dependencies] +pyee = "*" +typing-extensions = "*" + [[package]] name = "python-gitlab" version = "3.13.0" @@ -2391,4 +2422,4 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools" [metadata] lock-version = "2.0" python-versions = ">=3.10.0,<4.0.0" -content-hash = "57f293f2ed27706e83b5259ccf29e5932592e46579a01de26151f50293db5dfe" +content-hash = "d7e04778ca8a1d37ec654e6b44d2fd1dbab567f405e8e322432c2fd38d78f386" diff --git a/pyproject.toml b/pyproject.toml index 6ffd1ed..586396b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ notify-py = "^0.3.42" pydavinci = {git = "https://github.com/in03/pydavinci"} pydantic = {extras = ["dotenv"], version = "^1.10.4"} rtoml = "^0.9.0" +python-ffmpeg = "^2.0.2" [tool.poetry.group.dev.dependencies] isort = "^5.10.1" diff --git a/src/proxima/celery/ffmpeg/ffmpeg_process.py b/src/proxima/celery/ffmpeg/ffmpeg_process.py index c701dd6..6cb6890 100644 --- a/src/proxima/celery/ffmpeg/ffmpeg_process.py +++ b/src/proxima/celery/ffmpeg/ffmpeg_process.py @@ -148,10 +148,10 @@ def run(self, celery_task_object, logfile=None): progress_bar.stop() process.kill() print("[yellow][KeyboardInterrupt] FFmpeg process killed. Exiting...[/]") - core.app_exit(0) + return except Exception as e: progress_bar.stop() process.kill() logger.critical(f"[red][Error] {e}\nExiting...[/]") - core.app_exit(1, -1) + return diff --git a/src/proxima/celery/tasks.py b/src/proxima/celery/tasks.py index bb2e3f5..623d64d 100644 --- a/src/proxima/celery/tasks.py +++ b/src/proxima/celery/tasks.py @@ -1,8 +1,12 @@ import logging import os +import subprocess from dataclasses import dataclass, fields +from pathlib import Path +from celery import chord, group from celery.exceptions import Reject +from ffmpeg import FFmpeg, progress from rich import print from rich.console import Console @@ -79,6 +83,33 @@ def __post_init__(self): ) +@dataclass(frozen=True, init=True) +class SplitTaskJob(TaskJob): + segment_number: int + segment_range_in: int + segment_range_out: int + + def __post_init__(self): + # TODO: Custom exceptions for task job validation + + if not os.path.exists(self.source.file_path): # SOURCE ACCESSIBLE + raise FileNotFoundError( + f"Provided source file '{self.source.file_path}' does not exist" + ) + + # if os.path.exists(self.): # NO OVERWRITE + # raise FileExistsError( + # f"File already exists at provided output path {self.output_file_path}" + # ) + if self.input_level not in [ + "in_range=full", + "in_range=limited", + ]: # CHECK VALID VIDEO LEVELS + raise ValueError( + f"Calculated video levels are invalid: '{self.input_level}'" + ) + + def ffmpeg_video_flip(job: TaskJob): flip_string = "" if job.source.h_flip: @@ -223,3 +254,199 @@ def encode_proxy(self, job_dict: dict) -> str: logger.exception(f"[red] :warning: Couldn't encode proxy.[/]\n{e}") return f"{job.source.file_name} encoded successfully" + + +@celery_app.task( + bind=True, + acks_late=True, + track_started=True, + prefetch_limit=1, + soft_time_limit=60, + reject_on_worker_lost=True, + queue=celery_queue, +) +def encode_segment(self, job_dict: dict): + """ + Celery task to encode proxy media using parameters in job argument + and user-defined settings + """ + + logger.debug(f"[magenta]Received job dict {job_dict}") + + project_metadata = class_from_args(ProjectMetadata, job_dict["project"]) + source_metadata = class_from_args(SourceMetadata, job_dict["source"]) + + job = SplitTaskJob( + settings=TaskSettings(**job_dict["settings"]), + project=project_metadata, + source=source_metadata, + output_file_path=job_dict["job"]["output_file_path"], + output_file_name=job_dict["job"]["output_file_name"], + output_directory=job_dict["job"]["output_directory"], + input_level=job_dict["job"]["input_level"], + segment_number=job_dict["job"]["segment_number"], + segment_range_in=job_dict["job"]["segment_range_in"], + segment_range_out=job_dict["job"]["segment_range_out"], + ) + + print(job) + + ps = job.settings.proxy + + full_output_path = os.path.join( + job.output_directory, + f"{job.output_file_name}_{job.segment_number}{ps.ext}", + ) + + # Create proxy output directory + os.makedirs(job.output_directory, exist_ok=True) + + # Print new job header ############################################ + + print("\n") + console.rule("[green]Received segment encode job :clapper:[/]", align="left") + print("\n") + + logger.info( + f"[magenta bold]Job: [/]{self.request.id}\n" + f"Input File: '{job.source.file_path}'" + ) + + ################################################################### + + # Log job details + logger.info(f"Temp Segment File: '{full_output_path}'") + logger.info(f"Final Output File: '{job.output_file_path}'\n") + logger.info( + f"Source Resolution: {job.source.resolution[0]} x {job.source.resolution[1]}" + ) + logger.info( + f"Horizontal Flip: {job.source.h_flip}\n" f"Vertical Flip: {job.source.v_flip}" + ) + logger.info(f"Starting Timecode: {job.source.start_tc}") + + # Get FFmpeg Command + + ffmpeg_command = [ + # INPUT + "ffmpeg", + "-y", # Never prompt! + *ps.misc_args, + "-i", + # Segment range + job.source.file_path, + "-ss", + # TODO: When should we convert these to str? + str(job.segment_range_in), + "-to", + str(job.segment_range_out), + # VIDEO + "-c:v", + ps.codec, + "-profile:v", + ps.profile, + "-vsync", + "-1", # Necessary to match VFR + # TODO: Format this better + # It's hard to format this. Every arg behind the -vf flag + # should be separated by a literal comma and NO SPACES to string them together as per ffmpeg syntax. + # Any optional args must provide their own literal commas so as not to leave them stray + # if disabled... Inline functions here are also confusing and "magical". + # But we don't want to run them queuer side, only on final queueables. + # labels: enhancement + # VIDEO FILTERS + "-vf", + f"scale=-2:{ps.vertical_res}," + f"scale={job.input_level}:out_range=limited, " + f"{ffmpeg_video_flip(job)}" + f"format={ps.pix_fmt}" + if ps.pix_fmt + else "", + # AUDIO + "-c:a", + ps.audio_codec, + "-ar", + ps.audio_samplerate, + # TIMECODE + "-timecode", + job.source.start_tc, + # FLAGS + "-movflags", + "+write_colr", + # OUTPUT + full_output_path, + ] + + print() # Newline + logger.debug(f"[magenta]Running! FFmpeg command:[/]\n{' '.join(ffmpeg_command)}\n") + + try: + process = FfmpegProcess( + task_id=self.request.id, + channel_id=self.request.group, + command=[*ffmpeg_command], + ffmpeg_loglevel=ps.ffmpeg_loglevel, + ) + except Exception as e: + logger.error(f"[red]Error: {e}\nRejecting task to prevent requeuing.") + raise Reject(e, requeue=False) + + # Create logfile + encode_log_dir = job.settings.paths.ffmpeg_logfile_dir + os.makedirs(encode_log_dir, exist_ok=True) + logfile_path = os.path.normpath( + os.path.join(encode_log_dir, job.output_file_name + ".txt") + ) + logger.debug(f"[magenta]Encoder logfile path: {logfile_path}[/]") + + # Run encode job + logger.info("[yellow]Encoding...[/]") + + try: + process.run(self, logfile=logfile_path) + + except Exception as e: + logger.exception(f"[red] :warning: Couldn't encode proxy.[/]\n{e}") + return False + + return full_output_path + + +@celery_app.task( + bind=True, + acks_late=True, + track_started=True, + prefetch_limit=1, + soft_time_limit=60, + reject_on_worker_lost=True, + queue=celery_queue, +) +def concat_segments(self, segment_file_paths: list[str]): + logger.debug(f"[magenta]Received segments: '{segment_file_paths}'") + + # Sort ascending + segment_file_paths = sorted(segment_file_paths) + + # Get output file name + sample = segment_file_paths[0] + directory = os.path.dirname(sample) + name, ext = os.path.splitext(os.path.basename(sample)) + + prefix, _, _ = name.rpartition("_") + concat_file_path = os.path.join(directory, prefix + ext) + + # As string + concat_list = "|".join(segment_file_paths) + + logger.info("[yellow]Concatenating...[/]") + subprocess.check_output( + [ + "ffmpeg", + *settings.proxy.misc_args, + "-i", + f'"concat:{concat_list}"', + "-c", + "copy", + concat_file_path, + ] + ) diff --git a/src/proxima/cli/queue.py b/src/proxima/cli/queue.py index 259d131..1b380c8 100644 --- a/src/proxima/cli/queue.py +++ b/src/proxima/cli/queue.py @@ -1,6 +1,6 @@ import logging -from celery import group +from celery import chord, group from pydavinci import davinci from pydavinci.exceptions import TimelineNotFound from rich import print @@ -9,8 +9,9 @@ from proxima import ProxyLinker, core, shared from proxima.app import resolve from proxima.app.checks import AppStatus -from proxima.celery.tasks import encode_proxy +from proxima.celery.tasks import concat_segments, encode_proxy, encode_segment from proxima.settings.manager import settings +from proxima.types.job import Job core.install_rich_tracebacks() @@ -18,7 +19,7 @@ logger.setLevel(settings.app.loglevel) -def queue_batch(batch: list): +def queue_standard_batch(batch: list[dict]): """Block until all queued tasks finish, notify results.""" logger.info("[cyan]Queuing batch...") @@ -40,6 +41,41 @@ def queue_batch(batch: list): return final_results +def queue_split_batch(batch: list[dict]): + """Block until all queued tasks finish, notify results.""" + + logger.info("[cyan]Queuing split batch...") + + unique_job_file_names = {job_dict["job"]["output_file_path"] for job_dict in batch} + + nested_jobs = [] + for job_dict in batch: + current_sort = [] + + for unique_file_path in unique_job_file_names: + if job_dict["job"]["output_file_path"] == unique_file_path: + current_sort.append(job_dict) + + nested_jobs.append(group(current_sort)) + + # Wrap task objects in Celery task function + callable_tasks = chord((encode_segment.s(x) for x in batch), concat_segments.s()) + + # Create task group to retrieve job results as batch + task_group = group(callable_tasks) + print(task_group) + + progress = shared.ProgressTracker() + + # Queue job + results = task_group.apply_async(expires=settings.broker.job_expires) + logger.debug(f"[magenta] * Queued batch with ID {results}[/]") + + # report progress is blocking! + final_results = progress.report_progress(results) + return final_results + + def main(): """Main function""" @@ -62,6 +98,7 @@ def main(): batch.get_existing_unlinked() batch.remove_healthy() batch.handle_offline_proxies() + app_status = AppStatus("proxima") print( @@ -88,7 +125,12 @@ def main(): core.notify(f"Started encoding job '{r_.project.name} - {r_.active_timeline.name}'") # Queue tasks to workers and track task progress - results = queue_batch(batch.hashable) + if settings.proxy.split_and_stitch_encoding: + batch.split_jobs() + results = queue_split_batch(batch.hashable) + + else: + results = queue_standard_batch(batch.hashable) if results.failed(): fail_message = "Some videos failed to encode!" @@ -104,7 +146,7 @@ def main(): _ = results.join() # Must always call join, or results don't expire - proxy_linker = ProxyLinker(batch.batch) + proxy_linker = ProxyLinker(batch.job_list) try: proxy_linker.batch_link() diff --git a/src/proxima/settings/default_settings.toml b/src/proxima/settings/default_settings.toml index 10fe8d0..effc4f5 100644 --- a/src/proxima/settings/default_settings.toml +++ b/src/proxima/settings/default_settings.toml @@ -24,6 +24,8 @@ misc_args = [ "-hide_banner", "-stats" ] ext = ".mov" overwrite = true + split_and_stitch_encoding = true + segment_duration = 30 # in seconds [filters] # Remove elements from lists to disable filter diff --git a/src/proxima/settings/manager.py b/src/proxima/settings/manager.py index d6ab2e3..07f7be0 100644 --- a/src/proxima/settings/manager.py +++ b/src/proxima/settings/manager.py @@ -106,6 +106,14 @@ class Proxy(BaseModel): ..., description="Whether or not to overwrite any existing proxy files on collision", ) + split_and_stitch_encoding: bool = Field( + ..., + description="Whether or not to use 'split and stitch' style encoding for better parallelisation", + ) + segment_duration: int = Field( + ..., + description="Duration between each split", + ) class Filters(BaseModel): diff --git a/src/proxima/settings/user_settings.toml b/src/proxima/settings/user_settings.toml index 4cf38a2..088e9c1 100644 --- a/src/proxima/settings/user_settings.toml +++ b/src/proxima/settings/user_settings.toml @@ -24,6 +24,8 @@ misc_args = [ "-hide_banner", "-stats" ] ext = ".mov" overwrite = true + split_and_stitch_encoding = true + segment_duration = 30 # in seconds [filters] # Remove elements from lists to disable filter diff --git a/src/proxima/types/batch.py b/src/proxima/types/batch.py index ad67a97..e75c348 100644 --- a/src/proxima/types/batch.py +++ b/src/proxima/types/batch.py @@ -1,6 +1,7 @@ import json import logging import os +from copy import deepcopy from dataclasses import asdict from functools import cached_property @@ -26,7 +27,7 @@ def __init__(self, batch: list[Job]): self.existing_link_success_count = 0 self.existing_link_failed_count = 0 self.existing_link_requeued_count = 0 - self.batch = batch + self.job_list = batch # instantiate cached properties self.project @@ -45,7 +46,7 @@ def project(self): the job refers to """ try: - return self.batch[0].project.project_name + return self.job_list[0].project.project_name except (KeyError, AttributeError) as e: logger.error(f"[red]Can't derive project from batch:\n{e}") return None @@ -63,7 +64,7 @@ def timeline(self): the job refers to """ try: - return self.batch[0].project.timeline_name + return self.job_list[0].project.timeline_name except (KeyError, AttributeError) as e: logger.error(f"[red]Can't derive project from batch:\n{e}") return None @@ -94,7 +95,7 @@ def batch_info(self) -> str: f"[cyan]{self.project} | {self.timeline}[/]\n" f"[green]Linked {els} | [yellow]Requeued {elr} | [red]Failed {elf}\n" f"{settings.proxy.nickname} | {overwrite_warning}\n" - f"\n[bold][white]Total queueable now:[/bold] {len(self.batch)}\n" + f"\n[bold][white]Total queueable now:[/bold] {len(self.job_list)}\n" ) @property @@ -139,7 +140,7 @@ def as_dict(dataclass_): ) data = [] - for x in self.batch: + for x in self.job_list: job_attributes = { "output_file_path": x.output_file_path, "output_file_name": x.output_file_name, @@ -148,6 +149,9 @@ def as_dict(dataclass_): "is_offline": x.is_offline, "newest_linkable_proxy": x.newest_linkable_proxy, "input_level": x.input_level, + "segment_number": x.segment_number, + "segment_range_in": x.segment_range_in, + "segment_range_out": x.segment_range_out, } data.append( @@ -163,7 +167,7 @@ def as_dict(dataclass_): def remove_healthy(self): """Remove linked and online source media, i.e. \"healthy\" """ - self.batch = [x for x in self.batch if not x.is_linked or x.is_offline] + self.job_list = [x for x in self.job_list if not x.is_linked or x.is_offline] def get_existing_unlinked(self): """ @@ -173,7 +177,7 @@ def get_existing_unlinked(self): logger.info("[cyan]Checking for existing, unlinked media...") self.existing_unlinked = [ - x for x in self.batch if not x.is_linked and x.newest_linkable_proxy + x for x in self.job_list if not x.is_linked and x.newest_linkable_proxy ] # Exit early if none @@ -182,7 +186,7 @@ def get_existing_unlinked(self): return # 'Online' handled media so the offline handler doesn't catch it - for x in self.batch: + for x in self.job_list: if x in self.existing_unlinked: x.is_offline = False @@ -227,7 +231,7 @@ def link_existing_unlinked(self): ) else: self.link_success.append(job) - self.batch.remove(job) + self.job_list.remove(job) # Mark any successful links self.existing_link_success_count = len(self.link_success) @@ -240,7 +244,7 @@ def link_existing_unlinked(self): "They may be corrupt or incomplete. Re-render them?" ): # Mark failed links as failed and remove - [self.batch.remove(x) for x in self.mismatch_fail] + [self.job_list.remove(x) for x in self.mismatch_fail] self.existing_link_failed_count = len(self.mismatch_fail) return @@ -260,8 +264,8 @@ def handle_offline_proxies(self): offline_proxies = [] - if self.batch: - offline_proxies = [x for x in self.batch if x.is_offline] + if self.job_list: + offline_proxies = [x for x in self.job_list if x.is_offline] if len(offline_proxies) > 0: logger.warning(f"[yellow]Offline proxies: {len(offline_proxies)}[/]") @@ -279,11 +283,11 @@ def handle_offline_proxies(self): print() if choice == "rerender": - self.batch = self.batch + self.job_list = self.job_list return if choice == "skip": - return [x for x in self.batch if not x.is_offline] + return [x for x in self.job_list if not x.is_offline] new_jobs = [] for offline_proxy in offline_proxies: @@ -300,12 +304,12 @@ def handle_offline_proxies(self): else: print(f"[yellow]Skipping '{offline_proxy.source.file_name}'...") - self.batch.remove(offline_proxy) + self.job_list.remove(offline_proxy) print() self.action_taken = True - self.batch = new_jobs + self.job_list = new_jobs def prompt_queue(self): """ @@ -313,10 +317,10 @@ def prompt_queue(self): """ logger.debug( - f"[magenta]Final queueable:[/]\n{[x.source.file_name for x in self.batch]}\n" + f"[magenta]Final queueable:[/]\n{[x.source.file_name for x in self.job_list]}\n" ) - if not self.batch: + if not self.job_list: if not self.action_taken: print( "[green]No new media to link.[/]\n" @@ -333,3 +337,50 @@ def prompt_queue(self): return False return True + + def split_jobs(self): + segmented_job_list: list[Job] = [] + for job in self.job_list: + seg_dur = settings.proxy.segment_duration + dur_secs = int(job.source.frames / job.source.fps) + + logger.debug( + f"[cyan]Splitting job '{job.output_file_name}' into segments..." + ) + logger.debug(f"[magenta] * Duration in seconds: {dur_secs}") + + remainder_len = int(dur_secs % seg_dur) + seg_count = int(dur_secs // seg_dur + int(bool(remainder_len))) + + pointer = 0 + + # Create duplicate jobs, set 'segment' attribute + for i in range(seg_count): + # Prevent changing all instances + job_copy = deepcopy(job) + # Add segment number + logger.debug(f"[magenta] * New seg: {i + 1}") + job_copy.segment_number = i + 1 + + # Increment each seg start + job_copy.segment_range_in = pointer + pointer += seg_dur + + # Partial segment is remainder + if i == seg_count: + job_copy.segment_range_out = remainder_len + + # Full segment end is seg_dur + else: + job_copy.segment_range_out = pointer + + segmented_job_list.append(job_copy) + + logger.debug("[magenta]Final segments") + [ + logger.debug( + f"[magenta] * Num: {x.segment_number}, In: {x.segment_range_in}, Out: {x.segment_range_out}" + ) + for x in segmented_job_list + ] + self.job_list = segmented_job_list diff --git a/src/proxima/types/job.py b/src/proxima/types/job.py index 63a7265..436279e 100644 --- a/src/proxima/types/job.py +++ b/src/proxima/types/job.py @@ -3,8 +3,9 @@ import pathlib import re from dataclasses import dataclass -from functools import cached_property +from functools import cached_property, lru_cache from glob import glob +from math import floor from proxima.app import core, exceptions from proxima.celery import ffmpeg @@ -60,6 +61,10 @@ def __init__( self.proxy_offline_status: bool = ( True if self.source.proxy_status == "Offline" else False ) + # Segments + self.segment_number: int | None = None + self.segment_range_in: int | None = None + self.segment_range_out: int | None = None def __repr__(self): status = "linked" if self.is_linked and not self.is_offline else "unlinked" From dc2749422809bce03f4c5b1a758c565f9d5e7233 Mon Sep 17 00:00:00 2001 From: in03 Date: Thu, 23 Feb 2023 05:45:31 +0000 Subject: [PATCH 6/6] build: Export updated requirements.txt --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 7b6bb62..8e92e60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,11 +29,13 @@ prompt-toolkit==3.0.36 ; python_full_version >= "3.10.0" and python_full_version pydantic==1.10.4 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pydantic[dotenv]==1.10.4 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pydavinci @ git+https://github.com/in03/pydavinci@HEAD ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" +pyee==9.0.4 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pyfiglet==0.8.post1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pygments==2.14.0 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pymediainfo==5.1.0 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pytest==7.2.1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" python-dotenv==0.21.1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" +python-ffmpeg==2.0.2 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" pytz==2022.7.1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" redis==3.5.3 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0" requests==2.28.2 ; python_full_version >= "3.10.0" and python_version < "4"