Skip to content

Commit

Permalink
Refactor tbm header validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed Aug 28, 2024
1 parent 6040ac4 commit 6e8a726
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 27 deletions.
34 changes: 18 additions & 16 deletions pygac/pod_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

from pygac.clock_offsets_converter import get_offsets
from pygac.correct_tsm_issue import TSM_AFFECTED_INTERVALS_POD, get_tsm_idx
from pygac.reader import NoTLEData, Reader, ReaderError
from pygac.reader import DecodingError, NoTLEData, Reader, ReaderError
from pygac.slerp import slerp
from pygac.utils import file_opener

Expand Down Expand Up @@ -321,24 +321,14 @@ def read_header(cls, filename, fileobj=None, header_date="auto"):
_tbm_head, = np.frombuffer(
fd_.read(tbm_header.itemsize),
dtype=tbm_header, count=1)
for encoding in ("utf-8", "cp500"):
try:
data_set_name = _tbm_head['data_set_name'].decode(encoding)
except ValueError:
continue
else:
break
else:
data_set_name = '---'
allowed_empty = (42*b'\x00' + b' ')
if (cls.data_set_pattern.match(data_set_name)
or (_tbm_head['data_set_name'] == allowed_empty)):
tbm_head = _tbm_head.copy()
try:
tbm_head = cls._validate_tbm_header(_tbm_head)
tbm_offset = tbm_header.itemsize
else:
fd_.seek(0)
except DecodingError:
tbm_head = None
tbm_offset = 0

fd_.seek(tbm_offset, 0)
header = cls.choose_header_based_on_timestamp(header_date, fd_)
fd_.seek(tbm_offset, 0)
# need to copy frombuffer to have write access on head
Expand All @@ -349,6 +339,18 @@ def read_header(cls, filename, fileobj=None, header_date="auto"):
cls._validate_header(head)
return tbm_head, head

@classmethod
def _validate_tbm_header(cls, potential_tbm_header):
data_set_name = potential_tbm_header['data_set_name']
allowed_empty = (42*b'\x00' + b' ')
if data_set_name == allowed_empty:
return potential_tbm_header.copy()

# This will raise a DecodingError if the data_set_name is not valid.
cls._decode_data_set_name(data_set_name)
return potential_tbm_header.copy()


@classmethod
def choose_header_based_on_timestamp(cls, header_date, fd_):
"""Choose the header dtype based on the timestamp."""
Expand Down
28 changes: 17 additions & 11 deletions pygac/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,10 @@ def _correct_data_set_name(cls, header, filename):
filename (str): path to file
"""
filename = str(filename)
for encoding in "utf-8", "cp500":
data_set_name = header['data_set_name']
try:
data_set_name = cls._decode_data_set_name(data_set_name, encoding)
except DecodingError as err:
LOG.debug(str(err))
else:
header["data_set_name"] = data_set_name
break
else:
data_set_name = header['data_set_name']
try:
header["data_set_name"] = cls._decode_data_set_name(data_set_name)
except DecodingError:
LOG.debug(f'The data_set_name in header {header["data_set_name"]} does not match.'
' Use filename instead.')
match = cls.data_set_pattern.search(filename)
Expand All @@ -232,7 +226,19 @@ def _correct_data_set_name(cls, header, filename):
return header

@classmethod
def _decode_data_set_name(cls, data_set_name, encoding):
def _decode_data_set_name(cls, data_set_name):
for encoding in "utf-8", "cp500":
try:
data_set_name = cls._decode_data_set_name_for_encoding(data_set_name, encoding)
except DecodingError as err:
LOG.debug(str(err))
else:
return data_set_name
else:
raise DecodingError("Could not reliably decode the dataset name.")

@classmethod
def _decode_data_set_name_for_encoding(cls, data_set_name, encoding):
data_set_name = data_set_name.decode(encoding, errors='ignore')
if not cls.data_set_pattern.match(data_set_name):
raise DecodingError(f'The data_set_name in header {data_set_name} '
Expand Down

0 comments on commit 6e8a726

Please sign in to comment.