mirror of
https://github.com/VinciGit00/Scrapegraph-ai.git
synced 2026-06-06 21:13:22 +08:00
312 lines
11 KiB
Python
312 lines
11 KiB
Python
"""
|
|
fetch_node_level_k module
|
|
"""
|
|
|
|
from typing import List, Optional
|
|
from urllib.parse import urljoin
|
|
|
|
from bs4 import BeautifulSoup
|
|
from langchain_core.documents import Document
|
|
|
|
from ..docloaders import ChromiumLoader
|
|
from .base_node import BaseNode
|
|
|
|
|
|
class FetchNodeLevelK(BaseNode):
|
|
"""
|
|
A node responsible for fetching the HTML content of a specified URL and all its sub-links
|
|
recursively up to a certain level of hyperlink the graph. This content is then used to update
|
|
the graph's state. It uses ChromiumLoader to fetch the content from a web page asynchronously
|
|
(with proxy protection).
|
|
|
|
Attributes:
|
|
embedder_model: An optional model for embedding the fetched content.
|
|
verbose (bool): A flag indicating whether to show print statements during execution.
|
|
cache_path (str): Path to cache fetched content.
|
|
headless (bool): Whether to run the Chromium browser in headless mode.
|
|
loader_kwargs (dict): Additional arguments for the content loader.
|
|
browser_base (dict): Optional configuration for the browser base API.
|
|
depth (int): Maximum depth of hyperlink graph traversal.
|
|
only_inside_links (bool): Whether to fetch only internal links.
|
|
min_input_len (int): Minimum required length of input data.
|
|
|
|
Args:
|
|
input (str): Boolean expression defining the input keys needed from the state.
|
|
output (List[str]): List of output keys to be updated in the state.
|
|
node_config (dict): Additional configuration for the node.
|
|
node_name (str): The unique identifier name for the node, defaulting to "FetchLevelK".
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
input: str,
|
|
output: List[str],
|
|
node_config: Optional[dict] = None,
|
|
node_name: str = "FetchLevelK",
|
|
):
|
|
"""
|
|
Initializes the FetchNodeLevelK instance.
|
|
|
|
Args:
|
|
input (str): Boolean expression defining the input keys needed from the state.
|
|
output (List[str]): List of output keys to be updated in the state.
|
|
node_config (Optional[dict]): Additional configuration for the node.
|
|
node_name (str): The name of the node (default is "FetchLevelK").
|
|
"""
|
|
super().__init__(node_name, "node", input, output, 2, node_config)
|
|
|
|
self.embedder_model = node_config.get("embedder_model", None)
|
|
self.verbose = node_config.get("verbose", False) if node_config else False
|
|
self.cache_path = node_config.get("cache_path", False)
|
|
self.headless = node_config.get("headless", True) if node_config else True
|
|
self.loader_kwargs = node_config.get("loader_kwargs", {}) if node_config else {}
|
|
self.browser_base = node_config.get("browser_base", None)
|
|
self.scrape_do = node_config.get("scrape_do", None)
|
|
self.storage_state = node_config.get("storage_state", None)
|
|
self.depth = node_config.get("depth", 1) if node_config else 1
|
|
self.only_inside_links = (
|
|
node_config.get("only_inside_links", False) if node_config else False
|
|
)
|
|
self.min_input_len = 1
|
|
|
|
def execute(self, state: dict) -> dict:
|
|
"""
|
|
Executes the node's logic to fetch the HTML content of a specified URL and its sub-links
|
|
recursively, then updates the graph's state with the fetched content.
|
|
|
|
Args:
|
|
state (dict): The current state of the graph.
|
|
|
|
Returns:
|
|
dict: The updated state with a new output key containing the fetched HTML content.
|
|
|
|
Raises:
|
|
KeyError: If the input key is not found in the state.
|
|
"""
|
|
self.logger.info(f"--- Executing {self.node_name} Node ---")
|
|
|
|
input_keys = self.get_input_keys(state)
|
|
input_data = [state[key] for key in input_keys]
|
|
source = input_data[0]
|
|
|
|
documents = [{"source": source}]
|
|
loader_kwargs = (
|
|
self.node_config.get("loader_kwargs", {}) if self.node_config else {}
|
|
)
|
|
|
|
for _ in range(self.depth):
|
|
documents = self.obtain_content(documents, loader_kwargs)
|
|
|
|
filtered_documents = [doc for doc in documents if "document" in doc]
|
|
state.update({self.output[0]: filtered_documents})
|
|
return state
|
|
|
|
def fetch_content(self, source: str, loader_kwargs) -> Optional[str]:
|
|
"""
|
|
Fetches the HTML content of a given source URL.
|
|
|
|
Args:
|
|
source (str): The URL to fetch content from.
|
|
loader_kwargs (dict): Additional arguments for the content loader.
|
|
|
|
Returns:
|
|
Optional[str]: The fetched HTML content or None if fetching failed.
|
|
"""
|
|
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
|
|
|
|
if self.browser_base is not None:
|
|
try:
|
|
from ..docloaders.browser_base import browser_base_fetch
|
|
except ImportError:
|
|
raise ImportError(
|
|
"""The browserbase module is not installed.
|
|
Please install it using `pip install browserbase`."""
|
|
)
|
|
|
|
data = browser_base_fetch(
|
|
self.browser_base.get("api_key"),
|
|
self.browser_base.get("project_id"),
|
|
[source],
|
|
)
|
|
document = [
|
|
Document(page_content=content, metadata={"source": source})
|
|
for content in data
|
|
]
|
|
elif self.scrape_do:
|
|
from ..docloaders.scrape_do import scrape_do_fetch
|
|
|
|
data = scrape_do_fetch(self.scrape_do.get("api_key"), source)
|
|
document = [Document(page_content=data, metadata={"source": source})]
|
|
else:
|
|
loader = ChromiumLoader(
|
|
[source],
|
|
headless=self.headless,
|
|
storage_state=self.storage_state,
|
|
**loader_kwargs,
|
|
)
|
|
document = loader.load()
|
|
return document
|
|
|
|
def extract_links(self, html_content: str) -> list:
|
|
"""
|
|
Extracts all hyperlinks from the HTML content.
|
|
|
|
Args:
|
|
html_content (str): The HTML content to extract links from.
|
|
|
|
Returns:
|
|
list: A list of extracted hyperlinks.
|
|
"""
|
|
soup = BeautifulSoup(html_content, "html.parser")
|
|
links = [link["href"] for link in soup.find_all("a", href=True)]
|
|
self.logger.info(f"Extracted {len(links)} links.")
|
|
return links
|
|
|
|
def get_full_links(self, base_url: str, links: list) -> list:
|
|
"""
|
|
Converts relative URLs to full URLs based on the base URL.
|
|
Filters out non-web links (mailto:, tel:, javascript:, etc.).
|
|
|
|
Args:
|
|
base_url (str): The base URL for resolving relative links.
|
|
links (list): A list of links to convert.
|
|
|
|
Returns:
|
|
list: A list of valid full URLs.
|
|
"""
|
|
# List of invalid URL schemes to filter out
|
|
invalid_schemes = {
|
|
"mailto:",
|
|
"tel:",
|
|
"fax:",
|
|
"sms:",
|
|
"callto:",
|
|
"wtai:",
|
|
"javascript:",
|
|
"data:",
|
|
"file:",
|
|
"ftp:",
|
|
"irc:",
|
|
"news:",
|
|
"nntp:",
|
|
"feed:",
|
|
"webcal:",
|
|
"skype:",
|
|
"im:",
|
|
"mtps:",
|
|
"spotify:",
|
|
"steam:",
|
|
"teamspeak:",
|
|
"udp:",
|
|
"unreal:",
|
|
"ut2004:",
|
|
"ventrilo:",
|
|
"view-source:",
|
|
"ws:",
|
|
"wss:",
|
|
}
|
|
|
|
full_links = []
|
|
for link in links:
|
|
# Skip if link starts with any invalid scheme
|
|
if any(link.lower().startswith(scheme) for scheme in invalid_schemes):
|
|
continue
|
|
|
|
# Skip if it's an external link and only_inside_links is True
|
|
if self.only_inside_links and link.startswith(("http://", "https://")):
|
|
continue
|
|
|
|
# Convert relative URLs to absolute URLs
|
|
try:
|
|
full_link = (
|
|
link
|
|
if link.startswith(("http://", "https://"))
|
|
else urljoin(base_url, link)
|
|
)
|
|
# Ensure the final URL starts with http:// or https://
|
|
if full_link.startswith(("http://", "https://")):
|
|
full_links.append(full_link)
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to process link {link}: {str(e)}")
|
|
|
|
return full_links
|
|
|
|
def obtain_content(self, documents: List, loader_kwargs) -> List:
|
|
"""
|
|
Iterates through documents, fetching and updating content recursively.
|
|
|
|
Args:
|
|
documents (List): A list of documents containing the source URLs.
|
|
loader_kwargs (dict): Additional arguments for the content loader.
|
|
|
|
Returns:
|
|
List: The updated list of documents with fetched content.
|
|
"""
|
|
new_documents = []
|
|
for doc in documents:
|
|
source = doc["source"]
|
|
if "document" not in doc:
|
|
try:
|
|
document = self.fetch_content(source, loader_kwargs)
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Failed to fetch content for {source}: {str(e)}"
|
|
)
|
|
continue
|
|
|
|
if not document or not document[0].page_content.strip():
|
|
self.logger.warning(f"Failed to fetch content for {source}")
|
|
documents.remove(doc)
|
|
continue
|
|
|
|
doc["document"] = document
|
|
links = self.extract_links(doc["document"][0].page_content)
|
|
full_links = self.get_full_links(source, links)
|
|
|
|
for link in full_links:
|
|
if not any(
|
|
d.get("source", "") == link for d in documents
|
|
) and not any(d.get("source", "") == link for d in new_documents):
|
|
new_documents.append({"source": link})
|
|
|
|
documents.extend(new_documents)
|
|
return documents
|
|
|
|
def process_links(
|
|
self,
|
|
base_url: str,
|
|
links: list,
|
|
loader_kwargs,
|
|
depth: int,
|
|
current_depth: int = 1,
|
|
) -> dict:
|
|
"""
|
|
Processes a list of links recursively up to a given depth.
|
|
|
|
Args:
|
|
base_url (str): The base URL for resolving relative links.
|
|
links (list): A list of links to process.
|
|
loader_kwargs (dict): Additional arguments for the content loader.
|
|
depth (int): The maximum depth for recursion.
|
|
current_depth (int): The current depth of recursion (default is 1).
|
|
|
|
Returns:
|
|
dict: A dictionary containing processed link content.
|
|
"""
|
|
content_dict = {}
|
|
for idx, link in enumerate(links, start=1):
|
|
full_link = link if link.startswith("http") else urljoin(base_url, link)
|
|
self.logger.info(f"Processing link {idx}: {full_link}")
|
|
link_content = self.fetch_content(full_link, loader_kwargs)
|
|
|
|
if current_depth < depth:
|
|
new_links = self.extract_links(link_content)
|
|
content_dict.update(
|
|
self.process_links(
|
|
full_link, new_links, loader_kwargs, depth, current_depth + 1
|
|
)
|
|
)
|
|
else:
|
|
self.logger.warning(f"Failed to fetch content for {full_link}")
|
|
return content_dict
|