Skip to content

Commit

Permalink
Add logs for get_advisory_batches
Browse files Browse the repository at this point in the history
Signed-off-by: Tushar Goel <[email protected]>
  • Loading branch information
TG1999 committed Feb 26, 2025
1 parent 7c005fc commit 2a2bbee
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
29 changes: 21 additions & 8 deletions vulnerabilities/pipelines/recompute_content_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import multiprocessing
import os
import traceback
import warnings
from concurrent import futures

Expand Down Expand Up @@ -67,7 +68,7 @@ class InsufficientResourcesError(Exception):
def process_advisories(
advisories,
advisory_func,
progress_logger=None,
log=None,
batch_size=1000,
):
"""
Expand All @@ -83,23 +84,27 @@ def process_advisories(
"""
advisories_count = advisories.count()
logger.info(f"Process {advisories_count} advisories with {advisory_func.__name__}")
progress = LoopProgress(advisories_count, logger=progress_logger)
progress = LoopProgress(advisories_count, logger=log)
max_workers = get_max_workers(keep_available=4)

advisory_batches = get_advisory_batches(advisories, batch_size)
advisory_batches = get_advisory_batches(
advisories=advisories,
batch_size=batch_size,
log=log,
)

if max_workers <= 0:
for advisory_ids in progress.iter(advisory_batches):
progress.log_progress()
logger.debug(f"{advisory_func.__name__} len={len(advisory_ids)}")
advisory_func(advisory_ids=advisory_ids, logger=progress_logger)
advisory_func(advisory_ids=advisory_ids, logger=log)
return

logger.info(f"Starting ProcessPoolExecutor with {max_workers} max_workers")

with futures.ProcessPoolExecutor(max_workers) as executor:
future_to_advisories = {
executor.submit(advisory_func, advisory_ids, progress_logger): advisory_ids
executor.submit(advisory_func, advisory_ids, log): advisory_ids
for advisory_ids in advisory_batches
}

Expand All @@ -120,14 +125,22 @@ def process_advisories(
raise broken_pool_error from InsufficientResourcesError(message)


def get_advisory_batches(advisories, batch_size=1000):
def get_advisory_batches(advisories, batch_size=1000, log=None):
"""
Yield lists of advisory ids each of upto batch size length.
"""
paginator = Paginator(advisories, per_page=batch_size)
for page_number in paginator.page_range:
page = paginator.page(page_number)
yield [obj.id for obj in page.object_list]
advisory_ids = None
try:
advisory_ids = [obj.id for obj in page.object_list]
except Exception as e:
if log:
log(f"Error getting advisory batch {traceback.format_exc()}", level=logging.ERROR)
log(f"While processing advisories {advisory_ids}", level=logging.ERROR)
raise
yield advisory_ids


def recompute_content_ids(advisory_ids, logger):
Expand Down Expand Up @@ -193,6 +206,6 @@ def recompute_content_ids(self):
process_advisories(
advisories=advisories,
advisory_func=recompute_content_ids,
progress_logger=self.log,
log=self.log,
batch_size=1000,
)
2 changes: 1 addition & 1 deletion vulnerabilities/pipelines/remove_duplicate_advisories.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def remove_duplicates(self):
process_advisories(
advisories=advisories,
advisory_func=remove_duplicates_batch,
progress_logger=self.log,
log=self.log,
batch_size=self.BATCH_SIZE,
)

Expand Down

0 comments on commit 2a2bbee

Please sign in to comment.