Nexari-Research commited on
Commit
1c76a46
·
verified ·
1 Parent(s): 54d8f9b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +116 -64
app.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import os
2
  import json
3
  import logging
@@ -5,31 +6,31 @@ import asyncio
5
  from fastapi import FastAPI
6
  from fastapi.responses import StreamingResponse
7
  from pydantic import BaseModel
 
8
 
9
- # separated modules
10
  import router_model
11
  import coder_model
12
  import chat_model
13
 
14
  logging.basicConfig(level=logging.INFO)
15
- logger = logging.getLogger("nexari")
16
 
17
  app = FastAPI()
18
 
19
- # Primary MODEL_DIR
20
  MODEL_DIR = "./models"
21
 
22
  def ensure_model_dir_or_fail():
23
  try:
24
  os.makedirs(MODEL_DIR, exist_ok=True)
25
- logger.info(f"Model dir ensured: {MODEL_DIR}")
26
  except Exception as e:
27
- logger.critical(f"CRITICAL: Unable to create model directory '{MODEL_DIR}': {e}")
28
  raise
29
 
30
  @app.on_event("startup")
31
  async def startup_event():
32
- logger.info("⏳ STARTUP: ensuring model dir and loading models...")
33
  ensure_model_dir_or_fail()
34
 
35
  router_model.BASE_DIR = os.path.join(MODEL_DIR, "router")
@@ -42,10 +43,10 @@ async def startup_event():
42
  asyncio.create_task(chat_model.load_model_async()),
43
  ]
44
  results = await asyncio.gather(*tasks, return_exceptions=True)
45
- for idx, res in enumerate(results):
46
- if isinstance(res, Exception):
47
- logger.error(f"Model loader {idx} failed during startup: {res}")
48
- logger.info("Startup complete.")
49
 
50
  class Message(BaseModel):
51
  role: str
@@ -57,108 +58,159 @@ class ChatRequest(BaseModel):
57
  temperature: float = 0.7
58
 
59
  def get_intent(last_user_message: str):
60
- # Fallback if router not loaded
61
- if not router_model.model:
62
- logger.warning("Router model not loaded — using rule-based fallback.")
63
- text = last_user_message.lower()
64
- if any(tok in text for tok in ["code", "function", "bug", "error", "fix", "html", "css", "python", "js"]):
65
  return "coding", "neutral"
 
 
66
  return "chat", "neutral"
67
 
68
- sys_prompt = "Analyze intent. Return JSON: {'intent': 'coding' or 'chat' or 'reasoning', 'sentiment': 'neutral' or 'sad'}"
69
  try:
70
  res = router_model.model.create_chat_completion(
71
- messages=[{"role": "system", "content": sys_prompt}, {"role": "user", "content": last_user_message}],
72
  temperature=0.1, max_tokens=50
73
  )
74
- content = res['choices'][0]['message']['content'].lower()
75
-
 
 
 
 
 
 
76
  if "coding" in content:
77
  return "coding", "neutral"
78
- # Reasoning intent detection
79
  if "reasoning" in content or "think" in content or "solve" in content:
80
  return "reasoning", "neutral"
81
  if "sad" in content:
82
  return "chat", "sad"
83
  return "chat", "neutral"
84
  except Exception as e:
85
- logger.exception(f"Router intent detection failed: {e}")
86
  return "chat", "neutral"
87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  @app.post("/v1/chat/completions")
89
  async def chat_endpoint(request: ChatRequest):
90
- messages = [m.dict() for m in request.messages]
 
91
  if not messages:
92
  return {"error": "No messages provided."}
93
- last_msg = messages[-1]['content']
94
 
95
- # 1. Detect Intent & Sentiment
96
- intent, sentiment = get_intent(last_msg)
97
 
98
  selected_model = None
99
- sys_msg = "You are a helpful AI."
100
-
101
- # 2. Set Status Indicator Text
102
- status_indicator = "Thinking..." # Default
103
 
104
  if intent == "coding":
105
- if not coder_model.model:
106
- logger.error("Client requested coding intent but coder model is not loaded.")
107
- return {"error": "Coder model not available."}
108
  selected_model = coder_model.model
109
- sys_msg = "You are an expert Coding Assistant. Write clean, efficient code."
110
  status_indicator = "Coding..."
111
- logger.info("Using: CODER")
112
-
113
  elif intent == "reasoning":
114
- # Currently using Chat model for reasoning, but indicating "Reasoning..."
115
- # If you have a specific reasoning model (e.g. DeepSeek-R1), add logic here.
116
- if not chat_model.model:
117
- return {"error": "Model not available."}
118
  selected_model = chat_model.model
119
- status_indicator = "Reasoning..."
120
- logger.info("Using: REASONING (via Chat Model)")
121
-
122
  else:
123
- if not chat_model.model:
124
- logger.error("Chat model not loaded.")
125
- return {"error": "Chat model not available."}
126
  selected_model = chat_model.model
127
- logger.info(f"Using: CHAT (Sentiment: {sentiment})")
128
  if sentiment == "sad":
