Skip to content

Commit

Permalink
updated api client
Browse files Browse the repository at this point in the history
  • Loading branch information
emcf committed Jul 12, 2024
1 parent 48277e3 commit a93abf7
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 31 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
- name: Test with unittest and generate coverage report
env:
THEPIPE_API_KEY: ${{ secrets.THEPIPE_API_KEY }}
OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }}
run: |
coverage run -m unittest discover
coverage xml -i
Expand Down
4 changes: 3 additions & 1 deletion thepipe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from PIL import Image
from llama_index.core.schema import Document, ImageDocument

HOST_URL = os.getenv("THEPIPE_API_URL", "http://localhost:5000")
HOST_IMAGES = os.getenv("HOST_IMAGES", "false").lower() == "true"
HOST_URL = os.getenv("THEPIPE_API_URL", "https://thepipe-api.up.railway.app")
THEPIPE_API_KEY = os.getenv("THEPIPE_API_KEY", None)

class Chunk:
def __init__(self, path: Optional[str] = None, texts: Optional[List[str]] = [], images: Optional[List[Image.Image]] = [], audios: Optional[List] = [], videos: Optional[List] = []):
Expand Down
151 changes: 138 additions & 13 deletions thepipe/extract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import re
from typing import List, Dict, Optional, Tuple
from .core import Chunk, calculate_tokens
from typing import List, Dict, Union, Optional, Tuple, Callable
from thepipe.core import HOST_URL, THEPIPE_API_KEY, Chunk, calculate_tokens
from thepipe.scraper import scrape_url, scrape_file
from thepipe.chunker import chunk_by_document
import requests
import os
from openai import OpenAI

DEFAULT_EXTRACTION_PROMPT = "Extract structured information from the above document according to the following schema: {schema}. Immediately return valid JSON formatted data. If there is missing data, you may use null, but use your reasoning to always fill in every column as best you can. Always immediately return valid JSON."

Expand All @@ -12,7 +16,6 @@ def clean_response_text(llm_response: str) -> str:
return llm_response.encode('utf-8', 'ignore').decode('utf-8')

llm_response = llm_response.strip()
# Extract JSON from code block
code_block_pattern = r'^```(?:json)?\s*([\s\S]*?)\s*```$'
match = re.match(code_block_pattern, llm_response, re.MULTILINE | re.DOTALL)
if match:
Expand All @@ -21,24 +24,21 @@ def clean_response_text(llm_response: str) -> str:
try:
parsed_json = json.loads(llm_response)
return parsed_json
except json.JSONDecodeError as e:
# Try to extract JSON by matching syntax
except json.JSONDecodeError:
json_pattern = r'(\[[\s\S]*\]|\{[\s\S]*\})'
match = re.search(json_pattern, llm_response)
if match:
try:
parsed_json = json.loads(match.group(1))
return parsed_json
except json.JSONDecodeError as e:
except json.JSONDecodeError:
pass

# Additional fallback: Try to extract individual JSON objects
objects = re.findall(r'\{[^{}]*\}', llm_response)
if objects:
valid_objects = []
for obj in objects:
try:
# Replace escaped backslashes and quotes
obj = obj.replace('\\', '').replace('\\"', '"')
valid_objects.append(json.loads(obj))
except json.JSONDecodeError:
Expand All @@ -50,8 +50,6 @@ def clean_response_text(llm_response: str) -> str:
return None

def extract_from_chunk(chunk: Chunk, chunk_index: int, schema: str, ai_model: str, source: str, multiple_extractions: bool, extraction_prompt: str, host_images: bool) -> Tuple[Dict, int]:
from openai import OpenAI # only import if needed

response_dict = {"chunk_index": chunk_index, "source": source}
tokens_used = 0
try:
Expand Down Expand Up @@ -103,7 +101,7 @@ def extract_from_chunk(chunk: Chunk, chunk_index: int, schema: str, ai_model: st

return response_dict, tokens_used

def extract(chunks: List[Chunk], schema: str, ai_model: str = 'google/gemma-2-9b-it', multiple_extractions: bool = False, extraction_prompt: str = DEFAULT_EXTRACTION_PROMPT, host_images: bool = False) -> Tuple[List[Dict], int]:
def extract(chunks: List[Chunk], schema: str, ai_model: str, multiple_extractions: bool, extraction_prompt: str, host_images: bool) -> Tuple[List[Dict], int]:
results = []
total_tokens_used = 0

Expand Down Expand Up @@ -134,7 +132,134 @@ def extract(chunks: List[Chunk], schema: str, ai_model: str = 'google/gemma-2-9b
"error": str(e)
})

# Sort results by chunk_index to maintain original order
results.sort(key=lambda x: x["chunk_index"])
return results, total_tokens_used

