Nexari-G1.1 / app.py
Nexari-Research's picture
Update app.py
941017d verified
# app.py β€” Nexari G1 (Advanced Intent Analysis & Confidence Gating)
import os
import json
import logging
import asyncio
from datetime import datetime
import pytz
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Any, Dict, List
# Local model modules
import coder_model
import chat_model
# === SAFE IMPORT FOR NEW LIBRARIES ===
try:
from sentence_transformers import SentenceTransformer, util
from duckduckgo_search import DDGS
NEURAL_AVAILABLE = True
except ImportError:
NEURAL_AVAILABLE = False
print("⚠️ WARNING: sentence-transformers or duckduckgo-search not found.")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("nexari.app")
app = FastAPI()
MODEL_DIR = "./models"
NEURAL_DIR = os.path.join(MODEL_DIR, "neural")
# === CONFIGURATION ===
NEURAL_MODEL_NAME = "all-MiniLM-L6-v2"
neural_classifier = None
encoded_anchors = {}
MAX_HISTORY_MESSAGES = 6
# Optimized Anchors for better Vector Separation
INTENT_ANCHORS = {
"coding": ["write python code", "fix bug", "create function", "script", "debug", "sql query", "html css", "java code"],
"reasoning": ["solve math", "explain logic", "why", "prove that", "analyze", "physics", "chemistry"],
"search": ["latest news", "price of gold", "weather today", "who is the ceo", "current stock price", "search google", "find info"],
"time": ["what time is it", "current time", "date today", "clock", "day is today"],
# New category to pull "Identity" questions away from Search
"identity": ["what is your name", "who are you", "who created you", "tell me about yourself", "are you ai"]
}
def ensure_model_dir_or_fail():
try:
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(NEURAL_DIR, exist_ok=True)
except Exception as e:
logger.critical("Unable to create model dir: %s", e)
# === LOADERS ===
def load_neural_network():
global neural_classifier, encoded_anchors
if not NEURAL_AVAILABLE: return
try:
logger.info("⏳ Loading Neural Intent Model...")
model = SentenceTransformer(NEURAL_MODEL_NAME, cache_folder=NEURAL_DIR, device="cpu")
anchors = {}
for intent, texts in INTENT_ANCHORS.items():
anchors[intent] = model.encode(texts, convert_to_tensor=True)
neural_classifier = model
encoded_anchors = anchors
logger.info("βœ… Neural Intent Classifier Ready!")
except Exception as e:
logger.error(f"❌ Failed to load Neural Network: {e}")
async def load_neural_async():
await asyncio.to_thread(load_neural_network)
@app.on_event("startup")
async def startup_event():
ensure_model_dir_or_fail()
coder_model.BASE_DIR = os.path.join(MODEL_DIR, "coder")
chat_model.BASE_DIR = os.path.join(MODEL_DIR, "chat")
tasks = [
asyncio.create_task(coder_model.load_model_async()),
asyncio.create_task(chat_model.load_model_async()),
asyncio.create_task(load_neural_async()),
]
asyncio.gather(*tasks, return_exceptions=True)
logger.info("πŸš€ Server Startup Complete")
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: list[Message]
stream: bool = True
temperature: float = 0.7
# === TOOLS ===
def get_real_time():
try:
ist = pytz.timezone('Asia/Kolkata')
return datetime.now(ist).strftime("%A, %d %B %Y, %I:%M %p (IST)")
except Exception:
return str(datetime.now())
def search_sync(query: str):
logger.info(f"πŸ”Ž Executing Search for: {query}")
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=4))
if not results: return None
formatted_res = ""
for r in results:
formatted_res += f"Source: {r['title']}\nSnippet: {r['body']}\nLink: {r['href']}\n\n"
return formatted_res
except Exception as e:
logger.error(f"DDGS Error: {e}")
return None
async def perform_web_search(query: str):
if not NEURAL_AVAILABLE: return None
return await asyncio.to_thread(search_sync, query)
# === ADVANCED INTENT LOGIC (2025 Technique) ===
def analyze_deep_intent(text: str):
"""
Combines Neural Similarity with Confidence Gating & Token Chain Analysis.
Returns: (intent_name, confidence_score)
"""
# 1. Low-Level Token Analysis (The Chain Reaction Check)
text_lower = text.lower()
tokens = text_lower.split()
# GUARDRAIL: Self-Reference Override
# If user asks about "your name", "you", "yourself" -> Force Chat/Identity
self_tokens = {"your", "you", "yourself", "created", "made"}
if "name" in tokens and len(tokens) < 7 and any(t in tokens for t in self_tokens):
return "identity", 0.99
if not neural_classifier: return "chat", 0.0
try:
# 2. Neural Vector Search
user_embedding = neural_classifier.encode(text, convert_to_tensor=True)
scores = {}
for intent, anchor_embeddings in encoded_anchors.items():
cosine_scores = util.cos_sim(user_embedding, anchor_embeddings)
# We take the MAX score from the anchor group
scores[intent] = float(cosine_scores.max())
best_intent = max(scores, key=scores.get)
best_score = scores[best_intent]
# 3. Confidence Gating
# Search needs HIGH confidence to trigger (prevent false positives)
if best_intent == "search":
if best_score < 0.45: # Strict threshold for search
logger.info(f"⚠️ Search detected but low confidence ({best_score:.2f}). Fallback to chat.")
return "chat", best_score
# Identity maps to chat logic internally
if best_intent == "identity":
return "chat", best_score
# General Threshold
if best_score < 0.30:
return "chat", best_score
return best_intent, best_score
except Exception as e:
logger.error(f"Intent Error: {e}")
return "chat", 0.0
def sanitize_chunk(chunk: Any) -> Dict[str, Any]:
if isinstance(chunk, dict):
return {k: v for k, v in chunk.items() if k != "status"}
return {"text": str(chunk)}
SYSTEM_PREFIX = (
"You are Nexari G1, an advanced AI created by Piyush, the CEO of Nexari AI. "
"Your core directive is to be helpful, and accurate. "
"Always respond naturally, clarity, and Always respond with a friendly, positive tone, regardless of context. "
"You have internet access, web search ability, and server tools. Never deny these capabilities. "
# --- NEW FOCUS INSTRUCTION START ---
"ATTENTION PROTOCOL: "
"1. Always prioritize the user’s latest message above everything else. "
"2. Use previous messages only for context (names, continuity, or details). "
"3. If the user changes the topic, immediately switch and ignore the old topic. "
"4. Do not repeat previous answers unless the user clearly asks for repetition. "
# --- NEW FOCUS INSTRUCTION END ---
"Use emojis to make the conversation lively. "
)
def limit_context(messages: List[Dict]) -> List[Dict]:
if not messages: return []
sys = messages[0] if messages[0].get("role") == "system" else None
start = 1 if sys else 0
rem = messages[start:]
if len(rem) > MAX_HISTORY_MESSAGES: rem = rem[-MAX_HISTORY_MESSAGES:]
final = []
if sys: final.append(sys)
final.extend(rem)
return final
@app.post("/v1/chat/completions")
async def chat_endpoint(request: ChatRequest):
raw_msgs = [m.dict() for m in request.messages] if request.messages else []
if not raw_msgs: return {"error": "Empty messages"}
last_msg_text = raw_msgs[-1]['content']
# === ANALYZE INTENT ===
intent, confidence = analyze_deep_intent(last_msg_text)
logger.info(f"🧠 Analysis: Text='{last_msg_text}' | Intent='{intent}' | Conf={confidence:.2f}")
selected_model = chat_model.model
sys_msg = SYSTEM_PREFIX
status = "Thinking..."
injected_context = ""
# === ROUTING ===
if intent == "coding" and getattr(coder_model, "model", None):
selected_model = coder_model.model
sys_msg += " You are an Expert Coder. Provide clean, working code."
status = "Coding..."
elif intent == "reasoning" and getattr(chat_model, "model", None):
selected_model = chat_model.model
sys_msg += " Think step-by-step."
status = "Reasoning..."
elif intent == "time":
t = get_real_time()
injected_context = f"CURRENT DATE & TIME: {t}"
status = "Checking Time..."
elif intent == "search":
status = "Searching Web..."
clean_query = last_msg_text.replace("search", "").replace("google", "").strip()
search_q = clean_query if len(clean_query) > 2 else last_msg_text
res = await perform_web_search(search_q)
if res:
injected_context = (
f"### SEARCH RESULTS (REAL-TIME DATA):\n{res}\n"
"### INSTRUCTION:\n"
"Answer the user's question using ONLY the Search Results above."
)
else:
# Silent fallback if search fails
injected_context = ""
status = "Thinking..."
# === CONSTRUCT MESSAGE ===
if raw_msgs[0].get("role") != "system":
raw_msgs.insert(0, {"role":"system","content": sys_msg})
else:
raw_msgs[0]["content"] = sys_msg
if injected_context:
new_content = (
f"{injected_context}\n\n"
f"### USER QUESTION:\n{last_msg_text}"
)
raw_msgs[-1]['content'] = new_content
if not selected_model:
if chat_model.model: selected_model = chat_model.model
elif coder_model.model: selected_model = coder_model.model
else: return {"error": "System warming up..."}
final_msgs = limit_context(raw_msgs)
def iter_response():
try:
yield f"event: status\ndata: {json.dumps({'status': status})}\n\n"
yield ":\n\n"
stream = selected_model.create_chat_completion(
messages=final_msgs, temperature=request.temperature, stream=True
)
for chunk in stream:
yield f"data: {json.dumps(sanitize_chunk(chunk))}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Stream error: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(iter_response(), media_type="text/event-stream")