Skip to content

Commit

Permalink
fix: improve downloader thread exit conditions
Browse files Browse the repository at this point in the history
- Implement timeout mechanism for inactive downloads
- Add max_idle_time parameter to prevent infinite waiting
- Return download success status from download method

The downloader now exits when:
1. Reached max_num of downloads
2. No new downloads for max_idle_time
3. Parser exited and queue is empty

This fixes the issue where threads would hang indefinitely
when fewer images were found than max_num specified.
  • Loading branch information
ZhiyuanChen committed Jan 2, 2025
1 parent 2c3f6fd commit f7f6107
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 29 deletions.
10 changes: 7 additions & 3 deletions icrawler/builtin/baidu.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def crawl(
max_size=None,
file_idx_offset=0,
overwrite=False,
max_idle_time=None,
):
if offset + max_num > 1000:
if offset > 1000:
Expand All @@ -170,10 +171,13 @@ def crawl(
"been automatically set to %d",
1000 - offset,
)
else:
pass
feeder_kwargs = dict(keyword=keyword, offset=offset, max_num=max_num, filters=filters)
downloader_kwargs = dict(
max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite
max_num=max_num,
min_size=min_size,
max_size=max_size,
file_idx_offset=file_idx_offset,
overwrite=overwrite,
max_idle_time=max_idle_time,
)
super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs)
8 changes: 7 additions & 1 deletion icrawler/builtin/bing.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def crawl(
max_size=None,
file_idx_offset=0,
overwrite=False,
max_idle_time=None,
):
if offset + max_num > 1000:
if offset > 1000:
Expand All @@ -160,6 +161,11 @@ def crawl(
)
feeder_kwargs = dict(keyword=keyword, offset=offset, max_num=max_num, filters=filters)
downloader_kwargs = dict(
max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite
max_num=max_num,
min_size=min_size,
max_size=max_size,
file_idx_offset=file_idx_offset,
overwrite=overwrite,
max_idle_time=max_idle_time,
)
super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs)
2 changes: 2 additions & 0 deletions icrawler/builtin/flickr.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def crawl(
max_size=None,
file_idx_offset=0,
overwrite=False,
max_idle_time=None,
**kwargs,
):
kwargs["apikey"] = self.apikey
Expand Down Expand Up @@ -170,5 +171,6 @@ def crawl(
max_size=max_size,
file_idx_offset=file_idx_offset,
overwrite=overwrite,
max_idle_time=max_idle_time,
),
)
13 changes: 8 additions & 5 deletions icrawler/builtin/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,11 @@ def crawl(
language=None,
file_idx_offset=0,
overwrite=False,
max_idle_time=None,
):
if offset + max_num > 1000:
if offset > 1000:
self.logger.error(
'"Offset" cannot exceed 1000, otherwise you will get ' "duplicated searching results."
)
self.logger.error("Offset cannot exceed 1000, otherwise you " "will get duplicated searching results.")
return
elif max_num > 1000:
max_num = 1000 - offset
Expand All @@ -201,9 +200,13 @@ def crawl(
"can specify different date ranges.",
1000 - offset,
)

feeder_kwargs = dict(keyword=keyword, offset=offset, max_num=max_num, language=language, filters=filters)
downloader_kwargs = dict(
max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite
max_num=max_num,
min_size=min_size,
max_size=max_size,
file_idx_offset=file_idx_offset,
overwrite=overwrite,
max_idle_time=max_idle_time,
)
super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs)
8 changes: 6 additions & 2 deletions icrawler/builtin/greedy.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
):
super().__init__(feeder_cls, parser_cls, downloader_cls, *args, **kwargs)

def crawl(self, domains, max_num=0, min_size=None, max_size=None, file_idx_offset=0):
def crawl(self, domains, max_num=0, min_size=None, max_size=None, file_idx_offset=0, max_idle_time=None):
if isinstance(domains, str):
domains = [domains]
elif not isinstance(domains, list):
Expand All @@ -90,6 +90,10 @@ def crawl(self, domains, max_num=0, min_size=None, max_size=None, file_idx_offse
feeder_kwargs={"domains": domains},
parser_kwargs={"domains": domains},
downloader_kwargs=dict(
max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset
max_num=max_num,
min_size=min_size,
max_size=max_size,
file_idx_offset=file_idx_offset,
max_idle_time=max_idle_time,
),
)
11 changes: 7 additions & 4 deletions icrawler/builtin/urllist.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ def __init__(
):
super().__init__(feeder_cls, parser_cls, downloader_cls, *args, **kwargs)

