fix(node-logging): use centralized logger in each node for logging

This commit is contained in:
Federico Minutoli 2024-05-24 01:09:49 +02:00
parent 4348d4f4db
commit c251cc45d3
18 changed files with 406 additions and 242 deletions

View File

@ -3,21 +3,22 @@ BlocksIndentifier Module
"""
from typing import List, Optional
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_core.documents import Document
from .base_node import BaseNode
from .base_node import BaseNode
class BlocksIndentifier(BaseNode):
"""
A node responsible to identify the blocks in the HTML content of a specified HTML content
e.g products in a E-commerce, flights in a travel website etc.
e.g products in a E-commerce, flights in a travel website etc.
Attributes:
headless (bool): A flag indicating whether the browser should run in headless mode.
verbose (bool): A flag indicating whether to print verbose output during execution.
Args:
input (str): Boolean expression defining the input keys needed from the state.
output (List[str]): List of output keys to be updated in the state.
@ -25,11 +26,21 @@ class BlocksIndentifier(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "BlocksIndentifier".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict], node_name: str = "BlocksIndentifier"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict],
node_name: str = "BlocksIndentifier",
):
super().__init__(node_name, "node", input, output, 1)
self.headless = True if node_config is None else node_config.get("headless", True)
self.verbose = True if node_config is None else node_config.get("verbose", False)
self.headless = (
True if node_config is None else node_config.get("headless", True)
)
self.verbose = (
True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state):
"""
@ -47,8 +58,7 @@ class BlocksIndentifier(BaseNode):
KeyError: If the input key is not found in the state, indicating that the
necessary information to perform the operation is missing.
"""
if self.verbose:
print(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)

View File

