Skip to content

Commit ef1a991

Browse files
committed
review comments
1 parent 7d43b6a commit ef1a991

File tree

7 files changed

+51
-46
lines changed

7 files changed

+51
-46
lines changed

quixstreams/dataframe/windows/sliding.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def process_window(
6565
# Sliding windows are inclusive on both ends, so values with
6666
# timestamps equal to latest_timestamp - duration - grace
6767
# are still eligible for processing.
68-
state_ts = state.get_highest_id() or 0
68+
state_ts = state.get_latest_timestamp() or 0
6969
latest_timestamp = max(timestamp_ms, state_ts)
7070
max_expired_window_end = latest_timestamp - grace - 1
7171
max_expired_window_start = max_expired_window_end - duration

quixstreams/dataframe/windows/time_based.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def process_window(
6868
step_ms=self._step_ms,
6969
)
7070

71-
state_ts = state.get_highest_id() or 0
71+
state_ts = state.get_latest_timestamp() or 0
7272
latest_timestamp = max(timestamp_ms, state_ts)
7373
max_expired_window_end = latest_timestamp - grace_ms
7474
max_expired_window_start = max_expired_window_end - duration_ms

quixstreams/state/rocksdb/windowed/state.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,16 @@ def delete_from_collection(self, end: int) -> None:
105105
"""
106106
return self._transaction.delete_from_collection(end=end, prefix=self._prefix)
107107

108-
def get_highest_id(self) -> Optional[int]:
108+
def get_latest_timestamp(self) -> Optional[int]:
109109
"""
110-
Get the highest observed message ID for the current message key.
110+
Get the latest observed timestamp for the current message key.
111111
112-
Use this ID to determine if the arriving event is late and should be
112+
Use this timestamp to determine if the arriving event is late and should be
113113
discarded from the processing.
114114
115-
:return: latest observed event ID
115+
:return: latest observed timestamp
116116
"""
117-
return self._transaction.get_highest_id(prefix=self._prefix)
117+
return self._transaction.get_latest_timestamp(prefix=self._prefix)
118118

119119
def expire_windows(
120120
self,

quixstreams/state/rocksdb/windowed/transaction.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ def as_state(self, prefix: Any = DEFAULT_PREFIX) -> WindowedTransactionState: #
9898
),
9999
)
100100

101-
def get_highest_id(self, prefix: bytes) -> Optional[int]:
102-
return self._get_highest_id(
101+
def get_latest_timestamp(self, prefix: bytes) -> Optional[int]:
102+
return self._get_latest_timestamp(
103103
prefix=prefix, cache=self._latest_timestamps, default=0
104104
)
105105

@@ -128,17 +128,17 @@ def update_window(
128128

129129
key = encode_integer_pair(start_ms, end_ms)
130130
self.set(key=key, value=value, prefix=prefix)
131-
latest_timestamp_ms = self.get_highest_id(prefix=prefix)
131+
latest_timestamp_ms = self.get_latest_timestamp(prefix=prefix)
132132
updated_timestamp_ms = (
133133
max(latest_timestamp_ms, timestamp_ms)
134134
if latest_timestamp_ms is not None
135135
else timestamp_ms
136136
)
137137

138-
self._set_highest_id(
138+
self._set_latest_timestamp(
139139
cache=self._latest_timestamps,
140140
prefix=prefix,
141-
id=updated_timestamp_ms,
141+
timestamp_ms=updated_timestamp_ms,
142142
)
143143

144144
def add_to_collection(
@@ -164,7 +164,7 @@ def get_from_collection(self, start: int, end: int, prefix: bytes) -> list[Any]:
164164

165165
def delete_from_collection(self, end: int, prefix: bytes) -> None:
166166
start = (
167-
self._get_highest_id(
167+
self._get_latest_timestamp(
168168
cache=self._last_deleted_value_timestamps, prefix=prefix
169169
)
170170
or -1
@@ -180,10 +180,10 @@ def delete_from_collection(self, end: int, prefix: bytes) -> None:
180180
self.delete(key=key, prefix=prefix, cf_name=VALUES_CF_NAME)
181181

182182
if last_deleted_id is not None:
183-
self._set_highest_id(
183+
self._set_latest_timestamp(
184184
cache=self._last_deleted_value_timestamps,
185185
prefix=prefix,
186-
id=last_deleted_id,
186+
timestamp_ms=last_deleted_id,
187187
)
188188

189189
def delete_window(self, start_ms: int, end_ms: int, prefix: bytes):
@@ -233,7 +233,7 @@ def expire_windows(
233233
start_from = -1
234234

235235
# Find the latest start timestamp of the expired windows for the given key
236-
last_expired = self._get_highest_id(
236+
last_expired = self._get_latest_timestamp(
237237
cache=self._last_expired_timestamps, prefix=prefix
238238
)
239239
if last_expired is not None:
@@ -253,10 +253,10 @@ def expire_windows(
253253
latest_window = expired_windows[-1]
254254
last_expired__gt = latest_window[0][0]
255255

256-
self._set_highest_id(
256+
self._set_latest_timestamp(
257257
cache=self._last_expired_timestamps,
258258
prefix=prefix,
259-
id=last_expired__gt,
259+
timestamp_ms=last_expired__gt,
260260
)
261261

262262
# Collect values into windows
@@ -318,7 +318,7 @@ def delete_windows(
318318
start_from = -1
319319

320320
# Find the latest start timestamp of the deleted windows for the given key
321-
last_deleted = self._get_highest_id(
321+
last_deleted = self._get_latest_timestamp(
322322
cache=self._last_deleted_window_timestamps, prefix=prefix
323323
)
324324
if last_deleted is not None:
@@ -337,10 +337,10 @@ def delete_windows(
337337

338338
# Save the start of the latest deleted window to the deletion index
339339
if last_deleted__gt:
340-
self._set_highest_id(
340+
self._set_latest_timestamp(
341341
cache=self._last_deleted_window_timestamps,
342342
prefix=prefix,
343-
id=last_deleted__gt,
343+
timestamp_ms=last_deleted__gt,
344344
)
345345

346346
if delete_values:
@@ -445,7 +445,7 @@ def _get_items(
445445
# Sort and deserialize items merged from the cache and store
446446
return sorted(merged_items.items(), key=lambda kv: kv[0], reverse=backwards)
447447

448-
def _get_highest_id(
448+
def _get_latest_timestamp(
449449
self, cache: TimestampsCache, prefix: bytes, default: Any = None
450450
) -> Optional[int]:
451451
cached_ts = cache.timestamps.get(prefix)
@@ -464,11 +464,13 @@ def _get_highest_id(
464464
cache.timestamps[prefix] = stored_ts
465465
return stored_ts
466466

467-
def _set_highest_id(self, cache: TimestampsCache, prefix: bytes, id: int):
468-
cache.timestamps[prefix] = id
467+
def _set_latest_timestamp(
468+
self, cache: TimestampsCache, prefix: bytes, timestamp_ms: int
469+
):
470+
cache.timestamps[prefix] = timestamp_ms
469471
self.set(
470472
key=cache.key,
471-
value=id,
473+
value=timestamp_ms,
472474
prefix=prefix,
473475
cf_name=cache.cf_name,
474476
)

quixstreams/state/types.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def delete_from_collection(self, end: int) -> None:
116116
"""
117117
...
118118

119-
def get_highest_id(self) -> Optional[int]:
119+
def get_latest_timestamp(self) -> Optional[int]:
120120
"""
121121
Get the latest observed message ID for the current state prefix
122122
(same as message key).
@@ -300,15 +300,14 @@ def delete_from_collection(self, end: int) -> None:
300300
"""
301301
...
302302

303-
def get_highest_id(self, prefix: bytes) -> int:
303+
def get_latest_timestamp(self, prefix: bytes) -> int:
304304
"""
305-
Get the latest observed message ID for the current state prefix
306-
(same as message key).
305+
Get the latest observed timestamp for the current message key.
307306
308-
Use this ID to determine if the arriving event is late and should be
307+
Use this timestamp to determine if the arriving event is late and should be
309308
discarded from the processing.
310309
311-
:return: latest observed event ID
310+
:return: latest observed timestamp
312311
"""
313312
...
314313

tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_state.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def test_expire_windows(transaction_state, delete):
5959

6060
with transaction_state() as state:
6161
state.update_window(start_ms=20, end_ms=30, value=3, timestamp_ms=20)
62-
max_start_time = state.get_highest_id() - duration_ms
62+
max_start_time = state.get_latest_timestamp() - duration_ms
6363
expired = state.expire_windows(max_start_time=max_start_time, delete=delete)
6464
# "expire_windows" must update the expiration index so that the same
6565
# windows are not expired twice
@@ -96,7 +96,7 @@ def test_expire_windows_with_collect(transaction_state, end_inclusive):
9696

9797
with transaction_state() as state:
9898
state.update_window(start_ms=20, end_ms=30, value=None, timestamp_ms=20)
99-
max_start_time = state.get_highest_id() - duration_ms
99+
max_start_time = state.get_latest_timestamp() - duration_ms
100100
expired = state.expire_windows(
101101
max_start_time=max_start_time,
102102
collect=True,
@@ -122,14 +122,14 @@ def test_same_keys_in_db_and_update_cache(transaction_state):
122122
state.update_window(start_ms=0, end_ms=10, value=3, timestamp_ms=8)
123123

124124
state.update_window(start_ms=10, end_ms=20, value=2, timestamp_ms=10)
125-
max_start_time = state.get_highest_id() - duration_ms
125+
max_start_time = state.get_latest_timestamp() - duration_ms
126126
expired = state.expire_windows(max_start_time=max_start_time)
127127

128128
# Value from the cache takes precedence over the value in the db
129129
assert expired == [((0, 10), 3)]
130130

131131

132-
def test_get_highest_id(windowed_rocksdb_store_factory):
132+
def test_get_latest_timestamp(windowed_rocksdb_store_factory):
133133
store = windowed_rocksdb_store_factory()
134134
partition = store.assign_partition(0)
135135
timestamp = 123
@@ -141,7 +141,7 @@ def test_get_highest_id(windowed_rocksdb_store_factory):
141141

142142
partition = store.assign_partition(0)
143143
with partition.begin() as tx:
144-
assert tx.get_highest_id(prefix=prefix) == timestamp
144+
assert tx.get_latest_timestamp(prefix=prefix) == timestamp
145145

146146

147147
@pytest.mark.parametrize(

tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def test_expire_windows_expired(self, windowed_rocksdb_store_factory, delete):
6262
tx.update_window(
6363
start_ms=20, end_ms=30, value=3, timestamp_ms=20, prefix=prefix
6464
)
65-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
65+
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
6666
expired = tx.expire_windows(
6767
max_start_time=max_start_time, prefix=prefix, delete=delete
6868
)
@@ -112,7 +112,7 @@ def test_expire_windows_cached(self, windowed_rocksdb_store_factory, delete):
112112
tx.update_window(
113113
start_ms=20, end_ms=30, value=3, timestamp_ms=20, prefix=prefix
114114
)
115-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
115+
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
116116
expired = tx.expire_windows(
117117
max_start_time=max_start_time, prefix=prefix, delete=delete
118118
)
@@ -156,7 +156,7 @@ def test_expire_windows_empty(self, windowed_rocksdb_store_factory):
156156
tx.update_window(
157157
start_ms=3, end_ms=13, value=1, timestamp_ms=3, prefix=prefix
158158
)
159-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
159+
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
160160
assert not tx.expire_windows(max_start_time=max_start_time, prefix=prefix)
161161

162162
def test_expire_windows_with_grace_expired(self, windowed_rocksdb_store_factory):
@@ -175,7 +175,9 @@ def test_expire_windows_with_grace_expired(self, windowed_rocksdb_store_factory)
175175
tx.update_window(
176176
start_ms=15, end_ms=25, value=1, timestamp_ms=15, prefix=prefix
177177
)
178-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms - grace_ms
178+
max_start_time = (
179+
tx.get_latest_timestamp(prefix=prefix) - duration_ms - grace_ms
180+
)
179181
expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix)
180182

181183
assert len(expired) == 1
@@ -197,7 +199,9 @@ def test_expire_windows_with_grace_empty(self, windowed_rocksdb_store_factory):
197199
tx.update_window(
198200
start_ms=13, end_ms=23, value=1, timestamp_ms=13, prefix=prefix
199201
)
200-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms - grace_ms
202+
max_start_time = (
203+
tx.get_latest_timestamp(prefix=prefix) - duration_ms - grace_ms
204+
)
201205
expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix)
202206

203207
assert not expired
@@ -276,7 +280,7 @@ def test_expire_windows_no_expired(self, windowed_rocksdb_store_factory):
276280
)
277281
# "expire_windows" must update the expiration index so that the same
278282
# windows are not expired twice
279-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
283+
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
280284
assert not tx.expire_windows(max_start_time=max_start_time, prefix=prefix)
281285

