mirror of
https://github.com/VinciGit00/Scrapegraph-ai.git
synced 2026-06-12 21:01:54 +08:00
codebeaver/pre/beta-963 - .
This commit is contained in:
parent
d6fd1bbd0c
commit
0aad469362
@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import sys
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import ANY, AsyncMock, patch
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
@ -864,3 +864,541 @@ async def test_ascrape_playwright_browser_config(monkeypatch):
|
||||
result = await loader.ascrape_playwright("http://example.com")
|
||||
assert captured_kwargs.get("extra") == extra_kwarg_value
|
||||
assert "<html>Config Tested</html>" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scrape_method_js_support(monkeypatch):
|
||||
"""Test that scrape method calls ascrape_with_js_support when requires_js_support is True."""
|
||||
|
||||
async def dummy_js(url):
|
||||
return f"<html>JS supported content for {url}</html>"
|
||||
|
||||
urls = ["http://example.com"]
|
||||
loader = ChromiumLoader(urls, backend="playwright", requires_js_support=True)
|
||||
monkeypatch.setattr(loader, "ascrape_with_js_support", dummy_js)
|
||||
result = await loader.scrape("http://example.com")
|
||||
assert "JS supported content" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_playwright_scroll_retry_failure(monkeypatch):
|
||||
"""Test that ascrape_playwright_scroll retries on failure and returns an error message after retry_limit attempts."""
|
||||
|
||||
# Dummy page that always raises Timeout on goto
|
||||
class DummyPage:
|
||||
async def goto(self, url, wait_until):
|
||||
raise asyncio.TimeoutError("Simulated timeout in goto")
|
||||
|
||||
async def wait_for_load_state(self, state):
|
||||
return
|
||||
|
||||
async def content(self):
|
||||
return "<html>No Content</html>"
|
||||
|
||||
evaluate = AsyncMock(
|
||||
side_effect=asyncio.TimeoutError("Simulated timeout in evaluate")
|
||||
)
|
||||
|
||||
mouse = AsyncMock()
|
||||
|
||||
class DummyContext:
|
||||
async def new_page(self):
|
||||
return DummyPage()
|
||||
|
||||
class DummyBrowser:
|
||||
async def new_context(self, **kwargs):
|
||||
return DummyContext()
|
||||
|
||||
async def close(self):
|
||||
return
|
||||
|
||||
class DummyPW:
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return
|
||||
|
||||
class chromium:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
class firefox:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
monkeypatch.setattr("playwright.async_api.async_playwright", lambda: DummyPW())
|
||||
|
||||
urls = ["http://example.com"]
|
||||
loader = ChromiumLoader(urls, backend="playwright", retry_limit=2, timeout=1)
|
||||
# Use a scroll value just above minimum and a sleep value > 0
|
||||
result = await loader.ascrape_playwright_scroll(
|
||||
"http://example.com", scroll=5000, sleep=1
|
||||
)
|
||||
assert "Error: Network error after 2 attempts" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alazy_load_order(monkeypatch):
|
||||
"""Test that alazy_load returns documents in the same order as the input URLs even if scraping tasks complete out of order."""
|
||||
urls = [
|
||||
"http://example.com/first",
|
||||
"http://example.com/second",
|
||||
"http://example.com/third",
|
||||
]
|
||||
loader = ChromiumLoader(urls, backend="playwright")
|
||||
|
||||
async def delayed_scraper(url):
|
||||
# Delay inversely proportional to a function of the url to scramble finish order
|
||||
import asyncio
|
||||
|
||||
delay = 0.3 - 0.1 * (len(url) % 3)
|
||||
await asyncio.sleep(delay)
|
||||
return f"<html>Content for {url}</html>"
|
||||
|
||||
monkeypatch.setattr(loader, "ascrape_playwright", delayed_scraper)
|
||||
|
||||
docs = [doc async for doc in loader.alazy_load()]
|
||||
# Ensure that the order of documents matches the order of input URLs
|
||||
for doc, url in zip(docs, urls):
|
||||
assert doc.metadata["source"] == url
|
||||
assert f"Content for {url}" in doc.page_content
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_with_js_support_calls_close(monkeypatch):
|
||||
"""Test that ascrape_with_js_support calls browser.close() after scraping."""
|
||||
close_called_flag = {"called": False}
|
||||
|
||||
class DummyPage:
|
||||
async def goto(self, url, wait_until):
|
||||
return
|
||||
|
||||
async def wait_for_load_state(self, state):
|
||||
return
|
||||
|
||||
async def content(self):
|
||||
return "<html>Dummy Content</html>"
|
||||
|
||||
class DummyContext:
|
||||
async def new_page(self):
|
||||
return DummyPage()
|
||||
|
||||
class DummyBrowser:
|
||||
async def new_context(self, **kwargs):
|
||||
return DummyContext()
|
||||
|
||||
async def close(self):
|
||||
close_called_flag["called"] = True
|
||||
return
|
||||
|
||||
class DummyPW:
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return
|
||||
|
||||
class chromium:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
class firefox:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
monkeypatch.setattr("playwright.async_api.async_playwright", lambda: DummyPW())
|
||||
|
||||
urls = ["http://example.com"]
|
||||
loader = ChromiumLoader(
|
||||
urls, backend="playwright", requires_js_support=True, retry_limit=1, timeout=5
|
||||
)
|
||||
result = await loader.ascrape_with_js_support("http://example.com")
|
||||
assert result == "<html>Dummy Content</html>"
|
||||
assert close_called_flag["called"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lazy_load_invalid_backend(monkeypatch):
|
||||
"""Test that lazy_load raises AttributeError if the scraping method for an invalid backend is missing."""
|
||||
# Create a loader instance with a backend that does not have a corresponding scraping method.
|
||||
loader = ChromiumLoader(["http://example.com"], backend="nonexistent")
|
||||
with pytest.raises(AttributeError):
|
||||
# lazy_load calls asyncio.run(scraping_fn(url)) for each URL.
|
||||
list(loader.lazy_load())
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_undetected_chromedriver_failure(monkeypatch):
|
||||
"""Test that ascrape_undetected_chromedriver returns an error message after all retry attempts when driver.get always fails."""
|
||||
import types
|
||||
|
||||
# Create a dummy undetected_chromedriver module with a dummy Chrome driver that always fails.
|
||||
dummy_module = types.ModuleType("undetected_chromedriver")
|
||||
|
||||
class DummyDriver:
|
||||
def __init__(self, options):
|
||||
self.options = options
|
||||
self.quit_called = False
|
||||
|
||||
def get(self, url):
|
||||
# Simulate a failure in fetching the page.
|
||||
raise aiohttp.ClientError("Forced failure in get")
|
||||
|
||||
@property
|
||||
def page_source(self):
|
||||
return "<html>This should not be reached</html>"
|
||||
|
||||
def quit(self):
|
||||
self.quit_called = True
|
||||
|
||||
dummy_module.Chrome = lambda options: DummyDriver(options)
|
||||
monkeypatch.setitem(sys.modules, "undetected_chromedriver", dummy_module)
|
||||
|
||||
loader = ChromiumLoader(
|
||||
["http://example.com"], backend="selenium", retry_limit=2, timeout=1
|
||||
)
|
||||
loader.browser_name = "chromium"
|
||||
result = await loader.ascrape_undetected_chromedriver("http://example.com")
|
||||
# Check that the error message indicates the number of attempts and the forced failure.
|
||||
assert "Error: Network error after 2 attempts" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_playwright_scroll_constant_height(mock_playwright):
|
||||
"""Test that ascrape_playwright_scroll exits the scroll loop when page height remains constant."""
|
||||
mock_pw, mock_browser, mock_context, mock_page = mock_playwright
|
||||
# Set evaluate to always return constant height value (simulate constant page height)
|
||||
mock_page.evaluate.return_value = 1000
|
||||
# Return dummy content once scrolling loop breaks
|
||||
mock_page.content.return_value = "<html>Constant height content</html>"
|
||||
# Use a scroll value above minimum and a very short sleep to cycle quickly
|
||||
loader = ChromiumLoader(["http://example.com"], backend="playwright")
|
||||
result = await loader.ascrape_playwright_scroll(
|
||||
"http://example.com", scroll=6000, sleep=0.1
|
||||
)
|
||||
assert "Constant height content" in result
|
||||
|
||||
|
||||
def test_lazy_load_empty_content(monkeypatch):
|
||||
"""Test that lazy_load yields a Document with empty content if the scraper returns an empty string."""
|
||||
from langchain_core.documents import Document
|
||||
|
||||
urls = ["http://example.com"]
|
||||
loader = ChromiumLoader(urls, backend="playwright", requires_js_support=False)
|
||||
|
||||
async def dummy_scraper(url):
|
||||
return ""
|
||||
|
||||
monkeypatch.setattr(loader, "ascrape_playwright", dummy_scraper)
|
||||
docs = list(loader.lazy_load())
|
||||
assert len(docs) == 1
|
||||
for doc in docs:
|
||||
assert isinstance(doc, Document)
|
||||
assert doc.page_content == ""
|
||||
assert doc.metadata["source"] in urls
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lazy_load_scraper_returns_none(monkeypatch):
|
||||
"""Test that lazy_load yields Document objects with page_content as None when the scraper returns None."""
|
||||
urls = ["http://example.com", "http://test.com"]
|
||||
loader = ChromiumLoader(urls, backend="playwright")
|
||||
|
||||
async def dummy_none(url):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(loader, "ascrape_playwright", dummy_none)
|
||||
docs = list(loader.lazy_load())
|
||||
assert len(docs) == 2
|
||||
for doc, url in zip(docs, urls):
|
||||
from langchain_core.documents import Document
|
||||
|
||||
assert isinstance(doc, Document)
|
||||
assert doc.page_content is None
|
||||
assert doc.metadata["source"] == url
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alazy_load_mixed_none_and_content(monkeypatch):
|
||||
"""Test that alazy_load yields Document objects in order when one scraper returns None and the other valid HTML."""
|
||||
urls = ["http://example.com", "http://none.com"]
|
||||
loader = ChromiumLoader(urls, backend="playwright")
|
||||
|
||||
async def mixed_scraper(url):
|
||||
if "none" in url:
|
||||
return None
|
||||
return f"<html>Valid content for {url}</html>"
|
||||
|
||||
monkeypatch.setattr(loader, "ascrape_playwright", mixed_scraper)
|
||||
docs = [doc async for doc in loader.alazy_load()]
|
||||
assert len(docs) == 2
|
||||
# Ensure order is preserved and check contents
|
||||
assert docs[0].metadata["source"] == "http://example.com"
|
||||
assert "<html>Valid content for http://example.com</html>" in docs[0].page_content
|
||||
assert docs[1].metadata["source"] == "http://none.com"
|
||||
assert docs[1].page_content is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_with_js_support_exception_cleanup(monkeypatch):
|
||||
"""Test that ascrape_with_js_support calls browser.close() after an exception occurs."""
|
||||
close_called_flag = {"called": False}
|
||||
|
||||
class DummyPage:
|
||||
async def goto(self, url, wait_until):
|
||||
raise asyncio.TimeoutError("Forced timeout")
|
||||
|
||||
async def wait_for_load_state(self, state):
|
||||
return
|
||||
|
||||
async def content(self):
|
||||
return "<html>No Content</html>"
|
||||
|
||||
class DummyContext:
|
||||
async def new_page(self):
|
||||
return DummyPage()
|
||||
|
||||
class DummyBrowser:
|
||||
async def new_context(self, **kwargs):
|
||||
return DummyContext()
|
||||
|
||||
async def close(self):
|
||||
close_called_flag["called"] = True
|
||||
return
|
||||
|
||||
class DummyPW:
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return
|
||||
|
||||
class chromium:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
class firefox:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
monkeypatch.setattr("playwright.async_api.async_playwright", lambda: DummyPW())
|
||||
|
||||
loader = ChromiumLoader(
|
||||
["http://example.com"],
|
||||
backend="playwright",
|
||||
requires_js_support=True,
|
||||
retry_limit=1,
|
||||
timeout=1,
|
||||
)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Failed to scrape after 1 attempts"):
|
||||
await loader.ascrape_with_js_support("http://example.com")
|
||||
|
||||
|
||||
@patch("scrapegraphai.docloaders.chromium.dynamic_import")
|
||||
def test_init_dynamic_import_called(mock_dynamic_import):
|
||||
"""Test that dynamic_import is called during initialization."""
|
||||
urls = ["http://example.com"]
|
||||
_ = ChromiumLoader(urls, backend="playwright")
|
||||
mock_dynamic_import.assert_called_with("playwright", ANY)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alazy_load_selenium_backend(monkeypatch):
|
||||
"""Test that alazy_load correctly yields Document objects when using selenium backend."""
|
||||
urls = ["http://example.com", "http://selenium.com"]
|
||||
loader = ChromiumLoader(urls, backend="selenium")
|
||||
|
||||
async def dummy_selenium(url):
|
||||
return f"<html>dummy selenium backend content for {url}</html>"
|
||||
|
||||
monkeypatch.setattr(loader, "ascrape_undetected_chromedriver", dummy_selenium)
|
||||
docs = [doc async for doc in loader.alazy_load()]
|
||||
for doc, url in zip(docs, urls):
|
||||
assert f"dummy selenium backend content for {url}" in doc.page_content
|
||||
assert doc.metadata["source"] == url
|
||||
assert close_called_flag["called"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_undetected_chromedriver_zero_retry(monkeypatch):
|
||||
"""Test that ascrape_undetected_chromedriver returns empty result when retry_limit is set to 0."""
|
||||
import types
|
||||
|
||||
# Create a dummy undetected_chromedriver module where Chrome is defined but will not be used.
|
||||
dummy_module = types.ModuleType("undetected_chromedriver")
|
||||
dummy_module.Chrome = lambda options: None
|
||||
monkeypatch.setitem(sys.modules, "undetected_chromedriver", dummy_module)
|
||||
|
||||
loader = ChromiumLoader(
|
||||
["http://example.com"], backend="selenium", retry_limit=0, timeout=5
|
||||
)
|
||||
loader.browser_name = "chromium"
|
||||
# With retry_limit=0, the while loop never runs so the result remains an empty string.
|
||||
result = await loader.ascrape_undetected_chromedriver("http://example.com")
|
||||
assert result == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scrape_selenium_exception(monkeypatch):
|
||||
"""Test that the scrape method for selenium backend raises a ValueError when ascrape_undetected_chromedriver fails."""
|
||||
|
||||
async def failing_scraper(url):
|
||||
raise Exception("dummy error")
|
||||
|
||||
urls = ["http://example.com"]
|
||||
loader = ChromiumLoader(urls, backend="selenium", retry_limit=1, timeout=5)
|
||||
loader.browser_name = "chromium"
|
||||
monkeypatch.setattr(loader, "ascrape_undetected_chromedriver", failing_scraper)
|
||||
with pytest.raises(
|
||||
ValueError, match="Failed to scrape with undetected chromedriver: dummy error"
|
||||
):
|
||||
await loader.scrape("http://example.com")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_playwright_scroll_exception_cleanup(monkeypatch):
|
||||
"""Test that ascrape_playwright_scroll calls browser.close() when an exception occurs during page navigation."""
|
||||
close_called = {"called": False}
|
||||
|
||||
class DummyPage:
|
||||
async def goto(self, url, wait_until):
|
||||
raise asyncio.TimeoutError("Simulated timeout in goto")
|
||||
|
||||
async def wait_for_load_state(self, state):
|
||||
return
|
||||
|
||||
async def content(self):
|
||||
return "<html>Never reached</html>"
|
||||
|
||||
async def evaluate(self, script):
|
||||
return 1000 # constant height value to simulate no progress in scrolling
|
||||
|
||||
mouse = AsyncMock()
|
||||
mouse.wheel = AsyncMock()
|
||||
|
||||
class DummyContext:
|
||||
async def new_page(self):
|
||||
return DummyPage()
|
||||
|
||||
class DummyBrowser:
|
||||
async def new_context(self, **kwargs):
|
||||
return DummyContext()
|
||||
|
||||
async def close(self):
|
||||
close_called["called"] = True
|
||||
|
||||
class DummyPW:
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return
|
||||
|
||||
class chromium:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
class firefox:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
monkeypatch.setattr("playwright.async_api.async_playwright", lambda: DummyPW())
|
||||
|
||||
loader = ChromiumLoader(
|
||||
["http://example.com"],
|
||||
backend="playwright",
|
||||
retry_limit=2,
|
||||
timeout=1,
|
||||
headless=True,
|
||||
)
|
||||
result = await loader.ascrape_playwright_scroll(
|
||||
"http://example.com", scroll=5000, sleep=0.1, scroll_to_bottom=True
|
||||
)
|
||||
|
||||
assert "Error: Network error after" in result
|
||||
assert close_called["called"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ascrape_with_js_support_non_timeout_retry(monkeypatch):
|
||||
"""Test that ascrape_with_js_support retries on a non-timeout exception and eventually succeeds."""
|
||||
attempt = {"count": 0}
|
||||
|
||||
class DummyPage:
|
||||
async def goto(self, url, wait_until):
|
||||
if attempt["count"] < 1:
|
||||
attempt["count"] += 1
|
||||
raise ValueError("Non-timeout error")
|
||||
|
||||
async def wait_for_load_state(self, state):
|
||||
return
|
||||
|
||||
async def content(self):
|
||||
return "<html>Success after non-timeout retry</html>"
|
||||
|
||||
class DummyContext:
|
||||
async def new_page(self):
|
||||
return DummyPage()
|
||||
|
||||
class DummyBrowser:
|
||||
async def new_context(self, **kwargs):
|
||||
return DummyContext()
|
||||
|
||||
async def close(self):
|
||||
return
|
||||
|
||||
class DummyPW:
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return
|
||||
|
||||
class chromium:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
class firefox:
|
||||
@staticmethod
|
||||
async def launch(headless, proxy, **kwargs):
|
||||
return DummyBrowser()
|
||||
|
||||
monkeypatch.setattr("playwright.async_api.async_playwright", lambda: DummyPW())
|
||||
loader = ChromiumLoader(
|
||||
["http://nontimeout.com"],
|
||||
backend="playwright",
|
||||
requires_js_support=True,
|
||||
retry_limit=2,
|
||||
timeout=1,
|
||||
)
|
||||
result = await loader.ascrape_with_js_support("http://nontimeout.com")
|
||||
assert "Success after non-timeout retry" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scrape_uses_js_support_flag(monkeypatch):
|
||||
"""Test that the scrape method uses ascrape_with_js_support when requires_js_support is True."""
|
||||
|
||||
async def dummy_js(url, browser_name="chromium"):
|
||||
return f"<html>JS flag content for {url}</html>"
|
||||
|
||||
async def dummy_playwright(url, browser_name="chromium"):
|
||||
return f"<html>Playwright content for {url}</html>"
|
||||
|
||||
urls = ["http://example.com"]
|
||||
loader = ChromiumLoader(urls, backend="playwright", requires_js_support=True)
|
||||
monkeypatch.setattr(loader, "ascrape_with_js_support", dummy_js)
|
||||
monkeypatch.setattr(loader, "ascrape_playwright", dummy_playwright)
|
||||
result = await loader.scrape("http://example.com")
|
||||
assert "JS flag content" in result
|
||||
|
||||
0
tests/test_csv_scraper_multi_graph.py
Normal file
0
tests/test_csv_scraper_multi_graph.py
Normal file
@ -1,6 +1,8 @@
|
||||
import urllib.parse
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from scrapegraphai.docloaders.scrape_do import scrape_do_fetch
|
||||
|
||||
|
||||
@ -29,3 +31,255 @@ def test_scrape_do_fetch_without_proxy():
|
||||
mock_get.assert_called_once_with(expected_url)
|
||||
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_with_proxy_no_geo():
|
||||
"""
|
||||
Test scrape_do_fetch function using proxy mode without geoCode.
|
||||
This test verifies that:
|
||||
- The function constructs the correct proxy URL with the default proxy endpoint.
|
||||
- The function calls requests.get with the proper proxies, verify flag and empty params.
|
||||
- The function returns the expected response text.
|
||||
"""
|
||||
token = "test_token"
|
||||
target_url = "https://example.org"
|
||||
expected_response = "Mocked proxy response"
|
||||
|
||||
# The default proxy endpoint is used as defined in the function
|
||||
expected_proxy_scrape_do_url = "proxy.scrape.do:8080"
|
||||
expected_proxy_mode_url = f"http://{token}:@{expected_proxy_scrape_do_url}"
|
||||
expected_proxies = {
|
||||
"http": expected_proxy_mode_url,
|
||||
"https": expected_proxy_mode_url,
|
||||
}
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(token, target_url, use_proxy=True)
|
||||
|
||||
# For proxy usage without geoCode, params should be an empty dict.
|
||||
mock_get.assert_called_once_with(
|
||||
target_url, proxies=expected_proxies, verify=False, params={}
|
||||
)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_with_proxy_with_geo():
|
||||
"""
|
||||
Test scrape_do_fetch function using proxy mode with geoCode and super_proxy enabled.
|
||||
This test verifies that:
|
||||
- The function constructs the correct proxy URL using the default proxy endpoint.
|
||||
- The function appends the correct params including geoCode and super proxy flags.
|
||||
- The function returns the expected response text.
|
||||
"""
|
||||
token = "test_token"
|
||||
target_url = "https://example.net"
|
||||
geo_code = "US"
|
||||
super_proxy = True
|
||||
expected_response = "Mocked proxy response US"
|
||||
|
||||
expected_proxy_scrape_do_url = "proxy.scrape.do:8080"
|
||||
expected_proxy_mode_url = f"http://{token}:@{expected_proxy_scrape_do_url}"
|
||||
expected_proxies = {
|
||||
"http": expected_proxy_mode_url,
|
||||
"https": expected_proxy_mode_url,
|
||||
}
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(
|
||||
token, target_url, use_proxy=True, geoCode=geo_code, super_proxy=super_proxy
|
||||
)
|
||||
|
||||
expected_params = {"geoCode": geo_code, "super": "true"}
|
||||
mock_get.assert_called_once_with(
|
||||
target_url, proxies=expected_proxies, verify=False, params=expected_params
|
||||
)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_without_proxy_custom_env():
|
||||
"""
|
||||
Test scrape_do_fetch using API mode with a custom API_SCRAPE_DO_URL environment variable.
|
||||
"""
|
||||
token = "custom_token"
|
||||
target_url = "https://custom-example.com"
|
||||
encoded_url = urllib.parse.quote(target_url)
|
||||
expected_response = "Custom API response"
|
||||
|
||||
with patch.dict("os.environ", {"API_SCRAPE_DO_URL": "custom.api.scrape.do"}):
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(token, target_url, use_proxy=False)
|
||||
|
||||
expected_url = (
|
||||
f"http://custom.api.scrape.do?token={token}&url={encoded_url}"
|
||||
)
|
||||
mock_get.assert_called_once_with(expected_url)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_with_proxy_custom_env():
|
||||
"""
|
||||
Test scrape_do_fetch using proxy mode with a custom PROXY_SCRAPE_DO_URL environment variable.
|
||||
"""
|
||||
token = "custom_token"
|
||||
target_url = "https://custom-example.org"
|
||||
expected_response = "Custom proxy response"
|
||||
|
||||
with patch.dict(
|
||||
"os.environ", {"PROXY_SCRAPE_DO_URL": "custom.proxy.scrape.do:8888"}
|
||||
):
|
||||
expected_proxy_mode_url = f"http://{token}:@custom.proxy.scrape.do:8888"
|
||||
expected_proxies = {
|
||||
"http": expected_proxy_mode_url,
|
||||
"https": expected_proxy_mode_url,
|
||||
}
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(token, target_url, use_proxy=True)
|
||||
|
||||
mock_get.assert_called_once_with(
|
||||
target_url, proxies=expected_proxies, verify=False, params={}
|
||||
)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_exception_propagation():
|
||||
"""
|
||||
Test that scrape_do_fetch properly propagates exceptions raised by requests.get.
|
||||
"""
|
||||
token = "test_token"
|
||||
target_url = "https://example.com"
|
||||
|
||||
with patch("requests.get", side_effect=Exception("Network Error")):
|
||||
with pytest.raises(Exception) as excinfo:
|
||||
scrape_do_fetch(token, target_url, use_proxy=False)
|
||||
assert "Network Error" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_scrape_do_fetch_with_proxy_with_geo_and_super_false():
|
||||
"""
|
||||
Test scrape_do_fetch function using proxy mode with geoCode provided and super_proxy set to False.
|
||||
This test verifies that the correct proxy URL and parameters (with "super" set to "false") are used.
|
||||
"""
|
||||
token = "test_token"
|
||||
target_url = "https://example.co"
|
||||
geo_code = "UK"
|
||||
super_proxy = False
|
||||
expected_response = "Mocked proxy response UK no super"
|
||||
|
||||
expected_proxy_scrape_do_url = "proxy.scrape.do:8080"
|
||||
expected_proxy_mode_url = f"http://{token}:@{expected_proxy_scrape_do_url}"
|
||||
expected_proxies = {
|
||||
"http": expected_proxy_mode_url,
|
||||
"https": expected_proxy_mode_url,
|
||||
}
|
||||
expected_params = {"geoCode": geo_code, "super": "false"}
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(
|
||||
token, target_url, use_proxy=True, geoCode=geo_code, super_proxy=super_proxy
|
||||
)
|
||||
|
||||
mock_get.assert_called_once_with(
|
||||
target_url, proxies=expected_proxies, verify=False, params=expected_params
|
||||
)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_empty_token_without_proxy():
|
||||
"""
|
||||
Test scrape_do_fetch in API mode with an empty token.
|
||||
This verifies that even when the token is an empty string, the URL is constructed as expected.
|
||||
"""
|
||||
token = ""
|
||||
target_url = "https://emptytoken.com"
|
||||
encoded_url = urllib.parse.quote(target_url)
|
||||
expected_response = "Empty token response"
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(token, target_url, use_proxy=False)
|
||||
|
||||
expected_url = f"http://api.scrape.do?token={token}&url={encoded_url}"
|
||||
mock_get.assert_called_once_with(expected_url)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_with_proxy_with_empty_geo():
|
||||
"""
|
||||
Test scrape_do_fetch function using proxy mode with an empty geoCode string.
|
||||
Even though geoCode is provided (as an empty string), it should be treated as false
|
||||
and not result in params being set.
|
||||
"""
|
||||
token = "test_token"
|
||||
target_url = "https://example.empty"
|
||||
geo_code = ""
|
||||
super_proxy = True
|
||||
expected_response = "Mocked proxy response empty geo"
|
||||
|
||||
expected_proxy_scrape_do_url = "proxy.scrape.do:8080"
|
||||
expected_proxy_mode_url = f"http://{token}:@{expected_proxy_scrape_do_url}"
|
||||
expected_proxies = {
|
||||
"http": expected_proxy_mode_url,
|
||||
"https": expected_proxy_mode_url,
|
||||
}
|
||||
# Since geo_code is an empty string, the condition will be false and params should be an empty dict.
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(
|
||||
token, target_url, use_proxy=True, geoCode=geo_code, super_proxy=super_proxy
|
||||
)
|
||||
|
||||
mock_get.assert_called_once_with(
|
||||
target_url, proxies=expected_proxies, verify=False, params={}
|
||||
)
|
||||
assert result == expected_response
|
||||
|
||||
|
||||
def test_scrape_do_fetch_api_encoding_special_characters():
|
||||
"""
|
||||
Test scrape_do_fetch function in API mode with a target URL that includes query parameters
|
||||
and special characters. This test verifies that the URL gets properly URL-encoded.
|
||||
"""
|
||||
token = "special_token"
|
||||
# target_url includes query parameters and characters that need URL encoding
|
||||
target_url = "https://example.com/path?param=value&other=1"
|
||||
encoded_url = urllib.parse.quote(target_url)
|
||||
expected_response = "Encoded API response"
|
||||
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.text = expected_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = scrape_do_fetch(token, target_url, use_proxy=False)
|
||||
|
||||
expected_url = f"http://api.scrape.do?token={token}&url={encoded_url}"
|
||||
mock_get.assert_called_once_with(expected_url)
|
||||
assert result == expected_response
|
||||
|
||||
Loading…
Reference in New Issue
Block a user