129
- sys_msg = "You are an empathetic friend."
130
  status_indicator = "Empathizing..."
131
 
132
- # Ensure system prompt
133
- if messages[0]['role'] != "system":
134
- messages.insert(0, {"role": "system", "content": sys_msg})
135
  else:
136
- messages[0]['content'] = sys_msg
137
 
138
- # 3. Stream Response with Status Packet
139
  def iter_response():
140
  try:
141
- # Send status packet first (frontend expects this to update its indicator)
142
- yield f"data: {json.dumps({'status': status_indicator})}\n\n"
143
-
144
- # Start model streaming generator
 
 
 
 
 
 
 
 
145
  stream = selected_model.create_chat_completion(
146
  messages=messages,
147
  temperature=request.temperature,
148
  stream=True
149
  )
150
 
151
- # **CRITICAL: send the status packet again immediately after starting the stream**
152
- # This guarantees the frontend will receive the server's authoritative status even
153
- # if it creates the indicator DOM slightly later (race condition on the client).
154
- yield f"data: {json.dumps({'status': status_indicator})}\n\n"
155
-
156
  for chunk in stream:
157
- yield f"data: {json.dumps(chunk)}\n\n"
 
 
 
 
 
 
 
158
  yield "data: [DONE]\n\n"
 
 
159
  except Exception as e:
160
- logger.exception(f"Error while streaming: {e}")
161
- yield f"data: {json.dumps({'error': str(e)})}\n\n"
 
 
 
 
162
  yield "data: [DONE]\n\n"
163
 
164
  return StreamingResponse(iter_response(), media_type="text/event-stream")
 
1
+ # app.py -- robust SSE status handling and chunk sanitization
2
  import os
3
  import json
4
  import logging
 
6
  from fastapi import FastAPI
7
  from fastapi.responses import StreamingResponse
8
  from pydantic import BaseModel
9
+ from typing import Any, Dict
10
 
11
+ # Local model modules (expect these to exist in your project)
12
  import router_model
13
  import coder_model
14
  import chat_model
15
 
16
  logging.basicConfig(level=logging.INFO)
17
+ logger = logging.getLogger("nexari.app")
18
 
19
  app = FastAPI()
20
 
 
21
  MODEL_DIR = "./models"
22
 
23
  def ensure_model_dir_or_fail():
24
  try:
25
  os.makedirs(MODEL_DIR, exist_ok=True)
26
+ logger.info("Model directory ensured: %s", MODEL_DIR)
27
  except Exception as e:
28
+ logger.critical("Unable to create model dir: %s", e)
29
  raise
30
 
31
  @app.on_event("startup")
32
  async def startup_event():
33
+ logger.info("Startup: ensure model dir and set base dirs...")
34
  ensure_model_dir_or_fail()
35
 
36
  router_model.BASE_DIR = os.path.join(MODEL_DIR, "router")
 
43
  asyncio.create_task(chat_model.load_model_async()),
44
  ]
45
  results = await asyncio.gather(*tasks, return_exceptions=True)
46
+ for i, r in enumerate(results):
47
+ if isinstance(r, Exception):
48
+ logger.error("Model loader %d failed: %s", i, r)
49
+ logger.info("Startup complete.")
50
 
51
  class Message(BaseModel):
52
  role: str
 
58
  temperature: float = 0.7
59
 
60
  def get_intent(last_user_message: str):
61
+ # If router model missing, use a simple rule
62
+ if not getattr(router_model, "model", None):
63
+ text = (last_user_message or "").lower()
64
+ if any(tok in text for tok in ["code", "bug", "fix", "error", "function", "python", "js", "html", "css"]):
 
65
  return "coding", "neutral"
66
+ if any(tok in text for tok in ["why", "how", "prove", "reason", "think"]):
67
+ return "reasoning", "neutral"
68
  return "chat", "neutral"
69
 
70
+ sys_prompt = "Analyze intent. Return JSON like {'intent':'coding'|'chat'|'reasoning', 'sentiment':'neutral'|'sad'}"
71
  try:
72
  res = router_model.model.create_chat_completion(
73
+ messages=[{"role":"system","content":sys_prompt},{"role":"user","content": last_user_message}],
74
  temperature=0.1, max_tokens=50
75
  )
76
+ content = ""
77
+ try:
78
+ content = res['choices'][0]['message']['content'].lower()
79
+ except Exception:
80
+ try:
81
+ content = res['choices'][0]['text'].lower()
82
+ except Exception:
83
+ content = ""
84
  if "coding" in content:
85
  return "coding", "neutral"
 
86
  if "reasoning" in content or "think" in content or "solve" in content:
87
  return "reasoning", "neutral"
88
  if "sad" in content:
89
  return "chat", "sad"
90
  return "chat", "neutral"
91
  except Exception as e:
92
+ logger.exception("Router failure: %s", e)
93
  return "chat", "neutral"
94
 
