Skip to content

Commit

Permalink
Merge pull request #34 from dotX12/dev
Browse files Browse the repository at this point in the history
Dev to master
  • Loading branch information
dotX12 authored Sep 4, 2022
2 parents 5c1ec65 + ad14d2a commit c98044f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 33 deletions.
67 changes: 34 additions & 33 deletions shazamio/algorithm.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from copy import copy
from typing import List, Optional, Any

from numpy import hanning, log, maximum, fft, array
import numpy as np

from .enums import FrequencyBand
from .signature import DecodedMessage, FrequencyPeak

HANNING_MATRIX = hanning(2050)[1:-1] # Wipe trailing and leading zeroes
HANNING_MATRIX = np.hanning(2050)[1:-1] # Wipe trailing and leading zeroes


class RingBuffer(list):
Expand Down Expand Up @@ -154,10 +153,10 @@ def do_fft(self, batch_of_128_s16le_mono_samples):
# The pre multiplication of the array is for applying a windowing function before the DFT
# (slight rounded Hanning without zeros at edges)

fft_results: array = fft.rfft(HANNING_MATRIX * excerpt_from_ring_buffer)
fft_results: np.array = np.fft.rfft(HANNING_MATRIX * excerpt_from_ring_buffer)

fft_results = (fft_results.real**2 + fft_results.imag**2) / (1 << 17)
fft_results = maximum(fft_results, 0.0000000001)
fft_results = np.maximum(fft_results, 0.0000000001)

self.fft_outputs.append(fft_results)

Expand All @@ -168,39 +167,40 @@ def do_peak_spreading_and_recognition(self):
self.do_peak_recognition()

def do_peak_spreading(self):

origin_last_fft: List[float] = self.fft_outputs[self.fft_outputs.position - 1]

spread_last_fft: List[float] = list(origin_last_fft)

for position in range(1025):

# Perform frequency-domain spreading of peak values

if position < 1023:
spread_last_fft[position] = max(
spread_last_fft[position : position + 3]
)

# Perform time-domain spreading of peak values
temporary_array_1 = np.tile(origin_last_fft, 3).reshape((3, -1))
temporary_array_1[1] = np.roll(temporary_array_1[1], -1)
temporary_array_1[2] = np.roll(temporary_array_1[2], -2)

max_value = spread_last_fft[position]
origin_last_fft_np = np.hstack(
[temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]]
)

for former_fft_num in [-1, -3, -6]:
former_fft_output = self.spread_fft_output[
(self.spread_fft_output.position + former_fft_num)
% self.spread_fft_output.buffer_size
]
i1, i2, i3 = [
(self.spread_fft_output.position + former_fft_num)
% self.spread_fft_output.buffer_size
for former_fft_num in [-1, -3, -6]
]

former_fft_output[position] = max_value = max(
former_fft_output[position], max_value
)
temporary_array_2 = np.vstack(
[
origin_last_fft_np,
self.spread_fft_output[i1],
self.spread_fft_output[i2],
self.spread_fft_output[i3],
]
)

# Save output locally
temporary_array_2[1] = np.max(temporary_array_2[:2, :], axis=0)
temporary_array_2[2] = np.max(temporary_array_2[:3, :], axis=0)
temporary_array_2[3] = np.max(temporary_array_2[:4, :], axis=0)

self.spread_fft_output.append(spread_last_fft)
self.spread_fft_output[i1] = temporary_array_2[1].tolist()
self.spread_fft_output[i2] = temporary_array_2[2].tolist()
self.spread_fft_output[i3] = temporary_array_2[3].tolist()

pass
self.spread_fft_output.append(list(origin_last_fft_np))

def do_peak_recognition(self):

Expand Down Expand Up @@ -256,14 +256,15 @@ def do_peak_recognition(self):
fft_number = self.spread_fft_output.num_written - 46

peak_magnitude = (
log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3 + 6144
np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3
+ 6144
)
peak_magnitude_before = (
log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3
np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3
+ 6144
)
peak_magnitude_after = (
log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3
np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3
+ 6144
)

Expand Down
68 changes: 68 additions & 0 deletions tests/test_peak_spreading_numpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from pydub import AudioSegment
from typing import List

from shazamio.algorithm import SignatureGenerator
from types import MethodType


def do_peak_spreading_non_numpy(self):
origin_last_fft: List[float] = self.fft_outputs[self.fft_outputs.position - 1]

spread_last_fft: List[float] = list(origin_last_fft)

for position in range(1025):

# Perform frequency-domain spreading of peak values

if position < 1023:
spread_last_fft[position] = max(spread_last_fft[position : position + 3])

# Perform time-domain spreading of peak values

max_value = spread_last_fft[position]

for former_fft_num in [-1, -3, -6]:
former_fft_output = self.spread_fft_output[
(self.spread_fft_output.position + former_fft_num)
% self.spread_fft_output.buffer_size
]

former_fft_output[position] = max_value = max(
former_fft_output[position], max_value
)

# Save output locally

self.spread_fft_output.append(spread_last_fft)

pass


async def test_do_peak_spreading_numpy():
audio = AudioSegment.from_file(file="examples/data/dora.ogg")

audio = audio.set_sample_width(2)
audio = audio.set_frame_rate(16000)
audio = audio.set_channels(1)

signature_generator_non_numpy = SignatureGenerator()
signature_generator_non_numpy.do_peak_spreading = MethodType(
do_peak_spreading_non_numpy, signature_generator_non_numpy
)
signature_generator_non_numpy.feed_input(audio.get_array_of_samples())
signature_generator_non_numpy.MAX_TIME_SECONDS = 12

signature_non_numpy = signature_generator_non_numpy.get_next_signature()

while not signature_non_numpy:
signature_non_numpy = signature_generator_non_numpy.get_next_signature()

signature_generator = SignatureGenerator()
signature_generator.feed_input(audio.get_array_of_samples())
signature_generator.MAX_TIME_SECONDS = 12

signature = signature_generator.get_next_signature()

while not signature:
signature = signature_generator.get_next_signature()
assert signature.encode_to_binary() == signature_non_numpy.encode_to_binary()

0 comments on commit c98044f

Please sign in to comment.