""" fetch_node_level_k module """ from typing import List, Optional from urllib.parse import urljoin from bs4 import BeautifulSoup from langchain_core.documents import Document from ..docloaders import ChromiumLoader from .base_node import BaseNode class FetchNodeLevelK(BaseNode): """ A node responsible for fetching the HTML content of a specified URL and all its sub-links recursively up to a certain level of hyperlink the graph. This content is then used to update the graph's state. It uses ChromiumLoader to fetch the content from a web page asynchronously (with proxy protection). Attributes: embedder_model: An optional model for embedding the fetched content. verbose (bool): A flag indicating whether to show print statements during execution. cache_path (str): Path to cache fetched content. headless (bool): Whether to run the Chromium browser in headless mode. loader_kwargs (dict): Additional arguments for the content loader. browser_base (dict): Optional configuration for the browser base API. depth (int): Maximum depth of hyperlink graph traversal. only_inside_links (bool): Whether to fetch only internal links. min_input_len (int): Minimum required length of input data. Args: input (str): Boolean expression defining the input keys needed from the state. output (List[str]): List of output keys to be updated in the state. node_config (dict): Additional configuration for the node. node_name (str): The unique identifier name for the node, defaulting to "FetchLevelK". """ def __init__( self, input: str, output: List[str], node_config: Optional[dict] = None, node_name: str = "FetchLevelK", ): """ Initializes the FetchNodeLevelK instance. Args: input (str): Boolean expression defining the input keys needed from the state. output (List[str]): List of output keys to be updated in the state. node_config (Optional[dict]): Additional configuration for the node. node_name (str): The name of the node (default is "FetchLevelK"). """ super().__init__(node_name, "node", input, output, 2, node_config) self.embedder_model = node_config.get("embedder_model", None) self.verbose = node_config.get("verbose", False) if node_config else False self.cache_path = node_config.get("cache_path", False) self.headless = node_config.get("headless", True) if node_config else True self.loader_kwargs = node_config.get("loader_kwargs", {}) if node_config else {} self.browser_base = node_config.get("browser_base", None) self.scrape_do = node_config.get("scrape_do", None) self.storage_state = node_config.get("storage_state", None) self.depth = node_config.get("depth", 1) if node_config else 1 self.only_inside_links = ( node_config.get("only_inside_links", False) if node_config else False ) self.min_input_len = 1 def execute(self, state: dict) -> dict: """ Executes the node's logic to fetch the HTML content of a specified URL and its sub-links recursively, then updates the graph's state with the fetched content. Args: state (dict): The current state of the graph. Returns: dict: The updated state with a new output key containing the fetched HTML content. Raises: KeyError: If the input key is not found in the state. """ self.logger.info(f"--- Executing {self.node_name} Node ---") input_keys = self.get_input_keys(state) input_data = [state[key] for key in input_keys] source = input_data[0] documents = [{"source": source}] loader_kwargs = ( self.node_config.get("loader_kwargs", {}) if self.node_config else {} ) for _ in range(self.depth): documents = self.obtain_content(documents, loader_kwargs) filtered_documents = [doc for doc in documents if "document" in doc] state.update({self.output[0]: filtered_documents}) return state def fetch_content(self, source: str, loader_kwargs) -> Optional[str]: """ Fetches the HTML content of a given source URL. Args: source (str): The URL to fetch content from. loader_kwargs (dict): Additional arguments for the content loader. Returns: Optional[str]: The fetched HTML content or None if fetching failed. """ self.logger.info(f"--- (Fetching HTML from: {source}) ---") if self.browser_base is not None: try: from ..docloaders.browser_base import browser_base_fetch except ImportError: raise ImportError( """The browserbase module is not installed. Please install it using `pip install browserbase`.""" ) data = browser_base_fetch( self.browser_base.get("api_key"), self.browser_base.get("project_id"), [source], ) document = [ Document(page_content=content, metadata={"source": source}) for content in data ] elif self.scrape_do: from ..docloaders.scrape_do import scrape_do_fetch data = scrape_do_fetch(self.scrape_do.get("api_key"), source) document = [Document(page_content=data, metadata={"source": source})] else: loader = ChromiumLoader( [source], headless=self.headless, storage_state=self.storage_state, **loader_kwargs, ) document = loader.load() return document def extract_links(self, html_content: str) -> list: """ Extracts all hyperlinks from the HTML content. Args: html_content (str): The HTML content to extract links from. Returns: list: A list of extracted hyperlinks. """ soup = BeautifulSoup(html_content, "html.parser") links = [link["href"] for link in soup.find_all("a", href=True)] self.logger.info(f"Extracted {len(links)} links.") return links def get_full_links(self, base_url: str, links: list) -> list: """ Converts relative URLs to full URLs based on the base URL. Filters out non-web links (mailto:, tel:, javascript:, etc.). Args: base_url (str): The base URL for resolving relative links. links (list): A list of links to convert. Returns: list: A list of valid full URLs. """ # List of invalid URL schemes to filter out invalid_schemes = { "mailto:", "tel:", "fax:", "sms:", "callto:", "wtai:", "javascript:", "data:", "file:", "ftp:", "irc:", "news:", "nntp:", "feed:", "webcal:", "skype:", "im:", "mtps:", "spotify:", "steam:", "teamspeak:", "udp:", "unreal:", "ut2004:", "ventrilo:", "view-source:", "ws:", "wss:", } full_links = [] for link in links: # Skip if link starts with any invalid scheme if any(link.lower().startswith(scheme) for scheme in invalid_schemes): continue # Skip if it's an external link and only_inside_links is True if self.only_inside_links and link.startswith(("http://", "https://")): continue # Convert relative URLs to absolute URLs try: full_link = ( link if link.startswith(("http://", "https://")) else urljoin(base_url, link) ) # Ensure the final URL starts with http:// or https:// if full_link.startswith(("http://", "https://")): full_links.append(full_link) except Exception as e: self.logger.warning(f"Failed to process link {link}: {str(e)}") return full_links def obtain_content(self, documents: List, loader_kwargs) -> List: """ Iterates through documents, fetching and updating content recursively. Args: documents (List): A list of documents containing the source URLs. loader_kwargs (dict): Additional arguments for the content loader. Returns: List: The updated list of documents with fetched content. """ new_documents = [] for doc in documents: source = doc["source"] if "document" not in doc: try: document = self.fetch_content(source, loader_kwargs) except Exception as e: self.logger.warning( f"Failed to fetch content for {source}: {str(e)}" ) continue if not document or not document[0].page_content.strip(): self.logger.warning(f"Failed to fetch content for {source}") documents.remove(doc) continue doc["document"] = document links = self.extract_links(doc["document"][0].page_content) full_links = self.get_full_links(source, links) for link in full_links: if not any( d.get("source", "") == link for d in documents ) and not any(d.get("source", "") == link for d in new_documents): new_documents.append({"source": link}) documents.extend(new_documents) return documents def process_links( self, base_url: str, links: list, loader_kwargs, depth: int, current_depth: int = 1, ) -> dict: """ Processes a list of links recursively up to a given depth. Args: base_url (str): The base URL for resolving relative links. links (list): A list of links to process. loader_kwargs (dict): Additional arguments for the content loader. depth (int): The maximum depth for recursion. current_depth (int): The current depth of recursion (default is 1). Returns: dict: A dictionary containing processed link content. """ content_dict = {} for idx, link in enumerate(links, start=1): full_link = link if link.startswith("http") else urljoin(base_url, link) self.logger.info(f"Processing link {idx}: {full_link}") link_content = self.fetch_content(full_link, loader_kwargs) if current_depth < depth: new_links = self.extract_links(link_content) content_dict.update( self.process_links( full_link, new_links, loader_kwargs, depth, current_depth + 1 ) ) else: self.logger.warning(f"Failed to fetch content for {full_link}") return content_dict