Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generate,translate & emded subtitle to all files in a directory. #1557

Open
leefm0204 opened this issue Nov 20, 2024 · 0 comments
Open

generate,translate & emded subtitle to all files in a directory. #1557

leefm0204 opened this issue Nov 20, 2024 · 0 comments

Comments

@leefm0204
Copy link

leefm0204 commented Nov 20, 2024

a bash script that will continuously generate subtitle for all files in a specified directory. and then option:

  1. translate generated srt using translate-shell (sudo apt install translate-shell) ,
  2. embed srt

example of command:
./gensub /folder/path/of/video --translate --source ja --target zh-CN --embed

below is my modified generate_subtitles.py that using sensevoice where will show progress bar and remaining time.

#!/usr/bin/env python3
#
# Copyright (c)  2023  Xiaomi Corporation

"""
This file demonstrates how to use sherpa-onnx Python APIs to generate
subtitles.

Supported file formats are those supported by ffmpeg; for instance,
*.mov, *.mp4, *.wav, etc.

Note that you need a non-streaming model for this script.

Please visit
https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/silero_vad.onnx
to download silero_vad.onnx

For instance,

wget https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/silero_vad.onnx

(5) For SenseVoice CTC models

./python-api-examples/generate-subtitles.py  \
  --silero-vad-model=/path/to/silero_vad.onnx \
  --sense-voice=./sherpa-onnx-sense-voice-zh-en-ja-ko-yue-2024-07-17/model.onnx \
  --tokens=./sherpa-onnx-sense-voice-zh-en-ja-ko-yue-2024-07-17/tokens.txt \
  --num-threads=2 \
  /path/to/test.mp4
"""

import argparse
import datetime as dt
import shutil
import subprocess
import sys
from pathlib import Path
from dataclasses import dataclass
from datetime import timedelta
from tqdm import tqdm

import numpy as np
import sherpa_onnx


def get_args():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--silero-vad-model",
        type=str,
        required=True,
        help="Path to silero_vad.onnx",
    )

    parser.add_argument(
        "--tokens",
        type=str,
        help="Path to tokens.txt",
    )

    parser.add_argument(
        "--sense-voice",
        default="",
        type=str,
        help="Path to the model.onnx from SenseVoice",
    )

    parser.add_argument(
        "--num-threads",
        type=int,
        default=4,
        help="Number of threads for neural network computation",
    )

    parser.add_argument(
        "--decoding-method",
        type=str,
        default="greedy_search",
        help="""Valid values are greedy_search and modified_beam_search.
        modified_beam_search is valid only for transducer models.
        """,
    )

    parser.add_argument(
        "--debug",
        type=bool,
        default=False,
        help="True to show debug messages when loading modes.",
    )

    parser.add_argument(
        "--sample-rate",
        type=int,
        default=16000,
        help="""Sample rate of the feature extractor. Must match the one
        expected by the model. Note: The input sound files can have a
        different sample rate from this argument.""",
    )

    parser.add_argument(
        "--feature-dim",
        type=int,
        default=80,
        help="Feature dimension. Must match the one expected by the model",
    )

    parser.add_argument(
        "sound_file",
        type=str,
        help="The input sound file to generate subtitles ",
    )

    return parser.parse_args()


def assert_file_exists(filename: str):
    assert Path(filename).is_file(), (
        f"{filename} does not exist!\n"
        "Please refer to "
        "https://k2-fsa.github.io/sherpa/onnx/pretrained_models/index.html to download it"
    )


def create_recognizer(args) -> sherpa_onnx.OfflineRecognizer:
    if args.sense_voice:
        assert_file_exists(args.sense_voice)
        recognizer = sherpa_onnx.OfflineRecognizer.from_sense_voice(
            model=args.sense_voice,
            tokens=args.tokens,
            num_threads=args.num_threads,
            use_itn=True,
            debug=args.debug,
        )
    else:
        raise ValueError("Please specify at least one model")
    return recognizer


@dataclass
class Segment:
    start: float
    duration: float
    text: str = ""

    @property
    def end(self):
        return self.start + self.duration

    def __str__(self):
        s = f"{timedelta(seconds=self.start)}"[:-3]
        s += " --> "
        s += f"{timedelta(seconds=self.end)}"[:-3]
        s = s.replace(".", ",")
        s += "\n"
        s += self.text
        return s