def extract_from_url(
url: str,
schema: Union[str, Dict],
ai_model: str = 'google/gemma-2-9b-it',
multiple_extractions: bool = False,
extraction_prompt: str = DEFAULT_EXTRACTION_PROMPT,
host_images: bool = False,
text_only: bool = False,
ai_extraction: bool = False,
verbose: bool = False,
chunking_method: Callable[[List[Chunk]], List[Chunk]] = chunk_by_document,
local: bool = False
) -> List[Dict]: #Tuple[List[Dict], int]:
if isinstance(schema, dict):
schema = json.dumps(schema)
if local:
chunks = scrape_url(url, text_only=text_only, ai_extraction=ai_extraction, verbose=verbose, local=local)
chunked_content = chunking_method(chunks)
return extract(chunked_content, schema, ai_model, multiple_extractions, extraction_prompt, host_images)
else:
headers = {
"Authorization": f"Bearer {THEPIPE_API_KEY}"
}
data = {
'urls': [url],
'schema': schema,
'ai_model': ai_model,
'multiple_extractions': str(multiple_extractions).lower(),
'extraction_prompt': extraction_prompt,
'host_images': str(host_images).lower(),
'text_only': str(text_only).lower(),
'ai_extraction': str(ai_extraction).lower(),
'chunking_method': chunking_method.__name__
}
response = requests.post(f"{HOST_URL}/extract", headers=headers, data=data)
if response.status_code != 200:
raise Exception(f"API request failed with status code {response.status_code}: {response.text}")

results = []
total_tokens_used = 0
for line in response.iter_lines(decode_unicode=True):
if line:
data = json.loads(line)
result = data['result']
if 'error' in result:
results.append(result)
else:
extracted_data = {
'chunk_index': result['chunk_index'],
'source': result['source']
}
if multiple_extractions:
extracted_data['extraction'] = result.get('extraction', [])
else:
extracted_data.update(result)
schema_keys = json.loads(schema).keys()
for key in schema_keys:
if key not in extracted_data:
extracted_data[key] = None
results.append(extracted_data)
total_tokens_used += data['tokens_used']

return results#, total_tokens_used

return results, total_tokens_used
def extract_from_file(
file_path: str,
schema: Union[str, Dict],
ai_model: str = 'google/gemma-2-9b-it',
multiple_extractions: bool = False,
extraction_prompt: str = DEFAULT_EXTRACTION_PROMPT,
host_images: bool = False,
text_only: bool = False,
ai_extraction: bool = False,
verbose: bool = False,
chunking_method: Callable[[List[Chunk]], List[Chunk]] = chunk_by_document,
local: bool = False
) -> List[Dict]:#Tuple[List[Dict], int]:
if isinstance(schema, dict):
schema = json.dumps(schema)
if local:
chunks = scrape_file(file_path, ai_extraction=ai_extraction, text_only=text_only, verbose=verbose)
chunked_content = chunking_method(chunks)
return extract(chunked_content, schema, ai_model, multiple_extractions, extraction_prompt, host_images)
else:
headers = {
"Authorization": f"Bearer {THEPIPE_API_KEY}"
}
data = {
'schema': schema,
'ai_model': ai_model,
'multiple_extractions': str(multiple_extractions).lower(),
'extraction_prompt': extraction_prompt,
'host_images': str(host_images).lower(),
'text_only': str(text_only).lower(),
'ai_extraction': str(ai_extraction).lower(),
'chunking_method': chunking_method.__name__
}
files = {'files': (os.path.basename(file_path), open(file_path, 'rb'))}

response = requests.post(f"{HOST_URL}/extract", headers=headers, data=data, files=files)
if response.status_code != 200:
raise Exception(f"API request failed with status code {response.status_code}: {response.text}")

results = []
total_tokens_used = 0
for line in response.iter_lines(decode_unicode=True):
if line:
data = json.loads(line)
result = data['result']
if 'error' in result:
results.append(result)
else:
extracted_data = {
'chunk_index': result['chunk_index'],
'source': result['source']
}
if multiple_extractions:
extracted_data['extraction'] = result.get('extraction', [])
else:
extracted_data.update(result)
schema_keys = json.loads(schema).keys()
for key in schema_keys:
if key not in extracted_data:
extracted_data[key] = None
results.append(extracted_data)
total_tokens_used += data['tokens_used']

return results#, total_tokens_used
78 changes: 61 additions & 17 deletions thepipe/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@
from PIL import Image
import requests
import json
from .core import HOST_URL, Chunk
from .core import HOST_URL, THEPIPE_API_KEY, HOST_IMAGES, Chunk, make_image_url
import tempfile
import mimetypes
import dotenv
import shutil
from magika import Magika
from .core import make_image_url, Chunk
dotenv.load_dotenv()

from typing import List, Dict, Tuple, Optional

