fix(node-logging): use centralized logger in each node for logging

This commit is contained in:
Federico Minutoli 2024-05-24 01:09:49 +02:00
parent 4348d4f4db
commit c251cc45d3
18 changed files with 406 additions and 242 deletions

View File

@ -3,21 +3,22 @@ BlocksIndentifier Module
""" """
from typing import List, Optional from typing import List, Optional
from langchain_community.document_loaders import AsyncChromiumLoader from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_core.documents import Document from langchain_core.documents import Document
from .base_node import BaseNode
from .base_node import BaseNode
class BlocksIndentifier(BaseNode): class BlocksIndentifier(BaseNode):
""" """
A node responsible to identify the blocks in the HTML content of a specified HTML content A node responsible to identify the blocks in the HTML content of a specified HTML content
e.g products in a E-commerce, flights in a travel website etc. e.g products in a E-commerce, flights in a travel website etc.
Attributes: Attributes:
headless (bool): A flag indicating whether the browser should run in headless mode. headless (bool): A flag indicating whether the browser should run in headless mode.
verbose (bool): A flag indicating whether to print verbose output during execution. verbose (bool): A flag indicating whether to print verbose output during execution.
Args: Args:
input (str): Boolean expression defining the input keys needed from the state. input (str): Boolean expression defining the input keys needed from the state.
output (List[str]): List of output keys to be updated in the state. output (List[str]): List of output keys to be updated in the state.
@ -25,11 +26,21 @@ class BlocksIndentifier(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "BlocksIndentifier". node_name (str): The unique identifier name for the node, defaulting to "BlocksIndentifier".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict], node_name: str = "BlocksIndentifier"): def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict],
node_name: str = "BlocksIndentifier",
):
super().__init__(node_name, "node", input, output, 1) super().__init__(node_name, "node", input, output, 1)
self.headless = True if node_config is None else node_config.get("headless", True) self.headless = (
self.verbose = True if node_config is None else node_config.get("verbose", False) True if node_config is None else node_config.get("headless", True)
)
self.verbose = (
True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state): def execute(self, state):
""" """
@ -47,8 +58,7 @@ class BlocksIndentifier(BaseNode):
KeyError: If the input key is not found in the state, indicating that the KeyError: If the input key is not found in the state, indicating that the
necessary information to perform the operation is missing. necessary information to perform the operation is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
print(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)

View File

@ -3,17 +3,18 @@ FetchNode Module
""" """
import json import json
import requests
from typing import List, Optional from typing import List, Optional
import pandas as pd import pandas as pd
import requests
from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document from langchain_core.documents import Document
from ..docloaders import ChromiumLoader from ..docloaders import ChromiumLoader
from .base_node import BaseNode
from ..utils.cleanup_html import cleanup_html from ..utils.cleanup_html import cleanup_html
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .base_node import BaseNode
class FetchNode(BaseNode): class FetchNode(BaseNode):
""" """
@ -51,7 +52,7 @@ class FetchNode(BaseNode):
False if node_config is None else node_config.get("verbose", False) False if node_config is None else node_config.get("verbose", False)
) )
self.useSoup = ( self.useSoup = (
False if node_config is None else node_config.get("useSoup", False) False if node_config is None else node_config.get("useSoup", False)
) )
self.loader_kwargs = ( self.loader_kwargs = (
{} if node_config is None else node_config.get("loader_kwargs", {}) {} if node_config is None else node_config.get("loader_kwargs", {})
@ -73,8 +74,8 @@ class FetchNode(BaseNode):
KeyError: If the input key is not found in the state, indicating that the KeyError: If the input key is not found in the state, indicating that the
necessary information to perform the operation is missing. necessary information to perform the operation is missing.
""" """
logger.info(f"--- Executing {self.node_name} Node ---") self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -92,7 +93,7 @@ class FetchNode(BaseNode):
] ]
state.update({self.output[0]: compressed_document}) state.update({self.output[0]: compressed_document})
return state return state
# handling for pdf # handling for pdf
elif input_keys[0] == "pdf": elif input_keys[0] == "pdf":
loader = PyPDFLoader(source) loader = PyPDFLoader(source)
@ -108,7 +109,7 @@ class FetchNode(BaseNode):
] ]
state.update({self.output[0]: compressed_document}) state.update({self.output[0]: compressed_document})
return state return state
elif input_keys[0] == "json": elif input_keys[0] == "json":
f = open(source) f = open(source)
compressed_document = [ compressed_document = [
@ -116,7 +117,7 @@ class FetchNode(BaseNode):
] ]
state.update({self.output[0]: compressed_document}) state.update({self.output[0]: compressed_document})
return state return state
elif input_keys[0] == "xml": elif input_keys[0] == "xml":
with open(source, "r", encoding="utf-8") as f: with open(source, "r", encoding="utf-8") as f:
data = f.read() data = f.read()
@ -125,25 +126,29 @@ class FetchNode(BaseNode):
] ]
state.update({self.output[0]: compressed_document}) state.update({self.output[0]: compressed_document})
return state return state
elif self.input == "pdf_dir": elif self.input == "pdf_dir":
pass pass
elif not source.startswith("http"): elif not source.startswith("http"):
title, minimized_body, link_urls, image_urls = cleanup_html(source, source) title, minimized_body, link_urls, image_urls = cleanup_html(source, source)
parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}" parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}"
compressed_document = [Document(page_content=parsed_content, compressed_document = [
metadata={"source": "local_dir"} Document(page_content=parsed_content, metadata={"source": "local_dir"})
)] ]
elif self.useSoup: elif self.useSoup:
response = requests.get(source) response = requests.get(source)
if response.status_code == 200: if response.status_code == 200:
title, minimized_body, link_urls, image_urls = cleanup_html(response.text, source) title, minimized_body, link_urls, image_urls = cleanup_html(
response.text, source
)
parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}" parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}"
compressed_document = [Document(page_content=parsed_content)] compressed_document = [Document(page_content=parsed_content)]
else: else:
self.logger.warning(f"Failed to retrieve contents from the webpage at url: {source}") self.logger.warning(
f"Failed to retrieve contents from the webpage at url: {source}"
)
else: else:
loader_kwargs = {} loader_kwargs = {}
@ -153,14 +158,22 @@ class FetchNode(BaseNode):
loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs) loader = ChromiumLoader([source], headless=self.headless, **loader_kwargs)
document = loader.load() document = loader.load()
title, minimized_body, link_urls, image_urls = cleanup_html(str(document[0].page_content), source) title, minimized_body, link_urls, image_urls = cleanup_html(
str(document[0].page_content), source
)
parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}" parsed_content = f"Title: {title}, Body: {minimized_body}, Links: {link_urls}, Images: {image_urls}"
compressed_document = [ compressed_document = [
Document(page_content=parsed_content, metadata={"source": source}) Document(page_content=parsed_content, metadata={"source": source})
] ]
state.update({self.output[0]: compressed_document, self.output[1]: link_urls, self.output[2]: image_urls}) state.update(
{
self.output[0]: compressed_document,
self.output[1]: link_urls,
self.output[2]: image_urls,
}
)
return state return state

