-
Notifications
You must be signed in to change notification settings - Fork 134
/
putget_test.py
293 lines (241 loc) · 11.6 KB
/
putget_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import pytest
import time
import logging
from cassandra import ConsistencyLevel
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from dtest_setup_overrides import DTestSetupOverrides
from dtest import Tester, create_ks, create_cf
from tools.data import (create_c1c2_table, insert_c1c2, insert_columns, putget,
query_c1c2, query_columns, range_putget)
from tools.misc import ImmutableMapping, retry_till_success
since = pytest.mark.since
ported_to_in_jvm = pytest.mark.ported_to_in_jvm
logger = logging.getLogger(__name__)
@ported_to_in_jvm('4.0')
class TestPutGet(Tester):
@pytest.fixture(scope='function', autouse=True)
def fixture_dtest_setup_overrides(self, dtest_config):
dtest_setup_overrides = DTestSetupOverrides()
dtest_setup_overrides.cluster_options = ImmutableMapping({'start_rpc': 'true'})
return dtest_setup_overrides
def test_putget(self):
""" Simple put/get on a single row, hitting multiple sstables """
self._putget()
def test_putget_snappy(self):
""" Simple put/get on a single row, but hitting multiple sstables (with snappy compression) """
self._putget(compression="Snappy")
def test_putget_deflate(self):
""" Simple put/get on a single row, but hitting multiple sstables (with deflate compression) """
self._putget(compression="Deflate")
# Simple queries, but with flushes in between inserts to make sure we hit
# sstables (and more than one) on reads
def _putget(self, compression=None):
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 3)
create_cf(session, 'cf', compression=compression)
putget(cluster, session)
def test_non_local_read(self):
""" This test reads from a coordinator we know has no copy of the data """
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_c1c2_table(self, session)
# insert and get at CL.QUORUM (since RF=2, node1 won't have all key locally)
insert_c1c2(session, n=1000, consistency=ConsistencyLevel.QUORUM)
for n in range(0, 1000):
query_c1c2(session, n, ConsistencyLevel.QUORUM)
def test_rangeputget(self):
""" Simple put/get on ranges of rows, hitting multiple sstables """
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_cf(session, 'cf')
range_putget(cluster, session)
def test_wide_row(self):
""" Test wide row slices """
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
create_cf(session, 'cf')
key = 'wide'
for x in range(1, 5001):
insert_columns(self, session, key, 100, offset=x - 1)
for size in (10, 100, 1000):
for x in range(1, (50001 - size) // size):
query_columns(self, session, key, size, offset=x * size - 1)
@pytest.mark.no_vnodes
@since('2.0', max_version='4')
def test_wide_slice(self):
"""
Check slicing a wide row.
See https://issues.apache.org/jira/browse/CASSANDRA-4919
From Sylvain about duplicating:
Ok, so now that I think about it, you can't reproduce that with CQL currently.
You'll have to use the thrift get_paged_slice call as it's the only way to
trigger this.
Then, I think you'll be able to reproduce with the following steps:
1) you'd want to use 2 nodes with RF=1 and with ByteOrderedPartitioner (it's
possible to reproduce with a random partitioner but a tad more painful)
2) picks token for the nodes so that you know what goes on which node. For
example you may want that any row key starting with 'a' goes on node1, and
anything starting with a 'b' goes on node 2.
3) insers data that span the two nodes. Say inserts 20 rows 'a0' ... 'a9' and
'b0' ...'b9' (so 10 rows on each node) with say 10 columns on row.
4) then do a get_paged_slice for keys 'a5' to 'b4' and for the column filter, a
slice filter that picks the fifth last columns.
5) the get_paged_slice is supposed to return 95 columns (it should return the 5
last columns of a5 and then all 10 columns for 'a6' to 'b4'), but without
CASSANDRA-4919 it will return 90 columns only (it will only return the 5 last
columns of 'b0').
"""
cluster = self.cluster
cluster.set_configuration_options(values={'partitioner': 'org.apache.cassandra.dht.ByteOrderedPartitioner'})
cluster.populate(2)
node1, node2 = cluster.nodelist()
node1.set_configuration_options(values={'initial_token': 'a'.encode().hex()})
node1.set_configuration_options(values={'initial_token': 'b'.encode().hex()})
cluster.start()
time.sleep(.5)
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
query = """
CREATE TABLE test (
k text,
column1 text,
value text,
PRIMARY KEY (k, column1)
) WITH COMPACT STORAGE;
"""
session.execute(query)
time.sleep(.5)
for i in range(10):
key_num = str(i).zfill(2)
for j in range(10):
stmt = "INSERT INTO test (k, column1, value) VALUES ('a%s', 'col%s', '%s')" % (key_num, j, j)
session.execute(stmt)
stmt = "INSERT INTO test (k, column1, value) VALUES ('b%s', 'col%s', '%s')" % (key_num, j, j)
session.execute(stmt)
session.shutdown()
tc = ThriftConnection(node1, ks_name='ks', cf_name='test')
tc.use_ks()
# Slice on the keys
rnge = tc.Cassandra.KeyRange(
start_key="a%s" % ('5'.zfill(2)),
end_key="b%s" % ('4'.zfill(2)),
count=9999,
)
rows = tc.client.get_paged_slice(
column_family='test',
range=rnge,
start_column='col5',
consistency_level=tc.Cassandra.ConsistencyLevel.ONE,
)
keys = [fd.key for fd in rows]
columns = []
for row in rows:
cols = [col.column.name for col in row.columns]
columns.extend(cols)
# print row.key
# print cols
assert len(columns) == 95, "Regression in cassandra-4919. Expected 95 columns == got {}.".format(len(columns))
class ThriftConnection(object):
"""
A thrift connection. For when CQL doesn't do what we need.
"""
def __init__(self, node=None, host=None, port=None, ks_name='ks', cf_name='cf',
cassandra_interface='11'):
"""
initializes the connection.
- node: a ccm node. If supplied, the host and port, and cassandra_interface
will be pulled from the node.
- host, port: overwritten if node is supplied
- ks_name, cf_name: all operations are done on the supplied ks and cf
- cassandra_interface: '07' and '11' are currently supported. This is the
thrift interface to cassandra. '11' suffices for now except when creating
keyspaces against cassandra0.7, in which case 07 must be used.
"""
if node:
host, port = node.network_interfaces['thrift']
self.node = node
self.host = host
self.port = port
self.cassandra_interface = cassandra_interface
# import the correct version of the cassandra thrift interface
# and set self.Cassandra as the imported module
module_name = 'cassandra-thrift.v%s' % cassandra_interface
imp = __import__(module_name, globals(), locals(), ['Cassandra'])
self.Cassandra = imp.Cassandra
socket = TSocket.TSocket(host, port)
self.transport = TTransport.TFramedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = self.Cassandra.Client(protocol)
socket.open()
self.open_socket = True
self.ks_name = ks_name
self.cf_name = cf_name
def create_ks(self, replication_factor=1):
if self.cassandra_interface == '07':
ks_def = self.Cassandra.KsDef(name=self.ks_name,
strategy_class='org.apache.cassandra.locator.SimpleStrategy',
replication_factor=int(replication_factor),
cf_defs=[])
else:
ks_def = self.Cassandra.KsDef(name=self.ks_name,
strategy_class='org.apache.cassandra.locator.SimpleStrategy',
strategy_options={'replication_factor': str(replication_factor)},
cf_defs=[])
retry_till_success(self.client.system_add_keyspace, ks_def, timeout=30)
time.sleep(0.5)
retry_till_success(self.wait_for_agreement, timeout=10)
time.sleep(0.5)
self.use_ks()
return self
def use_ks(self):
retry_till_success(self.client.set_keyspace, self.ks_name, timeout=30)
return self
def create_cf(self):
cf_def = self.Cassandra.CfDef(name=self.cf_name, keyspace=self.ks_name)
retry_till_success(self.client.system_add_column_family, cf_def, timeout=30)
time.sleep(0.5)
retry_till_success(self.wait_for_agreement, timeout=10)
time.sleep(0.5)
return self
def wait_for_agreement(self):
schemas = self.client.describe_schema_versions()
if len([ss for ss in list(schemas.keys()) if ss != 'UNREACHABLE']) > 1:
raise Exception("schema agreement not reached")
def _translate_cl(self, cl):
return self.Cassandra.ConsistencyLevel._NAMES_TO_VALUES[cl]
def insert_columns(self, num_rows=10, consistency_level='QUORUM'):
""" Insert some basic values """
cf_parent = self.Cassandra.ColumnParent(column_family=self.cf_name)
for row_key in ('row_%d' % i for i in range(num_rows)):
col = self.Cassandra.Column(name='col_0', value='val_0',
timestamp=int(time.time() * 1000))
retry_till_success(self.client.insert,
key=row_key, column_parent=cf_parent, column=col,
consistency_level=self._translate_cl(consistency_level),
timeout=30)
return self
def query_columns(self, num_rows=10, consistency_level='QUORUM'):
""" Check that the values inserted in insert_columns() are present """
for row_key in ('row_%d' % i for i in range(num_rows)):
cpath = self.Cassandra.ColumnPath(column_family=self.cf_name,
column='col_0')
cosc = retry_till_success(self.client.get, key=row_key, column_path=cpath,
consistency_level=self._translate_cl(consistency_level),
timeout=30)
col = cosc.column
value = col.value
assert value == 'val_0'
return self