Skip to content

Commit

Permalink
DDGS.text: remove multithreading, add delay between requests
Browse files Browse the repository at this point in the history
  • Loading branch information
deedy5 committed Dec 21, 2024
1 parent 6ade79a commit 41ea30b
Showing 1 changed file with 72 additions and 139 deletions.
211 changes: 72 additions & 139 deletions duckduckgo_search/duckduckgo_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from functools import cached_property
from itertools import cycle, islice
from random import choice
from threading import Event
from time import sleep, time
from types import TracebackType
from typing import cast

Expand Down Expand Up @@ -88,10 +88,10 @@ def __init__(
follow_redirects=False,
verify=verify,
)
self._exception_event = Event()
self._chat_messages: list[dict[str, str]] = []
self._chat_tokens_count = 0
self._chat_vqd: str = ""
self.sleep_timestamp = 0.0

def __enter__(self) -> DDGS:
return self
Expand All @@ -109,6 +109,12 @@ def parser(self) -> LHTMLParser:
"""Get HTML parser."""
return LHTMLParser(remove_blank_text=True, remove_comments=True, remove_pis=True, collect_ids=False)

def _sleep(self, sleeptime: float = 0.75) -> None:
"""Sleep between API requests."""
delay = 0.0 if not self.sleep_timestamp else 0.0 if time() - self.sleep_timestamp >= 20 else sleeptime
self.sleep_timestamp = time()
sleep(delay)