@ -3,17 +3,18 @@ FetchNode Module
"""
import json
import requests
from typing import List, Optional
import pandas as pd
import requests
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from ..docloaders import ChromiumLoader
from .base_node import BaseNode
from ..utils.cleanup_html import cleanup_html
from ..utils.logging import get_logger
from .base_node import BaseNode
class FetchNode(BaseNode):
"""
@ -51,7 +52,7 @@ class FetchNode(BaseNode):
False if node_config is None else node_config.get("verbose", False)
)
self.useSoup = (
False if node_config is None else node_config.get("useSoup", False)
False if node_config is None else node_config.get("useSoup", False)
)
self.loader_kwargs = (
{} if node_config is None else node_config.get("loader_kwargs", {})
@ -73,8 +74,8 @@ class FetchNode(BaseNode):
KeyError: If the input key is not found in the state, indicating that the
necessary information to perform the operation is missing.
"""
logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -92,7 +93,7 @@ class FetchNode(BaseNode):
]
state.update({self.output[0]: compressed_document})
return state
# handling for pdf
elif input_keys[0] == "pdf":
loader = PyPDFLoader(source)
@ -108,7 +109,7 @@ class FetchNode(BaseNode):
]
state.update({self.output[0]: compressed_document})
return state
elif input_keys[0] == "json":
f = open(source)
compressed_document = [
@ -116,7 +117,7 @@ class FetchNode(BaseNode):
]
state.update({self.output[0]: compressed_document})
return state
elif input_keys[0] == "xml":
with open(source, "r", encoding="utf-8") as f:
data = f.read()
@ -125,25 +126,29 @@ class FetchNode(BaseNode):
]
state.update({self.output[0]: compressed_document})
return state
elif self.input == "pdf_dir":
pass
elif not source.startswith("http"):
title, minimized_body, link_urls, image_urls = cleanup_html(source, source)
parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}"
compressed_document = [Document(page_content=parsed_content,
metadata={"source": "local_dir"}
)]
compressed_document = [
Document(page_content=parsed_content, metadata={"source": "local_dir"})
]
elif self.useSoup:
response = requests.get(source)
if response.status_code == 200:
title, minimized_body, link_urls, image_urls = cleanup_html(response.text, source)
title, minimized_body, link_urls, image_urls = cleanup_html(
response.text, source
)
parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}"
compressed_document = [Document(page_content=parsed_content)]
else:
self.logger.warning(f"Failed to retrieve contents from the webpage at url: {source}")
else:
self.logger.warning(
f"Failed to retrieve contents from the webpage at url: {source}"
)
else:
loader_kwargs = {}
@ -153,14 +158,22 @@ class FetchNode(BaseNode):
loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs)
document = loader.load()
title, minimized_body, link_urls, image_urls = cleanup_html(str(document[0].page_content), source)
title, minimized_body, link_urls, image_urls = cleanup_html(
str(document[0].page_content), source
)
parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}"
compressed_document = [
Document(page_content=parsed_content, metadata={"source": source})
]
state.update({self.output[0]: compressed_document, self.output[1]: link_urls, self.output[2]: image_urls})
state.update(
{
self.output[0]: compressed_document,
self.output[1]: link_urls,
self.output[2]: image_urls,
}
)
return state

View File

@ -2,14 +2,16 @@
gg
Module for generating the answer node
"""
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library
@ -25,15 +27,15 @@ class GenerateAnswerCSVNode(BaseNode):
Attributes:
llm_model: An instance of a language model client, configured for generating answers.
node_name (str): The unique identifier name for the node, defaulting
node_name (str): The unique identifier name for the node, defaulting
to "GenerateAnswerNodeCsv".
node_type (str): The type of the node, set to "node" indicating a
node_type (str): The type of the node, set to "node" indicating a
standard operational node.
Args:
llm_model: An instance of the language model client (e.g., ChatOpenAI) used
llm_model: An instance of the language model client (e.g., ChatOpenAI) used
for generating answers.
node_name (str, optional): The unique identifier name for the node.
node_name (str, optional): The unique identifier name for the node.
Defaults to "GenerateAnswerNodeCsv".
Methods:
@ -41,8 +43,13 @@ class GenerateAnswerCSVNode(BaseNode):
updating the state with the generated answer under the 'answer' key.
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
"""
Initializes the GenerateAnswerNodeCsv with a language model client and a node name.
Args:
@ -51,8 +58,9 @@ class GenerateAnswerCSVNode(BaseNode):
"""
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state):
"""
@ -73,8 +81,7 @@ class GenerateAnswerCSVNode(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -122,21 +129,27 @@ class GenerateAnswerCSVNode(BaseNode):
chains_dict = {}
# Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)):
for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1:
prompt = PromptTemplate(
template=template_no_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"format_instructions": format_instructions,
},
)
else:
prompt = PromptTemplate(
template=template_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions,
},
)
# Dynamically name the chains based on their index
@ -155,8 +168,7 @@ class GenerateAnswerCSVNode(BaseNode):
partial_variables={"format_instructions": format_instructions},
)
merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke(
{"context": answer, "question": user_prompt})
answer = merge_chain.invoke({"context": answer, "question": user_prompt})
else:
# Chain
single_chain = list(chains_dict.values())[0]

View File

@ -4,12 +4,13 @@ GenerateAnswerNode Module
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library
@ -34,13 +35,19 @@ class GenerateAnswerNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = True if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -59,8 +66,7 @@ class GenerateAnswerNode(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -108,21 +114,27 @@ class GenerateAnswerNode(BaseNode):
chains_dict = {}
# Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)):
for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1:
prompt = PromptTemplate(
template=template_no_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"format_instructions": format_instructions,
},
)
else:
prompt = PromptTemplate(
template=template_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions,
},
)
# Dynamically name the chains based on their index
@ -141,8 +153,7 @@ class GenerateAnswerNode(BaseNode):
partial_variables={"format_instructions": format_instructions},
)
merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke(
{"context": answer, "question": user_prompt})
answer = merge_chain.invoke({"context": answer, "question": user_prompt})
else:
# Chain
single_chain = list(chains_dict.values())[0]

View File

@ -4,12 +4,12 @@ GenerateAnswerNode Module
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
# Imports from the library
from .base_node import BaseNode
@ -33,13 +33,19 @@ class GenerateAnswerOmniNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "GenerateAnswerOmni"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswerOmni",
):
super().__init__(node_name, "node", input, output, 3, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -58,8 +64,7 @@ class GenerateAnswerOmniNode(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
print(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -112,22 +117,28 @@ class GenerateAnswerOmniNode(BaseNode):
chains_dict = {}
# Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)):
for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1:
prompt = PromptTemplate(
template=template_no_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"format_instructions": format_instructions,
"img_desc": imag_desc},
partial_variables={
"context": chunk.page_content,
"format_instructions": format_instructions,
"img_desc": imag_desc,
},
)
else:
prompt = PromptTemplate(
template=template_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions,
},
)
# Dynamically name the chains based on their index
@ -149,8 +160,7 @@ class GenerateAnswerOmniNode(BaseNode):
},
)
merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke(
{"context": answer, "question": user_prompt})
answer = merge_chain.invoke({"context": answer, "question": user_prompt})
else:
# Chain
single_chain = list(chains_dict.values())[0]

View File

@ -1,14 +1,16 @@
"""
Module for generating the answer node
"""
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library
@ -24,15 +26,15 @@ class GenerateAnswerPDFNode(BaseNode):
Attributes:
llm: An instance of a language model client, configured for generating answers.
node_name (str): The unique identifier name for the node, defaulting
node_name (str): The unique identifier name for the node, defaulting
to "GenerateAnswerNodePDF".
node_type (str): The type of the node, set to "node" indicating a
node_type (str): The type of the node, set to "node" indicating a
standard operational node.
Args:
llm: An instance of the language model client (e.g., ChatOpenAI) used
llm: An instance of the language model client (e.g., ChatOpenAI) used
for generating answers.
node_name (str, optional): The unique identifier name for the node.
node_name (str, optional): The unique identifier name for the node.
Defaults to "GenerateAnswerNodePDF".
Methods:
@ -40,8 +42,13 @@ class GenerateAnswerPDFNode(BaseNode):
updating the state with the generated answer under the 'answer' key.
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
"""
Initializes the GenerateAnswerNodePDF with a language model client and a node name.
Args:
@ -50,8 +57,9 @@ class GenerateAnswerPDFNode(BaseNode):
"""
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm"]
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state):
"""
@ -72,8 +80,7 @@ class GenerateAnswerPDFNode(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -121,21 +128,27 @@ class GenerateAnswerPDFNode(BaseNode):
chains_dict = {}
# Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)):
for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1:
prompt = PromptTemplate(
template=template_no_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"format_instructions": format_instructions,
},
)
else:
prompt = PromptTemplate(
template=template_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions,
},
)
# Dynamically name the chains based on their index
@ -154,8 +167,7 @@ class GenerateAnswerPDFNode(BaseNode):
partial_variables={"format_instructions": format_instructions},
)
merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke(
{"context": answer, "question": user_prompt})
answer = merge_chain.invoke({"context": answer, "question": user_prompt})
else:
# Chain
single_chain = list(chains_dict.values())[0]

