Skip to content

Commit

Permalink
Split TerminatedConvolutionalCode into strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
rwnobrega committed Nov 30, 2024
1 parent 09e853a commit e7f74a1
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 168 deletions.
138 changes: 45 additions & 93 deletions src/komm/_error_control_convolutional/TerminatedConvolutionalCode.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from functools import cached_property
from typing import Literal

import attrs
import numpy as np
import numpy.typing as npt
from attrs import frozen
from numpy.linalg import matrix_power
from attrs import field, frozen
from typing_extensions import override

from .._error_control_block.BlockCode import BlockCode
from .._types import ArrayIntLike
from .._util.bit_operations import binlist2int, int2binlist, pack, unpack
from .._util.matrices import pseudo_inverse
from .._util.bit_operations import int2binlist, unpack
from .ConvolutionalCode import ConvolutionalCode
from .terminations import (
DirectTruncation,
TailBiting,
TerminationStrategy,
ZeroTermination,
)

TerminationMode = Literal["direct-truncation", "zero-termination", "tail-biting"]


@frozen
Expand All @@ -36,16 +44,6 @@ class TerminatedConvolutionalCode(BlockCode):
Examples:
>>> convolutional_code = komm.ConvolutionalCode([[0b1, 0b11]])
>>> code = komm.TerminatedConvolutionalCode(convolutional_code, num_blocks=3, mode='zero-termination')
>>> (code.length, code.dimension, code.redundancy)
(8, 3, 5)
>>> code.generator_matrix
array([[1, 1, 0, 1, 0, 0, 0, 0],
[0, 0, 1, 1, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 1, 0, 1]])
>>> code.minimum_distance()
3
>>> code = komm.TerminatedConvolutionalCode(convolutional_code, num_blocks=3, mode='direct-truncation')
>>> (code.length, code.dimension, code.redundancy)
(6, 3, 3)
Expand All @@ -56,6 +54,16 @@ class TerminatedConvolutionalCode(BlockCode):
>>> code.minimum_distance()
2
>>> code = komm.TerminatedConvolutionalCode(convolutional_code, num_blocks=3, mode='zero-termination')
>>> (code.length, code.dimension, code.redundancy)
(8, 3, 5)
>>> code.generator_matrix
array([[1, 1, 0, 1, 0, 0, 0, 0],
[0, 0, 1, 1, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 1, 0, 1]])
>>> code.minimum_distance()
3
>>> code = komm.TerminatedConvolutionalCode(convolutional_code, num_blocks=3, mode='tail-biting')
>>> (code.length, code.dimension, code.redundancy)
(6, 3, 3)
Expand All @@ -69,110 +77,54 @@ class TerminatedConvolutionalCode(BlockCode):

