feat: Add configurable timeout to FetchNode

- Add timeout parameter to FetchNode (default: 30 seconds)
- Apply timeout to requests.get() calls to prevent indefinite hangs
- Implement timeout for PDF parsing using ThreadPoolExecutor
- Propagate timeout to ChromiumLoader via loader_kwargs
- Add comprehensive unit tests for timeout functionality
- Fully backward compatible (timeout can be disabled with None)

Fixes issue with requests.get() and PDF parsing blocking indefinitely
on slow/unresponsive servers or large documents.

Usage:
  node_config={'timeout': 30}  # Custom timeout
  node_config={'timeout': None}  # Disable timeout
  node_config={}  # Use default 30s timeout
This commit is contained in:
Harsh Abasaheb Chavan 2025-11-01 09:08:13 +00:00
parent 365761a678
commit e81a4ed745
2 changed files with 270 additions and 2 deletions

View File

@ -4,6 +4,7 @@ FetchNode Module
import json import json
from typing import List, Optional from typing import List, Optional
import concurrent.futures
import requests import requests
from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import PyPDFLoader
@ -68,6 +69,10 @@ class FetchNode(BaseNode):
else node_config.get("openai_md_enabled", False) else node_config.get("openai_md_enabled", False)
) )
# Timeout in seconds for blocking operations (HTTP requests, PDF parsing, etc.).
# If set to None, no timeout will be applied.
self.timeout = None if node_config is None else node_config.get("timeout", 30)
self.cut = False if node_config is None else node_config.get("cut", True) self.cut = False if node_config is None else node_config.get("cut", True)
self.browser_base = ( self.browser_base = (
@ -174,7 +179,19 @@ class FetchNode(BaseNode):
if input_type == "pdf": if input_type == "pdf":
loader = PyPDFLoader(source) loader = PyPDFLoader(source)
return loader.load() # PyPDFLoader.load() can be blocking for large PDFs. Run it in a thread and
# enforce the configured timeout if provided.
if self.timeout is None:
return loader.load()
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(loader.load)
try:
return future.result(timeout=self.timeout)
except concurrent.futures.TimeoutError:
raise TimeoutError(
f"PDF parsing exceeded timeout of {self.timeout} seconds"
)
elif input_type == "csv": elif input_type == "csv":
try: try:
import pandas as pd import pandas as pd
@ -260,7 +277,12 @@ class FetchNode(BaseNode):
self.logger.info(f"--- (Fetching HTML from: {source}) ---") self.logger.info(f"--- (Fetching HTML from: {source}) ---")
if self.use_soup: if self.use_soup:
response = requests.get(source) # Apply configured timeout to blocking HTTP requests. If timeout is None,
# don't pass the timeout argument (requests will block until completion).
if self.timeout is None:
response = requests.get(source)
else:
response = requests.get(source, timeout=self.timeout)
if response.status_code == 200: if response.status_code == 200:
if not response.text.strip(): if not response.text.strip():
raise ValueError("No HTML body content found in the response.") raise ValueError("No HTML body content found in the response.")
@ -286,6 +308,11 @@ class FetchNode(BaseNode):
if self.node_config: if self.node_config:
loader_kwargs = self.node_config.get("loader_kwargs", {}) loader_kwargs = self.node_config.get("loader_kwargs", {})
# If a global timeout is configured on the node and no loader-specific timeout
# was provided, propagate it to ChromiumLoader so it can apply the same limit.
if "timeout" not in loader_kwargs and self.timeout is not None:
loader_kwargs["timeout"] = self.timeout
if self.browser_base: if self.browser_base:
try: try:
from ..docloaders.browser_base import browser_base_fetch from ..docloaders.browser_base import browser_base_fetch

View File

@ -0,0 +1,241 @@
"""
Unit tests for FetchNode timeout functionality.
These tests verify that:
1. The timeout configuration is properly read and stored
2. HTTP requests use the configured timeout
3. PDF parsing respects the timeout
4. Timeout is propagated to ChromiumLoader via loader_kwargs
"""
import sys
import time
import unittest
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
# Add the project root to path to import modules
sys.path.insert(0, str(Path(__file__).parent.parent))
class TestFetchNodeTimeout(unittest.TestCase):
"""Test suite for FetchNode timeout configuration and usage."""
def setUp(self):
"""Set up test fixtures."""
# Mock all the heavy external dependencies at import time
self.mock_modules = {}
for module in ['langchain_core', 'langchain_core.documents',
'langchain_community', 'langchain_community.document_loaders',
'langchain_openai', 'minify_html', 'pydantic',
'langchain', 'langchain.prompts']:
if module not in sys.modules:
sys.modules[module] = MagicMock()
# Create mock Document class
class MockDocument:
def __init__(self, page_content, metadata=None):
self.page_content = page_content
self.metadata = metadata or {}
sys.modules['langchain_core.documents'].Document = MockDocument
# Create mock PyPDFLoader
class MockPyPDFLoader:
def __init__(self, source):
self.source = source
def load(self):
time.sleep(0.1) # Simulate some work
return [MockDocument(page_content=f"PDF content from {self.source}")]
sys.modules['langchain_community.document_loaders'].PyPDFLoader = MockPyPDFLoader
# Now import FetchNode
from scrapegraphai.nodes.fetch_node import FetchNode
self.FetchNode = FetchNode
def tearDown(self):
"""Clean up after tests."""
# Remove mocked modules
for module in list(sys.modules.keys()):
if 'langchain' in module or module in ['minify_html', 'pydantic']:
if module in self.mock_modules or module.startswith('langchain'):
sys.modules.pop(module, None)
def test_timeout_default_value(self):
"""Test that default timeout is set to 30 seconds."""
node = self.FetchNode(
input="url",
output=["doc"],
node_config={}
)
self.assertEqual(node.timeout, 30)
def test_timeout_custom_value(self):
"""Test that custom timeout value is properly stored."""
node = self.FetchNode(
input="url",
output=["doc"],
node_config={"timeout": 10}
)
self.assertEqual(node.timeout, 10)
def test_timeout_none_value(self):
"""Test that timeout can be disabled by setting to None."""
node = self.FetchNode(
input="url",
output=["doc"],
node_config={"timeout": None}
)
self.assertIsNone(node.timeout)
def test_timeout_no_config(self):
"""Test that timeout defaults to 30 when no node_config provided."""
node = self.FetchNode(
input="url",
output=["doc"],
node_config=None
)
self.assertEqual(node.timeout, 30)
@patch('scrapegraphai.nodes.fetch_node.requests')
def test_requests_get_with_timeout(self, mock_requests):
"""Test that requests.get is called with timeout when use_soup=True."""
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "<html><body>Test content</body></html>"
mock_requests.get.return_value = mock_response
node = self.FetchNode(
input="url",
output=["doc"],
node_config={"use_soup": True, "timeout": 15}
)
# Execute with a URL
state = {"url": "https://example.com"}
node.execute(state)
# Verify requests.get was called with timeout
mock_requests.get.assert_called_once()
call_args = mock_requests.get.call_args
self.assertEqual(call_args[1].get('timeout'), 15)
@patch('scrapegraphai.nodes.fetch_node.requests')
def test_requests_get_without_timeout_when_none(self, mock_requests):
"""Test that requests.get is called without timeout argument when timeout=None."""
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "<html><body>Test content</body></html>"
mock_requests.get.return_value = mock_response
node = self.FetchNode(
input="url",
output=["doc"],
node_config={"use_soup": True, "timeout": None}
)
# Execute with a URL
state = {"url": "https://example.com"}
node.execute(state)
# Verify requests.get was called without timeout
mock_requests.get.assert_called_once()
call_args = mock_requests.get.call_args
self.assertNotIn('timeout', call_args[1])
def test_pdf_parsing_with_timeout(self):
"""Test that PDF parsing completes within timeout."""
node = self.FetchNode(
input="pdf",
output=["doc"],
node_config={"timeout": 5}
)
# Execute with a PDF file
state = {"pdf": "test.pdf"}
result = node.execute(state)
# Should complete successfully
self.assertIn("doc", result)
self.assertIsNotNone(result["doc"])
def test_pdf_parsing_timeout_exceeded(self):
"""Test that PDF parsing raises TimeoutError when timeout is exceeded."""
# Create a mock loader that takes longer than timeout
class SlowPyPDFLoader:
def __init__(self, source):
self.source = source
def load(self):
time.sleep(2) # Sleep longer than timeout
return []
with patch('scrapegraphai.nodes.fetch_node.PyPDFLoader', SlowPyPDFLoader):
node = self.FetchNode(
input="pdf",
output=["doc"],
node_config={"timeout": 0.5} # Very short timeout
)
# Execute should raise TimeoutError
state = {"pdf": "slow.pdf"}
with self.assertRaises(TimeoutError) as context:
node.execute(state)
self.assertIn("PDF parsing exceeded timeout", str(context.exception))
@patch('scrapegraphai.nodes.fetch_node.ChromiumLoader')
def test_timeout_propagated_to_chromium_loader(self, mock_loader_class):
"""Test that timeout is propagated to ChromiumLoader via loader_kwargs."""
mock_loader = Mock()
mock_doc = Mock()
mock_doc.page_content = "<html>Test</html>"
mock_loader.load.return_value = [mock_doc]
mock_loader_class.return_value = mock_loader
node = self.FetchNode(
input="url",
output=["doc"],
node_config={"timeout": 20, "headless": True}
)
# Execute with a URL (not using soup, so ChromiumLoader is used)
state = {"url": "https://example.com"}
node.execute(state)
# Verify ChromiumLoader was instantiated with timeout in kwargs
mock_loader_class.assert_called_once()
call_kwargs = mock_loader_class.call_args[1]
self.assertEqual(call_kwargs.get('timeout'), 20)
@patch('scrapegraphai.nodes.fetch_node.ChromiumLoader')
def test_timeout_not_overridden_in_loader_kwargs(self, mock_loader_class):
"""Test that existing timeout in loader_kwargs is not overridden."""
mock_loader = Mock()
mock_doc = Mock()
mock_doc.page_content = "<html>Test</html>"
mock_loader.load.return_value = [mock_doc]
mock_loader_class.return_value = mock_loader
node = self.FetchNode(
input="url",
output=["doc"],
node_config={
"timeout": 20,
"loader_kwargs": {"timeout": 50} # Explicit loader timeout
}
)
# Execute with a URL
state = {"url": "https://example.com"}
node.execute(state)
# Verify ChromiumLoader got the loader_kwargs timeout, not node timeout
mock_loader_class.assert_called_once()
call_kwargs = mock_loader_class.call_args[1]
self.assertEqual(call_kwargs.get('timeout'), 50)
if __name__ == '__main__':
unittest.main()