def crawl(self, url_list, max_num=1000, file_idx_offset=0, overwrite=False):
feeder_kwargs = dict(url_list=url_list)
downloader_kwargs = dict(file_idx_offset=file_idx_offset, max_num=max_num, overwrite=overwrite)
super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs)
def crawl(self, url_list, max_num=1000, file_idx_offset=0, overwrite=False, max_idle_time=None):
super().crawl(
feeder_kwargs=dict(url_list=url_list),
downloader_kwargs=dict(
file_idx_offset=file_idx_offset, max_num=max_num, overwrite=overwrite, max_idle_time=max_idle_time
),
)
2 changes: 1 addition & 1 deletion icrawler/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def set_session(self, headers=None):
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
" AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/88.0.4324.104 Safari/537.36"
)
),
}
elif not isinstance(headers, dict):
raise TypeError('"headers" must be a dict object')
Expand Down
34 changes: 25 additions & 9 deletions icrawler/downloader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue
import time
from io import BytesIO
from threading import current_thread
from urllib.parse import urlparse
Expand Down Expand Up @@ -163,42 +164,57 @@ def start(self, file_idx_offset=0, *args, **kwargs):
worker.start()
self.logger.debug("thread %s started", worker.name)

def worker_exec(self, max_num, default_ext="", queue_timeout=5, req_timeout=5, **kwargs):
def worker_exec(self, max_num, default_ext="", queue_timeout=5, req_timeout=5, max_idle_time=None, **kwargs):
"""Target method of workers.
Get task from ``task_queue`` and then download files and process meta
data. A downloader thread will exit in either of the following cases:
1. All parser threads have exited and the task_queue is empty.
2. Downloaded image number has reached required number(max_num).
3. No new downloads for max_idle_time seconds.
Args:
max_num (int): Maximum number of images to download
queue_timeout (int): Timeout of getting tasks from ``task_queue``.
req_timeout (int): Timeout of making requests for downloading pages.
max_idle_time (int): Maximum time (in seconds) to wait without receiving new images
**kwargs: Arguments passed to the :func:`download` method.
"""
self.max_num = max_num
last_download_time = time.time()

while True:
if self.signal.get("reach_max_num"):
self.logger.info(
"downloaded images reach max num, thread %s" " is ready to exit", current_thread().name
)
self.logger.info("downloaded images reach max num, thread %s is ready to exit", current_thread().name)
break

current_time = time.time()
if max_idle_time is not None and current_time - last_download_time > max_idle_time and self.fetched_num > 0:
self.logger.info("no new images for %d seconds, thread %s exit", max_idle_time, current_thread().name)
break

try:
task = self.in_queue.get(timeout=queue_timeout)
except queue.Empty:
if self.signal.get("parser_exited"):
self.logger.info("no more download task for thread %s", current_thread().name)
break
else:
elif self.fetched_num == 0:
self.logger.info("%s is waiting for new download tasks", current_thread().name)
else:
self.logger.info("no more images available, thread %s exit", current_thread().name)
break
except:
self.logger.error("exception in thread %s", current_thread().name)
else:
self.download(task, default_ext, req_timeout, **kwargs)
success = self.download(task, default_ext, req_timeout, **kwargs)
if success:
last_download_time = time.time()
self.process_meta(task)
self.in_queue.task_done()
self.logger.info(f"thread {current_thread().name} exit")

self.logger.info("thread %s exit", current_thread().name)

def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.info("all downloader threads exited")
Expand Down Expand Up @@ -247,5 +263,5 @@ def get_filename(self, task, default_ext):
file_idx = self.fetched_num + self.file_idx_offset
return f"{file_idx:06d}.{extension}"

def worker_exec(self, max_num, default_ext="jpg", queue_timeout=5, req_timeout=5, **kwargs):
super().worker_exec(max_num, default_ext, queue_timeout, req_timeout, **kwargs)
def worker_exec(self, max_num, default_ext="jpg", queue_timeout=5, req_timeout=5, max_idle_time=None, **kwargs):
super().worker_exec(max_num, default_ext, queue_timeout, req_timeout, max_idle_time, **kwargs)
7 changes: 3 additions & 4 deletions icrawler/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs):
if self.signal.get("feeder_exited"):
self.logger.info("no more page urls for thread %s to parse", current_thread().name)
break
else:
self.logger.info("%s is waiting for new page urls", current_thread().name)
continue
self.logger.info("%s is waiting for new page urls", current_thread().name)
continue
except:
self.logger.error("exception in thread %s", current_thread().name)
continue
Expand Down Expand Up @@ -117,5 +116,5 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs):
retry -= 1
self.logger.info(f"thread {current_thread().name} exit")

def __exit__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("all parser threads exited")

0 comments on commit f7f6107

Please sign in to comment.