def main():
    args = get_args()
    assert_file_exists(args.tokens)
    assert_file_exists(args.silero_vad_model)

    assert args.num_threads > 0, args.num_threads

    if not Path(args.sound_file).is_file():
        raise ValueError(f"{args.sound_file} does not exist")

    assert (
        args.sample_rate == 16000
    ), f"Only sample rate 16000 is supported. Given: {args.sample_rate}"

    recognizer = create_recognizer(args)

    ffmpeg_cmd = [
        "ffmpeg",
        "-i",
        args.sound_file,
        "-f",
        "s16le",
        "-acodec",
        "pcm_s16le",
        "-ac",
        "1",
        "-ar",
        str(args.sample_rate),
        "-",
    ]

    process = subprocess.Popen(
        ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
    )

    frames_per_read = int(args.sample_rate * 100)  # 100 second

    stream = recognizer.create_stream()

    config = sherpa_onnx.VadModelConfig()
    config.silero_vad.model = args.silero_vad_model
    config.silero_vad.threshold = 0.5
    config.silero_vad.min_silence_duration = 0.25  # seconds
    config.silero_vad.min_speech_duration = 0.25  # seconds
    config.silero_vad.max_speech_duration = 5  # seconds
    config.sample_rate = args.sample_rate

    window_size = config.silero_vad.window_size

    buffer = []
    vad = sherpa_onnx.VoiceActivityDetector(config, buffer_size_in_seconds=100)

    segment_list = []

    # Get total duration of the audio file
    total_duration = float(
        subprocess.check_output(
            [
                "ffprobe",
                "-v",
                "quiet",
                "-show_entries",
                "format=duration",
                "-of",
                "default=noprint_wrappers=1:nokey=1",
                args.sound_file,
            ]
        ).strip()
    )

    # Initialize progress bar
    pbar = tqdm(total=int(total_duration), unit="sec", desc="Processing")

    start_t = dt.datetime.now()
    num_processed_samples = 0
    last_update = 0
    is_eof = False

    while True:
        # *2 because int16_t has two bytes
        data = process.stdout.read(frames_per_read * 2)
        if not data:
            if is_eof:
                break
            is_eof = True
            # pad 1 second at the end of the file for the VAD
            data = np.zeros(1 * args.sample_rate, dtype=np.int16).tobytes()

        samples = np.frombuffer(data, dtype=np.int16)
        samples = samples.astype(np.float32) / 32768

        num_processed_samples += samples.shape[0]

        # Update progress bar
        current_time = num_processed_samples / args.sample_rate
        if current_time - last_update >= 1:  # Update every second
            pbar.update(int(current_time - last_update))
            last_update = current_time

        buffer = np.concatenate([buffer, samples])
        while len(buffer) > window_size:
            vad.accept_waveform(buffer[:window_size])
            buffer = buffer[window_size:]
        if is_eof:
            vad.flush()

        streams = []
        segments = []
        while not vad.empty():
            segment = Segment(
                start=vad.front.start / args.sample_rate,
                duration=len(vad.front.samples) / args.sample_rate,
            )
            segments.append(segment)

            stream = recognizer.create_stream()
            stream.accept_waveform(args.sample_rate, vad.front.samples)

            streams.append(stream)
            vad.pop()
        for s in streams:
            recognizer.decode_stream(s)
        for seg, stream in zip(segments, streams):
            seg.text = stream.result.text
            segment_list.append(seg)

    pbar.close()  # Close the progress bar
    end_t = dt.datetime.now()
    elapsed_seconds = (end_t - start_t).total_seconds()
    duration = num_processed_samples / args.sample_rate
    rtf = elapsed_seconds / duration

    srt_filename = Path(args.sound_file).with_suffix(".srt")
    with open(srt_filename, "w", encoding="utf-8") as f:
        for i, seg in enumerate(segment_list):
            print(i + 1, file=f)
            print(seg, file=f)
            print("", file=f)
    print(f"Saved to {srt_filename}")
    print(f"Audio duration:\t{duration:.3f} s")
    print(f"Elapsed:\t{elapsed_seconds:.3f} s")
    print(f"RTF = {elapsed_seconds:.3f}/{duration:.3f} = {rtf:.3f}")
    print("Done!")


if __name__ == "__main__":
    if shutil.which("ffmpeg") is None:
        print("Please install ffmpeg first!")
        sys.exit(-1)
    if shutil.which("ffprobe") is None:
        print("Please install ffprobe first!")
        sys.exit(-1)

    main()

and then below is the bash script as mentioned:

#!/bin/bash