View File

@ -2,14 +2,16 @@
gg gg
Module for generating the answer node Module for generating the answer node
""" """
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger from ..utils.logging import get_logger
# Imports from the library # Imports from the library
@ -25,15 +27,15 @@ class GenerateAnswerCSVNode(BaseNode):
Attributes: Attributes:
llm_model: An instance of a language model client, configured for generating answers. llm_model: An instance of a language model client, configured for generating answers.
node_name (str): The unique identifier name for the node, defaulting node_name (str): The unique identifier name for the node, defaulting
to "GenerateAnswerNodeCsv". to "GenerateAnswerNodeCsv".
node_type (str): The type of the node, set to "node" indicating a node_type (str): The type of the node, set to "node" indicating a
standard operational node. standard operational node.
Args: Args:
llm_model: An instance of the language model client (e.g., ChatOpenAI) used llm_model: An instance of the language model client (e.g., ChatOpenAI) used
for generating answers. for generating answers.
node_name (str, optional): The unique identifier name for the node. node_name (str, optional): The unique identifier name for the node.
Defaults to "GenerateAnswerNodeCsv". Defaults to "GenerateAnswerNodeCsv".
Methods: Methods:
@ -41,8 +43,13 @@ class GenerateAnswerCSVNode(BaseNode):
updating the state with the generated answer under the 'answer' key. updating the state with the generated answer under the 'answer' key.
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "GenerateAnswer"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
""" """
Initializes the GenerateAnswerNodeCsv with a language model client and a node name. Initializes the GenerateAnswerNodeCsv with a language model client and a node name.
Args: Args:
@ -51,8 +58,9 @@ class GenerateAnswerCSVNode(BaseNode):
""" """
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state): def execute(self, state):
""" """
@ -73,8 +81,7 @@ class GenerateAnswerCSVNode(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -122,21 +129,27 @@ class GenerateAnswerCSVNode(BaseNode):
chains_dict = {} chains_dict = {}
# Use tqdm to add progress bar # Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)): for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1: if len(doc) == 1:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_no_chunks, template=template_no_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"format_instructions": format_instructions}, "context": chunk.page_content,
"format_instructions": format_instructions,
},
) )
else: else:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_chunks, template=template_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"chunk_id": i + 1, "context": chunk.page_content,
"format_instructions": format_instructions}, "chunk_id": i + 1,
"format_instructions": format_instructions,
},
) )
# Dynamically name the chains based on their index # Dynamically name the chains based on their index
@ -155,8 +168,7 @@ class GenerateAnswerCSVNode(BaseNode):
partial_variables={"format_instructions": format_instructions}, partial_variables={"format_instructions": format_instructions},
) )
merge_chain = merge_prompt | self.llm_model | output_parser merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke( answer = merge_chain.invoke({"context": answer, "question": user_prompt})
{"context": answer, "question": user_prompt})
else: else:
# Chain # Chain
single_chain = list(chains_dict.values())[0] single_chain = list(chains_dict.values())[0]

View File

@ -4,12 +4,13 @@ GenerateAnswerNode Module
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger from ..utils.logging import get_logger
# Imports from the library # Imports from the library
@ -34,13 +35,19 @@ class GenerateAnswerNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer". node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "GenerateAnswer"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = True if node_config is None else node_config.get( self.verbose = (
"verbose", False) True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -59,8 +66,7 @@ class GenerateAnswerNode(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -108,21 +114,27 @@ class GenerateAnswerNode(BaseNode):
chains_dict = {} chains_dict = {}
# Use tqdm to add progress bar # Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)): for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1: if len(doc) == 1:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_no_chunks, template=template_no_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"format_instructions": format_instructions}, "context": chunk.page_content,
"format_instructions": format_instructions,
},
) )
else: else:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_chunks, template=template_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"chunk_id": i + 1, "context": chunk.page_content,
"format_instructions": format_instructions}, "chunk_id": i + 1,
"format_instructions": format_instructions,
},
) )
# Dynamically name the chains based on their index # Dynamically name the chains based on their index
@ -141,8 +153,7 @@ class GenerateAnswerNode(BaseNode):
partial_variables={"format_instructions": format_instructions}, partial_variables={"format_instructions": format_instructions},
) )
merge_chain = merge_prompt | self.llm_model | output_parser merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke( answer = merge_chain.invoke({"context": answer, "question": user_prompt})
{"context": answer, "question": user_prompt})
else: else:
# Chain # Chain
single_chain = list(chains_dict.values())[0] single_chain = list(chains_dict.values())[0]

View File

@ -4,12 +4,12 @@ GenerateAnswerNode Module
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
# Imports from the library # Imports from the library
from .base_node import BaseNode from .base_node import BaseNode
@ -33,13 +33,19 @@ class GenerateAnswerOmniNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer". node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "GenerateAnswerOmni"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswerOmni",
):
super().__init__(node_name, "node", input, output, 3, node_config) super().__init__(node_name, "node", input, output, 3, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -58,8 +64,7 @@ class GenerateAnswerOmniNode(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
print(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -112,22 +117,28 @@ class GenerateAnswerOmniNode(BaseNode):
chains_dict = {} chains_dict = {}
# Use tqdm to add progress bar # Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)): for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1: if len(doc) == 1:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_no_chunks, template=template_no_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"format_instructions": format_instructions, "context": chunk.page_content,
"img_desc": imag_desc}, "format_instructions": format_instructions,
"img_desc": imag_desc,
},
) )
else: else:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_chunks, template=template_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"chunk_id": i + 1, "context": chunk.page_content,
"format_instructions": format_instructions}, "chunk_id": i + 1,
"format_instructions": format_instructions,
},
) )
# Dynamically name the chains based on their index # Dynamically name the chains based on their index
@ -149,8 +160,7 @@ class GenerateAnswerOmniNode(BaseNode):
}, },
) )
merge_chain = merge_prompt | self.llm_model | output_parser merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke( answer = merge_chain.invoke({"context": answer, "question": user_prompt})
{"context": answer, "question": user_prompt})
else: else:
# Chain # Chain
single_chain = list(chains_dict.values())[0] single_chain = list(chains_dict.values())[0]

View File

