From 768719cce80953fa6cbe283e442420116c438f16 Mon Sep 17 00:00:00 2001 From: Federico Minutoli Date: Fri, 10 May 2024 21:13:38 +0200 Subject: [PATCH] feat(safe-web-driver): enchanced the original `AsyncChromiumLoader` web driver with proxy protection and flexible kwargs and backend the original class prevents passing kwargs down to the playwright backend, making some config unfeasible, including passing a proxy server to the web driver. the new class has backward compatibility with the original, but 1) allows any kwarg to be passed down to the web driver, 2) allows specifying the web driver backend (only playwright is supported for now) in case more (e.g., selenium) will be supported in the future and 3) automatically fetches a suitable proxy if one is not passed already --- scrapegraphai/docloaders/__init__.py | 3 + scrapegraphai/docloaders/chromium.py | 125 +++++++++++++++++++++++++++ scrapegraphai/nodes/fetch_node.py | 92 +++++++++++--------- 3 files changed, 180 insertions(+), 40 deletions(-) create mode 100644 scrapegraphai/docloaders/__init__.py create mode 100644 scrapegraphai/docloaders/chromium.py diff --git a/scrapegraphai/docloaders/__init__.py b/scrapegraphai/docloaders/__init__.py new file mode 100644 index 00000000..a9e45407 --- /dev/null +++ b/scrapegraphai/docloaders/__init__.py @@ -0,0 +1,3 @@ +"""__init__.py file for docloaders folder""" + +from .chromium import ChromiumLoader diff --git a/scrapegraphai/docloaders/chromium.py b/scrapegraphai/docloaders/chromium.py new file mode 100644 index 00000000..0377f803 --- /dev/null +++ b/scrapegraphai/docloaders/chromium.py @@ -0,0 +1,125 @@ +import asyncio +import logging +from typing import Any, AsyncIterator, Iterator, List, Optional + +from langchain_core.documents import Document + +from ..utils import Proxy, dynamic_import, parse_or_search_proxy + + +logger = logging.getLogger(__name__) + + +class ChromiumLoader: + """scrapes HTML pages from URLs using a (headless) instance of the + Chromium web driver with proxy protection + + Attributes: + backend: The web driver backend library; defaults to 'playwright'. + browser_config: A dictionary containing additional browser kwargs. + headless: whether to run browser in headless mode. + proxy: A dictionary containing proxy settings; None disables protection. + urls: A list of URLs to scrape content from. + """ + + def __init__( + self, + urls: List[str], + *, + backend: str = "playwright", + headless: bool = True, + proxy: Optional[Proxy] = None, + **kwargs: Any, + ): + """Initialize the loader with a list of URL paths. + + Args: + backend: The web driver backend library; defaults to 'playwright'. + headless: whether to run browser in headless mode. + proxy: A dictionary containing proxy information; None disables protection. + urls: A list of URLs to scrape content from. + kwargs: A dictionary containing additional browser kwargs. + + Raises: + ImportError: If the required backend package is not installed. + """ + message = ( + f"{backend} is required for ChromiumLoader. " + f"Please install it with `pip install {backend}`." + ) + + dynamic_import(backend, message) + + self.backend = backend + self.browser_config = kwargs + self.headless = headless + self.proxy = parse_or_search_proxy(proxy) if proxy else None + self.urls = urls + + async def ascrape_playwright(self, url: str) -> str: + """ + Asynchronously scrape the content of a given URL using Playwright's async API. + + Args: + url (str): The URL to scrape. + + Returns: + str: The scraped HTML content or an error message if an exception occurs. + + """ + from playwright.async_api import async_playwright + + logger.info("Starting scraping...") + results = "" + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=self.headless, proxy=self.proxy, **self.browser_config + ) + try: + page = await browser.new_page() + await page.goto(url) + results = await page.content() # Simply get the HTML content + logger.info("Content scraped") + except Exception as e: + results = f"Error: {e}" + await browser.close() + return results + + def lazy_load(self) -> Iterator[Document]: + """ + Lazily load text content from the provided URLs. + + This method yields Documents one at a time as they're scraped, + instead of waiting to scrape all URLs before returning. + + Yields: + Document: The scraped content encapsulated within a Document object. + + """ + scraping_fn = getattr(self, f"ascrape_{self.backend}") + + for url in self.urls: + html_content = asyncio.run(scraping_fn(url)) + metadata = {"source": url} + yield Document(page_content=html_content, metadata=metadata) + + async def alazy_load(self) -> AsyncIterator[Document]: + """ + Asynchronously load text content from the provided URLs. + + This method leverages asyncio to initiate the scraping of all provided URLs + simultaneously. It improves performance by utilizing concurrent asynchronous + requests. Each Document is yielded as soon as its content is available, + encapsulating the scraped content. + + Yields: + Document: A Document object containing the scraped content, along with its + source URL as metadata. + """ + scraping_fn = getattr(self, f"ascrape_{self.backend}") + + tasks = [scraping_fn(url) for url in self.urls] + results = await asyncio.gather(*tasks) + for url, content in zip(self.urls, results): + metadata = {"source": url} + yield Document(page_content=content, metadata=metadata) diff --git a/scrapegraphai/nodes/fetch_node.py b/scrapegraphai/nodes/fetch_node.py index 52266b42..51a66518 100644 --- a/scrapegraphai/nodes/fetch_node.py +++ b/scrapegraphai/nodes/fetch_node.py @@ -1,21 +1,24 @@ """ FetchNode Module """ -import pandas as pd + import json from typing import List, Optional -from langchain_community.document_loaders import AsyncChromiumLoader -from langchain_core.documents import Document + +import pandas as pd from langchain_community.document_loaders import PyPDFLoader -from .base_node import BaseNode +from langchain_core.documents import Document + +from ..docloaders import ChromiumLoader from ..utils.remover import remover +from .base_node import BaseNode class FetchNode(BaseNode): """ A node responsible for fetching the HTML content of a specified URL and updating - the graph's state with this content. It uses the AsyncChromiumLoader to fetch the - content asynchronously. + the graph's state with this content. It uses ChromiumLoader to fetch + the content from a web page asynchronously (with proxy protection). This node acts as a starting point in many scraping workflows, preparing the state with the necessary HTML content for further processing by subsequent nodes in the graph. @@ -31,13 +34,21 @@ class FetchNode(BaseNode): node_name (str): The unique identifier name for the node, defaulting to "Fetch". """ - def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, node_name: str = "Fetch"): + def __init__( + self, + input: str, + output: List[str], + node_config: Optional[dict] = None, + node_name: str = "Fetch", + ): super().__init__(node_name, "node", input, output, 1) - self.headless = True if node_config is None else node_config.get( - "headless", True) - self.verbose = False if node_config is None else node_config.get( - "verbose", False) + self.headless = ( + True if node_config is None else node_config.get("headless", True) + ) + self.verbose = ( + False if node_config is None else node_config.get("verbose", False) + ) def execute(self, state): """ @@ -64,10 +75,14 @@ class FetchNode(BaseNode): input_data = [state[key] for key in input_keys] source = input_data[0] - if self.input == "json_dir" or self.input == "xml_dir" or self.input == "csv_dir": - compressed_document = [Document(page_content=source, metadata={ - "source": "local_dir" - })] + if ( + self.input == "json_dir" + or self.input == "xml_dir" + or self.input == "csv_dir" + ): + compressed_document = [ + Document(page_content=source, metadata={"source": "local_dir"}) + ] # if it is a local directory # handling for pdf @@ -76,45 +91,42 @@ class FetchNode(BaseNode): compressed_document = loader.load() elif self.input == "csv": - compressed_document = [Document(page_content=str(pd.read_csv(source)), metadata={ - "source": "csv" - })] + compressed_document = [ + Document( + page_content=str(pd.read_csv(source)), metadata={"source": "csv"} + ) + ] elif self.input == "json": f = open(source) - compressed_document = [Document(page_content=str(json.load(f)), metadata={ - "source": "json" - })] + compressed_document = [ + Document(page_content=str(json.load(f)), metadata={"source": "json"}) + ] elif self.input == "xml": - with open(source, 'r', encoding='utf-8') as f: + with open(source, "r", encoding="utf-8") as f: data = f.read() - compressed_document = [Document(page_content=data, metadata={ - "source": "xml" - })] + compressed_document = [ + Document(page_content=data, metadata={"source": "xml"}) + ] elif self.input == "pdf_dir": pass elif not source.startswith("http"): - compressed_document = [Document(page_content=remover(source), metadata={ - "source": "local_dir" - })] + compressed_document = [ + Document(page_content=remover(source), metadata={"source": "local_dir"}) + ] else: - if self.node_config is not None and self.node_config.get("endpoint") is not None: + loader_kwargs = {} - loader = AsyncChromiumLoader( - [source], - proxies={"http": self.node_config["endpoint"]}, - headless=self.headless, - ) - else: - loader = AsyncChromiumLoader( - [source], - headless=self.headless, - ) + if self.node_config is not None: + loader_kwargs = self.node_config.get("loader_kwargs", {}) + + loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs) document = loader.load() compressed_document = [ - Document(page_content=remover(str(document[0].page_content)))] + Document(page_content=remover(str(document[0].page_content))) + ] state.update({self.output[0]: compressed_document}) return state