View File

@ -4,12 +4,13 @@ GenerateScraperNode Module
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library
@ -37,15 +38,24 @@ class GenerateScraperNode(BaseNode):
"""
def __init__(self, input: str, output: List[str], library: str, website: str,
node_config: Optional[dict]=None, node_name: str = "GenerateScraper"):
def __init__(
self,
input: str,
output: List[str],
library: str,
website: str,
node_config: Optional[dict] = None,
node_name: str = "GenerateScraper",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.library = library
self.source = website
self.verbose = False if node_config is None else node_config.get("verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -63,8 +73,7 @@ class GenerateScraperNode(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -93,17 +102,20 @@ class GenerateScraperNode(BaseNode):
"""
print("source:", self.source)
if len(doc) > 1:
raise NotImplementedError("Currently GenerateScraperNode cannot handle more than 1 context chunks")
raise NotImplementedError(
"Currently GenerateScraperNode cannot handle more than 1 context chunks"
)
else:
template = template_no_chunks
prompt = PromptTemplate(
template=template,
input_variables=["question"],
partial_variables={"context": doc[0],
"library": self.library,
"source": self.source
},
partial_variables={
"context": doc[0],
"library": self.library,
"source": self.source,
},
)
map_chain = prompt | self.llm_model | output_parser

