Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 'Split and Stitch' Encoding #257

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@ dmypy.json

# Pyre type checker
.pyre/
src/proxima/settings/user_settings.toml
33 changes: 32 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ notify-py = "^0.3.42"
pydavinci = {git = "https://github.com/in03/pydavinci"}
pydantic = {extras = ["dotenv"], version = "^1.10.4"}
rtoml = "^0.9.0"
python-ffmpeg = "^2.0.2"

[tool.poetry.group.dev.dependencies]
isort = "^5.10.1"
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ prompt-toolkit==3.0.36 ; python_full_version >= "3.10.0" and python_full_version
pydantic==1.10.4 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pydantic[dotenv]==1.10.4 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pydavinci @ git+https://github.com/in03/pydavinci@HEAD ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pyee==9.0.4 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pyfiglet==0.8.post1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pygments==2.14.0 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pymediainfo==5.1.0 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pytest==7.2.1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
python-dotenv==0.21.1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
python-ffmpeg==2.0.2 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
pytz==2022.7.1 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
redis==3.5.3 ; python_full_version >= "3.10.0" and python_full_version < "4.0.0"
requests==2.28.2 ; python_full_version >= "3.10.0" and python_version < "4"
Expand Down
4 changes: 2 additions & 2 deletions src/proxima/celery/ffmpeg/ffmpeg_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def run(self, celery_task_object, logfile=None):
progress_bar.stop()
process.kill()
print("[yellow][KeyboardInterrupt] FFmpeg process killed. Exiting...[/]")
core.app_exit(0)
return

except Exception as e:
progress_bar.stop()
process.kill()
logger.critical(f"[red][Error] {e}\nExiting...[/]")
core.app_exit(1, -1)
return
227 changes: 227 additions & 0 deletions src/proxima/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
import os
import subprocess
from dataclasses import dataclass, fields
from pathlib import Path

from celery import chord, group
from celery.exceptions import Reject
from ffmpeg import FFmpeg, progress
from rich import print
from rich.console import Console

Expand Down Expand Up @@ -79,6 +83,33 @@ def __post_init__(self):
)


@dataclass(frozen=True, init=True)
class SplitTaskJob(TaskJob):
segment_number: int
segment_range_in: int
segment_range_out: int

def __post_init__(self):
# TODO: Custom exceptions for task job validation

if not os.path.exists(self.source.file_path): # SOURCE ACCESSIBLE
raise FileNotFoundError(
f"Provided source file '{self.source.file_path}' does not exist"
)

# if os.path.exists(self.): # NO OVERWRITE
# raise FileExistsError(
# f"File already exists at provided output path {self.output_file_path}"
# )
if self.input_level not in [
"in_range=full",
"in_range=limited",
]: # CHECK VALID VIDEO LEVELS
raise ValueError(
f"Calculated video levels are invalid: '{self.input_level}'"
)


def ffmpeg_video_flip(job: TaskJob):
flip_string = ""
if job.source.h_flip:
Expand Down Expand Up @@ -223,3 +254,199 @@ def encode_proxy(self, job_dict: dict) -> str:
logger.exception(f"[red] :warning: Couldn't encode proxy.[/]\n{e}")

return f"{job.source.file_name} encoded successfully"


@celery_app.task(
bind=True,
acks_late=True,
track_started=True,
prefetch_limit=1,
soft_time_limit=60,
reject_on_worker_lost=True,
queue=celery_queue,
)
def encode_segment(self, job_dict: dict):
"""
Celery task to encode proxy media using parameters in job argument
and user-defined settings
"""

logger.debug(f"[magenta]Received job dict {job_dict}")

project_metadata = class_from_args(ProjectMetadata, job_dict["project"])
source_metadata = class_from_args(SourceMetadata, job_dict["source"])

job = SplitTaskJob(
settings=TaskSettings(**job_dict["settings"]),
project=project_metadata,
source=source_metadata,
output_file_path=job_dict["job"]["output_file_path"],
output_file_name=job_dict["job"]["output_file_name"],
output_directory=job_dict["job"]["output_directory"],
input_level=job_dict["job"]["input_level"],
segment_number=job_dict["job"]["segment_number"],
segment_range_in=job_dict["job"]["segment_range_in"],
segment_range_out=job_dict["job"]["segment_range_out"],
)

print(job)

ps = job.settings.proxy

