forked from oracle-samples/oracle-db-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbulk_aq.py
75 lines (67 loc) · 2.25 KB
/
bulk_aq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#------------------------------------------------------------------------------
# Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# bulk_aq.py
# This script demonstrates how to use bulk enqueuing and dequeuing of
# messages with advanced queuing using cx_Oracle. It makes use of a RAW queue
# created in the sample setup.
#
# This script requires cx_Oracle 8.2 and higher.
#------------------------------------------------------------------------------
import cx_Oracle as oracledb
import sample_env
QUEUE_NAME = "DEMO_RAW_QUEUE"
PAYLOAD_DATA = [
"The first message",
"The second message",
"The third message",
"The fourth message",
"The fifth message",
"The sixth message",
"The seventh message",
"The eighth message",
"The ninth message",
"The tenth message",
"The eleventh message",
"The twelfth and final message"
]
# connect to database
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqone():
pass
# enqueue a few messages
print("Enqueuing messages...")
batch_size = 6
data_to_enqueue = PAYLOAD_DATA
while data_to_enqueue:
batch_data = data_to_enqueue[:batch_size]
data_to_enqueue = data_to_enqueue[batch_size:]
messages = [connection.msgproperties(payload=d) for d in batch_data]
for data in batch_data:
print(data)
queue.enqmany(messages)
connection.commit()
# dequeue the messages
print("\nDequeuing messages...")
batch_size = 8
while True:
messages = queue.deqmany(batch_size)
if not messages:
break
for props in messages:
print(props.payload.decode())
connection.commit()
print("\nDone.")