add screenshot scraper

This commit is contained in:
Marco Vinciguerra 2024-08-18 19:39:49 +02:00
parent 8b8d8f09b7
commit 8e3d5deaaa
7 changed files with 248 additions and 4 deletions

View File

@ -0,0 +1,38 @@
"""
Basic example of scraping pipeline using SmartScraper
"""
import os
import json
from dotenv import load_dotenv
from scrapegraphai.graphs import ScreenshotScraperGraph
from scrapegraphai.utils import prettify_exec_info
load_dotenv()
# ************************************************
# Define the configuration for the graph
# ************************************************
graph_config = {
"llm": {
"api_key": os.getenv("OPENAI_API_KEY"),
"model": "gpt-4o",
},
"verbose": True,
"headless": False,
}
# ************************************************
# Create the ScreenshotScraperGraph instance and run it
# ************************************************
smart_scraper_graph = ScreenshotScraperGraph(
prompt="List me the email of the company",
source="https://scrapegraphai.com/",
config=graph_config
)
result = smart_scraper_graph.run()
print(json.dumps(result, indent=4))

View File

@ -2,10 +2,12 @@
Basic example of scraping pipeline using SmartScraper
"""
import os, json
import os
import json
from dotenv import load_dotenv
from scrapegraphai.graphs import SmartScraperGraph
from scrapegraphai.utils import prettify_exec_info
from dotenv import load_dotenv
load_dotenv()
# ************************************************
@ -16,7 +18,7 @@ load_dotenv()
graph_config = {
"llm": {
"api_key": os.getenv("OPENAI_API_KEY"),
"model": "gpt-3.5-turbo",
"model": "gpt-4o",
},
"verbose": True,
"headless": False,

View File

@ -24,3 +24,4 @@ from .script_creator_multi_graph import ScriptCreatorMultiGraph
from .markdown_scraper_graph import MDScraperGraph
from .markdown_scraper_multi_graph import MDScraperMultiGraph
from .search_link_graph import SearchLinkGraph
from .screenshot_scraper_graph import ScreenshotScraperGraph

View File

@ -0,0 +1,71 @@
"""
ScreenshotScraperGraph Module
"""
from typing import Optional
import logging
from pydantic import BaseModel
from .base_graph import BaseGraph
from .abstract_graph import AbstractGraph
from ..nodes import (
FetchScreenNode,
GenerateAnswerFromImageNode,
)
class ScreenshotScraperGraph(AbstractGraph):
"""
smart_scraper.run()
)
"""
def __init__(self, prompt: str, source: str, config: dict, schema: Optional[BaseModel] = None):
super().__init__(prompt, config, source, schema)
def _create_graph(self) -> BaseGraph:
"""
Creates the graph of nodes representing the workflow for web scraping.
Returns:
BaseGraph: A graph instance representing the web scraping workflow.
"""
fetch_screen_node = FetchScreenNode(
input="url",
output=["imgs"],
node_config={
"link": self.source
}
)
generate_answer_from_image_node = GenerateAnswerFromImageNode(
input="doc",
output=["parsed_doc"],
node_config={
"config": self.config
}
)
return BaseGraph(
nodes=[
fetch_screen_node,
generate_answer_from_image_node,
],
edges=[
(fetch_screen_node, generate_answer_from_image_node),
],
entry_point=fetch_screen_node,
graph_name=self.__class__.__name__
)
def run(self) -> str:
"""
Executes the scraping process and returns the answer to the prompt.
Returns:
str: The answer to the prompt.
"""
inputs = {"user_prompt": self.prompt}
self.final_state, self.execution_info = self.graph.execute(inputs)
return self.final_state.get("answer", "No answer found.")

View File

@ -19,4 +19,6 @@ from .generate_answer_pdf_node import GenerateAnswerPDFNode
from .graph_iterator_node import GraphIteratorNode
from .merge_answers_node import MergeAnswersNode
from .generate_answer_omni_node import GenerateAnswerOmniNode
from .merge_generated_scripts import MergeGeneratedScriptsNode
from .merge_generated_scripts import MergeGeneratedScriptsNode
from .fetch_screen_node import FetchScreenNode
from .generate_answer_from_image_node import GenerateAnswerFromImageNode

View File

@ -0,0 +1,56 @@
from typing import List, Optional
from playwright.sync_api import sync_playwright
from .base_node import BaseNode
class FetchScreenNode(BaseNode):
"""
FetchScreenNode captures screenshots from a given URL and stores the image data as bytes.
"""
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "FetchScreenNode",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.url = node_config.get("link")
def execute(self, state: dict) -> dict:
"""Captures screenshots from the input URL and stores them in the state dictionary as bytes."""
screenshots = []
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto(self.url)
viewport_height = page.viewport_size["height"]
# Initialize screenshot counter
screenshot_counter = 1
# List to keep track of screenshot data
screenshot_data_list = []
# Function to capture screenshots
def capture_screenshot(scroll_position, counter):
page.evaluate(f"window.scrollTo(0, {scroll_position});")
screenshot_data = page.screenshot()
screenshot_data_list.append(screenshot_data)
# Capture screenshots
capture_screenshot(0, screenshot_counter) # First screenshot
screenshot_counter += 1
capture_screenshot(viewport_height, screenshot_counter) # Second screenshot
browser.close()
# Store screenshot data as bytes in the state dictionary
for screenshot_data in screenshot_data_list:
screenshots.append(screenshot_data)
state["link"] = self.url
state['screenshots'] = screenshots
return state

View File

@ -0,0 +1,74 @@
from typing import List, Optional
from .base_node import BaseNode
import base64
import requests
class GenerateAnswerFromImageNode(BaseNode):
"""
GenerateAnswerFromImageNode analyzes images from the state dictionary using the OpenAI API
and updates the state with the generated answers.
"""
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswerFromImageNode",
):
super().__init__(node_name, "node", input, output, 2, node_config)
def execute(self, state: dict) -> dict:
"""Processes images from the state, generates answers, and updates the state."""
# Retrieve the image data from the state dictionary
images = state.get('screenshots', [])
results = []
# OpenAI API Key
for image_data in images:
# Encode the image data to base64
base64_image = base64.b64encode(image_data).decode('utf-8')
# Prepare API request
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.node_config.get("config").get("llm").get("api_key")}"
}
payload = {
"model": "gpt-4o-mini",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": state.get("user_prompt", "Extract information from the image")
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
# Make the API request
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
result = response.json()
# Extract the response text
response_text = result.get('choices', [{}])[0].get('message', {}).get('content', 'No response')
# Append the result to the results list
results.append({
"analysis": response_text
})
# Update the state dictionary with the results
state['answer'] = results
return state