Skip to content

Commit

Permalink
now a per-minute chunking strategy for video
Browse files Browse the repository at this point in the history
  • Loading branch information
emcf committed Apr 28, 2024
1 parent cc5c466 commit 9384682
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 3 deletions.
4 changes: 3 additions & 1 deletion requirements_local.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ llmlingua
PyMuPDF
pdf2image
python-magic
python-dotenv
python-dotenv
whisper
moviepy
Binary file added tests/files/example.mp3
Binary file not shown.
Binary file added tests/files/example.mp4
Binary file not shown.
24 changes: 24 additions & 0 deletions tests/test_thepipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,30 @@ def tearDown(self):
os.remove(os.path.join(self.outputs_directory, file))
os.rmdir(self.outputs_directory)

def test_extract_video(self):
chunks = extractor.extract_from_source(source=self.files_directory+"/example.mp4")
# verify it extracted the video file into chunks
self.assertEqual(type(chunks), list)
self.assertNotEqual(len(chunks), 0)
self.assertEqual(type(chunks[0]), core.Chunk)
# verify it extracted visual data
self.assertTrue(any(chunk.image for chunk in chunks))
# verify it extracted audio data
self.assertTrue(any(chunk.text for chunk in chunks))
# verify it transcribed the audio correctly, i.e., 'citizens' is in the extracted text
self.assertTrue(any('citizens' in chunk.text.lower() for chunk in chunks if chunk.text is not None))

def test_extract_audio(self):
chunks = extractor.extract_from_source(source=self.files_directory+"/example.mp3")
# verify it extracted the audio file into chunks
self.assertEqual(type(chunks), list)
self.assertNotEqual(len(chunks), 0)
self.assertEqual(type(chunks[0]), core.Chunk)
# verify it extracted audio data
self.assertTrue(any(chunk.text for chunk in chunks))
# verify it transcribed the audio correctly, i.e., 'citizens' is in the extracted text
self.assertTrue(any('citizens' in chunk.text.lower() for chunk in chunks if chunk.text is not None))

def test_image_to_base64(self):
image = Image.open(os.path.join(self.files_directory, 'example.jpg'))
image.load() # needed to close the file
Expand Down
2 changes: 2 additions & 0 deletions thepipe_api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class SourceTypes(Enum):
URL = "website"
GITHUB = "github repository"
ZIP = "zip"
VIDEO = "video"
AUDIO = "audio"

