Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import time | |
| from contextlib import contextmanager | |
| from typing import Optional, Annotated | |
| from unicodedata import normalize | |
| import re | |
| import uuid | |
| import io | |
| import wave | |
| import numpy as np | |
| import onnxruntime as ort | |
| import scipy.io.wavfile | |
| import gradio as gr | |
| from .File_System import ROOT_DIR | |
| from app import _log_call_end, _log_call_start, _truncate_for_log | |
| from ._docstrings import autodoc | |
| try: | |
| import torch # type: ignore | |
| except Exception: # pragma: no cover | |
| torch = None # type: ignore | |
| try: | |
| from kokoro import KModel, KPipeline # type: ignore | |
| except Exception: # pragma: no cover | |
| KModel = None # type: ignore | |
| KPipeline = None # type: ignore | |
| try: | |
| from huggingface_hub import snapshot_download, list_repo_files | |
| except ImportError: | |
| snapshot_download = None | |
| list_repo_files = None | |
| # --- Supertonic Helper Classes & Functions --- | |
| class UnicodeProcessor: | |
| def __init__(self, unicode_indexer_path: str): | |
| with open(unicode_indexer_path, "r") as f: | |
| self.indexer = json.load(f) | |
| def _preprocess_text(self, text: str) -> str: | |
| # TODO: add more preprocessing | |
| text = normalize("NFKD", text) | |
| return text | |
| def _get_text_mask(self, text_ids_lengths: np.ndarray) -> np.ndarray: | |
| text_mask = length_to_mask(text_ids_lengths) | |
| return text_mask | |
| def _text_to_unicode_values(self, text: str) -> np.ndarray: | |
| unicode_values = np.array( | |
| [ord(char) for char in text], dtype=np.uint16 | |
| ) # 2 bytes | |
| return unicode_values | |
| def __call__(self, text_list: list[str]) -> tuple[np.ndarray, np.ndarray]: | |
| text_list = [self._preprocess_text(t) for t in text_list] | |
| text_ids_lengths = np.array([len(text) for text in text_list], dtype=np.int64) | |
| text_ids = np.zeros((len(text_list), text_ids_lengths.max()), dtype=np.int64) | |
| for i, text in enumerate(text_list): | |
| unicode_vals = self._text_to_unicode_values(text) | |
| text_ids[i, : len(unicode_vals)] = np.array( | |
| [self.indexer[val] for val in unicode_vals], dtype=np.int64 | |
| ) | |
| text_mask = self._get_text_mask(text_ids_lengths) | |
| return text_ids, text_mask | |
| class Style: | |
| def __init__(self, style_ttl_onnx: np.ndarray, style_dp_onnx: np.ndarray): | |
| self.ttl = style_ttl_onnx | |
| self.dp = style_dp_onnx | |
| class TextToSpeech: | |
| def __init__( | |
| self, | |
| cfgs: dict, | |
| text_processor: UnicodeProcessor, | |
| dp_ort: ort.InferenceSession, | |
| text_enc_ort: ort.InferenceSession, | |
| vector_est_ort: ort.InferenceSession, | |
| vocoder_ort: ort.InferenceSession, | |
| ): | |
| self.cfgs = cfgs | |
| self.text_processor = text_processor | |
| self.dp_ort = dp_ort | |
| self.text_enc_ort = text_enc_ort | |
| self.vector_est_ort = vector_est_ort | |
| self.vocoder_ort = vocoder_ort | |
| self.sample_rate = cfgs["ae"]["sample_rate"] | |
| self.base_chunk_size = cfgs["ae"]["base_chunk_size"] | |
| self.chunk_compress_factor = cfgs["ttl"]["chunk_compress_factor"] | |
| self.ldim = cfgs["ttl"]["latent_dim"] | |
| def sample_noisy_latent( | |
| self, duration: np.ndarray | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| bsz = len(duration) | |
| wav_len_max = duration.max() * self.sample_rate | |
| wav_lengths = (duration * self.sample_rate).astype(np.int64) | |
| chunk_size = self.base_chunk_size * self.chunk_compress_factor | |
| latent_len = ((wav_len_max + chunk_size - 1) / chunk_size).astype(np.int32) | |
| latent_dim = self.ldim * self.chunk_compress_factor | |
| noisy_latent = np.random.randn(bsz, latent_dim, latent_len).astype(np.float32) | |
| latent_mask = get_latent_mask( | |
| wav_lengths, self.base_chunk_size, self.chunk_compress_factor | |
| ) | |
| noisy_latent = noisy_latent * latent_mask | |
| return noisy_latent, latent_mask | |
| def _infer( | |
| self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05 | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| assert ( | |
| len(text_list) == style.ttl.shape[0] | |
| ), "Number of texts must match number of style vectors" | |
| bsz = len(text_list) | |
| text_ids, text_mask = self.text_processor(text_list) | |
| dur_onnx, *_ = self.dp_ort.run( | |
| None, {"text_ids": text_ids, "style_dp": style.dp, "text_mask": text_mask} | |
| ) | |
| dur_onnx = dur_onnx / speed | |
| text_emb_onnx, *_ = self.text_enc_ort.run( | |
| None, | |
| {"text_ids": text_ids, "style_ttl": style.ttl, "text_mask": text_mask}, | |
| ) # dur_onnx: [bsz] | |
| xt, latent_mask = self.sample_noisy_latent(dur_onnx) | |
| total_step_np = np.array([total_step] * bsz, dtype=np.float32) | |
| for step in range(total_step): | |
| current_step = np.array([step] * bsz, dtype=np.float32) | |
| xt, *_ = self.vector_est_ort.run( | |
| None, | |
| { | |
| "noisy_latent": xt, | |
| "text_emb": text_emb_onnx, | |
| "style_ttl": style.ttl, | |
| "text_mask": text_mask, | |
| "latent_mask": latent_mask, | |
| "current_step": current_step, | |
| "total_step": total_step_np, | |
| }, | |
| ) | |
| wav, *_ = self.vocoder_ort.run(None, {"latent": xt}) | |
| return wav, dur_onnx | |
| def __call__( | |
| self, | |
| text: str, | |
| style: Style, | |
| total_step: int, | |
| speed: float = 1.05, | |
| silence_duration: float = 0.3, | |
| max_len: int = 300, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| assert ( | |
| style.ttl.shape[0] == 1 | |
| ), "Single speaker text to speech only supports single style" | |
| text_list = chunk_text(text, max_len=max_len) | |
| wav_cat = None | |
| dur_cat = None | |
| for text in text_list: | |
| wav, dur_onnx = self._infer([text], style, total_step, speed) | |
| if wav_cat is None: | |
| wav_cat = wav | |
| dur_cat = dur_onnx | |
| else: | |
| silence = np.zeros( | |
| (1, int(silence_duration * self.sample_rate)), dtype=np.float32 | |
| ) | |
| wav_cat = np.concatenate([wav_cat, silence, wav], axis=1) | |
| dur_cat += dur_onnx + silence_duration | |
| return wav_cat, dur_cat | |
| def stream( | |
| self, | |
| text: str, | |
| style: Style, | |
| total_step: int, | |
| speed: float = 1.05, | |
| silence_duration: float = 0.3, | |
| max_len: int = 300, | |
| ): | |
| assert ( | |
| style.ttl.shape[0] == 1 | |
| ), "Single speaker text to speech only supports single style" | |
| text_list = chunk_text(text, max_len=max_len) | |
| for i, text in enumerate(text_list): | |
| wav, _ = self._infer([text], style, total_step, speed) | |
| yield wav.flatten() | |
| if i < len(text_list) - 1: | |
| silence = np.zeros( | |
| (int(silence_duration * self.sample_rate),), dtype=np.float32 | |
| ) | |
| yield silence | |
| def batch( | |
| self, text_list: list[str], style: Style, total_step: int, speed: float = 1.05 | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| return self._infer(text_list, style, total_step, speed) | |
| def length_to_mask(lengths: np.ndarray, max_len: Optional[int] = None) -> np.ndarray: | |
| """ | |
| Convert lengths to binary mask. | |
| Args: | |
| lengths: (B,) | |
| max_len: int | |
| Returns: | |
| mask: (B, 1, max_len) | |
| """ | |
| max_len = max_len or lengths.max() | |
| ids = np.arange(0, max_len) | |
| mask = (ids < np.expand_dims(lengths, axis=1)).astype(np.float32) | |
| return mask.reshape(-1, 1, max_len) | |
| def get_latent_mask( | |
| wav_lengths: np.ndarray, base_chunk_size: int, chunk_compress_factor: int | |
| ) -> np.ndarray: | |
| latent_size = base_chunk_size * chunk_compress_factor | |
| latent_lengths = (wav_lengths + latent_size - 1) // latent_size | |
| latent_mask = length_to_mask(latent_lengths) | |
| return latent_mask | |
| def load_onnx( | |
| onnx_path: str, opts: ort.SessionOptions, providers: list[str] | |
| ) -> ort.InferenceSession: | |
| return ort.InferenceSession(onnx_path, sess_options=opts, providers=providers) | |
| def load_onnx_all( | |
| onnx_dir: str, opts: ort.SessionOptions, providers: list[str] | |
| ) -> tuple[ | |
| ort.InferenceSession, | |
| ort.InferenceSession, | |
| ort.InferenceSession, | |
| ort.InferenceSession, | |
| ]: | |
| dp_onnx_path = os.path.join(onnx_dir, "duration_predictor.onnx") | |
| text_enc_onnx_path = os.path.join(onnx_dir, "text_encoder.onnx") | |
| vector_est_onnx_path = os.path.join(onnx_dir, "vector_estimator.onnx") | |
| vocoder_onnx_path = os.path.join(onnx_dir, "vocoder.onnx") | |
| dp_ort = load_onnx(dp_onnx_path, opts, providers) | |
| text_enc_ort = load_onnx(text_enc_onnx_path, opts, providers) | |
| vector_est_ort = load_onnx(vector_est_onnx_path, opts, providers) | |
| vocoder_ort = load_onnx(vocoder_onnx_path, opts, providers) | |
| return dp_ort, text_enc_ort, vector_est_ort, vocoder_ort | |
| def load_cfgs(onnx_dir: str) -> dict: | |
| cfg_path = os.path.join(onnx_dir, "tts.json") | |
| with open(cfg_path, "r") as f: | |
| cfgs = json.load(f) | |
| return cfgs | |
| def load_text_processor(onnx_dir: str) -> UnicodeProcessor: | |
| unicode_indexer_path = os.path.join(onnx_dir, "unicode_indexer.json") | |
| text_processor = UnicodeProcessor(unicode_indexer_path) | |
| return text_processor | |
| def load_text_to_speech(onnx_dir: str, use_gpu: bool = False) -> TextToSpeech: | |
| opts = ort.SessionOptions() | |
| if use_gpu: | |
| raise NotImplementedError("GPU mode is not fully tested") | |
| else: | |
| providers = ["CPUExecutionProvider"] | |
| print("Using CPU for inference") | |
| cfgs = load_cfgs(onnx_dir) | |
| dp_ort, text_enc_ort, vector_est_ort, vocoder_ort = load_onnx_all( | |
| onnx_dir, opts, providers | |
| ) | |
| text_processor = load_text_processor(onnx_dir) | |
| return TextToSpeech( | |
| cfgs, text_processor, dp_ort, text_enc_ort, vector_est_ort, vocoder_ort | |
| ) | |
| def load_voice_style(voice_style_paths: list[str], verbose: bool = False) -> Style: | |
| bsz = len(voice_style_paths) | |
| # Read first file to get dimensions | |
| with open(voice_style_paths[0], "r") as f: | |
| first_style = json.load(f) | |
| ttl_dims = first_style["style_ttl"]["dims"] | |
| dp_dims = first_style["style_dp"]["dims"] | |
| # Pre-allocate arrays with full batch size | |
| ttl_style = np.zeros([bsz, ttl_dims[1], ttl_dims[2]], dtype=np.float32) | |
| dp_style = np.zeros([bsz, dp_dims[1], dp_dims[2]], dtype=np.float32) | |
| # Fill in the data | |
| for i, voice_style_path in enumerate(voice_style_paths): | |
| with open(voice_style_path, "r") as f: | |
| voice_style = json.load(f) | |
| ttl_data = np.array( | |
| voice_style["style_ttl"]["data"], dtype=np.float32 | |
| ).flatten() | |
| ttl_style[i] = ttl_data.reshape(ttl_dims[1], ttl_dims[2]) | |
| dp_data = np.array( | |
| voice_style["style_dp"]["data"], dtype=np.float32 | |
| ).flatten() | |
| dp_style[i] = dp_data.reshape(dp_dims[1], dp_dims[2]) | |
| if verbose: | |
| print(f"Loaded {bsz} voice styles") | |
| return Style(ttl_style, dp_style) | |
| def timer(name: str): | |
| start = time.time() | |
| print(f"{name}...") | |
| yield | |
| print(f" -> {name} completed in {time.time() - start:.2f} sec") | |
| def sanitize_filename(text: str, max_len: int) -> str: | |
| """Sanitize filename by replacing non-alphanumeric characters with underscores""" | |
| prefix = text[:max_len] | |
| return re.sub(r"[^a-zA-Z0-9]", "_", prefix) | |
| def chunk_text(text: str, max_len: int = 300) -> list[str]: | |
| """ | |
| Split text into chunks by paragraphs and sentences. | |
| Args: | |
| text: Input text to chunk | |
| max_len: Maximum length of each chunk (default: 300) | |
| Returns: | |
| List of text chunks | |
| """ | |
| # Split by paragraph (two or more newlines) | |
| paragraphs = [p.strip() for p in re.split(r"\n\s*\n+", text.strip()) if p.strip()] | |
| chunks = [] | |
| for paragraph in paragraphs: | |
| paragraph = paragraph.strip() | |
| if not paragraph: | |
| continue | |
| # Split by sentence boundaries (period, question mark, exclamation mark followed by space) | |
| # But exclude common abbreviations like Mr., Mrs., Dr., etc. and single capital letters like F. | |
| pattern = r"(?<!Mr\.)(?<!Mrs\.)(?<!Ms\.)(?<!Dr\.)(?<!Prof\.)(?<!Sr\.)(?<!Jr\.)(?<!Ph\.D\.)(?<!etc\.)(?<!e\.g\.)(?<!i\.e\.)(?<!vs\.)(?<!Inc\.)(?<!Ltd\.)(?<!Co\.)(?<!Corp\.)(?<!St\.)(?<!Ave\.)(?<!Blvd\.)(?<!\b[A-Z]\.)(?<=[.!?])\s+" | |
| sentences = re.split(pattern, paragraph) | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) + 1 <= max_len: | |
| current_chunk += (" " if current_chunk else "") + sentence | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| # --- Main Tool Logic --- | |
| # --- Kokoro State --- | |
| _KOKORO_STATE = { | |
| "initialized": False, | |
| "device": "cpu", | |
| "model": None, | |
| "pipelines": {}, | |
| } | |
| # --- Supertonic State --- | |
| _SUPERTONIC_STATE = { | |
| "initialized": False, | |
| "tts": None, | |
| "assets_dir": None, | |
| } | |
| def _audio_np_to_int16(audio_np: np.ndarray) -> np.ndarray: | |
| audio_clipped = np.clip(audio_np, -1.0, 1.0) | |
| return (audio_clipped * 32767.0).astype(np.int16) | |
| # --- Kokoro Functions --- | |
| def get_kokoro_voices() -> list[str]: | |
| try: | |
| if list_repo_files: | |
| files = list_repo_files("hexgrad/Kokoro-82M") | |
| voice_files = [file for file in files if file.endswith(".pt") and file.startswith("voices/")] | |
| voices = [file.replace("voices/", "").replace(".pt", "") for file in voice_files] | |
| return sorted(voices) if voices else _get_fallback_voices() | |
| return _get_fallback_voices() | |
| except Exception: | |
| return _get_fallback_voices() | |
| def _get_fallback_voices() -> list[str]: | |
| return [ | |
| "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", | |
| "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", "am_michael", "am_onyx", "am_puck", "am_santa", | |
| "bf_alice", "bf_emma", "bf_isabella", "bf_lily", | |
| "bm_daniel", "bm_fable", "bm_george", "bm_lewis", | |
| "ef_dora", "em_alex", "em_santa", | |
| "ff_siwis", | |
| "hf_alpha", "hf_beta", "hm_omega", "hm_psi", | |
| "if_sara", "im_nicola", | |
| "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", | |
| "pf_dora", "pm_alex", "pm_santa", | |
| "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", | |
| "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang", | |
| ] | |
| def _init_kokoro() -> None: | |
| if _KOKORO_STATE["initialized"]: | |
| return | |
| if KModel is None or KPipeline is None: | |
| raise RuntimeError("Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4).") | |
| device = "cpu" | |
| if torch is not None: | |
| try: | |
| if torch.cuda.is_available(): | |
| device = "cuda" | |
| except Exception: | |
| device = "cpu" | |
| model = KModel(repo_id="hexgrad/Kokoro-82M").to(device).eval() | |
| pipelines = {"a": KPipeline(lang_code="a", model=False, repo_id="hexgrad/Kokoro-82M")} | |
| try: | |
| pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" | |
| except Exception: | |
| pass | |
| _KOKORO_STATE.update({"initialized": True, "device": device, "model": model, "pipelines": pipelines}) | |
| # --- Supertonic Functions --- | |
| def _init_supertonic() -> None: | |
| if _SUPERTONIC_STATE["initialized"]: | |
| return | |
| if snapshot_download is None: | |
| raise RuntimeError("huggingface_hub is not installed.") | |
| # Use a local assets directory within Nymbo-Tools | |
| # Assuming this file is in Nymbo-Tools/Modules | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| assets_dir = os.path.join(base_dir, "assets", "supertonic") | |
| if not os.path.exists(assets_dir): | |
| print(f"Downloading Supertonic models to {assets_dir}...") | |
| snapshot_download(repo_id="Supertone/supertonic", local_dir=assets_dir) | |
| onnx_dir = os.path.join(assets_dir, "onnx") | |
| tts = load_text_to_speech(onnx_dir, use_gpu=False) | |
| _SUPERTONIC_STATE.update({"initialized": True, "tts": tts, "assets_dir": assets_dir}) | |
| def get_supertonic_voices() -> list[str]: | |
| # We need assets to list voices. If not initialized, try to find them or init. | |
| if not _SUPERTONIC_STATE["initialized"]: | |
| # Check if assets exist without full init | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| assets_dir = os.path.join(base_dir, "assets", "supertonic") | |
| if not os.path.exists(assets_dir): | |
| # If we can't list, return a default list or empty | |
| return ["F1", "F2", "M1", "M2"] # Known defaults | |
| else: | |
| assets_dir = _SUPERTONIC_STATE["assets_dir"] | |
| voice_styles_dir = os.path.join(assets_dir, "voice_styles") | |
| if not os.path.exists(voice_styles_dir): | |
| return ["F1", "F2", "M1", "M2"] | |
| files = os.listdir(voice_styles_dir) | |
| voices = [f.replace('.json', '') for f in files if f.endswith('.json')] | |
| return sorted(voices) | |
| def List_Kokoro_Voices() -> list[str]: | |
| return get_kokoro_voices() | |
| def List_Supertonic_Voices() -> list[str]: | |
| return get_supertonic_voices() | |
| # Single source of truth for the LLM-facing tool description | |
| TOOL_SUMMARY = ( | |
| "Synthesize speech from text using Supertonic-66M (default) or Kokoro-82M. " | |
| "Supertonic: faster, supports steps/silence/chunking. " | |
| "Kokoro: slower, supports many languages/accents. " | |
| "Return the generated media to the user in this format ``." | |
| ) | |
| def Generate_Speech( | |
| text: Annotated[str, "The text to synthesize (English)."], | |
| model: Annotated[str, "The TTS model to use: 'Supertonic' or 'Kokoro'."] = "Supertonic", | |
| speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.3, | |
| steps: Annotated[int, "Supertonic only. Diffusion steps (1-50). Higher = better quality but slower."] = 5, | |
| voice: Annotated[str, "Voice identifier. Default 'F1' for Supertonic, 'af_heart' for Kokoro."] = "F1", | |
| silence_duration: Annotated[float, "Supertonic only. Silence duration between chunks (0.0-2.0s)."] = 0.3, | |
| max_chunk_size: Annotated[int, "Supertonic only. Max text chunk length (50-1000)."] = 300, | |
| ) -> str: | |
| _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), model=model, speed=speed, voice=voice) | |
| if not text or not text.strip(): | |
| try: | |
| _log_call_end("Generate_Speech", "error=empty text") | |
| finally: | |
| pass | |
| raise gr.Error("Please provide non-empty text to synthesize.") | |
| model_lower = model.lower() | |
| # Handle default voice switching if user didn't specify appropriate voice for model | |
| if model_lower == "kokoro" and voice == "F1": | |
| voice = "af_heart" | |
| elif model_lower == "supertonic" and voice == "af_heart": | |
| voice = "F1" | |
| try: | |
| if model_lower == "kokoro": | |
| return _generate_kokoro(text, speed, voice) | |
| else: | |
| # Default to Supertonic | |
| return _generate_supertonic(text, speed, voice, steps, silence_duration, max_chunk_size) | |
| except gr.Error as exc: | |
| _log_call_end("Generate_Speech", f"gr_error={str(exc)}") | |
| raise | |
| except Exception as exc: # pylint: disable=broad-except | |
| _log_call_end("Generate_Speech", f"error={str(exc)[:120]}") | |
| raise gr.Error(f"Error during speech generation: {exc}") | |
| def _generate_kokoro(text: str, speed: float, voice: str) -> str: | |
| _init_kokoro() | |
| model = _KOKORO_STATE["model"] | |
| pipelines = _KOKORO_STATE["pipelines"] | |
| pipeline = pipelines.get("a") | |
| if pipeline is None: | |
| raise gr.Error("Kokoro English pipeline not initialized.") | |
| audio_segments = [] | |
| pack = pipeline.load_voice(voice) | |
| segments = list(pipeline(text, voice, speed)) | |
| total_segments = len(segments) | |
| for segment_idx, (text_chunk, ps, _) in enumerate(segments): | |
| ref_s = pack[len(ps) - 1] | |
| try: | |
| audio = model(ps, ref_s, float(speed)) | |
| audio_segments.append(audio.detach().cpu().numpy()) | |
| if total_segments > 10 and (segment_idx + 1) % 5 == 0: | |
| print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") | |
| except Exception as exc: | |
| raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {exc}") | |
| if not audio_segments: | |
| raise gr.Error("No audio was generated (empty synthesis result).") | |
| if len(audio_segments) == 1: | |
| final_audio = audio_segments[0] | |
| else: | |
| final_audio = np.concatenate(audio_segments, axis=0) | |
| if total_segments > 1: | |
| duration = len(final_audio) / 24_000 | |
| print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") | |
| # Save to file | |
| filename = f"speech_kokoro_{uuid.uuid4().hex[:8]}.wav" | |
| output_path = os.path.join(ROOT_DIR, filename) | |
| # Normalize to 16-bit PCM | |
| audio_int16 = (final_audio * 32767).astype(np.int16) | |
| scipy.io.wavfile.write(output_path, 24000, audio_int16) | |
| _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/24_000:.2f}") | |
| return output_path | |
| def _generate_supertonic(text: str, speed: float, voice: str, steps: int, silence_duration: float, max_chunk_size: int) -> str: | |
| _init_supertonic() | |
| tts = _SUPERTONIC_STATE["tts"] | |
| assets_dir = _SUPERTONIC_STATE["assets_dir"] | |
| voice_path = os.path.join(assets_dir, "voice_styles", f"{voice}.json") | |
| if not os.path.exists(voice_path): | |
| # Fallback or error? | |
| # Try to find if it's just a name mismatch or use default | |
| if not os.path.exists(voice_path): | |
| raise gr.Error(f"Voice style {voice} not found for Supertonic.") | |
| style = load_voice_style([voice_path]) | |
| sr = tts.sample_rate | |
| # Supertonic returns a generator of chunks, or we can use __call__ for full audio | |
| # Using __call__ to get full audio for saving | |
| # But __call__ returns (wav_cat, dur_cat) | |
| wav_cat, _ = tts(text, style, steps, speed, silence_duration, max_chunk_size) | |
| if wav_cat is None or wav_cat.size == 0: | |
| raise gr.Error("No audio generated.") | |
| # wav_cat is (1, samples) float32 | |
| final_audio = wav_cat.flatten() | |
| # Save to file | |
| filename = f"speech_supertonic_{uuid.uuid4().hex[:8]}.wav" | |
| output_path = os.path.join(ROOT_DIR, filename) | |
| audio_int16 = _audio_np_to_int16(final_audio) | |
| scipy.io.wavfile.write(output_path, sr, audio_int16) | |
| _log_call_end("Generate_Speech", f"saved_to={os.path.basename(output_path)} duration_sec={len(final_audio)/sr:.2f}") | |
| return output_path | |
| def build_interface() -> gr.Interface: | |
| kokoro_voices = get_kokoro_voices() | |
| supertonic_voices = get_supertonic_voices() | |
| all_voices = sorted(list(set(kokoro_voices + supertonic_voices))) | |
| return gr.Interface( | |
| fn=Generate_Speech, | |
| inputs=[ | |
| gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4, info="The text to synthesize (English)"), | |
| gr.Dropdown(label="Model", choices=["Supertonic", "Kokoro"], value="Supertonic", info="The TTS model to use"), | |
| gr.Slider(minimum=0.5, maximum=2.0, value=1.3, step=0.1, label="Speed", info="Speech speed multiplier (1.0 = normal)"), | |
| gr.Slider(minimum=1, maximum=50, value=5, step=1, label="Steps", info="Supertonic only: Diffusion steps (1-50)"), | |
| gr.Dropdown( | |
| label="Voice", | |
| choices=all_voices, | |
| value="F1", | |
| info="Select voice (F1/F2/M1/M2 for Supertonic, others for Kokoro)", | |
| ), | |
| gr.Slider(minimum=0.0, maximum=2.0, value=0.3, step=0.1, label="Silence Duration", info="Supertonic only: Silence duration between chunks"), | |
| gr.Slider(minimum=50, maximum=1000, value=300, step=10, label="Max Chunk Size", info="Supertonic only: Max text chunk length"), | |
| ], | |
| outputs=gr.Audio(label="Audio", type="filepath", format="wav"), | |
| title="Generate Speech", | |
| description=( | |
| "<div style=\"text-align:center\">Generate speech with Supertonic-66M or Kokoro-82M. Runs on CPU.</div>" | |
| ), | |
| api_description=TOOL_SUMMARY, | |
| flagging_mode="never", | |
| ) | |
| __all__ = ["Generate_Speech", "List_Kokoro_Voices", "List_Supertonic_Voices", "build_interface"] | |