| import hvplot.streamz | |
| import pandas as pd | |
| import numpy as np | |
| from streamz import Stream | |
| from streamz.dataframe import DataFrame | |
| from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message | |
| import datetime | |
| import queue | |
| import threading | |
| import time | |
| import os | |
| import json | |
| from huggingface_hub import CommitScheduler, HfApi, hf_hub_download | |
| import uuid | |
| from pathlib import Path | |
| import panel as pn | |
| pn.extension(design="material") | |
| # Create a queue to communicate between threads | |
| post_queue = queue.Queue() | |
| # Counter for posts | |
| post_count = 0 | |
| # Create streaming dataframe | |
| stream = Stream() | |
| # Wait 1 second to collect initial data | |
| time.sleep(1) | |
| example = pd.DataFrame( | |
| {"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0] | |
| ) | |
| df = DataFrame(stream, example=example) | |
| # Calculate backlog for 24 hours | |
| DAY_IN_SECONDS = 24 * 60 * 60 # 24 hours * 60 minutes * 60 seconds | |
| # Add environment variable support for configuration | |
| REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts") | |
| REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset") | |
| HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Required for HuggingFace API access | |
| DATA_FOLDER = Path("bluesky_data") | |
| DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json" | |
| # def load_hub_data(): | |
| # """Load the most recent data from the Hub""" | |
| # try: | |
| # api = HfApi(token=HF_TOKEN) | |
| # # List files in the repository | |
| # files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE) | |
| # data_files = [f for f in files if f.startswith("data/bluesky_counts_")] | |
| # if not data_files: | |
| # return [] | |
| # # Get the most recent file | |
| # latest_file = sorted(data_files)[-1] | |
| # # Download the file | |
| # local_path = hf_hub_download( | |
| # repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN | |
| # ) | |
| # # Load and parse the data | |
| # data = [] | |
| # with open(local_path, "r") as f: | |
| # data.extend(json.loads(line.strip()) for line in f) | |
| # # Keep only last 24 hours of data | |
| # return data[-DAY_IN_SECONDS:] | |
| # except Exception as e: | |
| # print(f"Error loading data from Hub: {e}") | |
| # return [] | |
| # Initialize storage and Hub connection | |
| # DATA_FOLDER.mkdir(exist_ok=True) | |
| # scheduler = CommitScheduler( | |
| # repo_id=REPO_ID, | |
| # repo_type=REPO_TYPE, | |
| # folder_path=DATA_FOLDER, | |
| # path_in_repo="data", | |
| # every=600, # Upload every 10 minutes | |
| # token=HF_TOKEN, # Add token for authentication | |
| # ) | |
| def on_message_handler(message): | |
| global post_count | |
| commit = parse_subscribe_repos_message(message) | |
| # Only count new posts (not likes, reposts, etc) | |
| if hasattr(commit, "ops"): | |
| for op in commit.ops: | |
| if op.action == "create" and "app.bsky.feed.post" in op.path: | |
| post_count += 1 | |
| def emit_counts(): | |
| """Emit post counts every second""" | |
| global post_count | |
| # if saved_data := load_hub_data(): | |
| # print(f"Loaded {len(saved_data)} historical data points from Hub") | |
| # # Emit historical data | |
| # for point in saved_data[-100:]: # Emit last 100 points to initialize plot | |
| # df = pd.DataFrame( | |
| # { | |
| # "timestamp": [pd.Timestamp(point["timestamp"])], | |
| # "post_count": [point["post_count"]], | |
| # } | |
| # ) | |
| # stream.emit(df) | |
| # Wait for first second to collect initial data | |
| time.sleep(1) | |
| while True: | |
| # Create DataFrame with current timestamp and count | |
| now = pd.Timestamp.now() | |
| df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]}) | |
| stream.emit(df) | |
| # Reset counter | |
| post_count = 0 | |
| # Wait 1 second | |
| time.sleep(1) | |
| # Create the plot with 24-hour backlog | |
| plot = df.hvplot.line( | |
| "timestamp", | |
| "post_count", | |
| title="Bluesky Posts per Second (Last 24 Hours)", | |
| width=800, | |
| height=400, | |
| backlog=DAY_IN_SECONDS, # Keep last 24 hours of points | |
| ) | |
| # Start Firehose client in a separate thread | |
| def run_firehose(): | |
| client = FirehoseSubscribeReposClient() | |
| client.start(on_message_handler) | |
| firehose_thread = threading.Thread(target=run_firehose) | |
| firehose_thread.daemon = True | |
| firehose_thread.start() | |
| # Start emitting counts in another thread | |
| emit_thread = threading.Thread(target=emit_counts) | |
| emit_thread.daemon = True | |
| emit_thread.start() | |
| # If running in a Jupyter notebook, display the plot | |
| if __name__ == "__main__": | |
| import panel as pn | |
| pn.extension() | |
| dashboard = pn.Column(pn.pane.HoloViews(plot)) | |
| # Update server configuration for Docker | |
| pn.serve( | |
| dashboard, | |
| address="0.0.0.0", | |
| port=7860, | |
| allow_websocket_origin=["*"], | |
| show=False, | |
| ) |