View File

@ -3,16 +3,19 @@ GetProbableTagsNode Module
"""
from typing import List, Optional
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
from .base_node import BaseNode
from ..utils.logging import get_logger
from .base_node import BaseNode
class GetProbableTagsNode(BaseNode):
"""
A node that utilizes a language model to identify probable HTML tags within a document that
A node that utilizes a language model to identify probable HTML tags within a document that
are likely to contain the information relevant to a user's query. This node generates a prompt
describing the task, submits it to the language model, and processes the output to produce a
describing the task, submits it to the language model, and processes the output to produce a
list of probable tags.
Attributes:
@ -25,17 +28,24 @@ class GetProbableTagsNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GetProbableTags".
"""
def __init__(self, input: str, output: List[str], node_config: dict,
node_name: str = "GetProbableTags"):
def __init__(
self,
input: str,
output: List[str],
node_config: dict,
node_name: str = "GetProbableTags",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get("verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
Generates a list of probable HTML tags based on the user's input and updates the state
with this list. The method constructs a prompt for the language model, submits it, and
Generates a list of probable HTML tags based on the user's input and updates the state
with this list. The method constructs a prompt for the language model, submits it, and
parses the output to identify probable tags.
Args:
@ -50,8 +60,7 @@ class GetProbableTagsNode(BaseNode):
necessary information for generating tag predictions is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -78,7 +87,9 @@ class GetProbableTagsNode(BaseNode):
template=template,
input_variables=["question"],
partial_variables={
"format_instructions": format_instructions, "webpage": url},
"format_instructions": format_instructions,
"webpage": url,
},
)
# Execute the chain to get probable tags

View File

@ -5,9 +5,10 @@ GraphIterator Module
import asyncio
import copy
from typing import List, Optional
from ..utils.logging import get_logger
from tqdm.asyncio import tqdm
from ..utils.logging import get_logger
from .base_node import BaseNode
@ -59,9 +60,9 @@ class GraphIteratorNode(BaseNode):
"""
batchsize = self.node_config.get("batchsize", _default_batchsize)
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node with batchsize {batchsize} ---")
self.logger.info(
f"--- Executing {self.node_name} Node with batchsize {batchsize} ---"
)
try:
eventloop = asyncio.get_event_loop()

View File

