Skip to content

Commit

Permalink
Update DOCS.md
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbelgium authored Oct 28, 2024
1 parent 912f0c3 commit 6ff3c5b
Showing 1 changed file with 260 additions and 21 deletions.
281 changes: 260 additions & 21 deletions birdnet-pi/DOCS.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,251 @@ echo "Mono optimization applied. Only using primary input and balanced outputs."
Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"

```python
# Calculate RMS
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
#!/usr/bin/env python3
"""
Microphone Gain Adjustment Script with THD and Overload Detection
# Calculate THD over the full range
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
This script captures audio from an RTSP stream, processes it to calculate the RMS
within the 2000-8000 Hz frequency band, detects clipping, calculates Total Harmonic
Distortion (THD) over the full frequency range, and adjusts the microphone gain based
on predefined noise thresholds, trends, and distortion metrics.
Dependencies:
- numpy
- scipy
- ffmpeg (installed and accessible in PATH)
- amixer (for microphone gain control)
Author: OpenAI ChatGPT
Date: 2024-10-28 (Updated)
Changelog:
- 2024-10-27: Increased sampling rate to 48,000 Hz.
- 2024-10-27: Extended THD calculation over the full frequency range.
- 2024-10-27: Added gain stabilization delay to reduce frequent adjustments.
- 2024-10-27: Improved RTSP stream resilience with retry logic.
- 2024-10-27: Enhanced debug output with logging levels.
- 2024-10-28: Added summary log mode for simplified output.
- 2024-10-28: Removed gain stabilization delay for immediate gain adjustments.
"""

import subprocess
import numpy as np
from scipy.signal import butter, sosfilt, find_peaks
import time
import re

# ---------------------------- Configuration ----------------------------

# Microphone Settings
MICROPHONE_NAME = "Line In 1 Gain"
MIN_GAIN_DB = 20
MAX_GAIN_DB = 45
DECREASE_GAIN_STEP_DB = 1
INCREASE_GAIN_STEP_DB = 5
CLIPPING_REDUCTION_DB = 3

# Noise Thresholds
NOISE_THRESHOLD_HIGH = 0.001
NOISE_THRESHOLD_LOW = 0.00035

# Trend Detection
TREND_COUNT_THRESHOLD = 3

# Sampling Rate
SAMPLING_RATE = 48000 # Updated from 32000 to 48000 Hz

# RTSP Stream URL
RTSP_URL = "rtsp://192.168.178.124:8554/birdmic"

# Debug and Summary Modes
DEBUG = 1 # Debug Mode (1 for enabled, 0 for disabled)
SUMMARY_MODE = True # Summary Mode (True for summary output only)

# Microphone Characteristics
MIC_SENSITIVITY_DB = -28
MIC_CLIPPING_SPL = 120

# Calibration Constants
REFERENCE_PRESSURE = 20e-6

# THD Settings
THD_FUNDAMENTAL_THRESHOLD_DB = 60
MAX_THD_PERCENTAGE = 5.0

# -----------------------------------------------------------------------


def debug_print(msg, level="info"):
"""
Prints debug messages with logging levels if DEBUG mode is enabled.
:param msg: The debug message to print.
:param level: Logging level - "info", "warning", "error".
"""
if DEBUG and not SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{current_time}] [{level.upper()}] {msg}")


def summary_log(current_gain, clipping, rms_amplitude, thd_percentage):
"""
Outputs a summary log with date, time, current gain, clipping status, background noise, and THD.
:param current_gain: Current microphone gain in dB.
:param clipping: Clipping status (yes/no).
:param rms_amplitude: Background noise RMS amplitude.
:param thd_percentage: THD in percentage.
"""
if SUMMARY_MODE:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
clipping_status = "Yes" if clipping else "No"
print(f"{current_time} | Gain: {current_gain:.1f} dB | Clipping: {clipping_status} | "
f"Noise: {rms_amplitude:.5f} | THD: {thd_percentage:.2f}%")


def get_gain_db(mic_name):
"""
Retrieves the current gain setting of the specified microphone using amixer.
"""
cmd = ['amixer', 'sget', mic_name]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
match = re.search(r'\[(-?\d+(\.\d+)?)dB\]', output)
if match:
gain_db = float(match.group(1))
debug_print(f"Retrieved gain: {gain_db} dB", "info")
return gain_db
else:
debug_print("No gain information found in amixer output.", "warning")
return None
except subprocess.CalledProcessError as e:
debug_print(f"amixer sget failed: {e}", "error")
return None


def set_gain_db(mic_name, gain_db):
"""
Sets the gain of the specified microphone using amixer.
"""
cmd = ['amixer', 'sset', mic_name, f'{gain_db}dB']
try:
subprocess.check_call(cmd, stderr=subprocess.STDOUT)
debug_print(f"Set gain to: {gain_db} dB", "info")
return True
except subprocess.CalledProcessError as e:
debug_print(f"amixer sset failed: {e}", "error")
return False


def find_fundamental_frequency(fft_freqs, fft_magnitude, min_freq=100, max_freq=5000):
"""
Dynamically finds the fundamental frequency within a specified range.
"""
idx_min = np.searchsorted(fft_freqs, min_freq)
idx_max = np.searchsorted(fft_freqs, max_freq)
if idx_max <= idx_min:
return None, 0

search_magnitude = fft_magnitude[idx_min:idx_max]
search_freqs = fft_freqs[idx_min:idx_max]
peaks, properties = find_peaks(search_magnitude, height=np.max(search_magnitude) * 0.1)
if len(peaks) == 0:
return None, 0

max_peak_idx = np.argmax(properties['peak_heights'])
fundamental_freq = search_freqs[peaks[max_peak_idx]]
fundamental_amplitude = search_magnitude[peaks[max_peak_idx]]

debug_print(f"Detected fundamental frequency: {fundamental_freq:.2f} Hz with amplitude {fundamental_amplitude:.4f}", "info")
return fundamental_freq, fundamental_amplitude


def thd_calculation(audio, sampling_rate, num_harmonics=5):
"""
Calculates Total Harmonic Distortion (THD) for the audio signal.
"""
fft_vals = np.fft.rfft(audio)
fft_freqs = np.fft.rfftfreq(len(audio), 1 / sampling_rate)
fft_magnitude = np.abs(fft_vals)
fundamental_freq, fundamental_amplitude = find_fundamental_frequency(fft_freqs, fft_magnitude)

if fundamental_freq is None or fundamental_amplitude < 1e-6:
debug_print("Fundamental frequency not detected or amplitude too low. Skipping THD calculation.", "warning")
return 0.0

harmonic_amplitudes = []
for n in range(2, num_harmonics + 1):
harmonic_freq = n * fundamental_freq
if harmonic_freq > sampling_rate / 2:
break
harmonic_idx = np.argmin(np.abs(fft_freqs - harmonic_freq))
harmonic_amp = fft_magnitude[harmonic_idx]
harmonic_amplitudes.append(harmonic_amp)
debug_print(f"Harmonic {n} frequency: {harmonic_freq:.2f} Hz, amplitude: {harmonic_amp:.4f}", "info")

harmonic_sum = np.sqrt(np.sum(np.square(harmonic_amplitudes)))
thd = (harmonic_sum / fundamental_amplitude) * 100 if fundamental_amplitude > 0 else 0.0
debug_print(f"THD Calculation: {thd:.2f}%", "info")
return thd


def calculate_spl(audio, mic_sensitivity_db):
"""
Calculates the Sound Pressure Level (SPL) from the audio signal.
"""
rms_amplitude = np.sqrt(np.mean(audio ** 2))
if rms_amplitude == 0:
debug_print("RMS amplitude is zero. SPL cannot be calculated.", "warning")
return -np.inf

mic_sensitivity_linear = 10 ** (mic_sensitivity_db / 20)
pressure = rms_amplitude / mic_sensitivity_linear
spl = 20 * np.log10(pressure / REFERENCE_PRESSURE)
debug_print(f"Calculated SPL: {spl:.2f} dB", "info")
return spl

# Calculate SPL
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)

# Detect microphone overload
def detect_microphone_overload(spl, mic_clipping_spl):
"""
Detects if the calculated SPL is approaching the microphone's clipping SPL.
"""
if spl >= mic_clipping_spl - 3:
debug_print("Microphone overload detected.", "warning")
return True
return False


def calculate_noise_rms_and_thd(rtsp_url, bandpass_sos, sampling_rate, num_bins=5):
"""
Captures audio from an RTSP stream, calculates RMS, THD, and SPL, and detects microphone overload.
"""
cmd = [
'ffmpeg', '-loglevel', 'error', '-rtsp_transport', 'tcp', '-i', rtsp_url,
'-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sampling_rate), '-ac', '1', '-t', '5', '-'
]

retries = 3
for attempt in range(retries):
try:
debug_print(f"Attempt {attempt + 1} to capture audio from {rtsp_url}", "info")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

if process.returncode != 0:
debug_print(f"ffmpeg failed with error: {stderr.decode()}", "error")
time.sleep(5)
continue

audio = np.frombuffer(stdout, dtype=np.int16).astype(np.float32) / 32768.0
debug_print(f"Captured {len(audio)} samples from audio stream.", "info")
if len(audio) == 0:
debug_print("No audio data captured.", "warning")
time.sleep(5)
continue

filtered_audio = sosfilt(bandpass_sos, audio)
rms_amplitude = np.sqrt(np.mean(filtered_audio ** 2))
thd_percentage = thd_calculation(filtered_audio, sampling_rate)
spl = calculate_spl(filtered_audio, MIC_SENSITIVITY_DB)
overload = detect_microphone_overload(spl, MIC_CLIPPING_SPL)

return rms_amplitude, thd_percentage, spl, overload
Expand All @@ -453,11 +688,10 @@ Add this content in "$HOME/autogain.py" && chmod +x "$HOME/autogain.py"
def main():
"""
Main loop that continuously monitors background noise, detects clipping, calculates THD,
and adjusts microphone gain with stabilization delay and retry logic for RTSP stream resilience.
and adjusts microphone gain with retry logic for RTSP stream resilience.
"""
TREND_COUNT = 0
PREVIOUS_TREND = 0
last_gain_adjustment_time = time.time() - GAIN_STABILIZATION_DELAY # Initialize

# Precompute bandpass filter coefficients with updated SAMPLING_RATE
LOWCUT = 2000
Expand All @@ -481,17 +715,19 @@ def main():
time.sleep(60)
continue

current_time = time.time()

# Adjust gain if overload detected and sufficient time has passed
if overload and current_time - last_gain_adjustment_time >= GAIN_STABILIZATION_DELAY:
# Adjust gain if overload detected
if overload:
current_gain_db = get_gain_db(MICROPHONE_NAME)
if current_gain_db is not None:
NEW_GAIN_DB = max(current_gain_db - CLIPPING_REDUCTION_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Clipping detected. Reduced gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain reduced to {NEW_GAIN_DB} dB due to clipping.", "warning")
last_gain_adjustment_time = current_time
# No stabilization delay; continue to next iteration
# Skip trend adjustment in case of clipping
summary_log(current_gain_db if current_gain_db else MIN_GAIN_DB, True, rms, thd)
time.sleep(60)
continue

# Handle THD if SPL is above threshold
if spl >= THD_FUNDAMENTAL_THRESHOLD_DB:
Expand All @@ -503,10 +739,11 @@ def main():
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"High THD detected. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high THD.", "info")
last_gain_adjustment_time = current_time
else:
debug_print("THD within acceptable limits.", "info")

else:
debug_print("SPL below THD calculation threshold. Skipping THD check.", "info")

# Determine the noise trend
if rms > NOISE_THRESHOLD_HIGH:
CURRENT_TREND = 1
Expand Down Expand Up @@ -537,23 +774,25 @@ def main():

debug_print(f"Current gain: {current_gain_db} dB", "info")

# Adjust gain based on noise trend if threshold count is reached and stabilization delay has passed
if TREND_COUNT >= TREND_COUNT_THRESHOLD and current_time - last_gain_adjustment_time >= GAIN_STABILIZATION_DELAY:
# Output summary log for the current state
summary_log(current_gain_db, overload, rms, thd)

# Adjust gain based on noise trend if threshold count is reached
if TREND_COUNT >= TREND_COUNT_THRESHOLD:
if CURRENT_TREND == 1:
# Decrease gain by DECREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = max(current_gain_db - DECREASE_GAIN_STEP_DB, MIN_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise high. Decreased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain decreased to {NEW_GAIN_DB} dB due to high noise.", "info")
last_gain_adjustment_time = current_time
TREND_COUNT = 0
elif CURRENT_TREND == -1:
# Increase gain by INCREASE_GAIN_STEP_DB dB
NEW_GAIN_DB = min(current_gain_db + INCREASE_GAIN_STEP_DB, MAX_GAIN_DB)
if set_gain_db(MICROPHONE_NAME, NEW_GAIN_DB):
print(f"Background noise low. Increased gain to {NEW_GAIN_DB} dB")
debug_print(f"Gain increased to {NEW_GAIN_DB} dB due to low noise.", "info")
last_gain_adjustment_time = current_time
TREND_COUNT = 0
TREND_COUNT = 0
else:
debug_print("No gain adjustment needed based on noise trend.", "info")

Expand Down

0 comments on commit 6ff3c5b

Please sign in to comment.