Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chain scrapping & detection pipeline #5

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 36 additions & 87 deletions climateguard/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,46 @@
from gdelt_scrapper import GDELTScrapper
from news_scrapper import NewsScraper
import asyncio
from pathlib import Path
import json
import multiprocessing
from functools import partial
from typing import Literal, List, Dict

from climateguard.scrapping.pipeline import ScrapFromGDelt
from climateguard.detect_claims import detect_claims
from climateguard.models import Article, Claims

class Pipeline:
def __init__(self):
self.gdelt_scraper = GDELTScrapper()
self.news_scraper = NewsScraper()

def run(self, keyword: str, years: list[int], output_dir: Path):
# Step 1: Find themes related to the keyword
themes = self.gdelt_scraper.find_themes_related_to_keyword(keyword)
print(f"Themes related to {keyword}: {themes}")

# Step 2: Find articles for these themes and years
articles_df = self.gdelt_scraper.find_articles(themes=themes, years=years)

# Step 3: Extract URLs from the DataFrame
urls = articles_df["url"].tolist()

# Save the list of URLs to a separate file
self._save_urls(urls, output_dir)

# Step 4: Scrape each URL using multiprocessing
scraped_articles, failed_urls = self._scrape_urls_parallel(urls)

# Step 5: Save results
self._save_results(scraped_articles, failed_urls, output_dir)

