Skip to content

Commit

Permalink
Potentially use subset of message types in PITR map
Browse files Browse the repository at this point in the history
  • Loading branch information
Garrett McGrath committed Dec 20, 2023
1 parent 5f1fa72 commit 51790ef
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions per-message-s3-exporter/firehose_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
from pathlib import Path
import re
import ssl
import time
import warnings
Expand All @@ -15,6 +16,7 @@

import attr

EVENTS_RE: re.Pattern = re.compile(r"events\s+\"(?P<events>[a-z_ ]+)\"")
FIREHOSE_MESSAGE_TYPES: List[str] = [
"arrival",
"cancellation",
Expand Down Expand Up @@ -62,7 +64,17 @@ def pitr_map_location() -> Path:
return Path(os.getenv("PITR_MAP", "/home/firestarter/pitrs/.pitrs-map"))


def pitr_map_from_file(init_time: str) -> Dict[str, int]:
def pitr_map_message_types(init_args: str) -> List[str]:
"""Return a list of all requested message types or all valid message types
if the initiation command does not specify a subset"""
events_match = EVENTS_RE.match(init_time)
if events_match is None:
return FIREHOSE_MESSAGE_TYPES

return events_match["events"]


def pitr_map_from_file(init_time: str, init_args: str) -> Dict[str, int]:
"""Load PITR map from file, filling in a PITR value based on the init_time
when appropriate
Expand All @@ -78,7 +90,8 @@ def pitr_map_from_file(init_time: str) -> Dict[str, int]:
return {}

start_pitr_from_env = int(init_time.split()[-1])
pitr_map = {message_type: start_pitr_from_env for message_type in FIREHOSE_MESSAGE_TYPES}
message_types = pitr_map_message_types(init_args)
pitr_map = {message_type: start_pitr_from_env for message_type in message_types}

logging.info(f"Fetching start PITRs from {pitr_map_path}")
with pitr_map_path.open(encoding="utf-8") as pitr_map_file:
Expand Down Expand Up @@ -117,13 +130,29 @@ def from_env(cls):
init_time = os.environ.get("INIT_CMD_TIME", "live")
init_time_split = init_time.split()

init_args = os.environ.get("INIT_CMD_ARGS", "")
for command in [
"live",
"pitr",
"range",
"compression",
"keepalive",
"username",
"password",
]:
if command in init_args.split():
raise ValueError(
f'$INIT_CMD_ARGS should not contain the "{command}" command. '
"It belongs in its own variable."
)

if init_time_split[0] not in ("live", "pitr", "range"):
raise ValueError(
'$INIT_CMD_TIME value is invalid, should be "live", '
'"pitr <pitr>" or "range <start> <end>"'
)

pitr_map = pitr_map_from_file(init_time)
pitr_map = pitr_map_from_file(init_time, init_args)
if pitr_map:
min_pitr = min(pitr_map.values())
logging.info(f"Based on PITR map {pitr_map}")
Expand All @@ -135,22 +164,6 @@ def from_env(cls):
init_time_split[1] = f"{min_pitr}"
init_time = " ".join(init_time_split)

init_args = os.environ.get("INIT_CMD_ARGS", "")
for command in [
"live",
"pitr",
"range",
"compression",
"keepalive",
"username",
"password",
]:
if command in init_args.split():
raise ValueError(
f'$INIT_CMD_ARGS should not contain the "{command}" command. '
"It belongs in its own variable."
)

return cls(
username=username,
password=apikey,
Expand Down

0 comments on commit 51790ef

Please sign in to comment.