@ -3,8 +3,9 @@ ImageToTextNode Module
"""
from typing import List, Optional
from .base_node import BaseNode
from ..utils.logging import get_logger
from .base_node import BaseNode
class ImageToTextNode(BaseNode):
@ -23,16 +24,18 @@ class ImageToTextNode(BaseNode):
"""
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict]=None,
node_name: str = "ImageToText",
):
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "ImageToText",
):
super().__init__(node_name, "node", input, output, 1, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get("verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.max_images = 5 if node_config is None else node_config.get("max_images", 5)
def execute(self, state: dict) -> dict:
@ -48,9 +51,8 @@ class ImageToTextNode(BaseNode):
dict: The updated state with the input key containing the text extracted from the image.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
input_keys = self.get_input_keys(state)
input_data = [state[key] for key in input_keys]
urls = input_data[0]
@ -63,9 +65,9 @@ class ImageToTextNode(BaseNode):
# Skip the image-to-text conversion
if self.max_images < 1:
return state
img_desc = []
for url in urls[:self.max_images]:
for url in urls[: self.max_images]:
try:
text_answer = self.llm_model.run(url)
except Exception as e:

View File

@ -4,11 +4,13 @@ MergeAnswersNode Module
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library
from .base_node import BaseNode
@ -29,17 +31,23 @@ class MergeAnswersNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "MergeAnswers"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "MergeAnswers",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
Executes the node's logic to merge the answers from multiple graph instances into a
Executes the node's logic to merge the answers from multiple graph instances into a
single answer.
Args:
@ -54,8 +62,7 @@ class MergeAnswersNode(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
self.ogger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)

View File

@ -3,17 +3,20 @@ ParseNode Module
"""
from typing import List, Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_transformers import Html2TextTransformer
from .base_node import BaseNode
from ..utils.logging import get_logger
from .base_node import BaseNode
class ParseNode(BaseNode):
"""
A node responsible for parsing HTML content from a document.
A node responsible for parsing HTML content from a document.
The parsed content is split into chunks for further processing.
This node enhances the scraping workflow by allowing for targeted extraction of
This node enhances the scraping workflow by allowing for targeted extraction of
content, thereby optimizing the processing of large HTML documents.
Attributes:
@ -26,13 +29,23 @@ class ParseNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Parse".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict]=None, node_name: str = "Parse"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "Parse",
):
super().__init__(node_name, "node", input, output, 1, node_config)
self.verbose = False if node_config is None else node_config.get("verbose", False)
self.parse_html = True if node_config is None else node_config.get("parse_html", True)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.parse_html = (
True if node_config is None else node_config.get("parse_html", True)
)
def execute(self, state: dict) -> dict:
def execute(self, state: dict) -> dict:
"""
Executes the node's logic to parse the HTML document content and split it into chunks.
@ -48,8 +61,7 @@ class ParseNode(BaseNode):
necessary information for parsing the content is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -65,12 +77,11 @@ class ParseNode(BaseNode):
# Parse the document
docs_transformed = input_data[0]
if self.parse_html:
docs_transformed = Html2TextTransformer(
).transform_documents(input_data[0])
docs_transformed = Html2TextTransformer().transform_documents(input_data[0])
docs_transformed = docs_transformed[0]
chunks = text_splitter.split_text(docs_transformed.page_content)
state.update({self.output[0]: chunks})
return state

View File

@ -3,13 +3,17 @@ RAGNode Module
"""
from typing import List, Optional
from langchain.docstore.document import Document
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter, DocumentCompressorPipeline
from langchain.retrievers.document_compressors import (
DocumentCompressorPipeline,
EmbeddingsFilter,
)
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_community.vectorstores import FAISS
from ..utils.logging import get_logger
from ..utils.logging import get_logger
from .base_node import BaseNode
@ -32,13 +36,20 @@ class RAGNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Parse".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict]=None, node_name: str = "RAG"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "RAG",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.embedder_model = node_config.get("embedder_model", None)
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -57,8 +68,7 @@ class RAGNode(BaseNode):
necessary information for compressing the content is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -80,15 +90,15 @@ class RAGNode(BaseNode):
)
chunked_docs.append(doc)
if self.verbose:
self.logger.info("--- (updated chunks metadata) ---")
self.logger.info("--- (updated chunks metadata) ---")
# check if embedder_model is provided, if not use llm_model
self.embedder_model = self.embedder_model if self.embedder_model else self.llm_model
self.embedder_model = (
self.embedder_model if self.embedder_model else self.llm_model
)
embeddings = self.embedder_model
retriever = FAISS.from_documents(
chunked_docs, embeddings).as_retriever()
retriever = FAISS.from_documents(chunked_docs, embeddings).as_retriever()
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
# similarity_threshold could be set, now k=20
@ -108,9 +118,7 @@ class RAGNode(BaseNode):
compressed_docs = compression_retriever.invoke(user_prompt)
if self.verbose:
self.logger.info("--- (tokens compressed and vector stored) ---")
self.logger.info("--- (tokens compressed and vector stored) ---")
state.update({self.output[0]: compressed_docs})
return state