282286
def test_expire_windows_multiple_windows(self, windowed_rocksdb_store_factory):
@@ -302,7 +306,7 @@ def test_expire_windows_multiple_windows(self, windowed_rocksdb_store_factory):
302306
)
303307
# "expire_windows" must update the expiration index so that the same
304308
# windows are not expired twice
305-
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
309+
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
306310
expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix)
307311

308312
assert len(expired) == 3
@@ -319,7 +323,7 @@ def test_get_highest_id_update(self, windowed_rocksdb_store_factory):
319323
tx.update_window(0, 10, value=1, timestamp_ms=timestamp, prefix=prefix)
320324

321325
with partition.begin() as tx:
322-
assert tx.get_highest_id(prefix=prefix) == timestamp
326+
assert tx.get_latest_timestamp(prefix=prefix) == timestamp
323327

324328
def test_get_highest_id_cannot_go_backwards(self, windowed_rocksdb_store_factory):
325329
store = windowed_rocksdb_store_factory()
@@ -329,10 +333,10 @@ def test_get_highest_id_cannot_go_backwards(self, windowed_rocksdb_store_factory
329333
with partition.begin() as tx:
330334
tx.update_window(0, 10, value=1, timestamp_ms=timestamp, prefix=prefix)
331335
tx.update_window(0, 10, value=1, timestamp_ms=timestamp - 1, prefix=prefix)
332-
assert tx.get_highest_id(prefix=prefix) == timestamp
336+
assert tx.get_latest_timestamp(prefix=prefix) == timestamp
333337

334338
with partition.begin() as tx:
335-
assert tx.get_highest_id(prefix=prefix) == timestamp
339+
assert tx.get_latest_timestamp(prefix=prefix) == timestamp
336340

337341
def test_update_window_and_prepare(
338342
self, windowed_rocksdb_partition_factory, changelog_producer_mock

0 commit comments

Comments
 (0)