@ -1,14 +1,16 @@
""" """
Module for generating the answer node Module for generating the answer node
""" """
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger from ..utils.logging import get_logger
# Imports from the library # Imports from the library
@ -24,15 +26,15 @@ class GenerateAnswerPDFNode(BaseNode):
Attributes: Attributes:
llm: An instance of a language model client, configured for generating answers. llm: An instance of a language model client, configured for generating answers.
node_name (str): The unique identifier name for the node, defaulting node_name (str): The unique identifier name for the node, defaulting
to "GenerateAnswerNodePDF". to "GenerateAnswerNodePDF".
node_type (str): The type of the node, set to "node" indicating a node_type (str): The type of the node, set to "node" indicating a
standard operational node. standard operational node.
Args: Args:
llm: An instance of the language model client (e.g., ChatOpenAI) used llm: An instance of the language model client (e.g., ChatOpenAI) used
for generating answers. for generating answers.
node_name (str, optional): The unique identifier name for the node. node_name (str, optional): The unique identifier name for the node.
Defaults to "GenerateAnswerNodePDF". Defaults to "GenerateAnswerNodePDF".
Methods: Methods:
@ -40,8 +42,13 @@ class GenerateAnswerPDFNode(BaseNode):
updating the state with the generated answer under the 'answer' key. updating the state with the generated answer under the 'answer' key.
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "GenerateAnswer"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
""" """
Initializes the GenerateAnswerNodePDF with a language model client and a node name. Initializes the GenerateAnswerNodePDF with a language model client and a node name.
Args: Args:
@ -50,8 +57,9 @@ class GenerateAnswerPDFNode(BaseNode):
""" """
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm"] self.llm_model = node_config["llm"]
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state): def execute(self, state):
""" """
@ -72,8 +80,7 @@ class GenerateAnswerPDFNode(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -121,21 +128,27 @@ class GenerateAnswerPDFNode(BaseNode):
chains_dict = {} chains_dict = {}
# Use tqdm to add progress bar # Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)): for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1: if len(doc) == 1:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_no_chunks, template=template_no_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"format_instructions": format_instructions}, "context": chunk.page_content,
"format_instructions": format_instructions,
},
) )
else: else:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_chunks, template=template_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"chunk_id": i + 1, "context": chunk.page_content,
"format_instructions": format_instructions}, "chunk_id": i + 1,
"format_instructions": format_instructions,
},
) )
# Dynamically name the chains based on their index # Dynamically name the chains based on their index
@ -154,8 +167,7 @@ class GenerateAnswerPDFNode(BaseNode):
partial_variables={"format_instructions": format_instructions}, partial_variables={"format_instructions": format_instructions},
) )
merge_chain = merge_prompt | self.llm_model | output_parser merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke( answer = merge_chain.invoke({"context": answer, "question": user_prompt})
{"context": answer, "question": user_prompt})
else: else:
# Chain # Chain
single_chain = list(chains_dict.values())[0] single_chain = list(chains_dict.values())[0]

View File

@ -4,12 +4,13 @@ GenerateScraperNode Module
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger from ..utils.logging import get_logger
# Imports from the library # Imports from the library
@ -37,15 +38,24 @@ class GenerateScraperNode(BaseNode):
""" """
def __init__(self, input: str, output: List[str], library: str, website: str, def __init__(
node_config: Optional[dict]=None, node_name: str = "GenerateScraper"): self,
input: str,
output: List[str],
library: str,
website: str,
node_config: Optional[dict] = None,
node_name: str = "GenerateScraper",
):
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.library = library self.library = library
self.source = website self.source = website
self.verbose = False if node_config is None else node_config.get("verbose", False) self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -63,8 +73,7 @@ class GenerateScraperNode(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -93,17 +102,20 @@ class GenerateScraperNode(BaseNode):
""" """
print("source:", self.source) print("source:", self.source)
if len(doc) > 1: if len(doc) > 1:
raise NotImplementedError("Currently GenerateScraperNode cannot handle more than 1 context chunks") raise NotImplementedError(
"Currently GenerateScraperNode cannot handle more than 1 context chunks"
)
else: else:
template = template_no_chunks template = template_no_chunks
prompt = PromptTemplate( prompt = PromptTemplate(
template=template, template=template,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": doc[0], partial_variables={
"library": self.library, "context": doc[0],
"source": self.source "library": self.library,
}, "source": self.source,
},
) )
map_chain = prompt | self.llm_model | output_parser map_chain = prompt | self.llm_model | output_parser

View File

@ -3,16 +3,19 @@ GetProbableTagsNode Module
""" """
from typing import List, Optional from typing import List, Optional
from langchain.output_parsers import CommaSeparatedListOutputParser from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from .base_node import BaseNode
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .base_node import BaseNode
class GetProbableTagsNode(BaseNode): class GetProbableTagsNode(BaseNode):
""" """
A node that utilizes a language model to identify probable HTML tags within a document that A node that utilizes a language model to identify probable HTML tags within a document that
are likely to contain the information relevant to a user's query. This node generates a prompt are likely to contain the information relevant to a user's query. This node generates a prompt
describing the task, submits it to the language model, and processes the output to produce a describing the task, submits it to the language model, and processes the output to produce a
list of probable tags. list of probable tags.
Attributes: Attributes:
@ -25,17 +28,24 @@ class GetProbableTagsNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GetProbableTags". node_name (str): The unique identifier name for the node, defaulting to "GetProbableTags".
""" """
def __init__(self, input: str, output: List[str], node_config: dict, def __init__(
node_name: str = "GetProbableTags"): self,
input: str,
output: List[str],
node_config: dict,
node_name: str = "GetProbableTags",
):
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get("verbose", False) self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
Generates a list of probable HTML tags based on the user's input and updates the state Generates a list of probable HTML tags based on the user's input and updates the state
with this list. The method constructs a prompt for the language model, submits it, and with this list. The method constructs a prompt for the language model, submits it, and
parses the output to identify probable tags. parses the output to identify probable tags.
Args: Args:
@ -50,8 +60,7 @@ class GetProbableTagsNode(BaseNode):
necessary information for generating tag predictions is missing. necessary information for generating tag predictions is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -78,7 +87,9 @@ class GetProbableTagsNode(BaseNode):
template=template, template=template,
input_variables=["question"], input_variables=["question"],
partial_variables={ partial_variables={
"format_instructions": format_instructions, "webpage": url}, "format_instructions": format_instructions,
"webpage": url,
},
) )
# Execute the chain to get probable tags # Execute the chain to get probable tags