full_output_path = os.path.join(
job.output_directory,
f"{job.output_file_name}_{job.segment_number}{ps.ext}",
)

# Create proxy output directory
os.makedirs(job.output_directory, exist_ok=True)

# Print new job header ############################################

print("\n")
console.rule("[green]Received segment encode job :clapper:[/]", align="left")
print("\n")

logger.info(
f"[magenta bold]Job: [/]{self.request.id}\n"
f"Input File: '{job.source.file_path}'"
)

###################################################################

# Log job details
logger.info(f"Temp Segment File: '{full_output_path}'")
logger.info(f"Final Output File: '{job.output_file_path}'\n")
logger.info(
f"Source Resolution: {job.source.resolution[0]} x {job.source.resolution[1]}"
)
logger.info(
f"Horizontal Flip: {job.source.h_flip}\n" f"Vertical Flip: {job.source.v_flip}"
)
logger.info(f"Starting Timecode: {job.source.start_tc}")

# Get FFmpeg Command

ffmpeg_command = [
# INPUT
"ffmpeg",
"-y", # Never prompt!
*ps.misc_args,
"-i",
# Segment range
job.source.file_path,
"-ss",
# TODO: When should we convert these to str?
str(job.segment_range_in),
"-to",
str(job.segment_range_out),
# VIDEO
"-c:v",
ps.codec,
"-profile:v",
ps.profile,
"-vsync",
"-1", # Necessary to match VFR
# TODO: Format this better
# It's hard to format this. Every arg behind the -vf flag
# should be separated by a literal comma and NO SPACES to string them together as per ffmpeg syntax.
# Any optional args must provide their own literal commas so as not to leave them stray
# if disabled... Inline functions here are also confusing and "magical".
# But we don't want to run them queuer side, only on final queueables.
# labels: enhancement
# VIDEO FILTERS
"-vf",
f"scale=-2:{ps.vertical_res},"
f"scale={job.input_level}:out_range=limited, "
f"{ffmpeg_video_flip(job)}"
f"format={ps.pix_fmt}"
if ps.pix_fmt
else "",
# AUDIO
"-c:a",
ps.audio_codec,
"-ar",
ps.audio_samplerate,
# TIMECODE
"-timecode",
job.source.start_tc,
# FLAGS
"-movflags",
"+write_colr",
# OUTPUT
full_output_path,
]

print() # Newline
logger.debug(f"[magenta]Running! FFmpeg command:[/]\n{' '.join(ffmpeg_command)}\n")

try:
process = FfmpegProcess(
task_id=self.request.id,
channel_id=self.request.group,
command=[*ffmpeg_command],
ffmpeg_loglevel=ps.ffmpeg_loglevel,
)
except Exception as e:
logger.error(f"[red]Error: {e}\nRejecting task to prevent requeuing.")
raise Reject(e, requeue=False)

# Create logfile
encode_log_dir = job.settings.paths.ffmpeg_logfile_dir
os.makedirs(encode_log_dir, exist_ok=True)
logfile_path = os.path.normpath(
os.path.join(encode_log_dir, job.output_file_name + ".txt")
)
logger.debug(f"[magenta]Encoder logfile path: {logfile_path}[/]")

# Run encode job
logger.info("[yellow]Encoding...[/]")

try:
process.run(self, logfile=logfile_path)

except Exception as e:
logger.exception(f"[red] :warning: Couldn't encode proxy.[/]\n{e}")
return False

return full_output_path


@celery_app.task(
bind=True,
acks_late=True,
track_started=True,
prefetch_limit=1,
soft_time_limit=60,
reject_on_worker_lost=True,
queue=celery_queue,
)
def concat_segments(self, segment_file_paths: list[str]):
logger.debug(f"[magenta]Received segments: '{segment_file_paths}'")

# Sort ascending
segment_file_paths = sorted(segment_file_paths)

# Get output file name
sample = segment_file_paths[0]
directory = os.path.dirname(sample)
name, ext = os.path.splitext(os.path.basename(sample))

prefix, _, _ = name.rpartition("_")
concat_file_path = os.path.join(directory, prefix + ext)

# As string
concat_list = "|".join(segment_file_paths)

logger.info("[yellow]Concatenating...[/]")
subprocess.check_output(
[
"ffmpeg",
*settings.proxy.misc_args,
"-i",
f'"concat:{concat_list}"',
"-c",
"copy",
concat_file_path,
]
)
Loading