@@ -35,7 +35,7 @@ class Charge:
35
35
def __init__ (self , server ):
36
36
self .server = server
37
37
38
- async def on (self , category : str = 'no_category' , stream : str = 'documents' , create : bool = False ):
38
+ async def on (self , category : str = 'no_category' , stream : str = 'documents' , create : bool = False , workgroup : bool = False ):
39
39
"""
40
40
Connects to the NATS server, creates a stream and category if they don't exist, and prints a success message.
41
41
@@ -54,7 +54,7 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', cre
54
54
if self .stream not in [x .config .name for x in streams ] or self .category not in sum ([x .config .subjects for x in streams ], []):
55
55
try :
56
56
if self .stream not in [x .config .name for x in streams ]:
57
- _f ("wait" , f'creating { self .stream } ' ), await self .js .add_stream (name = self .stream , subjects = [self .category ], retention = 'workqueue' , num_replicas = 3 ) \
57
+ _f ("wait" , f'creating { self .stream } ' ), await self .js .add_stream (name = self .stream , subjects = [self .category ], retention = 'workqueue' if workgroup else None , num_replicas = 3 ) \
58
58
if create else _f ("warn" , f"couldn't create { stream } on { self .server } " )
59
59
streams = await self .js .streams_info ()
60
60
if self .category not in sum ([x .config .subjects for x in streams if x .config .name == self .stream ], []):
0 commit comments