View File

@ -5,9 +5,10 @@ GraphIterator Module
import asyncio import asyncio
import copy import copy
from typing import List, Optional from typing import List, Optional
from ..utils.logging import get_logger
from tqdm.asyncio import tqdm from tqdm.asyncio import tqdm
from ..utils.logging import get_logger
from .base_node import BaseNode from .base_node import BaseNode
@ -59,9 +60,9 @@ class GraphIteratorNode(BaseNode):
""" """
batchsize = self.node_config.get("batchsize", _default_batchsize) batchsize = self.node_config.get("batchsize", _default_batchsize)
if self.verbose: self.logger.info(
self.logger.info(f"--- Executing {self.node_name} Node with batchsize {batchsize} ---") f"--- Executing {self.node_name} Node with batchsize {batchsize} ---"
)
try: try:
eventloop = asyncio.get_event_loop() eventloop = asyncio.get_event_loop()

View File

@ -3,8 +3,9 @@ ImageToTextNode Module
""" """
from typing import List, Optional from typing import List, Optional
from .base_node import BaseNode
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .base_node import BaseNode
class ImageToTextNode(BaseNode): class ImageToTextNode(BaseNode):
@ -23,16 +24,18 @@ class ImageToTextNode(BaseNode):
""" """
def __init__( def __init__(
self, self,
input: str, input: str,
output: List[str], output: List[str],
node_config: Optional[dict]=None, node_config: Optional[dict] = None,
node_name: str = "ImageToText", node_name: str = "ImageToText",
): ):
super().__init__(node_name, "node", input, output, 1, node_config) super().__init__(node_name, "node", input, output, 1, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get("verbose", False) self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.max_images = 5 if node_config is None else node_config.get("max_images", 5) self.max_images = 5 if node_config is None else node_config.get("max_images", 5)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
@ -48,9 +51,8 @@ class ImageToTextNode(BaseNode):
dict: The updated state with the input key containing the text extracted from the image. dict: The updated state with the input key containing the text extracted from the image.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
input_data = [state[key] for key in input_keys] input_data = [state[key] for key in input_keys]
urls = input_data[0] urls = input_data[0]
@ -63,9 +65,9 @@ class ImageToTextNode(BaseNode):
# Skip the image-to-text conversion # Skip the image-to-text conversion
if self.max_images < 1: if self.max_images < 1:
return state return state
img_desc = [] img_desc = []
for url in urls[:self.max_images]: for url in urls[: self.max_images]:
try: try:
text_answer = self.llm_model.run(url) text_answer = self.llm_model.run(url)
except Exception as e: except Exception as e:

View File

@ -4,11 +4,13 @@ MergeAnswersNode Module
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library # Imports from the library
from .base_node import BaseNode from .base_node import BaseNode
@ -29,17 +31,23 @@ class MergeAnswersNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer". node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "MergeAnswers"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "MergeAnswers",
):
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
Executes the node's logic to merge the answers from multiple graph instances into a Executes the node's logic to merge the answers from multiple graph instances into a
single answer. single answer.
Args: Args:
@ -54,8 +62,7 @@ class MergeAnswersNode(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.ogger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)

View File