def _save_urls(self, urls: list, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
urls_file = output_dir / 'all_urls.json'
with open(urls_file, 'w', encoding='utf-8') as f:
json.dump(urls, f, ensure_ascii=False, indent=4)
print(f"All URLs saved to {urls_file}")

def _scrape_urls_parallel(self, urls):
# Create a partial function with self.news_scraper
scrape_func = partial(self._scrape_single_url, news_scraper=self.news_scraper)
self.scrap_from_gdelt = ScrapFromGDelt()
self.data_dir = Path(__file__).parent / "data"

# Use all available cores
num_cores = multiprocessing.cpu_count()

# Create a multiprocessing pool
with multiprocessing.Pool(num_cores) as pool:
results = pool.map(scrape_func, urls)
def run(self, keyword: str, years: list[int], language: Literal["French", "English", "Latvian"]) -> List[Dict]:
scraped_articles_file = self.data_dir / 'scraped_articles.json'

# Process results
scraped_articles = []
failed_urls = []
for result in results:
if result['success']:
article = result['article']
scraped_articles.append(article)
print(f"Scraped: {article.title}")
print(f"Content length: {len(article.content)}")
print(f"Date: {article.date}")
print("---")
else:
failed_urls.append(result['url'])
print(f"Failed to scrape: {result['url']}")
print("---")

return scraped_articles, failed_urls

@staticmethod
def _scrape_single_url(url, news_scraper):
article = news_scraper.scrape_article(url)
if article:
return {'success': True, 'article': article}
if not scraped_articles_file.exists():
print("Scraped articles not found. Starting scraping process...")
articles_data = self.scrap_from_gdelt.run(keyword, years, self.data_dir)
else:
return {'success': False, 'url': url}

def _save_results(self, scraped_articles, failed_urls, output_dir):
output_dir.mkdir(parents=True, exist_ok=True)

# Save successfully scraped articles to JSON
output_file = output_dir / 'scraped_articles.json'
with open(output_file, 'w', encoding='utf-8') as f:
json.dump([article.dict() for article in scraped_articles], f, ensure_ascii=False, indent=4)

print(f"\nSuccessfully scraped articles saved to {output_file}")

# Save failed URLs to a separate file
failed_file = output_dir / 'failed_urls.json'
with open(failed_file, 'w', encoding='utf-8') as f:
json.dump(failed_urls, f, ensure_ascii=False, indent=4)

print(f"Failed URLs saved to {failed_file}")
print("Scraped articles found. Loading from file...")
with open(scraped_articles_file, 'r', encoding='utf-8') as f:
articles_data = json.load(f)

# Process articles and detect claims
results = []
for article_data in articles_data:
article = Article(**article_data)
claims, n_tokens = detect_claims(article, language)
results.append({
"article": article.dict(),
"claims": claims.dict(),
"n_tokens": n_tokens
})

print(f"Processed {len(results)} articles with claims")
return results

def main():
pipeline = Pipeline()
processed_articles = pipeline.run(keyword="CLIMATE", years=[2022, 2023, 2024], language="English")


if __name__ == "__main__":
pipeline = Pipeline()
output_dir = Path(__file__).parent.parent / "data"
pipeline.run(keyword="CLIMATE", years=[2022, 2023, 2024], output_dir=output_dir)
main()
File renamed without changes.
97 changes: 97 additions & 0 deletions climateguard/scrapping/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from climateguard.scrapping.urls_from_gdelt import GDELTScrapper
from climateguard.scrapping.articles import NewsScraper
from pathlib import Path
import json
import multiprocessing
from functools import partial

class ScrapFromGDelt:
def __init__(self):
self.gdelt_scraper = GDELTScrapper()
self.news_scraper = NewsScraper()

def run(self, keyword: str, years: list[int], output_dir: Path):
# Step 1: Find themes related to the keyword
themes = self.gdelt_scraper.find_themes_related_to_keyword(keyword)
print(f"Themes related to {keyword}: {themes}")

# Step 2: Find articles for these themes and years
articles_df = self.gdelt_scraper.find_articles(themes=themes, years=years)

# Step 3: Extract URLs from the DataFrame
urls = articles_df["url"].tolist()

# Save the list of URLs to a separate file
self._save_urls(urls, output_dir)

# Step 4: Scrape each URL using multiprocessing
scraped_articles, failed_urls = self._scrape_urls_parallel(urls)

# Step 5: Save results
self._save_results(scraped_articles, failed_urls, output_dir)

def _save_urls(self, urls: list, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
urls_file = output_dir / 'all_urls.json'
with open(urls_file, 'w', encoding='utf-8') as f:
json.dump(urls, f, ensure_ascii=False, indent=4)
print(f"All URLs saved to {urls_file}")

def _scrape_urls_parallel(self, urls):
# Create a partial function with self.news_scraper
scrape_func = partial(self._scrape_single_url, news_scraper=self.news_scraper)

# Use all available cores
num_cores = multiprocessing.cpu_count()

# Create a multiprocessing pool
with multiprocessing.Pool(num_cores) as pool:
results = pool.map(scrape_func, urls)

# Process results
scraped_articles = []
failed_urls = []
for result in results:
if result['success']:
article = result['article']
scraped_articles.append(article)
print(f"Scraped: {article.title}")
print(f"Content length: {len(article.content)}")
print(f"Date: {article.date}")
print("---")
else:
failed_urls.append(result['url'])
print(f"Failed to scrape: {result['url']}")
print("---")

return scraped_articles, failed_urls

@staticmethod
def _scrape_single_url(url, news_scraper):
article = news_scraper.scrape_article(url)
if article:
return {'success': True, 'article': article}
else:
return {'success': False, 'url': url}

def _save_results(self, scraped_articles, failed_urls, output_dir):
output_dir.mkdir(parents=True, exist_ok=True)

# Save successfully scraped articles to JSON
output_file = output_dir / 'scraped_articles.json'
with open(output_file, 'w', encoding='utf-8') as f:
json.dump([article.dict() for article in scraped_articles], f, ensure_ascii=False, indent=4)

print(f"\nSuccessfully scraped articles saved to {output_file}")

# Save failed URLs to a separate file
failed_file = output_dir / 'failed_urls.json'
with open(failed_file, 'w', encoding='utf-8') as f:
json.dump(failed_urls, f, ensure_ascii=False, indent=4)

print(f"Failed URLs saved to {failed_file}")

if __name__ == "__main__":
pipeline = ScrapFromGDelt()
output_dir = Path(__file__).parent.parent / "data"
pipeline.run(keyword="CLIMATE", years=[2022, 2023, 2024], output_dir=output_dir)
File renamed without changes.
58 changes: 0 additions & 58 deletions climateguard/test.py

This file was deleted.