fetching first level

This commit is contained in:
Matteo Vedovati 2024-09-30 12:42:26 +02:00
parent 57bf572ab4
commit d80b792e15
2 changed files with 170 additions and 2 deletions

View File

@ -3,10 +3,17 @@ FetchNodeLevelK Module
"""
from typing import List, Optional
from .base_node import BaseNode
from ..docloaders import ChromiumLoader
from ..utils.cleanup_html import cleanup_html
from ..utils.convert_to_md import convert_to_md
from langchain_core.documents import Document
class FetchNodeLevelK(BaseNode):
"""
A node responsible for fetching all the pages at a certain level of hyperlink the graph.
A node responsible for fetching the HTML content of a specified URL and all its sub-links
recursively up to a certain level of hyperlink the graph. This content is then used to update
the graph's state. It uses ChromiumLoader to fetch the content from a web page asynchronously
(with proxy protection).
Attributes:
llm_model: An instance of a language model client, configured for generating answers.
@ -29,11 +36,80 @@ class FetchNodeLevelK(BaseNode):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.embedder_model = node_config.get("embedder_model", None)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.cache_path = node_config.get("cache_path", False)
self.headless = (
True if node_config is None else node_config.get("headless", True)
)
self.loader_kwargs = (
{} if node_config is None else node_config.get("loader_kwargs", {})
)
self.browser_base = (
None if node_config is None else node_config.get("browser_base", None)
)
def execute(self, state: dict) -> dict:
pass
"""
Executes the node's logic to fetch the HTML content of a specified URL and all its sub-links
and update the graph's state with the content.
Args:
state (dict): The current state of the graph. The input keys will be used
to fetch the correct data types from the state.
Returns:
dict: The updated state with a new output key containing the fetched HTML content.
Raises:
KeyError: If the input key is not found in the state, indicating that the
necessary information to perform the operation is missing.
"""
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
# Fetching data from the state based on the input keys
input_data = [state[key] for key in input_keys]
source = input_data[0]
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
loader_kwargs = {}
if self.node_config is not None:
loader_kwargs = self.node_config.get("loader_kwargs", {})
if self.browser_base is not None:
try:
from ..docloaders.browser_base import browser_base_fetch
except ImportError:
raise ImportError("""The browserbase module is not installed.
Please install it using `pip install browserbase`.""")
data = browser_base_fetch(self.browser_base.get("api_key"),
self.browser_base.get("project_id"), [source])
document = [Document(page_content=content,
metadata={"source": source}) for content in data]
else:
loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs)
document = loader.load()
if not document or not document[0].page_content.strip():
raise ValueError("""No HTML body content found in
the document fetched by ChromiumLoader.""")
parsed_content = document[0].page_content

View File

@ -0,0 +1,92 @@
import requests
import logging
import time
from urllib.parse import quote, urljoin
from typing import Optional
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import os
import json
import markdownify
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def fetch_content(token: str, target_url: str, max_retries: int = 5, retry_delay: int = 3) -> Optional[str]:
encoded_url = quote(target_url)
url = f"http://api.scrape.do?url={encoded_url}&token={token}&render=true&waitUntil=networkidle0"
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 200:
logging.info(f"Successfully fetched content from {target_url}")
return response.text
logging.warning(f"Failed with status {response.status_code}. Retrying in {retry_delay}s...")
except requests.RequestException as e:
logging.error(f"Error fetching {target_url}: {e}. Retrying in {retry_delay}s...")
time.sleep(retry_delay)
logging.error(f"Failed to fetch {target_url} after {max_retries} attempts.")
return None
def extract_links(html_content: str) -> list:
soup = BeautifulSoup(html_content, 'html.parser')
links = [link['href'] for link in soup.find_all('a', href=True)]
logging.info(f"Extracted {len(links)} links.")
return links
def process_links(token: str, base_url: str, links: list, depth: int, current_depth: int = 1) -> dict:
content_dict = {}
for idx, link in enumerate(links, start=1):
full_link = link if link.startswith("http") else urljoin(base_url, link)
logging.info(f"Processing link {idx}: {full_link}")
link_content = fetch_content(token, full_link)
if link_content:
markdown_content = markdownify.markdownify(link_content, heading_style="ATX")
content_dict[full_link] = markdown_content
save_content_to_json(content_dict, idx)
if current_depth < depth:
new_links = extract_links(link_content)
content_dict.update(process_links(token, full_link, new_links, depth, current_depth + 1))
else:
logging.warning(f"Failed to fetch content for {full_link}")
return content_dict
def save_content_to_json(content_dict: dict, idx: int):
if not os.path.exists("downloaded_pages"):
os.makedirs("downloaded_pages")
file_name = f"scraped_content_{idx}.json"
file_path = os.path.join("downloaded_pages", file_name)
with open(file_path, "w", encoding="utf-8") as json_file:
json.dump(content_dict, json_file, ensure_ascii=False, indent=4)
logging.info(f"Content saved to {file_path}")
if __name__ == "__main__":
token = os.getenv("TOKEN")
target_url = "https://www.wired.com"
depth = 2
if not token or not target_url:
logging.error("Please set the TOKEN and TARGET_URL environment variables.")
exit(1)
html_content = fetch_content(token, target_url)
if html_content:
links = extract_links(html_content)
logging.info("Links found:")
for link in links:
logging.info(link)
content_dict = process_links(token, target_url, links, depth)
for link, content in content_dict.items():
logging.info(f"Link: {link}")
logging.info(f"Content: {content[:500]}...")
else:
logging.error("Failed to fetch the content.")