@ -3,17 +3,20 @@ ParseNode Module
""" """
from typing import List, Optional from typing import List, Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_transformers import Html2TextTransformer from langchain_community.document_transformers import Html2TextTransformer
from .base_node import BaseNode
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .base_node import BaseNode
class ParseNode(BaseNode): class ParseNode(BaseNode):
""" """
A node responsible for parsing HTML content from a document. A node responsible for parsing HTML content from a document.
The parsed content is split into chunks for further processing. The parsed content is split into chunks for further processing.
This node enhances the scraping workflow by allowing for targeted extraction of This node enhances the scraping workflow by allowing for targeted extraction of
content, thereby optimizing the processing of large HTML documents. content, thereby optimizing the processing of large HTML documents.
Attributes: Attributes:
@ -26,13 +29,23 @@ class ParseNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Parse". node_name (str): The unique identifier name for the node, defaulting to "Parse".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict]=None, node_name: str = "Parse"): def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "Parse",
):
super().__init__(node_name, "node", input, output, 1, node_config) super().__init__(node_name, "node", input, output, 1, node_config)
self.verbose = False if node_config is None else node_config.get("verbose", False) self.verbose = (
self.parse_html = True if node_config is None else node_config.get("parse_html", True) False if node_config is None else node_config.get("verbose", False)
)
self.parse_html = (
True if node_config is None else node_config.get("parse_html", True)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
Executes the node's logic to parse the HTML document content and split it into chunks. Executes the node's logic to parse the HTML document content and split it into chunks.
@ -48,8 +61,7 @@ class ParseNode(BaseNode):
necessary information for parsing the content is missing. necessary information for parsing the content is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -65,12 +77,11 @@ class ParseNode(BaseNode):
# Parse the document # Parse the document
docs_transformed = input_data[0] docs_transformed = input_data[0]
if self.parse_html: if self.parse_html:
docs_transformed = Html2TextTransformer( docs_transformed = Html2TextTransformer().transform_documents(input_data[0])
).transform_documents(input_data[0])
docs_transformed = docs_transformed[0] docs_transformed = docs_transformed[0]
chunks = text_splitter.split_text(docs_transformed.page_content) chunks = text_splitter.split_text(docs_transformed.page_content)
state.update({self.output[0]: chunks}) state.update({self.output[0]: chunks})
return state return state

View File

@ -3,13 +3,17 @@ RAGNode Module
""" """
from typing import List, Optional from typing import List, Optional
from langchain.docstore.document import Document from langchain.docstore.document import Document
from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter, DocumentCompressorPipeline from langchain.retrievers.document_compressors import (
DocumentCompressorPipeline,
EmbeddingsFilter,
)
from langchain_community.document_transformers import EmbeddingsRedundantFilter from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_community.vectorstores import FAISS from langchain_community.vectorstores import FAISS
from ..utils.logging import get_logger
from ..utils.logging import get_logger
from .base_node import BaseNode from .base_node import BaseNode
@ -32,13 +36,20 @@ class RAGNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Parse". node_name (str): The unique identifier name for the node, defaulting to "Parse".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict]=None, node_name: str = "RAG"): def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "RAG",
):
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.embedder_model = node_config.get("embedder_model", None) self.embedder_model = node_config.get("embedder_model", None)
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -57,8 +68,7 @@ class RAGNode(BaseNode):
necessary information for compressing the content is missing. necessary information for compressing the content is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -80,15 +90,15 @@ class RAGNode(BaseNode):
) )
chunked_docs.append(doc) chunked_docs.append(doc)
if self.verbose: self.logger.info("--- (updated chunks metadata) ---")
self.logger.info("--- (updated chunks metadata) ---")
# check if embedder_model is provided, if not use llm_model # check if embedder_model is provided, if not use llm_model
self.embedder_model = self.embedder_model if self.embedder_model else self.llm_model self.embedder_model = (
self.embedder_model if self.embedder_model else self.llm_model
)
embeddings = self.embedder_model embeddings = self.embedder_model
retriever = FAISS.from_documents( retriever = FAISS.from_documents(chunked_docs, embeddings).as_retriever()
chunked_docs, embeddings).as_retriever()
redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings) redundant_filter = EmbeddingsRedundantFilter(embeddings=embeddings)
# similarity_threshold could be set, now k=20 # similarity_threshold could be set, now k=20
@ -108,9 +118,7 @@ class RAGNode(BaseNode):
compressed_docs = compression_retriever.invoke(user_prompt) compressed_docs = compression_retriever.invoke(user_prompt)
if self.verbose: self.logger.info("--- (tokens compressed and vector stored) ---")
self.logger.info("--- (tokens compressed and vector stored) ---")
state.update({self.output[0]: compressed_docs}) state.update({self.output[0]: compressed_docs})
return state return state

View File

