Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Feature/remove b shift #101

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions python/audio_dsp/dsp/biquad.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from audio_dsp.dsp import utils as utils
from audio_dsp.dsp import generic as dspg

BOOST_BSHIFT = 2 # limit boosts to 12 dB gain
BOOST_BSHIFT = 0 # limit boosts to 12 dB gain


class biquad(dspg.dsp_block):
Expand Down Expand Up @@ -65,7 +65,9 @@ def __init__(

# coeffs should be in the form [b0 b1 b2 -a1 -a2], and
# normalized by a0
self.coeffs, self.int_coeffs = _round_and_check(coeffs, self.b_shift)
# vpu_coeffs are in the form:
# [b0/2, b0/2, -a1, b1/2, b1/2, -a2, b2/2, b2/2]
self.coeffs, self.int_coeffs, self.vpu_coeffs = _round_and_check(coeffs, self.b_shift)

self._check_gain()

Expand All @@ -75,6 +77,8 @@ def __init__(
self._x2 = [0.0] * n_chans
self._y1 = [0.0] * n_chans
self._y2 = [0.0] * n_chans
# [x0, x0, y1, x1, x1, y2, x2, x2]
self._vpu_state = [[0]*8] * n_chans

def update_coeffs(self, new_coeffs: list[float]):
"""Update the saved coefficients to the input values.
Expand Down Expand Up @@ -126,9 +130,9 @@ def process_int(self, sample: float, channel: int = 0) -> float:

# process a single sample using direct form 1
y = utils.int64(
(sample_int * self.int_coeffs[0])
+ (self._x1[channel] * self.int_coeffs[1])
+ (self._x2[channel] * self.int_coeffs[2])
(sample_int * 2*self.int_coeffs[0])
+ (self._x1[channel] * 2*self.int_coeffs[1])
+ (self._x2[channel] * 2*self.int_coeffs[2])
+ (int(self._y1[channel] * self.int_coeffs[3]) >> self.b_shift)
+ (int(self._y2[channel] * self.int_coeffs[4]) >> self.b_shift)
)
Expand Down Expand Up @@ -158,24 +162,19 @@ def process_xcore(self, sample: float, channel: int = 0) -> float:
"""
sample_int = utils.int32(round(sample * 2**self.Q_sig))

self._vpu_state[channel][0] = sample_int
self._vpu_state[channel][1] = sample_int

# process a single sample using direct form 1. In the VPU the
# ``>> 30`` comes before accumulation
y = utils.vlmaccr(
[
sample_int,
self._x1[channel],
self._x2[channel],
self._y1[channel],
self._y2[channel],
],
self.int_coeffs,
self._vpu_state[channel],
self.vpu_coeffs,
)

# save states
self._x2[channel] = utils.int32(self._x1[channel])
self._x1[channel] = utils.int32(sample_int)
self._y2[channel] = utils.int32(self._y1[channel])
self._y1[channel] = utils.int32(y)
self._vpu_state[channel][3:] = self._vpu_state[channel][:5]
self._vpu_state[channel][2] = utils.int32(y)

# compensate for coefficients
y = utils.int32(y << self.b_shift)
Expand Down Expand Up @@ -411,23 +410,42 @@ def _round_to_q30(coeffs: list[float]) -> tuple[list[float], list[int]]:

"""
rounded_coeffs = [0.0] * len(coeffs)
int_coeffs = [0] * len(coeffs)
vpu_coeffs = [0] * 8
int_coeffs = [0] * 5

Q = 30
for n in range(len(coeffs)):
# scale to Q30 ints
rounded_coeffs[n] = round(coeffs[n] * 2**Q)

if n < 3:
scaled_rounded_coeff = rounded_coeffs[n] / 2
else:
scaled_rounded_coeff = rounded_coeffs[n]

# check for overflow
if not (-(2**31) <= rounded_coeffs[n] <= 2**31 - 1):
if not (-(2**31) <= scaled_rounded_coeff <= 2**31 - 1):
raise ValueError(
"Filter coefficient will overflow (%.4f, %d), reduce gain" % (coeffs[n], n)
)

int_coeffs[n] = utils.int32(rounded_coeffs[n])
int_coeffs[n] = utils.int32(scaled_rounded_coeff)
vpu_coeffs[n] = utils.int32(scaled_rounded_coeff)

# rescale to floats
rounded_coeffs[n] = rounded_coeffs[n] / 2**Q

return rounded_coeffs, int_coeffs
# shuffle VPU coeffs into [b0/2, b0/2, a1, b1/2, b1/2, a2, b2/2, b2/2]
vpu_coeffs[7] = int_coeffs[2]
vpu_coeffs[6] = int_coeffs[2]
vpu_coeffs[5] = int_coeffs[4]
vpu_coeffs[4] = int_coeffs[1]
vpu_coeffs[3] = int_coeffs[1]
vpu_coeffs[2] = int_coeffs[3]
vpu_coeffs[1] = int_coeffs[0]
vpu_coeffs[0] = int_coeffs[0]

return rounded_coeffs, int_coeffs, vpu_coeffs


def _apply_biquad_gain(coeffs: list[float], gain_db: float) -> list[float]:
Expand Down Expand Up @@ -493,14 +511,14 @@ def _round_and_check(coeffs: list[float], b_shift: int = 0) -> tuple[list[float]
if len(coeffs) != 5:
raise ValueError("coeffs should be in the form [b0 b1 b2 -a1 -a2]")
coeffs = _apply_biquad_bshift(coeffs, b_shift)
coeffs, int_coeffs = _round_to_q30(coeffs)
coeffs, int_coeffs, vpu_coeffs = _round_to_q30(coeffs)

# check filter is stable
poles = np.roots([1, -coeffs[3], -coeffs[4]])
if np.any(np.abs(poles) >= 1):
raise ValueError("Poles lie outside the unit circle, the filter is unstable")

return coeffs, int_coeffs
return coeffs, int_coeffs, vpu_coeffs


def make_biquad_bypass(fs: int) -> list[float]:
Expand Down