import json
import time
import requests
import streamlit as st
from contextlib import contextmanager
from typing import Optional, List, Literal
# import server.logger as logger
# ------------------------------------------------------------------------------
# Page Config:
# ------------------------------------------------------------------------------
# Configure page with professional branding
st.set_page_config(
page_title="Chat with your data | Knowledge Assistant",
page_icon="š¦",
layout='wide',
initial_sidebar_state='expanded',
menu_items={
'Get Help': 'https://github.com/sanchit-shaleen/chat-with-your-data',
'Report a bug': 'https://github.com/sanchit-shaleen/chat-with-your-data/issues',
'About': '# Chat with your data\nA production-grade document intelligence system'
}
)
# Modern CSS styling for professional appearance
st.markdown("""
""", unsafe_allow_html=True)
# ------------------------------------------------------------------------------
# Page consistent settings and initializations:
# ------------------------------------------------------------------------------
class Message:
type: Literal['assistant', 'human']
content: str
filenames: Optional[List[str]]
# List of filenames attached to the message
# These names will be original file names, might be diff than actual saved on server
# Hence, Chat-UI and sidebar might show same file with different names.
def __init__(
self, type: Literal['assistant', 'human'],
content: str, filenames: Optional[List[str]] = None
):
self.type = type
self.content = content
self.filenames = filenames
# Get user_id:
if "session_id" not in st.session_state:
# Enhanced CSS for modern landing page
st.markdown("""
""", unsafe_allow_html=True)
# Add header banner
st.markdown("""
š¬
Chat with your data
Your Personal Document Intelligence Assistant
""", unsafe_allow_html=True)
# Check server availability with retries
max_retries = 5
retry_delay = 2 # seconds
server_ready = False
for attempt in range(max_retries):
try:
if requests.get(f"{st.secrets.server.ip_address}/", timeout=5).status_code == 200:
server_ready = True
break
except:
if attempt < max_retries - 1:
st.info(f"š Initializing services... ({attempt + 1}/{max_retries})")
import time
time.sleep(retry_delay)
else:
st.error("š« Server is not reachable. Please check your connection or server status.", icon="š")
st.stop()
# Create two-column layout with proper sizing
col_left, col_right = st.columns([1, 1.2], gap="large")
with col_left:
st.markdown("### š¬ Why Chat with Your Data?", help="Key features")
# Feature cards with updated color scheme
st.markdown("""
š¤ Upload Any Data
Support for CSV, Excel, JSON, and more
š¬ Natural Conversations
Ask questions in plain English, get instant answers
ā AI-Powered Insights
Discover patterns and trends automatically
š Lightning Fast
Optimized caching for instant responses
""", unsafe_allow_html=True)
with col_right:
# Determine which tab is active
if "auth_tab" not in st.session_state:
st.session_state.auth_tab = "login"
st.markdown("""
### š Sign In / Create Account
""")
# Tab selection
tab1, tab2 = st.tabs(["š Login", "š Register"])
with tab1:
st.write("Welcome back! Sign in to your account to continue.")
ip_user_id = st.text_input(
"User ID",
placeholder="Enter your user ID",
key="login_user_id"
)
ip_user_pw = st.text_input(
"Password",
placeholder="Enter your password",
type="password",
key="login_user_pw"
)
remember = st.checkbox("Remember me", key="login_remember", value=False)
if st.button("š Login", type="primary", use_container_width=True, key="login_btn"):
ip_user_id = "_".join(ip_user_id.strip().lower().split(" "))
ip_user_pw = ip_user_pw.strip()
if not ip_user_id or not ip_user_pw:
st.error("ā Please fill all the fields.")
else:
try:
resp = requests.post(
f"{st.secrets.server.ip_address}/login",
json={"login_id": ip_user_id, "password": ip_user_pw}
)
if resp.status_code == 200:
session_id = resp.json().get("user_id")
st.session_state.session_id = session_id
name_of_user = resp.json().get("name", session_id)
st.session_state.name_of_user = name_of_user
st.success("ā
Login successful! Redirecting...", icon="āØ")
time.sleep(1)
st.rerun()
else:
st.error("ā " + resp.json().get("error", "Login failed."))
except requests.RequestException as e:
st.error(f"ā Error connecting to server: {e}")
with tab2:
st.write("Join us and start chatting with your data today!")
ip_user_name = st.text_input(
"Full Name",
placeholder="John Doe",
key="register_user_name"
)
ip_user_id = st.text_input(
"User ID",
placeholder="john_doe",
key="register_user_id"
)
st.caption("Use: lowercase, numbers, `-`, `_` only")
ip_user_pw = st.text_input(
"Password",
placeholder="Create a strong password",
type="password",
key="register_user_pw"
)
if st.button("⨠Create Account", type="primary", use_container_width=True, key="register_btn"):
ip_user_id = "_".join(ip_user_id.strip().lower().split(" "))
ip_user_pw = ip_user_pw.strip()
if not ip_user_name or not ip_user_id or not ip_user_pw:
st.error("ā Please fill all the fields.")
else:
try:
resp = requests.post(
f"{st.secrets.server.ip_address}/register",
json={
"name": ip_user_name,
"user_id": ip_user_id,
"password": ip_user_pw
}
)
if resp.status_code == 201:
st.success("ā
Registration successful! You can now login.", icon="š")
st.info("š Switch to the Login tab to sign in.", icon="ā¹ļø")
else:
st.error("ā " + resp.json().get("error", "Registration failed."))
except requests.RequestException as e:
st.error(f"ā Error connecting to server: {e}")
st.stop()
if "initialized" not in st.session_state:
# Initialize Logger:
# st.session_state.logger = logger.get_logger(name="Streamlit")
# log = st.session_state.logger
# log.info("Streamlit initialized.")
# Initialize the session with server::
st.session_state.server_ip = st.secrets.server.ip_address
try:
resp = requests.post(
f"{st.session_state.server_ip}/chat_history",
data={"user_id": st.session_state.session_id}
)
if resp.status_code == 200:
# Initialize messages:
st.session_state.chat_history = [Message('assistant', "š, How may I help you today?")]
# Load old chat history (if):
chat_hist = resp.json().get("chat_history", [])
for msg in chat_hist:
st.session_state.chat_history.append(Message(msg['role'], msg['content']))
else:
# log.error(f"Failed to initialize chat history: {resp.json().get('error', 'Unknown error')}")
st.error(
"Failed to initialize chat history. Please try again later.",
icon="š«"
)
st.stop()
except requests.RequestException as e:
# log.error(f"Error initializing server session: {e}")
st.error(
"Failed to connect to the server. Please check your connection or server status.",
icon="š«"
)
st.stop()
# # Initialize messages:
# st.session_state.chat_history = [
# Message('assistant', "š, How may I help you today?"),
# # Message("human", "Help me in some thing...")
# ]
# User's Existing Uploads:
st.session_state.user_uploads = requests.get(
f"{st.session_state.server_ip}/uploads",
params={"user_id": st.session_state.session_id}
).json().get("files", [])
# Last resp retrieved docs:
st.session_state.last_retrieved_docs = []
# Set flag to true:
st.session_state.initialized = True
# All variables in session state:
user_id = st.session_state.session_id
chat_history = st.session_state.chat_history
server_ip = st.session_state.server_ip
# Debug: Display current user_id
with st.sidebar:
st.caption(f"š¤ Logged in as: `{user_id}`")
# ------------------------------------------------------------------------------
# Helper functions:
# ------------------------------------------------------------------------------
def write_as_ai(text):
with st.chat_message(name='assistant', avatar='assistant'):
st.markdown(text)
def write_as_human(text: str, filenames: Optional[List[str]] = None):
with st.chat_message(name='user', avatar='user'):
st.markdown(text)
if filenames:
files = ", ".join([f"`'{file}'`" for file in filenames])
st.caption(f"š Attached file(s): {files}.")
def upload_file(uploaded_file) -> tuple[bool, str]:
"""Upload the st attachment/uploaded file to the server and save it.
Args:
uploaded_file: The file object uploaded by the user.
Returns:
tuple: A tuple containing:
- bool: True if the file was uploaded successfully, False otherwise.
- str: The server file name or error message.
"""
try:
# POST to FastAPI
files = {"file": (uploaded_file.name, uploaded_file.getvalue())}
data = {"user_id": user_id}
response = requests.post(f"{server_ip}/upload", files=files, data=data)
if response.status_code == 200:
message = response.json().get("message", "")
# log.info(f"File `{message}` uploaded successfully for user `{user_id}`.")
return True, message
else:
message = response.json().get("error", "Unknown error")
# log.error(
# f"Failed to upload file `{uploaded_file.name}`: {message} for user `{user_id}`.")
return False, message
except Exception as e:
# log.error(f"Error uploading file `{uploaded_file.name}`: {e} for user `{user_id}`.")
return False, str(e)
def embed_file(file_name: str) -> tuple[bool, str, dict]:
"""Embed the content of the file into the RAG system with multimodal support.
Args:
file_name: The name of the file to embed.
Returns:
tuple: A tuple containing:
- bool: True if the file was embedded successfully, False otherwise.
- str: Success message or error message.
- dict: Additional metadata (items_embedded, text_chunks, images_extracted, image_paths)
"""
try:
response = requests.post(
f"{server_ip}/embed",
json={
"user_id": user_id,
"file_name": file_name
}
)
if response.status_code == 200:
resp_data = response.json()
message = resp_data.get("message", "File embedded successfully.")
# Build detailed message with multimodal info
detailed_message = message
items = resp_data.get("items_embedded", 0)
text_chunks = resp_data.get("text_chunks", 0)
images = resp_data.get("images_extracted", 0)
if items > 0:
detailed_message = f"ā
Ingested {items} items ({text_chunks} text chunks + {images} š¼ļø images)"
# Return with metadata for frontend enhancement
metadata = {
"items_embedded": items,
"text_chunks": text_chunks,
"images_extracted": images,
"image_paths": resp_data.get("image_paths", [])
}
return True, detailed_message, metadata
else:
error_message = response.json().get("error", "Unknown error")
return False, error_message, {}
except Exception as e:
return False, str(e), {}
def handle_uploaded_files(uploaded_files) -> bool:
"""Handle the uploaded files by uploading them to the server and embedding their content."""
progress_status = ""
with st.chat_message(name='assistant', avatar='./assets/settings_3.png'):
with st.spinner("Processing files..."):
container = st.empty()
# Found out later that all this thing can be done with st.status() as status:
# But, it does not allow that much customization.
@contextmanager
def write_progress(msg: str):
# Shared variable across multiple steps
nonlocal progress_status
# Start with ā³ļø to show progress:
curr = progress_status + f"- ā³ {msg}\n"
container.container(border=True).markdown(curr)
try:
# Do the actual step (indent of 'with')
yield
# yield is over means, step is done > Update with ā
progress_status += f"\n- ā
{msg}\n"
curr = progress_status
except Exception as e:
progress_status += f"\n- ā {msg}: {e}\n"
raise e
finally:
container.container(border=True).markdown(curr)
try:
for i, file in enumerate(uploaded_files):
progress_status += f"\nš Processing file {i+1} of {len(uploaded_files)}...\n"
# log.info(f"Processing file: {file.name}")
# Upload file:
with write_progress("Uploading file..."):
status, message = upload_file(file)
if not status:
raise RuntimeError(f"Upload failed for file: {file.name}")
server_file_name = message
time.sleep(st.secrets.llm.per_step_delay)
# Embed the file:
with write_progress("Embedding content..."):
status, message, embed_metadata = embed_file(server_file_name)
if not status:
raise RuntimeError(f"Embedding failed for file: {file.name}")
# Show multimodal embedding results if available
if embed_metadata.get("images_extracted", 0) > 0:
st.success(f"š {message}", icon="ā
")
time.sleep(st.secrets.llm.per_step_delay)
# Any last steps like finalizing or cleanup:
with write_progress("Finalizing the process..."):
# Update data with latest user_upload
st.session_state.user_uploads = requests.get(
f"{st.session_state.server_ip}/uploads",
params={"user_id": user_id}
).json().get("files", [])
# log.info(f"File `{file.name}` processed successfully.")
time.sleep(st.secrets.llm.end_delay)
return True
except Exception as e:
st.exception(exception=e)
st.stop()
return False
@st.cache_data(ttl=60 * 10, show_spinner=False)
def get_iframe(file_name: str, num_pages: int = 5) -> tuple[bool, str]:
"""Get the iframe HTML for the PDF file."""
try:
response = requests.post(
f"{st.session_state.server_ip}/iframe",
json={
"user_id": user_id,
"file_name": file_name,
"num_pages": num_pages
},
)
if response.status_code == 200:
return True, response.json().get("iframe", "")
else:
return False, response.json().get("error", "Unknown error")
except requests.RequestException as e:
# log.error(f"Error getting iframe for {file_name}: {e}")
return False, str(e)
# ------------------------------------------------------------------------------
# Sidebar:
# ------------------------------------------------------------------------------
# User Profile:
with st.sidebar.container(border=True):
col1, col2, col3 = st.columns([1, 5, 1])
col1.write("š¤")
col2.markdown(f"**{st.session_state.get('name_of_user', 'User')}**")
col3.write(f"āØ")
st.sidebar.divider()
# Files Preview Section:
st.sidebar.subheader("š Document Management")
selected_file = st.sidebar.selectbox(
label="Choose Document",
index=0,
options=st.session_state.user_uploads,
help="Select a document to preview"
)
# Tried to show pdf persistently, but it re-renders on each run and page hangs in streaming response:
if not st.session_state.user_uploads:
st.sidebar.info("š No documents uploaded yet.\n\nStart by uploading your first document!", icon="ā¹ļø")
else:
col1, col2 = st.sidebar.columns([1, 1])
with col1:
preview_button = st.sidebar.button("šļø Preview", use_container_width=True)
with col2:
delete_button = st.sidebar.button("šļø Delete", use_container_width=True)
if selected_file and preview_button:
status, content = get_iframe(selected_file)
if status:
st.sidebar.markdown(content, unsafe_allow_html=True)
else:
st.sidebar.error(f"ā Error: {content}", icon="š«")
if selected_file and delete_button:
try:
resp = requests.post(
f"{st.session_state.server_ip}/delete_file",
json={"user_id": user_id, "file_name": selected_file}
)
if resp.status_code == 200:
st.sidebar.success("ā
Document deleted successfully!", icon="šļø")
st.cache_data.clear()
st.rerun()
else:
st.sidebar.error(resp.json().get("error", "Failed to delete document."), icon="š«")
except requests.RequestException as e:
st.sidebar.error(f"ā Error deleting document: {e}", icon="š")
st.sidebar.divider()
# Advanced Options:
st.sidebar.subheader("āļø Advanced Options")
col1, col2 = st.sidebar.columns([1, 1])
with col1:
dummy_mode = st.sidebar.toggle(
label="š Dummy Mode",
value=False,
key="dummy_mode",
help="Use placeholder responses instead of LLM"
)
with col2:
cache_mode = st.sidebar.toggle(
label="š¾ Cache Info",
value=False,
key="show_cache_info",
help="Show cache performance metrics"
)
st.sidebar.divider()
# Destructive Actions:
st.sidebar.subheader("š“ Danger Zone")
col1, col2 = st.sidebar.columns([1, 1])
with col1:
if st.sidebar.button("šļø Clear Uploads", type="secondary", use_container_width=True):
try:
resp = requests.post(
f"{st.session_state.server_ip}/clear_my_files",
data={"user_id": user_id}
)
if resp.status_code == 200:
st.sidebar.success("ā
All documents cleared!", icon="šļø")
st.cache_data.clear()
st.rerun()
else:
st.sidebar.error(resp.json().get("error", "Failed to clear documents."), icon="š«")
except requests.RequestException as e:
st.sidebar.error(f"ā Error: {e}", icon="š")
with col2:
if st.sidebar.button("š¬ Clear Chat", type="secondary", use_container_width=True):
resp = requests.post(
f"{server_ip}/clear_chat_history",
data={"user_id": user_id}
)
if resp.status_code == 200:
st.session_state.chat_history = [
Message('assistant', "š Hello! How can I help you today?")
]
st.session_state.last_retrieved_docs = []
st.sidebar.success("ā
Chat cleared!", icon="š¬")
st.rerun()
else:
st.sidebar.error(resp.json().get("error", "Failed to clear chat."), icon="š«")
st.sidebar.divider()
# Footer
st.sidebar.markdown("""
---
š¬ Chat with your data
Production-grade Document Intelligence
""", unsafe_allow_html=True)
# with st.sidebar:
# st.write(st.session_state)
# ------------------------------------------------------------------------------
# Page content:
# ------------------------------------------------------------------------------
# Page content with modern header
col1, col2 = st.columns([0.8, 9.2], vertical_alignment='bottom', gap='small')
with col1:
col1.markdown("š¦")
with col2:
st.markdown("""
Chat with your data
Your intelligent document assistant powered by local AI inference
""", unsafe_allow_html=True)
st.divider()
# Display cache info if enabled
if st.session_state.get('show_cache_info', False):
with st.expander("ā” Cache Performance & Statistics", expanded=True):
try:
cache_stats = requests.get(f"{server_ip}/cache-debug").json()
col1, col2, col3 = st.columns(3)
with col1:
st.metric("š¦ Cache Size", f"{cache_stats.get('cache_size', 0)} entries",
help="Number of queries currently cached")
with col2:
st.metric("ā” Status", "š¢ Active",
help="Cache is operational and responding")
with col3:
st.metric("š¾ Memory", "Optimized",
help="Using LRU eviction and TTL expiration")
if cache_stats.get('cache_keys'):
st.caption(f"š Recent queries in cache: {', '.join([f'`{k[:15]}...`' for k in cache_stats.get('cache_keys', [])[:5]])}")
except Exception as e:
st.warning("ā ļø Cache stats unavailable", icon="ā¹ļø")
# Chat history display
for ind, message in enumerate(st.session_state.chat_history):
if ind < len(st.session_state.chat_history) - 1: # all messages except last
if message.type == 'human':
write_as_human(message.content, message.filenames)
elif message.type == 'assistant':
answer = message.content
if "" in answer:
answer = answer[answer.find("") + len(""):]
write_as_ai(answer)
else: # Last message
if message.type == 'human': # if human, write normally
write_as_human(message.content)
elif message.type == 'assistant': # if assistant
# Get the answer, thoughts and docs from the message:
full = message.content
thoughts = full[
full.find("")+8:full.find("")
] if "" in full else None
answer = full[full.find("") + len(""):] if thoughts else full
documents = st.session_state.last_retrieved_docs if st.session_state.last_retrieved_docs else None
with st.chat_message(name='assistant', avatar='assistant'):
with st.container(border=True):
# # Thinking:
# if thoughts:
# cont_thoughts = st.expander("š Thoughts", expanded=True).markdown(thoughts)
# # Answer:
# st.markdown(answer)
# # Documents:
# if documents:
# tabs = st.expander("šļø Sources", expanded=False).tabs(
# tabs=[f"Document {i+1}" for i in range(len(documents))]
# )
# for i, doc in enumerate(documents):
# with tabs[i]:
# st.subheader(":blue[Content:]")
# st.markdown(doc['page_content'])
# st.divider()
# st.subheader(":blue[Source Details:]")
# st.json(doc['metadata'], expanded=False)
# Thinking:
if thoughts:
# cont_thoughts = c1.expander("š Thoughts", expanded=True).markdown(thoughts)
cont_thoughts = st.popover(
"š Thoughts", use_container_width=False).markdown(thoughts)
# Answer:
st.markdown(answer)
# Documents:
if documents:
tabs = st.expander("šļø Sources", expanded=False).tabs(
tabs=[f"Document {i+1}" for i in range(len(documents))]
)
for i, doc in enumerate(documents):
with tabs[i]:
# Check if this is an image document
is_image = doc.get('metadata', {}).get('type') == 'image'
if is_image and 'image_path' in doc.get('metadata', {}):
# Display image document
st.subheader("š¼ļø Image Source")
image_path = doc['metadata']['image_path']
try:
from PIL import Image
img = Image.open(image_path)
st.image(img, caption=f"Image ID: {doc['metadata'].get('image_id', 'unknown')}", use_column_width=True)
st.divider()
st.subheader(":blue[Image Details:]")
img_details = {
"page_number": doc['metadata'].get('page_number'),
"position": doc['metadata'].get('position'),
"extractor": doc['metadata'].get('extractor'),
"size": img.size if hasattr(img, 'size') else "unknown"
}
st.json(img_details, expanded=False)
except FileNotFoundError:
st.warning(f"ā ļø Image not found at: {image_path}")
except Exception as e:
st.error(f"ā Error loading image: {e}")
else:
# Display text document (original behavior)
st.subheader(":blue[Content:]")
st.markdown(doc['page_content'])
st.divider()
st.subheader(":blue[Source Details:]")
st.json(doc['metadata'], expanded=False)
# Metrics Display:
last_metrics = st.session_state.get('last_metrics', {})
if last_metrics and "error" not in last_metrics:
with st.expander("š Response Quality Metrics", expanded=True):
st.markdown("**LLM-based evaluation using DeepEval + Ollama (Reference-Free)**")
cols = st.columns(2)
# Answer Relevancy
with cols[0]:
relevancy_score = last_metrics.get("answer_relevancy", 0.0)
st.metric(
label="šÆ Answer Relevancy",
value=f"{relevancy_score:.2%}",
help="Measures how relevant the answer is to your question (0-100%)"
)
if relevancy_score >= 0.7:
st.success("ā Highly relevant", icon="ā
")
elif relevancy_score >= 0.5:
st.warning("ā Moderate", icon="ā ļø")
else:
st.error("ā Low relevance", icon="ā")
# Faithfulness
with cols[1]:
faithfulness_score = last_metrics.get("faithfulness", 0.0)
st.metric(
label="š Faithfulness",
value=f"{faithfulness_score:.2%}",
help="Measures how well the answer is grounded in the retrieved documents (0-100%)"
)
if faithfulness_score >= 0.7:
st.success("ā Well-grounded", icon="ā
")
elif faithfulness_score >= 0.5:
st.warning("ā Partial support", icon="ā ļø")
else:
st.error("ā Weak grounding", icon="ā")
st.caption("š” Metrics use reference-free LLM-as-Judge approach - no ground truth needed")
if user_message := st.chat_input(
placeholder="š¬ Ask anything about your documents... Attach [pdf, txt, md] files with š",
max_chars=2000,
accept_file='multiple',
file_type=['pdf', 'txt', 'md'],
):
# Create Message object from the user input:
new_message = Message(
type="human",
content=user_message.text,
filenames=[file.name for file in user_message.files] if user_message.files else None
)
# Save it to the chat:
st.session_state.chat_history.append(new_message)
# For now, write it on screen:
write_as_human(new_message.content, new_message.filenames)
# Clear last documents:
st.session_state.last_retrieved_docs = []
# Handle the files if any:
if user_message.files:
if handle_uploaded_files(user_message.files):
st.toast("Files processed successfully!", icon="ā
")
else:
st.error("Error processing files. Please try again.", icon="š«")
# Get response and write it:
with st.chat_message(name='assistant', avatar='assistant'):
with st.spinner("Generating response..."):
full = ""
# If dummy mode is enabled, use dummy response:
if st.session_state.get("dummy_mode", False):
resp_holder = st.empty()
response = requests.post(
f"{server_ip}/rag",
json={
"query": new_message.content,
"session_id": user_id,
"dummy": True
},
stream=True
)
for chunk in response.iter_content(chunk_size=None):
if chunk:
decoded = chunk.decode("utf-8")
decoded = json.loads(decoded)
if decoded["type"] == "content":
full += decoded["data"]
# elif decoded["type"] == "metadata":
# full += f"```json\n{json.dumps(decoded['data'], indent=2)}\n```\n\n\n"
# elif decoded["type"] == "context":
# documents.append(decoded['data'])
# else:
# st.error(decoded['data'])
# continue
resp_holder.markdown(full + "ā")
else: # real RAG response from server
response = requests.post(
f"{server_ip}/rag",
json={
"query": new_message.content,
"session_id": user_id,
"dummy": False
},
stream=True
)
documents = []
metrics = {}
resp_holder = st.container(border=True)
# Create placeholder containers that won't be replaced
reply_container = resp_holder.container(border=True)
reply_holder = reply_container.empty()
document_container = resp_holder.container()
document_holder = document_container.empty()
for chunk in response.iter_content(chunk_size=None):
print(" Received chunk... :", chunk, flush=True)
if chunk:
decoded = chunk.decode("utf-8")
decoded = json.loads(decoded)
print(" Decoded chunk: ", decoded, flush=True)
if decoded["type"] == "metadata":
# Skip metadata for now
continue
# full += f"```json\n{json.dumps(decoded['data'], indent=2)}\n```\n\n\n"
elif decoded["type"] == "context":
documents.append(decoded['data'])
elif decoded["type"] == "content":
full += decoded["data"]
elif decoded["type"] == "metrics":
metrics = decoded["data"]
st.session_state.last_metrics = metrics
print("šÆšÆšÆ METRICS RECEIVED:", metrics, flush=True)
st.toast("š Metrics received!", icon="ā
")
else:
st.error(decoded['data'])
continue
if documents:
docs = document_holder.expander("šļø Sources", expanded=True)
tabs = docs.tabs(
tabs=[f"Document {i+1}" for i in range(len(documents))])
for i, doc in enumerate(documents):
with tabs[i]:
st.subheader(":blue[Content:]")
st.markdown(doc['page_content'])
st.divider()
st.subheader(":blue[Source Details:]")
st.json(doc['metadata'], expanded=False)
print(" Updating reply_holder:", full)
reply_holder.markdown(full + "ā")
# Remove cursor after streaming completes
reply_holder.markdown(full)
# Debug: Check metrics state
print(f"š DEBUG: After streaming - metrics type={type(metrics)}, value={metrics}", flush=True)
print(f"š DEBUG: metrics bool={bool(metrics)}, has error={'error' in metrics if metrics else 'N/A'}", flush=True)
print(" Final response received: ", full)
st.session_state.last_retrieved_docs = documents
st.session_state.chat_history.append(Message("assistant", full))
st.rerun()