Skip to content

Commit

Permalink
feat: Add more settings and tunability to snapshot table and dictiona…
Browse files Browse the repository at this point in the history
…ry (#28062)
  • Loading branch information
tkaemming authored Jan 30, 2025
1 parent 61c51df commit fdee4c8
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions dags/person_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def populate(self, client: Client, timestamp: str, limit: int | None = None) ->
{limit_clause}
""",
{"timestamp": timestamp},
settings={
"optimize_aggregation_in_order": 1, # slows down the query, but reduces memory consumption dramatically
},
)

def sync(self, client: Client) -> None:
Expand All @@ -110,7 +113,7 @@ def name(self) -> str:
def qualified_name(self):
return f"{settings.CLICKHOUSE_DATABASE}.{self.name}"

def create(self, client: Client, shards: int, max_execution_time: int) -> None:
def create(self, client: Client, shards: int, max_execution_time: int, max_memory_usage: int) -> None:
client.execute(
f"""
CREATE DICTIONARY IF NOT EXISTS {self.qualified_name} (
Expand All @@ -123,7 +126,7 @@ def create(self, client: Client, shards: int, max_execution_time: int) -> None:
SOURCE(CLICKHOUSE(DB %(database)s TABLE %(table)s PASSWORD %(password)s))
LAYOUT(COMPLEX_KEY_HASHED(SHARDS {shards}))
LIFETIME(0)
SETTINGS(max_execution_time={max_execution_time})
SETTINGS(max_execution_time={max_execution_time}, max_memory_usage={max_memory_usage})
""",
{
"database": settings.CLICKHOUSE_DATABASE,
Expand Down Expand Up @@ -269,6 +272,10 @@ class SnapshotDictionaryConfig(dagster.Config):
description="The maximum amount of time to wait for the dictionary to be loaded before considering the operation "
"a failure, or 0 to wait an unlimited amount of time.",
)
max_memory_usage: int = pydantic.Field(
default=0,
description="The maximum amount of memory to use for the dictionary, or 0 to use an unlimited amount.",
)


@dagster.op
Expand All @@ -284,6 +291,7 @@ def create_snapshot_dictionary(
dictionary.create,
shards=config.shards,
max_execution_time=config.max_execution_time,
max_memory_usage=config.max_memory_usage,
)
).result()
return dictionary
Expand Down

0 comments on commit fdee4c8

Please sign in to comment.