diff --git a/scrapegraphai/docloaders/__init__.py b/scrapegraphai/docloaders/__init__.py new file mode 100644 index 00000000..a9e45407 --- /dev/null +++ b/scrapegraphai/docloaders/__init__.py @@ -0,0 +1,3 @@ +"""__init__.py file for docloaders folder""" + +from .chromium import ChromiumLoader diff --git a/scrapegraphai/docloaders/chromium.py b/scrapegraphai/docloaders/chromium.py new file mode 100644 index 00000000..0377f803 --- /dev/null +++ b/scrapegraphai/docloaders/chromium.py @@ -0,0 +1,125 @@ +import asyncio +import logging +from typing import Any, AsyncIterator, Iterator, List, Optional + +from langchain_core.documents import Document + +from ..utils import Proxy, dynamic_import, parse_or_search_proxy + + +logger = logging.getLogger(__name__) + + +class ChromiumLoader: + """scrapes HTML pages from URLs using a (headless) instance of the + Chromium web driver with proxy protection + + Attributes: + backend: The web driver backend library; defaults to 'playwright'. + browser_config: A dictionary containing additional browser kwargs. + headless: whether to run browser in headless mode. + proxy: A dictionary containing proxy settings; None disables protection. + urls: A list of URLs to scrape content from. + """ + + def __init__( + self, + urls: List[str], + *, + backend: str = "playwright", + headless: bool = True, + proxy: Optional[Proxy] = None, + **kwargs: Any, + ): + """Initialize the loader with a list of URL paths. + + Args: + backend: The web driver backend library; defaults to 'playwright'. + headless: whether to run browser in headless mode. + proxy: A dictionary containing proxy information; None disables protection. + urls: A list of URLs to scrape content from. + kwargs: A dictionary containing additional browser kwargs. + + Raises: + ImportError: If the required backend package is not installed. + """ + message = ( + f"{backend} is required for ChromiumLoader. " + f"Please install it with `pip install {backend}`." + ) + + dynamic_import(backend, message) + + self.backend = backend + self.browser_config = kwargs + self.headless = headless + self.proxy = parse_or_search_proxy(proxy) if proxy else None + self.urls = urls + + async def ascrape_playwright(self, url: str) -> str: + """ + Asynchronously scrape the content of a given URL using Playwright's async API. + + Args: + url (str): The URL to scrape. + + Returns: + str: The scraped HTML content or an error message if an exception occurs. + + """ + from playwright.async_api import async_playwright + + logger.info("Starting scraping...") + results = "" + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=self.headless, proxy=self.proxy, **self.browser_config + ) + try: + page = await browser.new_page() + await page.goto(url) + results = await page.content() # Simply get the HTML content + logger.info("Content scraped") + except Exception as e: + results = f"Error: {e}" + await browser.close() + return results + + def lazy_load(self) -> Iterator[Document]: + """ + Lazily load text content from the provided URLs. + + This method yields Documents one at a time as they're scraped, + instead of waiting to scrape all URLs before returning. + + Yields: + Document: The scraped content encapsulated within a Document object. + + """ + scraping_fn = getattr(self, f"ascrape_{self.backend}") + + for url in self.urls: + html_content = asyncio.run(scraping_fn(url)) + metadata = {"source": url} + yield Document(page_content=html_content, metadata=metadata) + + async def alazy_load(self) -> AsyncIterator[Document]: + """ + Asynchronously load text content from the provided URLs. + + This method leverages asyncio to initiate the scraping of all provided URLs + simultaneously. It improves performance by utilizing concurrent asynchronous + requests. Each Document is yielded as soon as its content is available, + encapsulating the scraped content. + + Yields: + Document: A Document object containing the scraped content, along with its + source URL as metadata. + """ + scraping_fn = getattr(self, f"ascrape_{self.backend}") + + tasks = [scraping_fn(url) for url in self.urls] + results = await asyncio.gather(*tasks) + for url, content in zip(self.urls, results): + metadata = {"source": url} + yield Document(page_content=content, metadata=metadata) diff --git a/scrapegraphai/nodes/fetch_node.py b/scrapegraphai/nodes/fetch_node.py index 52266b42..51a66518 100644 --- a/scrapegraphai/nodes/fetch_node.py +++ b/scrapegraphai/nodes/fetch_node.py @@ -1,21 +1,24 @@ """ FetchNode Module """ -import pandas as pd + import json from typing import List, Optional -from langchain_community.document_loaders import AsyncChromiumLoader -from langchain_core.documents import Document + +import pandas as pd from langchain_community.document_loaders import PyPDFLoader -from .base_node import BaseNode +from langchain_core.documents import Document + +from ..docloaders import ChromiumLoader from ..utils.remover import remover +from .base_node import BaseNode class FetchNode(BaseNode): """ A node responsible for fetching the HTML content of a specified URL and updating - the graph's state with this content. It uses the AsyncChromiumLoader to fetch the - content asynchronously. + the graph's state with this content. It uses ChromiumLoader to fetch + the content from a web page asynchronously (with proxy protection). This node acts as a starting point in many scraping workflows, preparing the state with the necessary HTML content for further processing by subsequent nodes in the graph. @@ -31,13 +34,21 @@ class FetchNode(BaseNode): node_name (str): The unique identifier name for the node, defaulting to "Fetch". """ - def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, node_name: str = "Fetch"): + def __init__( + self, + input: str, + output: List[str], + node_config: Optional[dict] = None, + node_name: str = "Fetch", + ): super().__init__(node_name, "node", input, output, 1) - self.headless = True if node_config is None else node_config.get( - "headless", True) - self.verbose = False if node_config is None else node_config.get( - "verbose", False) + self.headless = ( + True if node_config is None else node_config.get("headless", True) + ) + self.verbose = ( + False if node_config is None else node_config.get("verbose", False) + ) def execute(self, state): """ @@ -64,10 +75,14 @@ class FetchNode(BaseNode): input_data = [state[key] for key in input_keys] source = input_data[0] - if self.input == "json_dir" or self.input == "xml_dir" or self.input == "csv_dir": - compressed_document = [Document(page_content=source, metadata={ - "source": "local_dir" - })] + if ( + self.input == "json_dir" + or self.input == "xml_dir" + or self.input == "csv_dir" + ): + compressed_document = [ + Document(page_content=source, metadata={"source": "local_dir"}) + ] # if it is a local directory # handling for pdf @@ -76,45 +91,42 @@ class FetchNode(BaseNode): compressed_document = loader.load() elif self.input == "csv": - compressed_document = [Document(page_content=str(pd.read_csv(source)), metadata={ - "source": "csv" - })] + compressed_document = [ + Document( + page_content=str(pd.read_csv(source)), metadata={"source": "csv"} + ) + ] elif self.input == "json": f = open(source) - compressed_document = [Document(page_content=str(json.load(f)), metadata={ - "source": "json" - })] + compressed_document = [ + Document(page_content=str(json.load(f)), metadata={"source": "json"}) + ] elif self.input == "xml": - with open(source, 'r', encoding='utf-8') as f: + with open(source, "r", encoding="utf-8") as f: data = f.read() - compressed_document = [Document(page_content=data, metadata={ - "source": "xml" - })] + compressed_document = [ + Document(page_content=data, metadata={"source": "xml"}) + ] elif self.input == "pdf_dir": pass elif not source.startswith("http"): - compressed_document = [Document(page_content=remover(source), metadata={ - "source": "local_dir" - })] + compressed_document = [ + Document(page_content=remover(source), metadata={"source": "local_dir"}) + ] else: - if self.node_config is not None and self.node_config.get("endpoint") is not None: + loader_kwargs = {} - loader = AsyncChromiumLoader( - [source], - proxies={"http": self.node_config["endpoint"]}, - headless=self.headless, - ) - else: - loader = AsyncChromiumLoader( - [source], - headless=self.headless, - ) + if self.node_config is not None: + loader_kwargs = self.node_config.get("loader_kwargs", {}) + + loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs) document = loader.load() compressed_document = [ - Document(page_content=remover(str(document[0].page_content)))] + Document(page_content=remover(str(document[0].page_content))) + ] state.update({self.output[0]: compressed_document}) return state