Files
musicgen/src/effects.rs
2025-06-20 18:52:55 -06:00

823 lines
23 KiB
Rust

//! Audio effects processing module
//!
//! This module provides various audio effects that can be applied to synthesized audio,
//! including filters, delays, reverbs, distortion, and modulation effects.
use crate::{SAMPLE_RATE, lerp, low_pass_filter};
use std::collections::VecDeque;
use std::f32::consts::PI;
/// Trait for audio effects that process samples
pub trait AudioEffect {
/// Process a single audio sample
fn process_sample(&mut self, input: f32) -> f32;
/// Process a buffer of audio samples
fn process_buffer(&mut self, buffer: &mut [f32]) {
for sample in buffer.iter_mut() {
*sample = self.process_sample(*sample);
}
}
/// Reset the effect's internal state
fn reset(&mut self);
/// Set a parameter of the effect
fn set_parameter(&mut self, param: &str, value: f32);
}
/// Low-pass filter effect
#[derive(Debug, Clone)]
pub struct LowPassFilter {
pub cutoff_frequency: f32,
pub resonance: f32,
// Internal state
x1: f32,
x2: f32,
y1: f32,
y2: f32,
// Filter coefficients
a0: f32,
a1: f32,
a2: f32,
b1: f32,
b2: f32,
}
impl LowPassFilter {
/// Create a new low-pass filter
///
/// # Arguments
/// * `cutoff_frequency` - Cutoff frequency in Hz
/// * `resonance` - Resonance factor (0.1 to 10.0)
pub fn new(cutoff_frequency: f32, resonance: f32) -> Self {
let mut filter = Self {
cutoff_frequency,
resonance: resonance.clamp(0.1, 10.0),
x1: 0.0,
x2: 0.0,
y1: 0.0,
y2: 0.0,
a0: 1.0,
a1: 0.0,
a2: 0.0,
b1: 0.0,
b2: 0.0,
};
filter.update_coefficients();
filter
}
fn update_coefficients(&mut self) {
let omega = 2.0 * PI * self.cutoff_frequency / SAMPLE_RATE;
let sin_omega = omega.sin();
let cos_omega = omega.cos();
let alpha = sin_omega / (2.0 * self.resonance);
let b0 = (1.0 - cos_omega) / 2.0;
let b1 = 1.0 - cos_omega;
let b2 = (1.0 - cos_omega) / 2.0;
let a0 = 1.0 + alpha;
let a1 = -2.0 * cos_omega;
let a2 = 1.0 - alpha;
// Normalize coefficients
self.a0 = b0 / a0;
self.a1 = b1 / a0;
self.a2 = b2 / a0;
self.b1 = a1 / a0;
self.b2 = a2 / a0;
}
pub fn set_cutoff(&mut self, frequency: f32) {
self.cutoff_frequency = frequency.clamp(20.0, SAMPLE_RATE / 2.0);
self.update_coefficients();
}
pub fn set_resonance(&mut self, resonance: f32) {
self.resonance = resonance.clamp(0.1, 10.0);
self.update_coefficients();
}
}
impl AudioEffect for LowPassFilter {
fn process_sample(&mut self, input: f32) -> f32 {
let output = self.a0 * input + self.a1 * self.x1 + self.a2 * self.x2
- self.b1 * self.y1
- self.b2 * self.y2;
// Update delay line
self.x2 = self.x1;
self.x1 = input;
self.y2 = self.y1;
self.y1 = output;
output
}
fn reset(&mut self) {
self.x1 = 0.0;
self.x2 = 0.0;
self.y1 = 0.0;
self.y2 = 0.0;
}
fn set_parameter(&mut self, param: &str, value: f32) {
match param {
"cutoff" => self.set_cutoff(value),
"resonance" => self.set_resonance(value),
_ => {}
}
}
}
/// High-pass filter effect
#[derive(Debug, Clone)]
pub struct HighPassFilter {
pub cutoff_frequency: f32,
pub resonance: f32,
// Internal state (same as low-pass)
x1: f32,
x2: f32,
y1: f32,
y2: f32,
// Filter coefficients
a0: f32,
a1: f32,
a2: f32,
b1: f32,
b2: f32,
}
impl HighPassFilter {
pub fn new(cutoff_frequency: f32, resonance: f32) -> Self {
let mut filter = Self {
cutoff_frequency,
resonance: resonance.clamp(0.1, 10.0),
x1: 0.0,
x2: 0.0,
y1: 0.0,
y2: 0.0,
a0: 1.0,
a1: 0.0,
a2: 0.0,
b1: 0.0,
b2: 0.0,
};
filter.update_coefficients();
filter
}
fn update_coefficients(&mut self) {
let omega = 2.0 * PI * self.cutoff_frequency / SAMPLE_RATE;
let sin_omega = omega.sin();
let cos_omega = omega.cos();
let alpha = sin_omega / (2.0 * self.resonance);
let b0 = (1.0 + cos_omega) / 2.0;
let b1 = -(1.0 + cos_omega);
let b2 = (1.0 + cos_omega) / 2.0;
let a0 = 1.0 + alpha;
let a1 = -2.0 * cos_omega;
let a2 = 1.0 - alpha;
// Normalize coefficients
self.a0 = b0 / a0;
self.a1 = b1 / a0;
self.a2 = b2 / a0;
self.b1 = a1 / a0;
self.b2 = a2 / a0;
}
pub fn set_cutoff(&mut self, frequency: f32) {
self.cutoff_frequency = frequency.clamp(20.0, SAMPLE_RATE / 2.0);
self.update_coefficients();
}
pub fn set_resonance(&mut self, resonance: f32) {
self.resonance = resonance.clamp(0.1, 10.0);
self.update_coefficients();
}
}
impl AudioEffect for HighPassFilter {
fn process_sample(&mut self, input: f32) -> f32 {
let output = self.a0 * input + self.a1 * self.x1 + self.a2 * self.x2
- self.b1 * self.y1
- self.b2 * self.y2;
self.x2 = self.x1;
self.x1 = input;
self.y2 = self.y1;
self.y1 = output;
output
}
fn reset(&mut self) {
self.x1 = 0.0;
self.x2 = 0.0;
self.y1 = 0.0;
self.y2 = 0.0;
}
fn set_parameter(&mut self, param: &str, value: f32) {
match param {
"cutoff" => self.set_cutoff(value),
"resonance" => self.set_resonance(value),
_ => {}
}
}
}
/// Delay effect with feedback
#[derive(Debug)]
pub struct Delay {
pub delay_time: f32, // Delay time in seconds
pub feedback: f32, // Feedback amount (0.0 to 0.95)
pub mix: f32, // Wet/dry mix (0.0 = dry, 1.0 = wet)
delay_buffer: VecDeque<f32>,
delay_samples: usize,
}
impl Delay {
/// Create a new delay effect
///
/// # Arguments
/// * `delay_time` - Delay time in seconds
/// * `feedback` - Feedback amount (0.0 to 0.95)
/// * `mix` - Wet/dry mix (0.0 to 1.0)
pub fn new(delay_time: f32, feedback: f32, mix: f32) -> Self {
let delay_samples = (delay_time * SAMPLE_RATE) as usize;
let mut delay_buffer = VecDeque::with_capacity(delay_samples);
// Initialize with zeros
for _ in 0..delay_samples {
delay_buffer.push_back(0.0);
}
Self {
delay_time,
feedback: feedback.clamp(0.0, 0.95),
mix: mix.clamp(0.0, 1.0),
delay_buffer,
delay_samples,
}
}
pub fn set_delay_time(&mut self, time: f32) {
self.delay_time = time.max(0.001); // Minimum 1ms delay
let new_samples = (time * SAMPLE_RATE) as usize;
if new_samples != self.delay_samples {
self.delay_samples = new_samples;
self.delay_buffer.clear();
for _ in 0..new_samples {
self.delay_buffer.push_back(0.0);
}
}
}
pub fn set_feedback(&mut self, feedback: f32) {
self.feedback = feedback.clamp(0.0, 0.95);
}
pub fn set_mix(&mut self, mix: f32) {
self.mix = mix.clamp(0.0, 1.0);
}
}
impl AudioEffect for Delay {
fn process_sample(&mut self, input: f32) -> f32 {
let delayed_sample = self.delay_buffer.front().copied().unwrap_or(0.0);
let feedback_sample = input + delayed_sample * self.feedback;
// Add new sample to delay line
self.delay_buffer.pop_front();
self.delay_buffer.push_back(feedback_sample);
// Mix dry and wet signals
lerp(input, delayed_sample, self.mix)
}
fn reset(&mut self) {
for sample in self.delay_buffer.iter_mut() {
*sample = 0.0;
}
}
fn set_parameter(&mut self, param: &str, value: f32) {
match param {
"delay_time" => self.set_delay_time(value),
"feedback" => self.set_feedback(value),
"mix" => self.set_mix(value),
_ => {}
}
}
}
/// Simple reverb effect using multiple delay lines
#[derive(Debug)]
pub struct Reverb {
pub room_size: f32,
pub damping: f32,
pub mix: f32,
delay_lines: Vec<VecDeque<f32>>,
feedback_gains: Vec<f32>,
damping_filters: Vec<f32>, // Simple one-pole filters for damping
}
impl Reverb {
/// Create a new reverb effect
///
/// # Arguments
/// * `room_size` - Room size factor (0.0 to 1.0)
/// * `damping` - High frequency damping (0.0 to 1.0)
/// * `mix` - Wet/dry mix (0.0 to 1.0)
pub fn new(room_size: f32, damping: f32, mix: f32) -> Self {
// Use Schroeder's reverb design with 4 comb filters and 2 allpass filters
let base_delays = vec![1116, 1188, 1277, 1356, 225, 556]; // Prime numbers for better diffusion
let base_gains = vec![0.7, 0.7, 0.7, 0.7, 0.5, 0.5];
let mut delay_lines = Vec::new();
let mut feedback_gains = Vec::new();
let room_factor = 1.0 + room_size * 0.5; // Scale delays based on room size
for (i, &base_delay) in base_delays.iter().enumerate() {
let delay_samples = (base_delay as f32 * room_factor) as usize;
let mut delay_line = VecDeque::with_capacity(delay_samples);
for _ in 0..delay_samples {
delay_line.push_back(0.0);
}
delay_lines.push(delay_line);
feedback_gains.push(base_gains[i] * room_size);
}
Self {
room_size: room_size.clamp(0.0, 1.0),
damping: damping.clamp(0.0, 1.0),
mix: mix.clamp(0.0, 1.0),
delay_lines,
feedback_gains,
damping_filters: vec![0.0; base_delays.len()],
}
}
pub fn set_room_size(&mut self, size: f32) {
self.room_size = size.clamp(0.0, 1.0);
// Update feedback gains
for (i, gain) in self.feedback_gains.iter_mut().enumerate() {
let base_gains = vec![0.7, 0.7, 0.7, 0.7, 0.5, 0.5];
*gain = base_gains[i] * self.room_size;
}
}
pub fn set_damping(&mut self, damping: f32) {
self.damping = damping.clamp(0.0, 1.0);
}
pub fn set_mix(&mut self, mix: f32) {
self.mix = mix.clamp(0.0, 1.0);
}
}
impl AudioEffect for Reverb {
fn process_sample(&mut self, input: f32) -> f32 {
let mut output = 0.0;
// Process comb filters (first 4 delay lines)
for i in 0..4 {
if let Some(delayed) = self.delay_lines[i].front().copied() {
// Apply damping filter
self.damping_filters[i] =
low_pass_filter(delayed, self.damping_filters[i], 1.0 - self.damping);
let feedback_sample = input + self.damping_filters[i] * self.feedback_gains[i];
self.delay_lines[i].pop_front();
self.delay_lines[i].push_back(feedback_sample);
output += delayed;
}
}
// Process allpass filters (last 2 delay lines)
let mut allpass_input = output * 0.25; // Scale down comb filter output
for i in 4..6 {
if let Some(delayed) = self.delay_lines[i].front().copied() {
let feedback_sample = allpass_input + delayed * self.feedback_gains[i];
self.delay_lines[i].pop_front();
self.delay_lines[i].push_back(feedback_sample);
allpass_input = delayed - allpass_input * self.feedback_gains[i];
}
}
// Mix dry and wet signals
lerp(input, allpass_input, self.mix)
}
fn reset(&mut self) {
for delay_line in &mut self.delay_lines {
for sample in delay_line.iter_mut() {
*sample = 0.0;
}
}
for filter in &mut self.damping_filters {
*filter = 0.0;
}
}
fn set_parameter(&mut self, param: &str, value: f32) {
match param {
"room_size" => self.set_room_size(value),
"damping" => self.set_damping(value),
"mix" => self.set_mix(value),
_ => {}
}
}
}
/// Distortion effect
#[derive(Debug, Clone)]
pub struct Distortion {
pub drive: f32, // Distortion amount (1.0 to 10.0)
pub tone: f32, // Tone control (0.0 to 1.0)
pub output_gain: f32, // Output gain compensation
pre_filter: LowPassFilter,
post_filter: LowPassFilter,
}
impl Distortion {
/// Create a new distortion effect
///
/// # Arguments
/// * `drive` - Distortion drive amount (1.0 to 10.0)
/// * `tone` - Tone control (0.0 to 1.0)
pub fn new(drive: f32, tone: f32) -> Self {
Self {
drive: drive.clamp(1.0, 10.0),
tone: tone.clamp(0.0, 1.0),
output_gain: 1.0 / drive.clamp(1.0, 10.0), // Automatic gain compensation
pre_filter: LowPassFilter::new(8000.0, 0.7),
post_filter: LowPassFilter::new(4000.0 + tone * 4000.0, 0.7),
}
}
pub fn set_drive(&mut self, drive: f32) {
self.drive = drive.clamp(1.0, 10.0);
self.output_gain = 1.0 / self.drive;
}
pub fn set_tone(&mut self, tone: f32) {
self.tone = tone.clamp(0.0, 1.0);
self.post_filter.set_cutoff(4000.0 + self.tone * 4000.0);
}
fn waveshaper(&self, input: f32) -> f32 {
let driven = input * self.drive;
// Soft clipping using tanh
if driven.abs() <= 1.0 {
driven * (1.0 - driven.abs() / 3.0)
} else {
driven.signum() * (2.0 / 3.0 + (driven.abs() - 1.0) / (driven.abs() + 1.0) / 3.0)
}
}
}
impl AudioEffect for Distortion {
fn process_sample(&mut self, input: f32) -> f32 {
// Pre-filtering to reduce aliasing
let filtered_input = self.pre_filter.process_sample(input);
// Apply waveshaping distortion
let distorted = self.waveshaper(filtered_input);
// Post-filtering for tone shaping
let shaped = self.post_filter.process_sample(distorted);
// Apply output gain compensation
shaped * self.output_gain
}
fn reset(&mut self) {
self.pre_filter.reset();
self.post_filter.reset();
}
fn set_parameter(&mut self, param: &str, value: f32) {
match param {
"drive" => self.set_drive(value),
"tone" => self.set_tone(value),
"output_gain" => self.output_gain = value.clamp(0.0, 2.0),
_ => {}
}
}
}
/// Chorus effect using multiple delay lines with LFO modulation
#[derive(Debug)]
pub struct Chorus {
pub rate: f32, // LFO rate in Hz
pub depth: f32, // Modulation depth (0.0 to 1.0)
pub mix: f32, // Wet/dry mix
pub layers: usize, // Number of chorus layers
delay_lines: Vec<VecDeque<f32>>,
lfo_phases: Vec<f32>,
base_delay_samples: usize,
}
impl Chorus {
/// Create a new chorus effect
///
/// # Arguments
/// * `rate` - LFO rate in Hz
/// * `depth` - Modulation depth (0.0 to 1.0)
/// * `mix` - Wet/dry mix (0.0 to 1.0)
/// * `layers` - Number of chorus layers (1 to 4)
pub fn new(rate: f32, depth: f32, mix: f32, layers: usize) -> Self {
let layers = layers.clamp(1, 4);
let base_delay_ms = 15.0; // Base delay in milliseconds
let max_delay_ms = base_delay_ms + depth * 10.0; // Max modulation range
let base_delay_samples = (base_delay_ms * SAMPLE_RATE / 1000.0) as usize;
let max_delay_samples = (max_delay_ms * SAMPLE_RATE / 1000.0) as usize;
let mut delay_lines = Vec::new();
let mut lfo_phases = Vec::new();
for i in 0..layers {
let mut delay_line = VecDeque::with_capacity(max_delay_samples);
for _ in 0..max_delay_samples {
delay_line.push_back(0.0);
}
delay_lines.push(delay_line);
// Distribute LFO phases evenly
lfo_phases.push(i as f32 * 2.0 * PI / layers as f32);
}
Self {
rate: rate.clamp(0.1, 10.0),
depth: depth.clamp(0.0, 1.0),
mix: mix.clamp(0.0, 1.0),
layers,
delay_lines,
lfo_phases,
base_delay_samples,
}
}
pub fn set_rate(&mut self, rate: f32) {
self.rate = rate.clamp(0.1, 10.0);
}
pub fn set_depth(&mut self, depth: f32) {
self.depth = depth.clamp(0.0, 1.0);
}
fn get_delayed_sample(&self, delay_line: &VecDeque<f32>, delay_samples: f32) -> f32 {
let delay_int = delay_samples.floor() as usize;
let delay_frac = delay_samples - delay_int as f32;
if delay_int >= delay_line.len() {
return 0.0;
}
let sample1 = delay_line.get(delay_int).copied().unwrap_or(0.0);
let sample2 = delay_line.get(delay_int + 1).copied().unwrap_or(0.0);
// Linear interpolation
lerp(sample1, sample2, delay_frac)
}
}
impl AudioEffect for Chorus {
fn process_sample(&mut self, input: f32) -> f32 {
let mut chorus_output = 0.0;
for i in 0..self.layers {
// Update LFO phase
self.lfo_phases[i] += 2.0 * PI * self.rate / SAMPLE_RATE;
if self.lfo_phases[i] >= 2.0 * PI {
self.lfo_phases[i] -= 2.0 * PI;
}
// Calculate modulated delay
let lfo_value = self.lfo_phases[i].sin();
let delay_offset = self.depth * 10.0 * SAMPLE_RATE / 1000.0; // Convert to samples
let total_delay = self.base_delay_samples as f32 + lfo_value * delay_offset;
// Get delayed sample with interpolation
let delayed_sample = self.get_delayed_sample(&self.delay_lines[i], total_delay);
chorus_output += delayed_sample;
// Add input to delay line
self.delay_lines[i].pop_front();
self.delay_lines[i].push_back(input);
}
// Average the chorus layers
chorus_output /= self.layers as f32;
// Mix with dry signal
lerp(input, chorus_output, self.mix)
}
fn reset(&mut self) {
for delay_line in &mut self.delay_lines {
for sample in delay_line.iter_mut() {
*sample = 0.0;
}
}
for phase in &mut self.lfo_phases {
*phase = 0.0;
}
}
fn set_parameter(&mut self, param: &str, value: f32) {
match param {
"rate" => self.set_rate(value),
"depth" => self.set_depth(value),
"mix" => self.mix = value.clamp(0.0, 1.0),
_ => {}
}
}
}
/// Effects chain that can combine multiple effects
pub struct EffectsChain {
effects: Vec<Box<dyn AudioEffect>>,
}
impl EffectsChain {
/// Create a new empty effects chain
pub fn new() -> Self {
Self {
effects: Vec::new(),
}
}
/// Add an effect to the chain
pub fn add_effect(&mut self, effect: Box<dyn AudioEffect>) {
self.effects.push(effect);
}
/// Remove an effect from the chain
pub fn remove_effect(&mut self, index: usize) -> Option<Box<dyn AudioEffect>> {
if index < self.effects.len() {
Some(self.effects.remove(index))
} else {
None
}
}
/// Clear all effects from the chain
pub fn clear(&mut self) {
self.effects.clear();
}
/// Get the number of effects in the chain
pub fn len(&self) -> usize {
self.effects.len()
}
/// Check if the chain is empty
pub fn is_empty(&self) -> bool {
self.effects.is_empty()
}
}
impl AudioEffect for EffectsChain {
fn process_sample(&mut self, input: f32) -> f32 {
let mut signal = input;
for effect in &mut self.effects {
signal = effect.process_sample(signal);
}
signal
}
fn process_buffer(&mut self, buffer: &mut [f32]) {
for effect in &mut self.effects {
effect.process_buffer(buffer);
}
}
fn reset(&mut self) {
for effect in &mut self.effects {
effect.reset();
}
}
fn set_parameter(&mut self, param: &str, value: f32) {
// This would require a more sophisticated parameter routing system
// For now, we'll just ignore it
let _ = (param, value);
}
}
impl Default for EffectsChain {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_low_pass_filter() {
let mut filter = LowPassFilter::new(1000.0, 1.0);
let output = filter.process_sample(1.0);
assert!(output.is_finite());
}
#[test]
fn test_high_pass_filter() {
let mut filter = HighPassFilter::new(1000.0, 1.0);
let output = filter.process_sample(1.0);
assert!(output.is_finite());
}
#[test]
fn test_delay() {
let mut delay = Delay::new(0.1, 0.5, 0.5);
let output = delay.process_sample(1.0);
assert!(output.is_finite());
// After enough samples, we should get some delayed signal
for _ in 0..(0.1 * SAMPLE_RATE) as usize {
delay.process_sample(0.0);
}
let delayed_output = delay.process_sample(0.0);
assert!(delayed_output.abs() > 0.0);
}
#[test]
fn test_reverb() {
let mut reverb = Reverb::new(0.5, 0.3, 0.4);
let output = reverb.process_sample(1.0);
assert!(output.is_finite());
}
#[test]
fn test_distortion() {
let mut distortion = Distortion::new(3.0, 0.5);
let output = distortion.process_sample(0.5);
assert!(output.is_finite());
assert!(output.abs() <= 1.0); // Should not clip beyond reasonable bounds
}
#[test]
fn test_chorus() {
let mut chorus = Chorus::new(1.0, 0.5, 0.3, 2);
let output = chorus.process_sample(1.0);
assert!(output.is_finite());
}
#[test]
fn test_effects_chain() {
let mut chain = EffectsChain::new();
chain.add_effect(Box::new(LowPassFilter::new(2000.0, 1.0)));
chain.add_effect(Box::new(Delay::new(0.05, 0.3, 0.2)));
assert_eq!(chain.len(), 2);
let output = chain.process_sample(1.0);
assert!(output.is_finite());
}
#[test]
fn test_filter_parameter_setting() {
let mut filter = LowPassFilter::new(1000.0, 1.0);
filter.set_parameter("cutoff", 2000.0);
assert_eq!(filter.cutoff_frequency, 2000.0);
filter.set_parameter("resonance", 2.0);
assert_eq!(filter.resonance, 2.0);
}
}