Skip to content

Commit

Permalink
[Enhancement] Overhaul indexing to be more efficient (#540)
Browse files Browse the repository at this point in the history
* WIP - created methods for breaking on existing media

* WIP - got everything hooked up for POC

* Add some docs, tests

* Refactors

* Updated TODO
  • Loading branch information
kieraneglin authored Jan 2, 2025
1 parent 09d1653 commit 9185f07
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 48 deletions.
10 changes: 5 additions & 5 deletions lib/pinchflat/slow_indexing/media_collection_indexing_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,21 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorker do
case {source.index_frequency_minutes, source.last_indexed_at} do
{index_freq, _} when index_freq > 0 ->
# If the indexing is on a schedule simply run indexing and reschedule
perform_indexing_and_notification(source)
perform_indexing_and_notification(source, was_forced: args["force"])
maybe_enqueue_fast_indexing_task(source)
reschedule_indexing(source)

{_, nil} ->
# If the source has never been indexed, index it once
# even if it's not meant to reschedule
perform_indexing_and_notification(source)
perform_indexing_and_notification(source, was_forced: args["force"])
:ok

_ ->
# If the source HAS been indexed and is not meant to reschedule,
# perform a no-op (unless forced)
if args["force"] do
perform_indexing_and_notification(source)
perform_indexing_and_notification(source, was_forced: true)
end

:ok
Expand All @@ -103,11 +103,11 @@ defmodule Pinchflat.SlowIndexing.MediaCollectionIndexingWorker do
Ecto.StaleEntryError -> Logger.info("#{__MODULE__} discarded: source #{source_id} stale")
end

defp perform_indexing_and_notification(source) do
defp perform_indexing_and_notification(source, indexing_opts) do
apprise_server = Settings.get!(:apprise_server)

SourceNotifications.wrap_new_media_notification(apprise_server, source, fn ->
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source)
SlowIndexingHelpers.index_and_enqueue_download_for_media_items(source, indexing_opts)
end)
end

