use std::{ cmp::min, io::Read, sync::{Arc, Mutex}, }; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, Device, Stream, StreamConfig, SupportedStreamConfig, }; use crate::{args::Run, device::virtual_device::VirtualDevice}; fn reshape_audio_data(input: &[T], channels: usize) -> Vec> where T: Clone, { let mut output = vec![vec![]; channels]; for frame in input.chunks(channels) { for (i, sample) in frame.iter().enumerate() { output[i].push(sample.clone()); } } output } fn to_flat_audio_data(input: &[Vec]) -> Vec where T: Clone, { let channels = input.len(); let frames = input[0].len(); let mut output = vec![]; for i in 0..frames { (0..channels).for_each(|j| { output.push(input[j][i].clone()); }); } output } pub fn run(run: Run) { let config = std::path::Path::new(&run.config); if !config.exists() { eprintln!("Config file does not exists."); return; } let mut buf = String::default(); let mut config = if let Ok(config) = std::fs::File::open(config) { config } else { eprintln!("Cannot open config file."); return; }; config.read_to_string(&mut buf).unwrap(); let config: crate::config::Config = serde_yaml::from_str(&buf).unwrap(); let host = cpal::default_host(); let input_devices: Vec = host.input_devices().unwrap().collect(); let output_devices: Vec = host.output_devices().unwrap().collect(); // create virtual devices let mut virtual_devices = Vec::new(); for virtual_device in config.virtual_devices { let device = crate::device::virtual_device::VirtualDevice::new( virtual_device.name, virtual_device.channels, virtual_device.sample_rate, ); virtual_devices.push(Arc::new(Mutex::new(device))); } for input_route in &config.routes.input { println!("Input: {}", input_route.name); println!(" Virtual device: {}", input_route.virtual_device); if virtual_devices .iter() .any(|device| device.lock().unwrap().name == input_route.virtual_device) { println!(" Found"); } else { println!(" Not found"); } match &input_route.device { crate::config::Device::Local { local } => { println!(" Local device: {}", local.name); if input_devices .iter() .any(|device| device.name().unwrap() == local.name) { println!(" Found"); } else { println!(" Not found"); } } crate::config::Device::Remote { remote } => { println!(" Remote device: {}", remote.address); println!(" Port: {}", remote.port); println!(" Protocol: {}", remote.protocol); println!(" Buffer: {}", remote.buffer); println!(" Channels: {}", remote.channels); } } } for output_route in &config.routes.output { println!("Output: {}", output_route.name); println!(" Virtual device: {}", output_route.input.virtual_device); if virtual_devices .iter() .any(|device| device.lock().unwrap().name == output_route.input.virtual_device) { println!(" Found"); } else { println!(" Not found"); } match &output_route.device { crate::config::Device::Local { local } => { println!(" Local device: {}", local.name); if output_devices .iter() .any(|device| device.name().unwrap() == local.name) { println!(" Found"); } else { println!(" Not found"); } } crate::config::Device::Remote { remote } => { println!(" Remote device: {}", remote.address); println!(" Port: {}", remote.port); println!(" Protocol: {}", remote.protocol); println!(" Buffer: {}", remote.buffer); println!(" Channels: {}", remote.channels); } } } let mut streams = vec![]; for input_route in &config.routes.input { match &input_route.device { crate::config::Device::Local { local } => { let device = input_devices .iter() .find(|device| device.name().unwrap() == local.name) .unwrap(); let config = device.default_input_config().unwrap(); let channels = config.channels(); let sample_rate = config.sample_rate().0; let virtual_devices = virtual_devices .iter() .filter(|device| device.lock().unwrap().name == input_route.virtual_device) .cloned() .collect::>()[0] .clone(); let stream = match config.sample_format() { cpal::SampleFormat::I8 => device.build_input_stream( &config.into(), { move |data: &[i8], _| { input_callback( data, channels.into(), sample_rate, virtual_devices.clone(), ) } }, move |err| eprintln!("An error occurred on the input stream: {}", err), None, ), cpal::SampleFormat::I16 => device.build_input_stream( &config.into(), { move |data: &[i16], _| { input_callback( data, channels.into(), sample_rate, virtual_devices.clone(), ) } }, move |err| eprintln!("An error occurred on the input stream: {}", err), None, ), cpal::SampleFormat::I32 => device.build_input_stream( &config.into(), { move |data: &[i32], _| { input_callback( data, channels.into(), sample_rate, virtual_devices.clone(), ) } }, move |err| eprintln!("An error occurred on the input stream: {}", err), None, ), cpal::SampleFormat::F32 => device.build_input_stream( &config.into(), { move |data: &[f32], _| { input_callback( data, channels.into(), sample_rate, virtual_devices.clone(), ) } }, move |err| eprintln!("An error occurred on the input stream: {}", err), None, ), sample_format => { eprintln!("Unsupported sample format: {:?}", sample_format); return; } } .unwrap(); stream.play().unwrap(); streams.push(stream); } crate::config::Device::Remote { remote } => { unimplemented!(); } } } let mut threads = vec![]; for output_route in &config.routes.output { match &output_route.device { crate::config::Device::Local { local } => { let device = output_devices .iter() .find(|device| device.name().unwrap() == local.name) .unwrap(); let config = device.default_output_config().unwrap(); let sample_rate = config.sample_rate().0; let virtual_device = virtual_devices .iter() .filter(|device| { device.lock().unwrap().name == output_route.input.virtual_device }) .cloned() .collect::>()[0] .clone(); let thread = std::thread::spawn({ let device = device.clone(); move || { let stream = create_virtual_device_input_stream( config, device.clone(), virtual_device, ); stream.play().unwrap(); loop { std::thread::sleep(std::time::Duration::from_secs(1000)); } } }); threads.push(thread); } crate::config::Device::Remote { remote } => { unimplemented!(); } } } loop { std::thread::sleep(std::time::Duration::from_secs(1000)); } } fn create_virtual_device_input_stream( config: SupportedStreamConfig, device: Device, virtual_device: Arc>, ) -> Stream { let sample_rate = config.sample_rate().0; let channels = config.channels(); let index = virtual_device.lock().unwrap().add_output(sample_rate); match config.sample_format() { cpal::SampleFormat::I8 => device.build_output_stream( &config.into(), { move |data: &mut [i8], _| { output_callback( data, channels.into(), sample_rate, virtual_device.clone(), index, ) } }, move |err| eprintln!("An error occurred on the output stream: {}", err), None, ), cpal::SampleFormat::I16 => device.build_output_stream( &config.into(), { move |data: &mut [i16], _| { output_callback( data, channels.into(), sample_rate, virtual_device.clone(), index, ) } }, move |err| eprintln!("An error occurred on the output stream: {}", err), None, ), cpal::SampleFormat::I32 => device.build_output_stream( &config.into(), { move |data: &mut [i32], _| { output_callback( data, channels.into(), sample_rate, virtual_device.clone(), index, ) } }, move |err| eprintln!("An error occurred on the output stream: {}", err), None, ), cpal::SampleFormat::F32 => device.build_output_stream( &config.into(), { move |data: &mut [f32], _| { output_callback( data, channels.into(), sample_rate, virtual_device.clone(), index, ) } }, move |err| eprintln!("An error occurred on the output stream: {}", err), None, ), sample_format => { panic!("Unsupported sample format: {:?}", sample_format); } } .unwrap() } fn input_callback( data: &[T], channels: usize, sample_rate: u32, virtual_device: Arc>, ) where T: Clone + num::cast::ToPrimitive, { let data: Vec = data.iter().map(|d| d.to_f32().unwrap()).collect(); let audio_data = reshape_audio_data(&data, channels); virtual_device .lock() .unwrap() .write_input_multiple_channels(&audio_data); } fn output_callback( data: &mut [T], channels: usize, sample_rate: u32, virtual_device: Arc>, index: usize, ) where T: Clone + num::cast::FromPrimitive, { let mut virtual_device = virtual_device.lock().unwrap(); let vd_channels = virtual_device.channels; let mut audio_data = virtual_device.take_output( index, min(channels as u8, vd_channels), sample_rate, data.len() / channels, ); let mut count = 0; while audio_data.is_none() && count < 1 { audio_data = virtual_device.take_output( index, min(channels as u8, vd_channels), sample_rate, data.len() / channels, ); std::thread::sleep(std::time::Duration::from_millis(10)); count += 1; } if audio_data.is_none() { println!("audio_data is none"); return; } let mut audio_data = to_flat_audio_data(&audio_data.unwrap()); let data_len = data.len(); let audio_data_len = audio_data.len(); if data_len > audio_data_len { audio_data.extend(vec![0.0; data_len - audio_data_len]); } let audio_data: Vec = audio_data .iter() .map(|d| T::from_f32(*d).unwrap()) .collect(); data.clone_from_slice(&audio_data); }