add mp3 streaming for voicevox

This commit is contained in:
mii443
2025-04-11 15:10:01 +09:00
parent 257c8511e3
commit f7e08b4e2e
7 changed files with 163 additions and 16 deletions

View File

@ -24,6 +24,10 @@ opentelemetry-semantic-conventions = "0.29.0"
opentelemetry-otlp = { version = "0.29.0", features = ["grpc-tonic"] }
opentelemetry-stdout = "0.29.0"
tracing-opentelemetry = "0.30.0"
symphonia-core = "0.5.4"
tokio-util = { version = "0.7.14", features = ["compat"] }
futures = "0.3.31"
bytes = "1.10.1"
[dependencies.uuid]
version = "0.8"

View File

@ -5,6 +5,7 @@ mod database;
mod event_handler;
mod events;
mod implement;
mod stream_input;
mod trace;
mod tts;

93
src/stream_input.rs Normal file
View File

@ -0,0 +1,93 @@
use async_trait::async_trait;
use futures::TryStreamExt;
use reqwest::{header::HeaderMap, Client};
use symphonia_core::{io::MediaSource, probe::Hint};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use songbird::input::{
AsyncAdapterStream, AsyncReadOnlySource, AudioStream, AudioStreamError, Compose, Input,
};
#[derive(Debug, Clone)]
pub struct Mp3Request {
client: Client,
request: String,
headers: HeaderMap,
}
impl Mp3Request {
#[must_use]
pub fn new(client: Client, request: String) -> Self {
Self::new_with_headers(client, request, HeaderMap::default())
}
#[must_use]
pub fn new_with_headers(client: Client, request: String, headers: HeaderMap) -> Self {
Mp3Request {
client,
request,
headers,
}
}
async fn create_stream_async(&self) -> Result<AsyncReadOnlySource, AudioStreamError> {
let request = self
.client
.get(&self.request)
.headers(self.headers.clone())
.build()
.map_err(|why| AudioStreamError::Fail(why.into()))?;
let response = self
.client
.execute(request)
.await
.map_err(|why| AudioStreamError::Fail(why.into()))?;
if !response.status().is_success() {
return Err(AudioStreamError::Fail(
format!("HTTP error: {}", response.status()).into(),
));
}
let byte_stream = response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()));
let tokio_reader = byte_stream.into_async_read().compat();
Ok(AsyncReadOnlySource::new(tokio_reader))
}
}
#[async_trait]
impl Compose for Mp3Request {
fn create(&mut self) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
Err(AudioStreamError::Fail(
"Mp3Request::create must be called in an async context via create_async".into(),
))
}
async fn create_async(
&mut self,
) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
let input = self.create_stream_async().await?;
let stream = AsyncAdapterStream::new(Box::new(input), 64 * 1024);
let hint = Hint::new().with_extension("mp3").clone();
Ok(AudioStream {
input: Box::new(stream) as Box<dyn MediaSource>,
hint: Some(hint),
})
}
fn should_create_async(&self) -> bool {
true
}
}
impl From<Mp3Request> for Input {
fn from(val: Mp3Request) -> Self {
Input::Lazy(Box::new(val))
}
}

View File

@ -1,8 +1,8 @@
use std::num::NonZeroUsize;
use std::sync::RwLock;
use std::{num::NonZeroUsize, sync::Arc};
use lru::LruCache;
use songbird::{driver::Bitrate, input::cached::Compressed};
use songbird::{driver::Bitrate, input::cached::Compressed, tracks::Track};
use tracing::info;
use super::{
@ -20,7 +20,7 @@ use super::{
pub struct TTS {
pub voicevox_client: VOICEVOX,
gcp_tts_client: GCPTTS,
cache: RwLock<LruCache<CacheKey, Compressed>>,
cache: Arc<RwLock<LruCache<CacheKey, Compressed>>>,
}
#[derive(Hash, PartialEq, Eq)]
@ -34,7 +34,7 @@ impl TTS {
Self {
voicevox_client,
gcp_tts_client,
cache: RwLock::new(LruCache::new(NonZeroUsize::new(1000).unwrap())),
cache: Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(1000).unwrap()))),
}
}
@ -43,7 +43,7 @@ impl TTS {
&self,
text: &str,
speaker: i64,
) -> Result<Compressed, Box<dyn std::error::Error>> {
) -> Result<Track, Box<dyn std::error::Error>> {
let cache_key = CacheKey::Voicevox(text.to_string(), speaker);
let cached_audio = {
@ -53,24 +53,27 @@ impl TTS {
if let Some(audio) = cached_audio {
info!("Cache hit for VOICEVOX TTS");
return Ok(audio);
return Ok(audio.into());
}
info!("Cache miss for VOICEVOX TTS");
let audio = self
.voicevox_client
.synthesize(text.to_string(), speaker)
.synthesize_stream(text.to_string(), speaker)
.await?;
let compressed = Compressed::new(audio.into(), Bitrate::Auto).await?;
tokio::spawn({
let cache = self.cache.clone();
let audio = audio.clone();
async move {
info!("Compressing stream audio");
let compressed = Compressed::new(audio.into(), Bitrate::Auto).await.unwrap();
let mut cache_guard = cache.write().unwrap();
cache_guard.put(cache_key, compressed.clone());
}
});
{
let mut cache_guard = self.cache.write().unwrap();
cache_guard.put(cache_key, compressed.clone());
}
Ok(compressed)
Ok(audio.into())
}
#[tracing::instrument]

View File

@ -2,3 +2,4 @@ pub mod accent_phrase;
pub mod audio_query;
pub mod mora;
pub mod speaker;
pub mod stream;

View File

@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TTSResponse {
pub success: bool,
pub is_api_key_valid: bool,
pub speaker_name: String,
pub audio_status_url: String,
pub wav_download_url: String,
pub mp3_download_url: String,
pub mp3_streaming_url: String,
}

View File

@ -1,4 +1,6 @@
use super::structs::speaker::Speaker;
use crate::stream_input::Mp3Request;
use super::structs::{speaker::Speaker, stream::TTSResponse};
const BASE_API_URL: &str = "https://deprecatedapis.tts.quest/v2/";
@ -76,4 +78,34 @@ impl VOICEVOX {
Err(err) => Err(Box::new(err)),
}
}
#[tracing::instrument]
pub async fn synthesize_stream(
&self,
text: String,
speaker: i64,
) -> Result<Mp3Request, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
match client
.post("https://api.tts.quest/v3/voicevox/synthesis")
.query(&[
("speaker", speaker.to_string()),
("text", text),
("key", self.key.clone()),
])
.send()
.await
{
Ok(response) => {
let body = response.text().await.unwrap();
let response: TTSResponse = serde_json::from_str(&body).unwrap();
Ok(Mp3Request::new(
reqwest::Client::new(),
response.mp3_streaming_url,
))
}
Err(err) => Err(Box::new(err)),
}
}
}