@ -4,12 +4,15 @@ RobotsNode Module
from typing import List, Optional from typing import List, Optional
from urllib.parse import urlparse from urllib.parse import urlparse
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain.prompts import PromptTemplate
from langchain.output_parsers import CommaSeparatedListOutputParser from langchain.output_parsers import CommaSeparatedListOutputParser
from .base_node import BaseNode from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import AsyncChromiumLoader
from ..helpers import robots_dictionary from ..helpers import robots_dictionary
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .base_node import BaseNode
class RobotsNode(BaseNode): class RobotsNode(BaseNode):
""" """
@ -34,16 +37,21 @@ class RobotsNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "Robots". node_name (str): The unique identifier name for the node, defaulting to "Robots".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict]=None, def __init__(
self,
node_name: str = "Robots"): input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "Robots",
):
super().__init__(node_name, "node", input, output, 1) super().__init__(node_name, "node", input, output, 1)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.force_scraping = force_scraping self.force_scraping = force_scraping
self.verbose = True if node_config is None else node_config.get( self.verbose = (
"verbose", False) True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -65,8 +73,7 @@ class RobotsNode(BaseNode):
scraping is not enforced. scraping is not enforced.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -91,8 +98,7 @@ class RobotsNode(BaseNode):
""" """
if not source.startswith("http"): if not source.startswith("http"):
raise ValueError( raise ValueError("Operation not allowed")
"Operation not allowed")
else: else:
parsed_url = urlparse(source) parsed_url = urlparse(source)
@ -100,7 +106,9 @@ class RobotsNode(BaseNode):
loader = AsyncChromiumLoader(f"{base_url}/robots.txt") loader = AsyncChromiumLoader(f"{base_url}/robots.txt")
document = loader.load() document = loader.load()
if "ollama" in self.llm_model["model_name"]: if "ollama" in self.llm_model["model_name"]:
self.llm_model["model_name"] = self.llm_model["model_name"].split("/")[-1] self.llm_model["model_name"] = self.llm_model["model_name"].split("/")[
-1
]
model = self.llm_model["model_name"].split("/")[-1] model = self.llm_model["model_name"].split("/")[-1]
else: else:
@ -114,27 +122,25 @@ class RobotsNode(BaseNode):
prompt = PromptTemplate( prompt = PromptTemplate(
template=template, template=template,
input_variables=["path"], input_variables=["path"],
partial_variables={"context": document, partial_variables={"context": document, "agent": agent},
"agent": agent
},
) )
chain = prompt | self.llm_model | output_parser chain = prompt | self.llm_model | output_parser
is_scrapable = chain.invoke({"path": source})[0] is_scrapable = chain.invoke({"path": source})[0]
if "no" in is_scrapable: if "no" in is_scrapable:
if self.verbose: self.logger.warning(
self.logger.warning("\033[31m(Scraping this website is not allowed)\033[0m") "\033[31m(Scraping this website is not allowed)\033[0m"
)
if not self.force_scraping: if not self.force_scraping:
raise ValueError( raise ValueError("The website you selected is not scrapable")
'The website you selected is not scrapable')
else: else:
if self.verbose: self.logger.warning(
self.logger.warning("\033[33m(WARNING: Scraping this website is not allowed but you decided to force it)\033[0m") "\033[33m(WARNING: Scraping this website is not allowed but you decided to force it)\033[0m"
)
else: else:
if self.verbose: self.logger.warning("\033[32m(Scraping this website is allowed)\033[0m")
self.logger.warning("\033[32m(Scraping this website is allowed)\033[0m")
state.update({self.output[0]: is_scrapable}) state.update({self.output[0]: is_scrapable})
return state return state

View File

@ -3,11 +3,14 @@ SearchInternetNode Module
""" """
from typing import List, Optional from typing import List, Optional
from langchain.output_parsers import CommaSeparatedListOutputParser from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from ..utils.logging import get_logger
from ..utils.research_web import search_on_web from ..utils.research_web import search_on_web
from .base_node import BaseNode from .base_node import BaseNode
from ..utils.logging import get_logger
class SearchInternetNode(BaseNode): class SearchInternetNode(BaseNode):
""" """
@ -27,13 +30,19 @@ class SearchInternetNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "SearchInternet". node_name (str): The unique identifier name for the node, defaulting to "SearchInternet".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "SearchInternet"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "SearchInternet",
):
super().__init__(node_name, "node", input, output, 1, node_config) super().__init__(node_name, "node", input, output, 1, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
self.max_results = node_config.get("max_results", 3) self.max_results = node_config.get("max_results", 3)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
@ -55,8 +64,7 @@ class SearchInternetNode(BaseNode):
necessary information for generating the answer is missing. necessary information for generating the answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -87,12 +95,9 @@ class SearchInternetNode(BaseNode):
search_answer = search_prompt | self.llm_model | output_parser search_answer = search_prompt | self.llm_model | output_parser
search_query = search_answer.invoke({"user_prompt": user_prompt})[0] search_query = search_answer.invoke({"user_prompt": user_prompt})[0]
if self.verbose: self.logger.info(f"Search Query: {search_query}")
self.logger.info(f"Search Query: {search_query}")
answer = search_on_web(query=search_query, max_results=self.max_results)
answer = search_on_web(
query=search_query, max_results=self.max_results)
if len(answer) == 0: if len(answer) == 0:
# raise an exception if no answer is found # raise an exception if no answer is found

View File

@ -4,13 +4,14 @@ SearchLinkNode Module
# Imports from standard library # Imports from standard library
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from Langchain # Imports from Langchain
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..utils.logging import get_logger
# Imports from the library # Imports from the library
from .base_node import BaseNode from .base_node import BaseNode
@ -33,13 +34,19 @@ class SearchLinkNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer". node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "GenerateLinks"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateLinks",
):
super().__init__(node_name, "node", input, output, 1, node_config) super().__init__(node_name, "node", input, output, 1, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = False if node_config is None else node_config.get( self.verbose = (
"verbose", False) False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -58,8 +65,7 @@ class SearchLinkNode(BaseNode):
necessary information for generating the answer is missing. necessary information for generating the answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -93,7 +99,13 @@ class SearchLinkNode(BaseNode):
""" """
relevant_links = [] relevant_links = []
for i, chunk in enumerate(tqdm(parsed_content_chunks, desc="Processing chunks", disable=not self.verbose)): for i, chunk in enumerate(
tqdm(
parsed_content_chunks,
desc="Processing chunks",
disable=not self.verbose,
)
):
merge_prompt = PromptTemplate( merge_prompt = PromptTemplate(
template=prompt_relevant_links, template=prompt_relevant_links,
input_variables=["content", "user_prompt"], input_variables=["content", "user_prompt"],
@ -101,7 +113,8 @@ class SearchLinkNode(BaseNode):
merge_chain = merge_prompt | self.llm_model | output_parser merge_chain = merge_prompt | self.llm_model | output_parser
# merge_chain = merge_prompt | self.llm_model # merge_chain = merge_prompt | self.llm_model
answer = merge_chain.invoke( answer = merge_chain.invoke(
{"content": chunk.page_content, "user_prompt": user_prompt}) {"content": chunk.page_content, "user_prompt": user_prompt}
)
relevant_links += answer relevant_links += answer
state.update({self.output[0]: relevant_links}) state.update({self.output[0]: relevant_links})
return state return state

View File

@ -3,9 +3,11 @@ SearchInternetNode Module
""" """
from typing import List, Optional from typing import List, Optional
from tqdm import tqdm
from langchain.output_parsers import CommaSeparatedListOutputParser from langchain.output_parsers import CommaSeparatedListOutputParser
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from tqdm import tqdm
from .base_node import BaseNode from .base_node import BaseNode
@ -27,12 +29,18 @@ class SearchLinksWithContext(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer". node_name (str): The unique identifier name for the node, defaulting to "GenerateAnswer".
""" """
def __init__(self, input: str, output: List[str], node_config: Optional[dict] = None, def __init__(
node_name: str = "GenerateAnswer"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswer",
):
super().__init__(node_name, "node", input, output, 2, node_config) super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"] self.llm_model = node_config["llm_model"]
self.verbose = True if node_config is None else node_config.get( self.verbose = (
"verbose", False) True if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -51,8 +59,7 @@ class SearchLinksWithContext(BaseNode):
that the necessary information for generating an answer is missing. that the necessary information for generating an answer is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
print(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)
@ -90,25 +97,30 @@ class SearchLinksWithContext(BaseNode):
result = [] result = []
# Use tqdm to add progress bar # Use tqdm to add progress bar
for i, chunk in enumerate(tqdm(doc, desc="Processing chunks", disable=not self.verbose)): for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
if len(doc) == 1: if len(doc) == 1:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_no_chunks, template=template_no_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"format_instructions": format_instructions}, "context": chunk.page_content,
"format_instructions": format_instructions,
},
) )
else: else:
prompt = PromptTemplate( prompt = PromptTemplate(
template=template_chunks, template=template_chunks,
input_variables=["question"], input_variables=["question"],
partial_variables={"context": chunk.page_content, partial_variables={
"chunk_id": i + 1, "context": chunk.page_content,
"format_instructions": format_instructions}, "chunk_id": i + 1,
"format_instructions": format_instructions,
},
) )
result.extend( result.extend(prompt | self.llm_model | output_parser)
prompt | self.llm_model | output_parser)
state["urls"] = result state["urls"] = result
return state return state

View File

@ -3,8 +3,10 @@ TextToSpeechNode Module
""" """
from typing import List, Optional from typing import List, Optional
from .base_node import BaseNode
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .base_node import BaseNode
class TextToSpeechNode(BaseNode): class TextToSpeechNode(BaseNode):
""" """
@ -21,12 +23,19 @@ class TextToSpeechNode(BaseNode):
node_name (str): The unique identifier name for the node, defaulting to "TextToSpeech". node_name (str): The unique identifier name for the node, defaulting to "TextToSpeech".
""" """
def __init__(self, input: str, output: List[str], def __init__(
node_config: Optional[dict]=None, node_name: str = "TextToSpeech"): self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "TextToSpeech",
):
super().__init__(node_name, "node", input, output, 1, node_config) super().__init__(node_name, "node", input, output, 1, node_config)
self.tts_model = node_config["tts_model"] self.tts_model = node_config["tts_model"]
self.verbose = False if node_config is None else node_config.get("verbose", False) self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
def execute(self, state: dict) -> dict: def execute(self, state: dict) -> dict:
""" """
@ -35,7 +44,7 @@ class TextToSpeechNode(BaseNode):
Args: Args:
state (dict): The current state of the graph. The input keys will be used to fetch the state (dict): The current state of the graph. The input keys will be used to fetch the
correct data types from the state. correct data types from the state.
Returns: Returns:
dict: The updated state with the output key containing the audio generated from the text. dict: The updated state with the output key containing the audio generated from the text.
@ -44,8 +53,7 @@ class TextToSpeechNode(BaseNode):
necessary information for generating the audio is missing. necessary information for generating the audio is missing.
""" """
if self.verbose: self.logger.info(f"--- Executing {self.node_name} Node ---")
self.logger.info(f"--- Executing {self.node_name} Node ---")
# Interpret input keys based on the provided input expression # Interpret input keys based on the provided input expression
input_keys = self.get_input_keys(state) input_keys = self.get_input_keys(state)