View File

@ -4,12 +4,15 @@ RobotsNode Module
from typing import List, Optional
from urllib.parse import urlparse
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain.prompts import PromptTemplate
from langchain.output_parsers import CommaSeparatedListOutputParser
from .base_node import BaseNode
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import AsyncChromiumLoader
from ..helpers import robots_dictionary
from ..utils.logging import get_logger
from .base_node import BaseNode
class RobotsNode(BaseNode):
"""
@ -34,16 +37,21 @@ class RobotsNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Robots".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict]=None,
node_name: str = "Robots"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "Robots",
):
super().__init__(node_name, "node", input, output, 1)
self.llm_model = node_config["llm_model"]
self.force_scraping = force_scraping
self.verbose = True if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -65,8 +73,7 @@ class RobotsNode(BaseNode):
scraping is not enforced.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -91,8 +98,7 @@ class RobotsNode(BaseNode):
"""
if not source.startswith("http"):
raise ValueError(
"Operation not allowed")
raise ValueError("Operation not allowed")
else:
parsed_url = urlparse(source)
@ -100,7 +106,9 @@ class RobotsNode(BaseNode):
loader = AsyncChromiumLoader(f"{base_url}/robots.txt")
document = loader.load()
if "ollama" in self.llm_model["model_name"]:
self.llm_model["model_name"] = self.llm_model["model_name"].split("/")[-1]
self.llm_model["model_name"] = self.llm_model["model_name"].split("/")[
-1
]
model = self.llm_model["model_name"].split("/")[-1]
else:
@ -114,27 +122,25 @@ class RobotsNode(BaseNode):
prompt = PromptTemplate(
template=template,
input_variables=["path"],
partial_variables={"context": document,
"agent": agent
},
partial_variables={"context": document, "agent": agent},
)
chain = prompt | self.llm_model | output_parser
is_scrapable = chain.invoke({"path": source})[0]
if "no" in is_scrapable:
if self.verbose:
self.logger.warning("\033[31m(Scraping this website is not allowed)\033[0m")
self.logger.warning(
"\033[31m(Scraping this website is not allowed)\033[0m"
)
if not self.force_scraping:
raise ValueError(
'The website you selected is not scrapable')
raise ValueError("The website you selected is not scrapable")
else:
if self.verbose:
self.logger.warning("\033[33m(WARNING: Scraping this website is not allowed but you decided to force it)\033[0m")
self.logger.warning(
"\033[33m(WARNING: Scraping this website is not allowed but you decided to force it)\033[0m"
)
else:
if self.verbose:
self.logger.warning("\033[32m(Scraping this website is allowed)\033[0m")
self.logger.warning("\033[32m(Scraping this website is allowed)\033[0m")
state.update({self.output[0]: is_scrapable})
return state

View File