Expand Down
96 changes: 85 additions & 11 deletions lib/pinchflat/slow_indexing/slow_indexing_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
Many of these methods are made to be kickoff or be consumed by workers.
"""

use Pinchflat.Media.MediaQuery

require Logger

alias Pinchflat.Repo
Expand All @@ -14,6 +16,7 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
alias Pinchflat.Sources.Source
alias Pinchflat.Media.MediaItem
alias Pinchflat.YtDlp.MediaCollection
alias Pinchflat.Utils.FilesystemUtils
alias Pinchflat.Downloading.DownloadingHelpers
alias Pinchflat.SlowIndexing.FileFollowerServer
alias Pinchflat.Downloading.DownloadOptionBuilder
Expand All @@ -22,13 +25,19 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
alias Pinchflat.YtDlp.Media, as: YtDlpMedia

@doc """
Starts tasks for indexing a source's media regardless of the source's indexing
frequency. It's assumed the caller will check for indexing frequency.
Kills old indexing tasks and starts a new task to index the media collection.
The job is delayed based on the source's `index_frequency_minutes` setting unless
one of the following is true:
- The `force` option is set to true
- The source has never been indexed before
- The source has been indexed before, but the last indexing job was more than
`index_frequency_minutes` ago
Returns {:ok, %Task{}}
"""
def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do
job_offset_seconds = calculate_job_offset_seconds(source)
job_offset_seconds = if job_args[:force], do: 0, else: calculate_job_offset_seconds(source)

Tasks.delete_pending_tasks_for(source, "FastIndexingWorker")
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true)
Expand All @@ -52,32 +61,42 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
@doc """
Given a media source, creates (indexes) the media by creating media_items for each
media ID in the source. Afterward, kicks off a download task for each pending media
item belonging to the source. You can't tell me the method name isn't descriptive!
Returns a list of media items or changesets (if the media item couldn't be created).
item belonging to the source. Returns a list of media items or changesets
(if the media item couldn't be created).
Indexing is slow and usually returns a list of all media data at once for record creation.
To help with this, we use a file follower to watch the file that yt-dlp writes to
so we can create media items as they come in. This parallelizes the process and adds
clarity to the user experience. This has a few things to be aware of which are documented
below in the file watcher setup method.
Additionally, in the case of a repeat index we create a download archive file that
contains some media IDs that we've indexed in the past. Note that this archive doesn't
contain the most recent IDs but rather a subset of IDs that are offset by some amount.
Practically, this means that we'll re-index a small handful of media that we've recently
indexed, but this is a good thing since it'll let us pick up on any recent changes to the
most recent media items.
We don't create a download archive for playlists (only channels), nor do we create one if
the indexing was forced by the user.
NOTE: downloads are only enqueued if the source is set to download media. Downloads are
also enqueued for ALL pending media items, not just the ones that were indexed in this
job run. This should ensure that any stragglers are caught if, for some reason, they
weren't enqueued or somehow got de-queued.
Since indexing returns all media data EVERY TIME, we that that opportunity to update
indexing metadata for media items that have already been created.
Available options:
- `was_forced`: Whether the indexing was forced by the user
Returns [%MediaItem{} | %Ecto.Changeset{}]
"""
def index_and_enqueue_download_for_media_items(%Source{} = source) do
def index_and_enqueue_download_for_media_items(%Source{} = source, opts \\ []) do
# The media_profile is needed to determine the quality options to _then_ determine a more
# accurate predicted filepath
source = Repo.preload(source, [:media_profile])
# See the method definition below for more info on how file watchers work
# (important reading if you're not familiar with it)
{:ok, media_attributes} = setup_file_watcher_and_kickoff_indexing(source)
{:ok, media_attributes} = setup_file_watcher_and_kickoff_indexing(source, opts)
# Reload because the source may have been updated during the (long-running) indexing process
# and important settings like `download_media` may have changed.
source = Repo.reload!(source)
Expand Down Expand Up @@ -109,14 +128,16 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
# It attempts a graceful shutdown of the file follower after the indexing is done,
# but the FileFollowerServer will also stop itself if it doesn't see any activity
# for a sufficiently long time.
defp setup_file_watcher_and_kickoff_indexing(source) do
defp setup_file_watcher_and_kickoff_indexing(source, opts) do
was_forced = Keyword.get(opts, :was_forced, false)
{:ok, pid} = FileFollowerServer.start_link()

handler = fn filepath -> setup_file_follower_watcher(pid, filepath, source) end

command_opts =
[output: DownloadOptionBuilder.build_output_path_for(source)] ++
DownloadOptionBuilder.build_quality_options_for(source)
DownloadOptionBuilder.build_quality_options_for(source) ++
build_download_archive_options(source, was_forced)

runner_opts = [file_listener_handler: handler, use_cookies: source.use_cookies]
result = MediaCollection.get_media_attributes_for_collection(source.original_url, command_opts, runner_opts)
Expand Down Expand Up @@ -166,4 +187,57 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do

max(0, index_frequency_seconds - offset_seconds)
end

# The download archive file works in tandem with --break-on-existing to stop
# yt-dlp once we've hit media items we've already indexed. But we generate
# this list with a bit of an offset so we do intentionally re-scan some media
# items to pick up any recent changes (see `get_media_items_for_download_archive`).
#
# From there, we format the media IDs in the way that yt-dlp expects (ie: "<extractor> <media_id>")
# and return the filepath to the caller.
defp create_download_archive_file(source) do
tmpfile = FilesystemUtils.generate_metadata_tmpfile(:txt)

archive_contents =
source
|> get_media_items_for_download_archive()
|> Enum.map_join("\n", fn media_item -> "youtube #{media_item.media_id}" end)

case File.write(tmpfile, archive_contents) do
:ok -> tmpfile
err -> err
end
end

# Sorting by `uploaded_at` is important because we want to re-index the most recent
# media items first but there is no guarantee of any correlation between ID and uploaded_at.
#
# The offset is important because we want to re-index some media items that we've
# recently indexed to pick up on any changes. The limit is because we want this mechanism
# to work even if, for example, the video we were using as a stopping point was deleted.
# It's not a perfect system, but it should do well enough.
#
# The chosen limit and offset are arbitary, independent, and vibes-based. Feel free to
# tweak as-needed
defp get_media_items_for_download_archive(source) do
MediaQuery.new()
|> where(^MediaQuery.for_source(source))
|> order_by(desc: :uploaded_at)
|> limit(50)
|> offset(20)
|> Repo.all()
end

# The download archive isn't useful for playlists (since those are ordered arbitrarily)
# and we don't want to use it if the indexing was forced by the user. In other words,
# only create an archive for channels that are being indexed as part of their regular
# indexing schedule
defp build_download_archive_options(%Source{collection_type: :playlist}, _was_forced), do: []
defp build_download_archive_options(_source, true), do: []

defp build_download_archive_options(source, _was_forced) do
archive_file = create_download_archive_file(source)

[:break_on_existing, download_archive: archive_file]
end
end
14 changes: 13 additions & 1 deletion lib/pinchflat/utils/filesystem_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,20 @@ defmodule Pinchflat.Utils.FilesystemUtils do
Returns binary()
"""
def generate_metadata_tmpfile(type) do
filename = StringUtils.random_string(64)
# This "namespacing" is more to help with development since things get
# weird in my editor when there are thousands of files in a single directory
first_two = String.slice(filename, 0..1)
second_two = String.slice(filename, 2..3)
tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory)
filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.#{type}"])

filepath =
Path.join([
tmpfile_directory,
first_two,
second_two,
"#{filename}.#{type}"
])

:ok = write_p!(filepath, "")

Expand Down
8 changes: 7 additions & 1 deletion lib/pinchflat/yt_dlp/command_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ defmodule Pinchflat.YtDlp.CommandRunner do
formatted_command_opts = [url] ++ CliUtils.parse_options(all_opts)

case CliUtils.wrap_cmd(command, formatted_command_opts, stderr_to_stdout: true) do
{_, 0} ->
# yt-dlp exit codes:
# 0 = Everything is successful
# 100 = yt-dlp must restart for update to complete
# 101 = Download cancelled by --max-downloads etc
# 2 = Error in user-provided options
# 1 = Any other error
{_, status} when status in [0, 101] ->
# IDEA: consider deleting the file after reading it. It's in the tmp dir, so it's not
# a huge deal, but it's still a good idea to clean up after ourselves.
# (even on error? especially on error?)
Expand Down
3 changes: 1 addition & 2 deletions lib/pinchflat/yt_dlp/media_collection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Pinchflat.YtDlp.MediaCollection do
Returns {:ok, [map()]} | {:error, any, ...}.
"""
def get_media_attributes_for_collection(url, command_opts \\ [], addl_opts \\ []) do
runner = Application.get_env(:pinchflat, :yt_dlp_runner)
# `ignore_no_formats_error` is necessary because yt-dlp will error out if
# the first video has not released yet (ie: is a premier). We don't care about
# available formats since we're just getting the media details
Expand All @@ -39,7 +38,7 @@ defmodule Pinchflat.YtDlp.MediaCollection do
file_listener_handler.(output_filepath)
end

case runner.run(url, action, all_command_opts, output_template, runner_opts) do
case backend_runner().run(url, action, all_command_opts, output_template, runner_opts) do
{:ok, output} ->
parsed_lines =
output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<.link
href={~p"/sources/#{@source}/force_index"}
method="post"
data-confirm="Are you sure you want to force an index of this source? This isn't normally needed."
data-confirm="Are you sure you want index all content from this source? This isn't normally needed."
>
Force Index
</.link>
Expand Down
Loading

0 comments on commit 9185f07

Please sign in to comment.