convolutional_code: ConvolutionalCode
num_blocks: int
mode: Literal["direct-truncation", "zero-termination", "tail-biting"] = (
"zero-termination"
mode: TerminationMode = field(
default="zero-termination",
validator=attrs.validators.in_(TerminationMode.__args__),
)

def __attrs_post_init__(self) -> None:
if self.mode == "tail-biting":
try:
self.zs_multiplier
except:
raise ValueError(
"this convolutional code does not support tail-biting for this"
" number of blocks"
)
@cached_property
def _strategy(self) -> TerminationStrategy:
return {
"direct-truncation": DirectTruncation,
"zero-termination": ZeroTermination,
"tail-biting": TailBiting,
}[self.mode](self.convolutional_code, self.num_blocks)

@property
@override
def length(self) -> int:
total_num_blocks = self.num_blocks
if self.mode == "zero-termination":
total_num_blocks += self.convolutional_code.memory_order
return total_num_blocks * self.convolutional_code.num_output_bits
return self._strategy.codeword_length()

@property
@override
def dimension(self) -> int:
return self.num_blocks * self.convolutional_code.num_input_bits

@cached_property
@override
def generator_matrix(self) -> npt.NDArray[np.int_]:
convolutional_code = self.convolutional_code
k0, n0 = convolutional_code.num_input_bits, convolutional_code.num_output_bits
k, n = self.dimension, self.length
generator_matrix = np.zeros((k, n), dtype=int)
top_rows = np.apply_along_axis(self.enc_mapping, 1, np.eye(k0, k, dtype=int))
for t in range(self.num_blocks):
generator_matrix[k0 * t : k0 * (t + 1), :] = np.roll(
top_rows, shift=n0 * t, axis=1
)
if self.mode == "direct-truncation":
generator_matrix[k0 * t : k0 * (t + 1), : n0 * t] = 0
return generator_matrix
return self._strategy.generator_matrix(self)

@override
def enc_mapping(self, u: ArrayIntLike) -> npt.NDArray[np.int_]:
convolutional_code = self.convolutional_code
k0, n0, nu, fsm = (
convolutional_code.num_input_bits,
convolutional_code.num_output_bits,
convolutional_code.overall_constraint_length,
convolutional_code.finite_state_machine,
)
if self.mode == "direct-truncation":
input_sequence = pack(u, width=k0)
initial_state = 0
elif self.mode == "zero-termination":
tail = np.dot(u, self.tail_projector) % 2
input_sequence = pack(np.concatenate([u, tail]), width=k0)
initial_state = 0
else: # self.mode == "tail-biting"
# See [WBR01, Sec III.B].
input_sequence = pack(u, width=k0)
_, zs_response = fsm.process(input_sequence, initial_state=0)
initial_state = binlist2int(
np.dot(int2binlist(zs_response, width=nu), self.zs_multiplier) % 2
)

output_sequence, _ = fsm.process(input_sequence, initial_state)
n0 = self.convolutional_code.num_output_bits
fsm = self.convolutional_code.finite_state_machine
input_bits = self._strategy.pre_process_input(u)
initial_state = self._strategy.initial_state(input_bits)
output_sequence, _ = fsm.process(input_bits, initial_state)
v = unpack(output_sequence, width=n0)
return v

@property
@override
def default_decoder(self) -> str:
return "viterbi-hard"

@classmethod
@override
def supported_decoders(cls) -> list[str]:
return cls.__base__.supported_decoders() + ["viterbi-hard", "viterbi-soft", "bcjr"] # type: ignore

@cached_property
def tail_projector(self) -> npt.NDArray[np.int_]:
if self.mode != "zero-termination":
raise ValueError(
"this property is only defined for mode='zero-termination'"
)
h = self.num_blocks
mu = self.convolutional_code.memory_order
A_mat = self.convolutional_code.state_matrix
B_mat = self.convolutional_code.control_matrix
AnB_message = np.vstack([
np.dot(B_mat, matrix_power(A_mat, j)) % 2
for j in range(mu + h - 1, mu - 1, -1)
])
AnB_tail = np.vstack(
[np.dot(B_mat, matrix_power(A_mat, j)) % 2 for j in range(mu - 1, -1, -1)]
)
return np.dot(AnB_message, pseudo_inverse(AnB_tail)) % 2

@cached_property
def zs_multiplier(self) -> npt.NDArray[np.int_]:
# See [WBR01, eq. (4)].
if self.mode != "tail-biting":
raise ValueError("this property is only defined for mode='tail-biting'")
h = self.num_blocks
nu = self.convolutional_code.overall_constraint_length
A_mat = self.convolutional_code.state_matrix
return pseudo_inverse((matrix_power(A_mat, h) + np.eye(nu, dtype=int)) % 2)

@cached_property
def cache_bit(self) -> npt.NDArray[np.int_]:
n0 = self.convolutional_code.num_output_bits
Expand Down
128 changes: 128 additions & 0 deletions src/komm/_error_control_convolutional/terminations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from abc import ABC, abstractmethod

import numpy as np
import numpy.typing as npt
from attrs import frozen
from numpy.linalg import matrix_power

from .._error_control_block import BlockCode
from .._types import ArrayIntLike
from .._util.bit_operations import binlist2int, int2binlist, pack
from .._util.matrices import pseudo_inverse
from .ConvolutionalCode import ConvolutionalCode


@frozen
class TerminationStrategy(ABC):
convolutional_code: "ConvolutionalCode"
num_blocks: int

@abstractmethod
def initial_state(self, input_bits: ArrayIntLike) -> int: ...

@abstractmethod
def pre_process_input(self, input_bits: ArrayIntLike) -> npt.NDArray[np.int_]: ...

@abstractmethod
def codeword_length(self) -> int: ...

@abstractmethod
def generator_matrix(self, code: BlockCode) -> npt.NDArray[np.int_]: ...


def _base_generator_matrix(
code: BlockCode, convolutional_code: ConvolutionalCode, num_blocks: int
) -> npt.NDArray[np.int_]:
k0 = convolutional_code.num_input_bits
n0 = convolutional_code.num_output_bits
k, n = code.dimension, code.length
generator_matrix = np.zeros((k, n), dtype=int)
top_rows = np.apply_along_axis(code.enc_mapping, 1, np.eye(k0, k, dtype=int))
for t in range(num_blocks):
generator_matrix[k0 * t : k0 * (t + 1), :] = np.roll(top_rows, n0 * t, 1)
return generator_matrix


@frozen
class DirectTruncation(TerminationStrategy):
def initial_state(self, input_bits: ArrayIntLike) -> int:
return 0

def pre_process_input(self, input_bits: ArrayIntLike) -> npt.NDArray[np.int_]:
n = self.convolutional_code.num_input_bits
return pack(input_bits, width=n)

def codeword_length(self) -> int:
h = self.num_blocks
n = self.convolutional_code.num_output_bits
return h * n

def generator_matrix(self, code: BlockCode) -> npt.NDArray[np.int_]:
h = self.num_blocks
k0 = self.convolutional_code.num_input_bits
n0 = self.convolutional_code.num_output_bits
generator_matrix = _base_generator_matrix(code, self.convolutional_code, h)
for t in range(1, h):
generator_matrix[k0 * t : k0 * (t + 1), : n0 * t] = 0
return generator_matrix


@frozen
class ZeroTermination(TerminationStrategy):
def initial_state(self, input_bits: npt.ArrayLike) -> int:
return 0

def pre_process_input(self, input_bits: ArrayIntLike) -> npt.NDArray[np.int_]:
n = self.convolutional_code.num_input_bits
tail = input_bits @ self._tail_projector() % 2
return pack(np.concatenate([input_bits, tail]), width=n)

def codeword_length(self) -> int:
h = self.num_blocks
n = self.convolutional_code.num_output_bits
m = self.convolutional_code.memory_order
return (h + m) * n

def generator_matrix(self, code: BlockCode) -> npt.NDArray[np.int_]:
return _base_generator_matrix(code, self.convolutional_code, self.num_blocks)

def _tail_projector(self) -> npt.NDArray[np.int_]:
h = self.num_blocks
mu = self.convolutional_code.memory_order
A_mat = self.convolutional_code.state_matrix
B_mat = self.convolutional_code.control_matrix
AnB_message = np.vstack(
[B_mat @ matrix_power(A_mat, j) % 2 for j in range(mu + h - 1, mu - 1, -1)]
)
AnB_tail = np.vstack(
[B_mat @ matrix_power(A_mat, j) % 2 for j in range(mu - 1, -1, -1)]
)
return AnB_message @ pseudo_inverse(AnB_tail) % 2


@frozen
class TailBiting(TerminationStrategy):
def initial_state(self, input_bits: ArrayIntLike) -> int:
fsm = self.convolutional_code.finite_state_machine
nu = self.convolutional_code.overall_constraint_length
_, zs_response = fsm.process(input_bits, initial_state=0)
zs_response = int2binlist(zs_response, width=nu)
return binlist2int(zs_response @ self._zs_multiplier() % 2)

def pre_process_input(self, input_bits: ArrayIntLike) -> npt.NDArray[np.int_]:
n = self.convolutional_code.num_input_bits
return pack(input_bits, width=n)

def codeword_length(self) -> int:
h = self.num_blocks
n = self.convolutional_code.num_output_bits
return h * n

def generator_matrix(self, code: BlockCode) -> npt.NDArray[np.int_]:
return _base_generator_matrix(code, self.convolutional_code, self.num_blocks)

def _zs_multiplier(self) -> npt.NDArray[np.int_]:
h = self.num_blocks
nu = self.convolutional_code.overall_constraint_length
A_mat = self.convolutional_code.state_matrix
return pseudo_inverse(matrix_power(A_mat, h) + np.eye(nu, dtype=int) % 2)
Loading

0 comments on commit e7f74a1

Please sign in to comment.