95
+ def sanitize_chunk(chunk: Any) -> Dict[str, Any]:
96
+ """
97
+ Ensure chunk is a JSON-serializable mapping for SSE.
98
+ Remove any 'status' fields so we never send an unintended status overwrite.
99
+ """
100
+ # If chunk is already a dict-like
101
+ if isinstance(chunk, dict):
102
+ # shallow copy to avoid mutating model internals
103
+ out = {}
104
+ for k, v in chunk.items():
105
+ if k == "status":
106
+ # drop status fields from model-chunks; log for diagnostics
107
+ logger.debug("Dropping status field from model chunk: %s", v)
108
+ continue
109
+ # try to keep strings and numbers; for complex objects convert to str
110
+ if isinstance(v, (str, int, float, bool, type(None))):
111
+ out[k] = v
112
+ else:
113
+ try:
114
+ json.dumps(v)
115
+ out[k] = v
116
+ except Exception:
117
+ out[k] = str(v)
118
+ return out
119
+ else:
120
+ # Not a dict: coerce into a safe dict with text key
121
+ try:
122
+ # if it's bytes or similar, convert
123
+ txt = str(chunk)
124
+ return {"text": txt}
125
+ except Exception:
126
+ return {"text": "[UNSERIALIZABLE_CHUNK]"}
127
+
128
  @app.post("/v1/chat/completions")
129
  async def chat_endpoint(request: ChatRequest):
130
+ # Validate incoming
131
+ messages = [m.dict() for m in request.messages] if request.messages else []
132
  if not messages:
133
  return {"error": "No messages provided."}
134
+ last = messages[-1]['content']
135
 
136
+ intent, sentiment = get_intent(last)
 
137
 
138
  selected_model = None
139
+ sys_msg = "You are a helpful assistant."
140
+ status_indicator = "Thinking..." # default if not changed below
 
 
141
 
142
  if intent == "coding":
143
+ if not getattr(coder_model, "model", None):
144
+ logger.error("Coder model not available.")
145
+ return {"error":"Coder model not available."}
146
  selected_model = coder_model.model
147
+ sys_msg = "You are a coding expert. Provide clean code."
148
  status_indicator = "Coding..."
149
+ logger.info("Intent: CODING")
 
150
  elif intent == "reasoning":
151
+ if not getattr(chat_model, "model", None):
152
+ logger.error("Chat model not available for reasoning.")
153
+ return {"error":"Model not available."}
 
154
  selected_model = chat_model.model
155
+ status_indicator = "Reasoning..."
156
+ logger.info("Intent: REASONING")
 
157
  else:
158
+ if not getattr(chat_model, "model", None):
159
+ logger.error("Chat model missing.")
160
+ return {"error":"Chat model not available."}
161
  selected_model = chat_model.model
162
+ logger.info("Intent: CHAT (%s)", sentiment)
163
  if sentiment == "sad":
164
+ sys_msg = "You are empathic and calm."
165
  status_indicator = "Empathizing..."
166
 
167
+ # ensure system prompt is present
168
+ if messages[0].get("role") != "system":
169
+ messages.insert(0, {"role":"system","content": sys_msg})
170
  else:
171
+ messages[0]["content"] = sys_msg
172
 
173
+ # Streaming generator
174
  def iter_response():
175
  try:
176
+ # 1) Send a single authoritative SSE status event (event: status)
177
+ # Use event field so client can handle it separately from data token stream.
178
+ status_payload = json.dumps({"status": status_indicator})
179
+ event_payload = f"event: status\n"
180
+ event_payload += f"data: {status_payload}\n\n"
181
+ logger.info("Sending authoritative status event: %s", status_indicator)
182
+ yield event_payload
183
+
184
+ # 2) small flush hint to reduce buffering and help client parse promptly
185
+ yield ":\n\n"
186
+
187
+ # 3) Start streaming model output
188
  stream = selected_model.create_chat_completion(
189
  messages=messages,
190
  temperature=request.temperature,
191
  stream=True
192
  )
193
 
194
+ # Iterate model generator and sanitize every chunk so it cannot inject a status
 
 
 
 
195
  for chunk in stream:
196
+ safe = sanitize_chunk(chunk)
197
+ try:
198
+ yield f"data: {json.dumps(safe)}\n\n"
199
+ except Exception:
200
+ # fallback to a safe string representation
201
+ yield f"data: {json.dumps({'text': str(safe)})}\n\n"
202
+
203
+ # 4) final done marker
204
  yield "data: [DONE]\n\n"
205
+ logger.info("Stream finished for request (status was: %s)", status_indicator)
206
+
207
  except Exception as e:
208
+ logger.exception("Streaming error: %s", e)
209
+ # send explicit error object
210
+ try:
211
+ yield f"data: {json.dumps({'error': str(e)})}\n\n"
212
+ except Exception:
213
+ yield "data: {\"error\":\"streaming failure\"}\n\n"
214
  yield "data: [DONE]\n\n"
215
 
216
  return StreamingResponse(iter_response(), media_type="text/event-stream")