def _get_url(
self,
method: str,
Expand All @@ -117,20 +123,17 @@ def _get_url(
content: bytes | None = None,
data: dict[str, str] | bytes | None = None,
) -> bytes:
if self._exception_event.is_set():
raise DuckDuckGoSearchException("Exception occurred in previous call.")
self._sleep()
try:
resp = self.client.request(method, url, params=params, content=content, data=data)
except Exception as ex:
self._exception_event.set()
if "time" in str(ex).lower():
raise TimeoutException(f"{url} {type(ex).__name__}: {ex}") from ex
raise DuckDuckGoSearchException(f"{url} {type(ex).__name__}: {ex}") from ex
logger.debug(f"_get_url() {resp.url} {resp.status_code} {len(resp.content)}")
if resp.status_code == 200:
return cast(bytes, resp.content)
self._exception_event.set()
if resp.status_code in (202, 301, 403):
elif resp.status_code in (202, 301, 403):
raise RatelimitException(f"{resp.url} {resp.status_code} Ratelimit")
raise DuckDuckGoSearchException(f"{resp.url} return None. {params=} {content=} {data=}")

Expand Down Expand Up @@ -256,23 +259,6 @@ def _text_api(
timelimit: str | None = None,
max_results: int | None = None,
) -> list[dict[str, str]]:
"""DuckDuckGo text search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: d, w, m, y. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Returns:
List of dictionaries with search results.
Raises:
DuckDuckGoSearchException: Base exception for duckduckgo_search errors.
RatelimitException: Inherits from DuckDuckGoSearchException, raised for exceeding API request rate limits.
TimeoutException: Inherits from DuckDuckGoSearchException, raised for API request timeouts.
"""
assert keywords, "keywords is mandatory"

vqd = self._get_vqd(keywords)
Expand All @@ -281,56 +267,41 @@ def _text_api(
"q": keywords,
"kl": region,
"l": region,
"p": "",
"p": "1" if safesearch == "on" else "",
"s": "0",
"df": "",
"df": timelimit or "",
"vqd": vqd,
"bing_market": f"{region[3:]}-{region[:2].upper()}",
"ex": "",
"ex": "-1" if safesearch == "moderate" else "-2" if safesearch == "off" else "",
}
safesearch = safesearch.lower()
if safesearch == "moderate":
payload["ex"] = "-1"
elif safesearch == "off":
payload["ex"] = "-2"
elif safesearch == "on": # strict
payload["p"] = "1"
if timelimit:
payload["df"] = timelimit

cache = set()
results: list[dict[str, str]] = []

def _text_api_page(s: int) -> list[dict[str, str]]:
payload["s"] = f"{s}"
for _ in range(11):
resp_content = self._get_url("GET", "https://links.duckduckgo.com/d.js", params=payload)
page_data = _text_extract_json(resp_content, keywords)
page_results = []
for row in page_data:
href = row.get("u", None)
href = row.get("u")
if href and href not in cache and href != f"http://www.google.com/search?q={keywords}":
cache.add(href)
body = _normalize(row["a"])
if body:
result = {
"title": _normalize(row["t"]),
"href": _normalize_url(href),
"body": body,
}
page_results.append(result)
return page_results

slist = [0]
if max_results:
max_results = min(max_results, 190)
slist.extend(range(10, max_results, 15))
try:
for r in self._executor.map(_text_api_page, slist):
results.extend(r)
except Exception as e:
raise e

return list(islice(results, max_results))
results.append(
{
"title": _normalize(row["t"]),
"href": _normalize_url(href),
"body": body,
}
)
if max_results and len(results) >= max_results:
return results
else:
next_page_url = row.get("n")
if not next_page_url:
return results
payload["s"] = next_page_url.split("s=")[1].split("&")[0]
return results

def _text_html(
self,
Expand All @@ -339,22 +310,6 @@ def _text_html(
timelimit: str | None = None,
max_results: int | None = None,
) -> list[dict[str, str]]:
"""DuckDuckGo text search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
timelimit: d, w, m, y. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Returns:
List of dictionaries with search results.
Raises:
DuckDuckGoSearchException: Base exception for duckduckgo_search errors.
RatelimitException: Inherits from DuckDuckGoSearchException, raised for exceeding API request rate limits.
TimeoutException: Inherits from DuckDuckGoSearchException, raised for API request timeouts.
"""
assert keywords, "keywords is mandatory"

payload = {
Expand All @@ -368,24 +323,20 @@ def _text_html(
}
if timelimit:
payload["df"] = timelimit
if max_results and max_results > 20:
vqd = self._get_vqd(keywords)
payload["vqd"] = vqd

cache = set()
results: list[dict[str, str]] = []

def _text_html_page(s: int) -> list[dict[str, str]]:
payload["s"] = f"{s}"
for _ in range(11):
resp_content = self._get_url("POST", "https://html.duckduckgo.com/html", data=payload)
if b"No results." in resp_content:
return []
return results

page_results = []
tree = document_fromstring(resp_content, self.parser)
elements = tree.xpath("//div[h2]")
if not isinstance(elements, list):
return []
return results

for e in elements:
if isinstance(e, _Element):
hrefxpath = e.xpath("./a/@href")
Expand All @@ -402,25 +353,27 @@ def _text_html_page(s: int) -> list[dict[str, str]]:
title = str(titlexpath[0]) if titlexpath and isinstance(titlexpath, list) else ""
bodyxpath = e.xpath("./a//text()")
body = "".join(str(x) for x in bodyxpath) if bodyxpath and isinstance(bodyxpath, list) else ""
result = {
"title": _normalize(title),
"href": _normalize_url(href),
"body": _normalize(body),
}
page_results.append(result)
return page_results

slist = [0]
if max_results:
max_results = min(max_results, 190)
slist.extend(range(10, max_results, 15))
try:
for r in self._executor.map(_text_html_page, slist):
results.extend(r)
except Exception as e:
raise e
results.append(
{
"title": _normalize(title),
"href": _normalize_url(href),
"body": _normalize(body),
}
)
if max_results and len(results) >= max_results:
return results

npx = tree.xpath('.//div[@class="nav-link"]')
if not npx:
return results
next_page = npx[-1] if isinstance(npx, list) else None
if isinstance(next_page, _Element):
names = next_page.xpath('.//input[@type="hidden"]/@name')
values = next_page.xpath('.//input[@type="hidden"]/@value')
if isinstance(names, list) and isinstance(values, list):
payload = {str(n): str(v) for n, v in zip(names, values)}

return list(islice(results, max_results))
return results

def _text_lite(
self,
Expand All @@ -429,22 +382,6 @@ def _text_lite(
timelimit: str | None = None,
max_results: int | None = None,
) -> list[dict[str, str]]:
"""DuckDuckGo text search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
timelimit: d, w, m, y. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Returns:
List of dictionaries with search results.
Raises:
DuckDuckGoSearchException: Base exception for duckduckgo_search errors.
RatelimitException: Inherits from DuckDuckGoSearchException, raised for exceeding API request rate limits.
TimeoutException: Inherits from DuckDuckGoSearchException, raised for API request timeouts.
"""
assert keywords, "keywords is mandatory"

payload = {
Expand All @@ -462,17 +399,15 @@ def _text_lite(
cache = set()
results: list[dict[str, str]] = []

def _text_lite_page(s: int) -> list[dict[str, str]]:
payload["s"] = f"{s}"
for _ in range(11):
resp_content = self._get_url("POST", "https://lite.duckduckgo.com/lite/", data=payload)
if b"No more results." in resp_content:
return []
return results

page_results = []
tree = document_fromstring(resp_content, self.parser)
elements = tree.xpath("//table[last()]//tr")
if not isinstance(elements, list):
return []
return results

data = zip(cycle(range(1, 5)), elements)
for i, e in data:
Expand Down Expand Up @@ -500,25 +435,23 @@ def _text_lite_page(s: int) -> list[dict[str, str]]:
else ""
)
if href:
result = {
"title": _normalize(title),
"href": _normalize_url(href),
"body": _normalize(body),
}
page_results.append(result)
return page_results
results.append(
{
"title": _normalize(title),
"href": _normalize_url(href),
"body": _normalize(body),
}
)
if max_results and len(results) >= max_results:
return results

slist = [0]
if max_results:
max_results = min(max_results, 190)
slist.extend(range(10, max_results, 15))
try:
for r in self._executor.map(_text_lite_page, slist):
results.extend(r)
except Exception as e:
raise e
next_page_s = tree.xpath("//form[./input[contains(@value, 'ext')]]/input[@name='s']/@value")
if not next_page_s:
return results
elif isinstance(next_page_s, list):
payload["s"] = str(next_page_s[0])

return list(islice(results, max_results))
return results

def images(
self,
Expand Down

0 comments on commit 41ea30b

Please sign in to comment.