class Chunk:
def __init__(self, path: str, text: Optional[str] = None, image: Optional[Image.Image] = None, source_type: Optional[SourceTypes] = None):
Expand Down
55 changes: 53 additions & 2 deletions thepipe_api/extractor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
import math
import re
from typing import Dict, List, Optional
import glob
Expand Down Expand Up @@ -32,6 +33,8 @@ def default(self, obj):
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png'}
SPREADSHEET_EXTENSIONS = {'.csv', '.xls', '.xlsx'}
DOCUMENT_EXTENSIONS = {'.pdf', '.docx', '.pptx'}
VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv'}
AUDIO_EXTENSIONS = {'.mp3', '.wav'}
OTHER_EXTENSIONS = {'.zip', '.ipynb'}
KNOWN_EXTENSIONS = IMAGE_EXTENSIONS.union(CODE_EXTENSIONS).union(CTAGS_CODE_EXTENSIONS).union(PLAINTEXT_EXTENSIONS).union(IMAGE_EXTENSIONS).union(SPREADSHEET_EXTENSIONS).union(DOCUMENT_EXTENSIONS).union(OTHER_EXTENSIONS)
GITHUB_TOKEN: str = os.getenv("GITHUB_TOKEN")
Expand All @@ -50,8 +53,6 @@ def extract_from_source(source: str, match: Optional[str] = None, ignore: Option
return extract_github(github_url=source, file_path='', match=match, ignore=ignore, text_only=text_only, verbose=verbose, ai_extraction=ai_extraction, branch='master')
elif source_type == SourceTypes.URL:
return extract_url(url=source, text_only=text_only, local=local)
elif source_type == SourceTypes.ZIP:
return extract_zip(file_path=source, match=match, ignore=ignore, verbose=verbose, ai_extraction=ai_extraction, text_only=text_only)
return extract_from_file(file_path=source, source_type=source_type, verbose=verbose, ai_extraction=ai_extraction, text_only=text_only, local=local)

def extract_from_file(file_path: str, source_type: str, verbose: bool = False, ai_extraction: bool = False, text_only: bool = False, local: bool = True, limit: int = None) -> List[Chunk]:
Expand Down Expand Up @@ -97,6 +98,12 @@ def extract_from_file(file_path: str, source_type: str, verbose: bool = False, a
extraction = [Chunk(path=e.path, text=e.text, image=None, source_type=SourceTypes.COMPRESSIBLE_CODE) for e in extraction]
elif source_type == SourceTypes.IPYNB:
extraction = extract_from_ipynb(file_path=file_path, verbose=verbose, ai_extraction=ai_extraction, text_only=text_only)
elif source_type == SourceTypes.ZIP:
extraction = extract_zip(file_path=file_path, verbose=verbose, ai_extraction=ai_extraction, text_only=text_only)
elif source_type == SourceTypes.VIDEO:
extraction = extract_video(file_path=file_path, verbose=verbose, text_only=text_only)
elif source_type == SourceTypes.AUDIO:
extraction = extract_audio(file_path=file_path, verbose=verbose)
else:
extraction = [extract_plaintext(file_path=file_path)]
if verbose: print_status(f"Extracted from {file_path}", status='success')
Expand Down Expand Up @@ -146,6 +153,12 @@ def detect_type(source: str) -> Optional[SourceTypes]:
return SourceTypes.DOCX
elif extension == '.pptx':
return SourceTypes.PPTX
elif extension == '.zip':
return SourceTypes.ZIP
elif extension in VIDEO_EXTENSIONS:
return SourceTypes.VIDEO
elif extension in AUDIO_EXTENSIONS:
return SourceTypes.AUDIO
elif extension in PLAINTEXT_EXTENSIONS:
return SourceTypes.PLAINTEXT
return None
Expand Down Expand Up @@ -315,6 +328,44 @@ def extract_url(url: str, text_only: bool = False, local: bool = True, limit: in
raise ValueError("No content extracted from URL.")
return chunks

def extract_video(file_path: str, verbose: bool = False, text_only: bool = False) -> List[Chunk]:
from moviepy.editor import VideoFileClip # import only if needed
import whisper # import only if needed
model = whisper.load_model("small")
video = VideoFileClip(file_path)
chunk_duration = 60
num_chunks = math.ceil(video.duration / chunk_duration)
chunks = []
for i in range(num_chunks):
# calculate start and end time for the current chunk
start_time = i * chunk_duration
end_time = start_time + chunk_duration
if end_time > video.duration:
end_time = video.duration
# extract frame at the middle of the chunk
frame_time = (start_time + end_time) / 2
frame = video.get_frame(frame_time)
image = Image.fromarray(frame)
# extract and transcribe audio for the current chunk
audio_path = os.path.join(tempfile.gettempdir(), f"temp_audio_{i}.wav")
video.subclip(start_time, end_time).audio.write_audiofile(audio_path, codec='pcm_s16le')
result = model.transcribe(audio_path, verbose=verbose)
transcription = result['text']
# add chunk
if not text_only:
chunks.append(Chunk(path=file_path, text=transcription, image=image, source_type=SourceTypes.VIDEO))
else:
chunks.append(Chunk(path=file_path, text=transcription, image=None, source_type=SourceTypes.VIDEO))
os.remove(audio_path)
return chunks

def extract_audio(file_path: str, verbose: bool = False) -> List[Chunk]:
import whisper # import only if needed
model = whisper.load_model("small")
result = model.transcribe(file_path, verbose=verbose)
transcription = result['text']
return [Chunk(path=file_path, text=transcription, image=None, source_type=SourceTypes.AUDIO)]

def extract_github(github_url: str, file_path: str = '', match: Optional[str] = None, ignore: Optional[str] = None, text_only: bool = False, ai_extraction: bool = False, branch: str = 'main', verbose: bool = False) -> List[Chunk]:
files_contents = []
if not GITHUB_TOKEN:
Expand Down

0 comments on commit 9384682

Please sign in to comment.