Skip to content

Commit b9a12a7

Browse files
committed
Read pending events from database on startup for immediate flushing
Closes #85.
1 parent aea33fb commit b9a12a7

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

logstash_async/database.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,12 @@ def vacuum(self):
158158
with self._connect() as connection:
159159
cursor = connection.cursor()
160160
cursor.execute("VACUUM;")
161+
162+
# ----------------------------------------------------------------------
163+
def get_non_flushed_event_count(self):
164+
query_fetch = '''SELECT count(*) FROM `event` WHERE `pending_delete` = 0;'''
165+
with self._connect() as connection:
166+
cursor = connection.cursor()
167+
cursor.execute(query_fetch)
168+
count = cursor.fetchone()[0]
169+
return count

logstash_async/worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ def _setup_database(self):
129129
else:
130130
self._database = MemoryCache(cache=self._memory_cache, event_ttl=self._event_ttl)
131131

132+
self._non_flushed_event_count = self._database.get_non_flushed_event_count()
133+
132134
# ----------------------------------------------------------------------
133135
def _fetch_events(self):
134136
while True:

0 commit comments

Comments
 (0)