mirror of
https://github.com/VinciGit00/Scrapegraph-ai.git
synced 2026-06-28 21:01:55 +08:00
feat: Add configurable timeout to FetchNode
- Add timeout parameter to FetchNode (default: 30 seconds)
- Apply timeout to requests.get() calls to prevent indefinite hangs
- Implement timeout for PDF parsing using ThreadPoolExecutor
- Propagate timeout to ChromiumLoader via loader_kwargs
- Add comprehensive unit tests for timeout functionality
- Fully backward compatible (timeout can be disabled with None)
Fixes issue with requests.get() and PDF parsing blocking indefinitely
on slow/unresponsive servers or large documents.
Usage:
node_config={'timeout': 30} # Custom timeout
node_config={'timeout': None} # Disable timeout
node_config={} # Use default 30s timeout
This commit is contained in:
parent
365761a678
commit
e81a4ed745
@ -4,6 +4,7 @@ FetchNode Module
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from langchain_community.document_loaders import PyPDFLoader
|
from langchain_community.document_loaders import PyPDFLoader
|
||||||
@ -68,6 +69,10 @@ class FetchNode(BaseNode):
|
|||||||
else node_config.get("openai_md_enabled", False)
|
else node_config.get("openai_md_enabled", False)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Timeout in seconds for blocking operations (HTTP requests, PDF parsing, etc.).
|
||||||
|
# If set to None, no timeout will be applied.
|
||||||
|
self.timeout = None if node_config is None else node_config.get("timeout", 30)
|
||||||
|
|
||||||
self.cut = False if node_config is None else node_config.get("cut", True)
|
self.cut = False if node_config is None else node_config.get("cut", True)
|
||||||
|
|
||||||
self.browser_base = (
|
self.browser_base = (
|
||||||
@ -174,7 +179,19 @@ class FetchNode(BaseNode):
|
|||||||
|
|
||||||
if input_type == "pdf":
|
if input_type == "pdf":
|
||||||
loader = PyPDFLoader(source)
|
loader = PyPDFLoader(source)
|
||||||
return loader.load()
|
# PyPDFLoader.load() can be blocking for large PDFs. Run it in a thread and
|
||||||
|
# enforce the configured timeout if provided.
|
||||||
|
if self.timeout is None:
|
||||||
|
return loader.load()
|
||||||
|
else:
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||||
|
future = executor.submit(loader.load)
|
||||||
|
try:
|
||||||
|
return future.result(timeout=self.timeout)
|
||||||
|
except concurrent.futures.TimeoutError:
|
||||||
|
raise TimeoutError(
|
||||||
|
f"PDF parsing exceeded timeout of {self.timeout} seconds"
|
||||||
|
)
|
||||||
elif input_type == "csv":
|
elif input_type == "csv":
|
||||||
try:
|
try:
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@ -260,7 +277,12 @@ class FetchNode(BaseNode):
|
|||||||
|
|
||||||
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
|
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
|
||||||
if self.use_soup:
|
if self.use_soup:
|
||||||
response = requests.get(source)
|
# Apply configured timeout to blocking HTTP requests. If timeout is None,
|
||||||
|
# don't pass the timeout argument (requests will block until completion).
|
||||||
|
if self.timeout is None:
|
||||||
|
response = requests.get(source)
|
||||||
|
else:
|
||||||
|
response = requests.get(source, timeout=self.timeout)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
if not response.text.strip():
|
if not response.text.strip():
|
||||||
raise ValueError("No HTML body content found in the response.")
|
raise ValueError("No HTML body content found in the response.")
|
||||||
@ -286,6 +308,11 @@ class FetchNode(BaseNode):
|
|||||||
if self.node_config:
|
if self.node_config:
|
||||||
loader_kwargs = self.node_config.get("loader_kwargs", {})
|
loader_kwargs = self.node_config.get("loader_kwargs", {})
|
||||||
|
|
||||||
|
# If a global timeout is configured on the node and no loader-specific timeout
|
||||||
|
# was provided, propagate it to ChromiumLoader so it can apply the same limit.
|
||||||
|
if "timeout" not in loader_kwargs and self.timeout is not None:
|
||||||
|
loader_kwargs["timeout"] = self.timeout
|
||||||
|
|
||||||
if self.browser_base:
|
if self.browser_base:
|
||||||
try:
|
try:
|
||||||
from ..docloaders.browser_base import browser_base_fetch
|
from ..docloaders.browser_base import browser_base_fetch
|
||||||
|
|||||||
241
tests/test_fetch_node_timeout.py
Normal file
241
tests/test_fetch_node_timeout.py
Normal file
@ -0,0 +1,241 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for FetchNode timeout functionality.
|
||||||
|
|
||||||
|
These tests verify that:
|
||||||
|
1. The timeout configuration is properly read and stored
|
||||||
|
2. HTTP requests use the configured timeout
|
||||||
|
3. PDF parsing respects the timeout
|
||||||
|
4. Timeout is propagated to ChromiumLoader via loader_kwargs
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import Mock, patch, MagicMock
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Add the project root to path to import modules
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
|
|
||||||
|
class TestFetchNodeTimeout(unittest.TestCase):
|
||||||
|
"""Test suite for FetchNode timeout configuration and usage."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up test fixtures."""
|
||||||
|
# Mock all the heavy external dependencies at import time
|
||||||
|
self.mock_modules = {}
|
||||||
|
for module in ['langchain_core', 'langchain_core.documents',
|
||||||
|
'langchain_community', 'langchain_community.document_loaders',
|
||||||
|
'langchain_openai', 'minify_html', 'pydantic',
|
||||||
|
'langchain', 'langchain.prompts']:
|
||||||
|
if module not in sys.modules:
|
||||||
|
sys.modules[module] = MagicMock()
|
||||||
|
|
||||||
|
# Create mock Document class
|
||||||
|
class MockDocument:
|
||||||
|
def __init__(self, page_content, metadata=None):
|
||||||
|
self.page_content = page_content
|
||||||
|
self.metadata = metadata or {}
|
||||||
|
|
||||||
|
sys.modules['langchain_core.documents'].Document = MockDocument
|
||||||
|
|
||||||
|
# Create mock PyPDFLoader
|
||||||
|
class MockPyPDFLoader:
|
||||||
|
def __init__(self, source):
|
||||||
|
self.source = source
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
time.sleep(0.1) # Simulate some work
|
||||||
|
return [MockDocument(page_content=f"PDF content from {self.source}")]
|
||||||
|
|
||||||
|
sys.modules['langchain_community.document_loaders'].PyPDFLoader = MockPyPDFLoader
|
||||||
|
|
||||||
|
# Now import FetchNode
|
||||||
|
from scrapegraphai.nodes.fetch_node import FetchNode
|
||||||
|
self.FetchNode = FetchNode
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Clean up after tests."""
|
||||||
|
# Remove mocked modules
|
||||||
|
for module in list(sys.modules.keys()):
|
||||||
|
if 'langchain' in module or module in ['minify_html', 'pydantic']:
|
||||||
|
if module in self.mock_modules or module.startswith('langchain'):
|
||||||
|
sys.modules.pop(module, None)
|
||||||
|
|
||||||
|
def test_timeout_default_value(self):
|
||||||
|
"""Test that default timeout is set to 30 seconds."""
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={}
|
||||||
|
)
|
||||||
|
self.assertEqual(node.timeout, 30)
|
||||||
|
|
||||||
|
def test_timeout_custom_value(self):
|
||||||
|
"""Test that custom timeout value is properly stored."""
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"timeout": 10}
|
||||||
|
)
|
||||||
|
self.assertEqual(node.timeout, 10)
|
||||||
|
|
||||||
|
def test_timeout_none_value(self):
|
||||||
|
"""Test that timeout can be disabled by setting to None."""
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"timeout": None}
|
||||||
|
)
|
||||||
|
self.assertIsNone(node.timeout)
|
||||||
|
|
||||||
|
def test_timeout_no_config(self):
|
||||||
|
"""Test that timeout defaults to 30 when no node_config provided."""
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config=None
|
||||||
|
)
|
||||||
|
self.assertEqual(node.timeout, 30)
|
||||||
|
|
||||||
|
@patch('scrapegraphai.nodes.fetch_node.requests')
|
||||||
|
def test_requests_get_with_timeout(self, mock_requests):
|
||||||
|
"""Test that requests.get is called with timeout when use_soup=True."""
|
||||||
|
mock_response = Mock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.text = "<html><body>Test content</body></html>"
|
||||||
|
mock_requests.get.return_value = mock_response
|
||||||
|
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"use_soup": True, "timeout": 15}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute with a URL
|
||||||
|
state = {"url": "https://example.com"}
|
||||||
|
node.execute(state)
|
||||||
|
|
||||||
|
# Verify requests.get was called with timeout
|
||||||
|
mock_requests.get.assert_called_once()
|
||||||
|
call_args = mock_requests.get.call_args
|
||||||
|
self.assertEqual(call_args[1].get('timeout'), 15)
|
||||||
|
|
||||||
|
@patch('scrapegraphai.nodes.fetch_node.requests')
|
||||||
|
def test_requests_get_without_timeout_when_none(self, mock_requests):
|
||||||
|
"""Test that requests.get is called without timeout argument when timeout=None."""
|
||||||
|
mock_response = Mock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.text = "<html><body>Test content</body></html>"
|
||||||
|
mock_requests.get.return_value = mock_response
|
||||||
|
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"use_soup": True, "timeout": None}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute with a URL
|
||||||
|
state = {"url": "https://example.com"}
|
||||||
|
node.execute(state)
|
||||||
|
|
||||||
|
# Verify requests.get was called without timeout
|
||||||
|
mock_requests.get.assert_called_once()
|
||||||
|
call_args = mock_requests.get.call_args
|
||||||
|
self.assertNotIn('timeout', call_args[1])
|
||||||
|
|
||||||
|
def test_pdf_parsing_with_timeout(self):
|
||||||
|
"""Test that PDF parsing completes within timeout."""
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="pdf",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"timeout": 5}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute with a PDF file
|
||||||
|
state = {"pdf": "test.pdf"}
|
||||||
|
result = node.execute(state)
|
||||||
|
|
||||||
|
# Should complete successfully
|
||||||
|
self.assertIn("doc", result)
|
||||||
|
self.assertIsNotNone(result["doc"])
|
||||||
|
|
||||||
|
def test_pdf_parsing_timeout_exceeded(self):
|
||||||
|
"""Test that PDF parsing raises TimeoutError when timeout is exceeded."""
|
||||||
|
# Create a mock loader that takes longer than timeout
|
||||||
|
class SlowPyPDFLoader:
|
||||||
|
def __init__(self, source):
|
||||||
|
self.source = source
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
time.sleep(2) # Sleep longer than timeout
|
||||||
|
return []
|
||||||
|
|
||||||
|
with patch('scrapegraphai.nodes.fetch_node.PyPDFLoader', SlowPyPDFLoader):
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="pdf",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"timeout": 0.5} # Very short timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute should raise TimeoutError
|
||||||
|
state = {"pdf": "slow.pdf"}
|
||||||
|
with self.assertRaises(TimeoutError) as context:
|
||||||
|
node.execute(state)
|
||||||
|
|
||||||
|
self.assertIn("PDF parsing exceeded timeout", str(context.exception))
|
||||||
|
|
||||||
|
@patch('scrapegraphai.nodes.fetch_node.ChromiumLoader')
|
||||||
|
def test_timeout_propagated_to_chromium_loader(self, mock_loader_class):
|
||||||
|
"""Test that timeout is propagated to ChromiumLoader via loader_kwargs."""
|
||||||
|
mock_loader = Mock()
|
||||||
|
mock_doc = Mock()
|
||||||
|
mock_doc.page_content = "<html>Test</html>"
|
||||||
|
mock_loader.load.return_value = [mock_doc]
|
||||||
|
mock_loader_class.return_value = mock_loader
|
||||||
|
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={"timeout": 20, "headless": True}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute with a URL (not using soup, so ChromiumLoader is used)
|
||||||
|
state = {"url": "https://example.com"}
|
||||||
|
node.execute(state)
|
||||||
|
|
||||||
|
# Verify ChromiumLoader was instantiated with timeout in kwargs
|
||||||
|
mock_loader_class.assert_called_once()
|
||||||
|
call_kwargs = mock_loader_class.call_args[1]
|
||||||
|
self.assertEqual(call_kwargs.get('timeout'), 20)
|
||||||
|
|
||||||
|
@patch('scrapegraphai.nodes.fetch_node.ChromiumLoader')
|
||||||
|
def test_timeout_not_overridden_in_loader_kwargs(self, mock_loader_class):
|
||||||
|
"""Test that existing timeout in loader_kwargs is not overridden."""
|
||||||
|
mock_loader = Mock()
|
||||||
|
mock_doc = Mock()
|
||||||
|
mock_doc.page_content = "<html>Test</html>"
|
||||||
|
mock_loader.load.return_value = [mock_doc]
|
||||||
|
mock_loader_class.return_value = mock_loader
|
||||||
|
|
||||||
|
node = self.FetchNode(
|
||||||
|
input="url",
|
||||||
|
output=["doc"],
|
||||||
|
node_config={
|
||||||
|
"timeout": 20,
|
||||||
|
"loader_kwargs": {"timeout": 50} # Explicit loader timeout
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute with a URL
|
||||||
|
state = {"url": "https://example.com"}
|
||||||
|
node.execute(state)
|
||||||
|
|
||||||
|
# Verify ChromiumLoader got the loader_kwargs timeout, not node timeout
|
||||||
|
mock_loader_class.assert_called_once()
|
||||||
|
call_kwargs = mock_loader_class.call_args[1]
|
||||||
|
self.assertEqual(call_kwargs.get('timeout'), 50)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
Loading…
Reference in New Issue
Block a user