# Check if the directory is passed as an argument
if [ $# -lt 1 ]; then
  echo "Usage: $0 <directory> [--embed] [--translate] --source <source_lang> --target <target_lang>"
  exit 1
fi

# Parse the directory and optional flags
DIRECTORY="$1"
EMBED=false
TRANSLATE=false
SOURCE_LANG=""
TARGET_LANG=""

shift  # Shift to process additional flags
while [[ "$#" -gt 0 ]]; do
  case "$1" in
    --embed)
      EMBED=true
      ;;
    --translate)
      TRANSLATE=true
      ;;
    --source|-s)
      SOURCE_LANG="$2"
      shift
      ;;
    --target|-t)
      TARGET_LANG="$2"
      shift
      ;;
    *)
      echo "Unknown option: $1"
      exit 1
      ;;
  esac
  shift
done

# Validate source and target languages if translation is enabled
if [ "$TRANSLATE" = true ]; then
  if [[ -z "$SOURCE_LANG" || -z "$TARGET_LANG" ]]; then
    echo "Error: Source and target languages must be specified when using --translate."
    echo "Usage: $0 <directory> --translate --source <source_lang> --target <target_lang>"
    exit 1
  fi
fi

# Define the Gensub command
Gensub="python3 /path/to/generate-subtitles.py \
--silero-vad-model=/path/to/silero_vad.onnx \
--sense-voice=/path/to/model.onnx \
--tokens=/path/to/tokens.txt \
--num-threads=4 \
--sample-rate 16000"

# Check if the directory exists
if [ ! -d "$DIRECTORY" ]; then
  echo "Directory $DIRECTORY does not exist."
  exit 1
fi

# Function to translate subtitles using translate-shell
translate_subtitles() {
  local input_file=$1
  local output_file=$2

  echo "Translating subtitles from $SOURCE_LANG to $TARGET_LANG..."
  : > "$output_file"  # Clear or create the output file

  # Initialize progress bar
  total_lines=$(wc -l < "$input_file")
  current_line=0

  while IFS= read -r line || [[ -n "$line" ]]; do
    if [[ "$line" =~ ^[0-9]+$ || "$line" =~ "-->" ]]; then
      # Preserve index numbers and timecode lines
      echo "$line" >> "$output_file"
    elif [[ -z "$line" ]]; then
      # Preserve blank lines
      echo "" >> "$output_file"
    else
      # Translate text lines
      if ! translated_line=$(trans -brief "$SOURCE_LANG:$TARGET_LANG" "$line" 2>/dev/null); then
        echo "Error: Translation failed for '$line'. Keeping original."
        translated_line="$line"  # Fallback to original line if translation fails
      fi
      echo "$translated_line" >> "$output_file"
    fi

    # Update progress
    ((current_line++))
    progress=$((current_line * 100 / total_lines))
    printf "\rProgress: [%-50s] %d%%" "$(printf '=%.0s' $(seq 1 $((progress / 2))))" "$progress"
  done < "$input_file"

  echo  # New line after progress bar
  echo "Translation completed: $output_file"
}

# Process files one by one
for file in "$DIRECTORY"/*; do
  if [ -f "$file" ]; then
    output_file="${file%.*}.srt"
    translated_file="${output_file%.srt}_${SOURCE_LANG}_to_${TARGET_LANG}.srt"
    embedded_file="${file%.*}_embedded.mp4"

    # Generate subtitles if not already done
    if [ -f "$output_file" ]; then
      echo "Subtitle file for $file already exists, skipping subtitle generation."
    else
      echo "Processing $file to generate subtitles..."
      if ! $Gensub "$file"; then
        echo "Error processing $file. Exiting."
        exit 1
      fi
    fi

    # Translate subtitles if required
    if [ "$TRANSLATE" = true ] && [ ! -f "$translated_file" ]; then
      translate_subtitles "$output_file" "$translated_file"
    fi

    # Embed subtitles if required
    if [ "$EMBED" = true ] && [ ! -f "$embedded_file" ]; then
      if [ -f "$translated_file" ]; then
        echo "Embedding translated subtitles into $file..."
        if ! ffmpeg -i "$file" -i "$translated_file" -c copy -c:s mov_text "$embedded_file"; then
          echo "Error embedding subtitles for $file. Exiting."
          exit 1
        fi
      else
        echo "Embedding original subtitles into $file..."
        if ! ffmpeg -i "$file" -i "$output_file" -c copy -c:s mov_text "$embedded_file"; then
          echo "Error embedding subtitles for $file. Exiting."
          exit 1
        fi
      fi
    fi
  fi
done

echo "All files have been processed."

👋👏👍✌️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant