|
|
|
|
|
""" |
|
|
Wikipedia Tool for GAIA Agent System |
|
|
Handles Wikipedia searches, content extraction, and information retrieval |
|
|
""" |
|
|
|
|
|
import re |
|
|
import logging |
|
|
from typing import Dict, List, Optional, Any |
|
|
import wikipediaapi |
|
|
from urllib.parse import urlparse, unquote |
|
|
|
|
|
from tools import BaseTool |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class WikipediaSearchResult: |
|
|
"""Container for Wikipedia search results""" |
|
|
|
|
|
def __init__(self, title: str, summary: str, url: str, content: str = ""): |
|
|
self.title = title |
|
|
self.summary = summary |
|
|
self.url = url |
|
|
self.content = content |
|
|
|
|
|
def to_dict(self) -> Dict[str, str]: |
|
|
return { |
|
|
"title": self.title, |
|
|
"summary": self.summary, |
|
|
"url": self.url, |
|
|
"content": self.content[:1000] + "..." if len(self.content) > 1000 else self.content |
|
|
} |
|
|
|
|
|
class WikipediaTool(BaseTool): |
|
|
""" |
|
|
Wikipedia tool for searching and extracting information |
|
|
Handles disambiguation, missing pages, and content extraction |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__("wikipedia") |
|
|
|
|
|
|
|
|
self.wiki = wikipediaapi.Wikipedia( |
|
|
language='en', |
|
|
extract_format=wikipediaapi.ExtractFormat.WIKI, |
|
|
user_agent='GAIA-Agent/1.0 (educational-purpose)' |
|
|
) |
|
|
|
|
|
def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute Wikipedia operations based on input type |
|
|
|
|
|
Args: |
|
|
input_data: Can be: |
|
|
- str: Search query or Wikipedia URL |
|
|
- dict: {"query": str, "action": str, "limit": int} |
|
|
""" |
|
|
|
|
|
if isinstance(input_data, str): |
|
|
|
|
|
if self._is_wikipedia_url(input_data): |
|
|
return self._extract_from_url(input_data) |
|
|
else: |
|
|
return self._get_page_info(input_data) |
|
|
|
|
|
elif isinstance(input_data, dict): |
|
|
query = input_data.get("query", "") |
|
|
action = input_data.get("action", "summary") |
|
|
|
|
|
if action == "summary": |
|
|
return self._get_summary(query) |
|
|
elif action == "content": |
|
|
return self._get_full_content(query) |
|
|
else: |
|
|
raise ValueError(f"Unknown action: {action}") |
|
|
else: |
|
|
raise ValueError(f"Unsupported input type: {type(input_data)}") |
|
|
|
|
|
def _is_wikipedia_url(self, url: str) -> bool: |
|
|
"""Check if URL is a Wikipedia URL""" |
|
|
return "wikipedia.org" in url.lower() |
|
|
|
|
|
def _extract_title_from_url(self, url: str) -> str: |
|
|
"""Extract article title from Wikipedia URL""" |
|
|
try: |
|
|
parsed = urlparse(url) |
|
|
if "/wiki/" in parsed.path: |
|
|
title = parsed.path.split("/wiki/", 1)[1] |
|
|
return unquote(title).replace("_", " ") |
|
|
return "" |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
def _extract_from_url(self, url: str) -> Dict[str, Any]: |
|
|
"""Extract information from Wikipedia URL""" |
|
|
title = self._extract_title_from_url(url) |
|
|
if not title: |
|
|
raise ValueError(f"Could not extract title from URL: {url}") |
|
|
|
|
|
return self._get_full_content(title) |
|
|
|
|
|
def _get_page_info(self, query: str) -> Dict[str, Any]: |
|
|
"""Get basic page information (summary-level)""" |
|
|
try: |
|
|
page = self.wiki.page(query) |
|
|
|
|
|
if not page.exists(): |
|
|
return { |
|
|
"query": query, |
|
|
"found": False, |
|
|
"message": f"Wikipedia page '{query}' does not exist", |
|
|
"suggestions": self._get_suggestions(query) |
|
|
} |
|
|
|
|
|
|
|
|
summary = page.summary[:500] + "..." if len(page.summary) > 500 else page.summary |
|
|
|
|
|
result = WikipediaSearchResult( |
|
|
title=page.title, |
|
|
summary=summary, |
|
|
url=page.fullurl, |
|
|
content="" |
|
|
) |
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"found": True, |
|
|
"result": result.to_dict(), |
|
|
"message": "Successfully retrieved Wikipedia page info" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Failed to get Wikipedia page info: {str(e)}") |
|
|
|
|
|
def _get_summary(self, title: str) -> Dict[str, Any]: |
|
|
"""Get summary of a specific Wikipedia article""" |
|
|
try: |
|
|
page = self.wiki.page(title) |
|
|
|
|
|
if not page.exists(): |
|
|
return { |
|
|
"title": title, |
|
|
"found": False, |
|
|
"message": f"Wikipedia page '{title}' does not exist", |
|
|
"suggestions": self._get_suggestions(title) |
|
|
} |
|
|
|
|
|
|
|
|
summary = page.summary[:800] + "..." if len(page.summary) > 800 else page.summary |
|
|
|
|
|
result = WikipediaSearchResult( |
|
|
title=page.title, |
|
|
summary=summary, |
|
|
url=page.fullurl |
|
|
) |
|
|
|
|
|
return { |
|
|
"title": title, |
|
|
"found": True, |
|
|
"result": result.to_dict(), |
|
|
"categories": list(page.categories.keys())[:5], |
|
|
"message": "Successfully retrieved Wikipedia summary" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Failed to get Wikipedia summary: {str(e)}") |
|
|
|
|
|
def _get_full_content(self, title: str) -> Dict[str, Any]: |
|
|
"""Get full content of a Wikipedia article""" |
|
|
try: |
|
|
page = self.wiki.page(title) |
|
|
|
|
|
if not page.exists(): |
|
|
return { |
|
|
"title": title, |
|
|
"found": False, |
|
|
"message": f"Wikipedia page '{title}' does not exist", |
|
|
"suggestions": self._get_suggestions(title) |
|
|
} |
|
|
|
|
|
|
|
|
content_sections = self._parse_content_sections(page.text) |
|
|
|
|
|
result = WikipediaSearchResult( |
|
|
title=page.title, |
|
|
summary=page.summary[:800] + "..." if len(page.summary) > 800 else page.summary, |
|
|
url=page.fullurl, |
|
|
content=page.text |
|
|
) |
|
|
|
|
|
|
|
|
links = [] |
|
|
link_count = 0 |
|
|
for link_title in page.links.keys(): |
|
|
if link_count >= 20: |
|
|
break |
|
|
links.append(link_title) |
|
|
link_count += 1 |
|
|
|
|
|
return { |
|
|
"title": title, |
|
|
"found": True, |
|
|
"result": result.to_dict(), |
|
|
"sections": content_sections, |
|
|
"links": links, |
|
|
"categories": list(page.categories.keys())[:10], |
|
|
"backlinks_count": len(page.backlinks), |
|
|
"message": "Successfully retrieved full Wikipedia content" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Failed to get Wikipedia content: {str(e)}") |
|
|
|
|
|
def _parse_content_sections(self, content: str) -> Dict[str, str]: |
|
|
"""Parse Wikipedia content into sections""" |
|
|
sections = {} |
|
|
current_section = "Introduction" |
|
|
current_content = [] |
|
|
|
|
|
lines = content.split('\n') |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
|
|
|
|
|
|
if line.startswith('==') and line.endswith('==') and len(line) > 4: |
|
|
|
|
|
if current_content: |
|
|
sections[current_section] = '\n'.join(current_content).strip() |
|
|
|
|
|
|
|
|
current_section = line.strip('= ').strip() |
|
|
current_content = [] |
|
|
else: |
|
|
if line: |
|
|
current_content.append(line) |
|
|
|
|
|
|
|
|
if current_content: |
|
|
sections[current_section] = '\n'.join(current_content).strip() |
|
|
|
|
|
|
|
|
section_items = list(sections.items())[:5] |
|
|
return dict(section_items) |
|
|
|
|
|
def _get_suggestions(self, query: str) -> List[str]: |
|
|
"""Get search suggestions for a query (simplified)""" |
|
|
|
|
|
|
|
|
common_suggestions = [ |
|
|
query.lower(), |
|
|
query.title(), |
|
|
query.upper(), |
|
|
query.replace(' ', '_'), |
|
|
] |
|
|
return list(set(common_suggestions))[:3] |
|
|
|
|
|
def test_wikipedia_tool(): |
|
|
"""Test the Wikipedia tool with various queries""" |
|
|
tool = WikipediaTool() |
|
|
|
|
|
|
|
|
test_cases = [ |
|
|
"Albert Einstein", |
|
|
"https://en.wikipedia.org/wiki/Machine_learning", |
|
|
{"query": "Python (programming language)", "action": "summary"}, |
|
|
{"query": "Artificial Intelligence", "action": "content"}, |
|
|
"NonexistentPageTest12345" |
|
|
] |
|
|
|
|
|
print("🧪 Testing Wikipedia Tool...") |
|
|
|
|
|
for i, test_case in enumerate(test_cases, 1): |
|
|
print(f"\n--- Test {i}: {test_case} ---") |
|
|
try: |
|
|
result = tool.execute(test_case) |
|
|
|
|
|
if result.success: |
|
|
print(f"✅ Success: {result.result.get('message', 'No message')}") |
|
|
if result.result.get('found'): |
|
|
if 'result' in result.result: |
|
|
print(f" Title: {result.result['result'].get('title', 'No title')}") |
|
|
print(f" Summary: {result.result['result'].get('summary', 'No summary')[:100]}...") |
|
|
else: |
|
|
print(f" Not found: {result.result.get('message', 'Unknown error')}") |
|
|
else: |
|
|
print(f"❌ Error: {result.error}") |
|
|
|
|
|
print(f" Execution time: {result.execution_time:.2f}s") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Exception: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
test_wikipedia_tool() |