@ -3,11 +3,14 @@ SearchInternetNode Module
"""
from typing import List, Optional
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
from ..utils.logging import get_logger
from ..utils.research_web import search_on_web
from .base_node import BaseNode
from ..utils.logging import get_logger
class SearchInternetNode(BaseNode):
"""
@ -27,13 +30,19 @@ class SearchInternetNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "SearchInternet".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "SearchInternet"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "SearchInternet",
):
super().__init__(node_name, "node", input, output, 1, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.max_results = node_config.get("max_results", 3)
def execute(self, state: dict) -> dict:
@ -55,8 +64,7 @@ class SearchInternetNode(BaseNode):
necessary information for generating the answer is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
input_keys = self.get_input_keys(state)
@ -87,12 +95,9 @@ class SearchInternetNode(BaseNode):
search_answer = search_prompt | self.llm_model | output_parser
search_query = search_answer.invoke({"user_prompt": user_prompt})[0]
if self.verbose:
self.logger.info(f"Search Query: {search_query}")
self.logger.info(f"Search Query: {search_query}")
answer = search_on_web(
query=search_query, max_results=self.max_results)
answer = search_on_web(query=search_query, max_results=self.max_results)
if len(answer) == 0:
# raise an exception if no answer is found

View File

@ -4,13 +4,14 @@ SearchLinkNode Module
# Imports from standard library
from typing import List, Optional
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from Langchain
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library
from .base_node import BaseNode
@ -33,13 +34,19 @@ class SearchLinkNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "GenerateLinks"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateLinks",
):
super().__init__(node_name, "node", input, output, 1, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -58,8 +65,7 @@ class SearchLinkNode(BaseNode):
necessary information for generating the answer is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -93,7 +99,13 @@ class SearchLinkNode(BaseNode):
"""
relevant_links = []
for i, chunk in enumerate(tqdm(parsed_content_chunks, desc="Processing chunks", disable=not self.verbose)):
for i, chunk in enumerate(
tqdm(
parsed_content_chunks,
desc="Processing chunks",
disable=not self.verbose,
)
):
merge_prompt = PromptTemplate(
template=prompt_relevant_links,
input_variables=["content", "user_prompt"],
@ -101,7 +113,8 @@ class SearchLinkNode(BaseNode):
merge_chain = merge_prompt | self.llm_model | output_parser
# merge_chain = merge_prompt | self.llm_model
answer = merge_chain.invoke(
{"content": chunk.page_content, "user_prompt": user_prompt})
{"content": chunk.page_content, "user_prompt": user_prompt}
)
relevant_links += answer
state.update({self.output[0]: relevant_links})
return state

View File

@ -3,9 +3,11 @@ SearchInternetNode Module
"""
from typing import List, Optional
from tqdm import tqdm
from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate
from tqdm import tqdm
from .base_node import BaseNode
@ -27,12 +29,18 @@ class SearchLinksWithContext(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
"""
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = True if node_config is None else node_config.get(
"verbose", False)
self.verbose = (
True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -51,8 +59,7 @@ class SearchLinksWithContext(BaseNode):
that the necessary information for generating an answer is missing.
"""
if self.verbose:
print(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)
@ -90,25 +97,30 @@ class SearchLinksWithContext(BaseNode):
result = []
# Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)):
for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1:
prompt = PromptTemplate(
template=template_no_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"format_instructions": format_instructions,
},
)
else:
prompt = PromptTemplate(
template=template_chunks,
input_variables=["question"],
partial_variables={"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions},
partial_variables={
"context": chunk.page_content,
"chunk_id": i + 1,
"format_instructions": format_instructions,
},
)
result.extend(
prompt | self.llm_model | output_parser)
result.extend(prompt | self.llm_model | output_parser)
state["urls"] = result
return state

View File

@ -3,8 +3,10 @@ TextToSpeechNode Module
"""
from typing import List, Optional
from .base_node import BaseNode
from ..utils.logging import get_logger
from .base_node import BaseNode
class TextToSpeechNode(BaseNode):
"""
@ -21,12 +23,19 @@ class TextToSpeechNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "TextToSpeech".
"""
def __init__(self, input: str, output: List[str],
node_config: Optional[dict]=None, node_name: str = "TextToSpeech"):
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "TextToSpeech",
):
super().__init__(node_name, "node", input, output, 1, node_config)
self.tts_model = node_config["tts_model"]
self.verbose = False if node_config is None else node_config.get("verbose", False)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict:
"""
@ -35,7 +44,7 @@ class TextToSpeechNode(BaseNode):
Args:
state (dict): The current state of the graph. The input keys will be used to fetch the
correct data types from the state.
Returns:
dict: The updated state with the output key containing the audio generated from the text.
@ -44,8 +53,7 @@ class TextToSpeechNode(BaseNode):
necessary information for generating the audio is missing.
"""
if self.verbose:
self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state)