Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Any, Optional | |
| import tempfile | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| from pathlib import Path | |
| """Langraph""" | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_groq import ChatGroq | |
| from langchain_huggingface import ( | |
| ChatHuggingFace, | |
| HuggingFaceEndpoint, | |
| ) | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from langchain_core.tools import tool | |
| from langchain_openai import ChatOpenAI | |
| from duckduckgo_search import DDGS | |
| load_dotenv() | |
| ### =============== SEARCH TOOLS =============== ### | |
| def duckduckgo_search(query: str) -> str: | |
| """Search the web using DuckDuckGo. | |
| Args: | |
| query: The search query.""" | |
| try: | |
| # Initialize DuckDuckGo search with appropriate user agent | |
| ddgs_user_agent_header = "/".join( | |
| [ | |
| "Mozilla", | |
| "5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko", | |
| "20100101 Firefox", | |
| "124.0", | |
| ] | |
| ) | |
| ddgs = DDGS(headers={"User-Agent": ddgs_user_agent_header}) | |
| # Perform the search and get the first 5 results | |
| results = list(ddgs.text(query, max_results=5)) | |
| if not results: | |
| return {"search_results": "No search results found."} | |
| # Format the results | |
| formatted_results = [] | |
| for i, result in enumerate(results, 1): | |
| formatted_result = f"{i}. {result.get('title', 'No title')}\n" | |
| formatted_result += f" URL: {result.get('href', 'No URL')}\n" | |
| formatted_result += f" {result.get('body', 'No description')}\n" | |
| formatted_results.append(formatted_result) | |
| search_output = "\n".join(formatted_results) | |
| return {"search_results": search_output} | |
| except Exception as e: | |
| return {"search_results": f"Error performing search: {str(e)}"} | |
| def download_image(url: str, filename: Optional[str] = None) -> str: | |
| """Download an image from a URL and save it to a file. | |
| Args: | |
| url: The URL of the image to download. | |
| filename: Optional; The name to save the file as. If not provided, a name will be derived from the URL. | |
| """ | |
| try: | |
| # Send a GET request to the URL | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() # Raise an exception for HTTP errors | |
| # Determine the file extension from the content type | |
| content_type = response.headers.get('Content-Type', '') | |
| ext = '.jpg' # Default extension | |
| if 'png' in content_type: | |
| ext = '.png' | |
| elif 'jpeg' in content_type or 'jpg' in content_type: | |
| ext = '.jpg' | |
| elif 'gif' in content_type: | |
| ext = '.gif' | |
| elif 'webp' in content_type: | |
| ext = '.webp' | |
| # Create a filename if not provided | |
| if not filename: | |
| # Use the last part of the URL or a timestamp | |
| url_path = Path(url.split('?')[0]) | |
| url_filename = url_path.name | |
| if url_filename: | |
| filename = url_filename | |
| if not Path(filename).suffix: | |
| filename = f"{filename}{ext}" | |
| else: | |
| import time | |
| filename = f"image_{int(time.time())}{ext}" | |
| # Ensure the file has an extension | |
| if not Path(filename).suffix: | |
| filename = f"{filename}{ext}" | |
| # Save the file | |
| filepath = os.path.join(tempfile.gettempdir(), filename) | |
| with open(filepath, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"Image downloaded and saved to {filepath}" | |
| except Exception as e: | |
| return f"Error downloading image: {str(e)}" | |
| ### =============== DOCUMENT PROCESSING TOOLS =============== ### | |
| # File handling still requires external tools | |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Save content to a file and return the path. | |
| Args: | |
| content (str): the content to save to the file | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) | |
| filepath = temp_file.name | |
| else: | |
| filepath = os.path.join(temp_dir, filename) | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"File saved to {filepath}. You can read this file to process its contents." | |
| # load the system prompt from the file | |
| with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
| system_prompt = f.read() | |
| print(system_prompt) | |
| # System message | |
| sys_msg = SystemMessage(content=system_prompt) | |
| tools = [ | |
| duckduckgo_search, | |
| save_and_read_file, | |
| download_image, | |
| ] | |
| # Build graph function | |
| def build_graph(provider: str = "openai"): | |
| """Build the graph""" | |
| # Load environment variables from .env file | |
| if provider == "openai": | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0) | |
| elif provider == "groq": | |
| llm = ChatGroq(model="qwen-qwq-32b", temperature=0) | |
| elif provider == "huggingface": | |
| llm = ChatHuggingFace( | |
| llm=HuggingFaceEndpoint( | |
| repo_id="TinyLlama/TinyLlama-1.1B-Chat-v1.0", | |
| task="text-generation", | |
| max_new_tokens=1024, | |
| do_sample=False, | |
| repetition_penalty=1.03, | |
| temperature=0, | |
| ), | |
| verbose=True, | |
| ) | |
| else: | |
| raise ValueError("Invalid provider. Choose 'openai', 'groq', or 'huggingface'.") | |
| # Bind tools to LLM | |
| llm_with_tools = llm.bind_tools(tools) | |
| # Node | |
| def assistant(state: MessagesState): | |
| """Assistant node""" | |
| # Add system message at the beginning of messages | |
| messages = [sys_msg] + state["messages"] | |
| response = llm_with_tools.invoke(messages) | |
| # Return the response as is | |
| return {"messages": state["messages"] + [response]} | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("assistant", assistant) | |
| builder.add_node("tools", ToolNode(tools)) | |
| builder.add_edge(START, "assistant") | |
| builder.add_conditional_edges( | |
| "assistant", | |
| tools_condition, | |
| ) | |
| builder.add_edge("tools", "assistant") | |
| # Compile graph | |
| return builder.compile() | |
| # test | |
| if __name__ == "__main__": | |
| question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" | |
| graph = build_graph(provider="huggingface") | |
| messages = [HumanMessage(content=question)] | |
| messages = graph.invoke({"messages": messages}) | |
| for m in messages["messages"]: | |
| m.pretty_print() |