FOLDERS_TO_IGNORE = ['*node_modules.*', '.*venv.*', '.*\.git.*', '.*\.vscode.*', '.*pycache.*']
FILES_TO_IGNORE = ['package-lock.json', '.gitignore', '.*\.bin', '.*\.pyc', '.*\.pyo', '.*\.exe', '.*\.dll', '.*\.ipynb_checkpoints']
GITHUB_TOKEN: str = os.getenv("GITHUB_TOKEN", None)
THEPIPE_API_KEY: str = os.getenv("THEPIPE_API_KEY", None)
USER_AGENT_STRING: str = os.getenv("USER_AGENT_STRING", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3")
MAX_WHISPER_DURATION = 1200 # 20 minutes
TWITTER_DOMAINS = ['https://twitter.com', 'https://www.twitter.com', 'https://x.com', 'https://www.x.com']
Expand Down Expand Up @@ -60,14 +58,28 @@ def scrape_file(filepath: str, ai_extraction: bool = False, text_only: bool = Fa
with open(filepath, 'rb') as f:
response = requests.post(
url=f"{HOST_URL}/scrape",
files={'file': (filepath, f)},
data={'ai_extraction': ai_extraction, 'text_only': text_only}
headers={"Authorization": f"Bearer {THEPIPE_API_KEY}"},
files={'files': (os.path.basename(filepath), f)},
data={
'text_only': str(text_only).lower(),
'ai_extraction': str(ai_extraction).lower(),
'chunking_method': 'chunk_by_document'
}
)
response_json = response.json()
if 'error' in response_json:
raise ValueError(f"{response_json['error']}")
chunks = [Chunk.from_json(chunk_json) for chunk_json in response_json['chunks']]
return chunks
chunks = []
for line in response.iter_lines():
if line:
data = json.loads(line)
if 'result' in data:
chunk = Chunk(
path=data['result']['source'],
texts=[content['text'] for content in data['result']['content'] if content['type'] == 'text'],
images=[Image.open(BytesIO(base64.b64decode(content['image_url'].split(',')[1])))
for content in data['result']['content'] if content['type'] == 'image_url']
)
chunks.append(chunk)
return chunks

# returns chunks of scraped content from any source (file, URL, etc.)
extraction = []
source_type = detect_source_type(filepath)
Expand Down Expand Up @@ -302,7 +314,7 @@ def ai_extract_webpage_content(url: str, text_only: bool = False, verbose: bool
{
"role": "user",
"content": [
{"type": "image_url", "image_url": make_image_url(stacked_image)},
{"type": "image_url", "image_url": make_image_url(stacked_image, host_images=HOST_IMAGES)},
{"type": "text", "text": EXTRACTION_PROMPT},
]
},
Expand Down Expand Up @@ -427,17 +439,49 @@ def traverse_and_extract(element):
traverse_and_extract(body)
return ''.join(markdown_content)

# TODO: deprecate this in favor of Chunk.from_json or Chunk.from_message
def create_chunk_from_data(result: Dict, host_images: bool) -> Chunk:
texts = [content['text'] for content in result['content'] if content['type'] == 'text']

images = []
for content in result['content']:
if content['type'] == 'image_url':
if host_images:
# If images are hosted, we keep the URL as is
images.append(content['image_url'])
else:
# If images are not hosted, we decode the base64 string
image_data = content['image_url'].split(',')[1]
image = Image.open(BytesIO(base64.b64decode(image_data)))
images.append(image)

return Chunk(
path=result['source'],
texts=texts,
images=images
)

def scrape_url(url: str, text_only: bool = False, ai_extraction: bool = False, verbose: bool = False, local: bool = False) -> List[Chunk]:
if not local:
response = requests.post(
url=f"{HOST_URL}/scrape",
data={'url': url, 'ai_extraction': ai_extraction, 'text_only': text_only}
headers={"Authorization": f"Bearer {THEPIPE_API_KEY}"},
data={
'urls': [url],
'text_only': str(text_only).lower(),
'ai_extraction': str(ai_extraction).lower(),
'chunking_method': 'chunk_by_document'
}
)
response_json = response.json()
if 'error' in response_json:
raise ValueError(f"{response_json['error']}")
chunks = [Chunk.from_json(chunk_json) for chunk_json in response_json['chunks']]
chunks = []
for line in response.iter_lines():
if line:
data = json.loads(line)
if 'result' in data:
chunk = create_chunk_from_data(data['result'], host_images=HOST_IMAGES)
chunks.append(chunk)
return chunks

if any(url.startswith(domain) for domain in TWITTER_DOMAINS):
extraction = scrape_tweet(url=url, text_only=text_only)
return extraction
Expand Down Expand Up @@ -472,7 +516,7 @@ def scrape_url(url: str, text_only: bool = False, ai_extraction: bool = False, v
else:
chunk = extract_page_content(url=url, text_only=text_only, verbose=verbose)
return [chunk]

def format_timestamp(seconds, chunk_index, chunk_duration):
# helper function to format the timestamp.
total_seconds = chunk_index * chunk_duration + seconds
Expand Down

0 comments on commit a93abf7

Please sign in to comment.