Skip to content

Commit

Permalink
Add BulkResponse wrapper for improved decoding of HTTP bulk responses
Browse files Browse the repository at this point in the history
CrateDB HTTP bulk responses include `rowcount=` items, either signalling
if a bulk operation succeeded or failed.

- success means `rowcount=1`
- failure means `rowcount=-2`

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
  • Loading branch information
amotl committed Oct 3, 2024
1 parent 7cb2c68 commit ea27b3e
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 284 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Unreleased
"Threads may share the module, but not connections."
- Added ``error_trace`` to string representation of an Error to relay
server stacktraces into exception messages.
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
responses including ``rowcount=`` items.

.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/
Expand Down
66 changes: 66 additions & 0 deletions src/crate/client/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import typing as t
from functools import cached_property


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

def __init__(
self,
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None],
results: t.Union[t.Iterable[BulkResultItem], None]):
self.records = records
self.results = results

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed inserts using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.records is None or self.results is None:
return []

Check warning on line 38 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L38

Added line #L38 was not covered by tests
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.records, self.results):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def record_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.records:
return 0
return len(self.records)

Check warning on line 52 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L50-L52

Added lines #L50 - L52 were not covered by tests

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.record_count - self.failed_count

Check warning on line 59 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L59

Added line #L59 was not covered by tests

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)

Check warning on line 66 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L66

Added line #L66 was not covered by tests
46 changes: 46 additions & 0 deletions src/crate/client/test_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import sys
import unittest

from crate import client
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
from crate.testing.settings import crate_host


class BulkOperationTest(unittest.TestCase):

def setUp(self):
setUpCrateLayerBaseline(self)

def tearDown(self):
tearDownDropEntitiesBaseline(self)

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_with_bulk_response(self):

# Import at runtime is on purpose, to permit skipping.
from crate.client.result import BulkResponse

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that only partially succeeds.
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)

# Verify CrateDB response.
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])

# Verify decoded response.
bulk_response = BulkResponse(invalid_records, result)
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])

cursor.execute("REFRESH TABLE foobar;")
cursor.execute("SELECT * FROM foobar;")
result = cursor.fetchall()
self.assertEqual(result, [[1, "Hotzenplotz 1"]])

cursor.close()
connection.close()
273 changes: 273 additions & 0 deletions src/crate/client/test_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
# -*- coding: utf-8; -*-
#
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
# license agreements. See the NOTICE file distributed with this work for
# additional information regarding copyright ownership. Crate licenses
# this file to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# However, if you have executed another commercial license agreement
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.

from __future__ import absolute_import

import json
import os
import socket
import unittest
from pprint import pprint
from http.server import HTTPServer, BaseHTTPRequestHandler
import ssl
import time
import threading
import logging

import stopit

from crate.testing.layer import CrateLayer
from crate.testing.settings import \
crate_host, crate_path, crate_port, \
crate_transport_port, docs_path, localhost
from crate.client import connect


makeSuite = unittest.TestLoader().loadTestsFromTestCase

log = logging.getLogger('crate.testing.layer')
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
log.addHandler(ch)


def cprint(s):
if isinstance(s, bytes):
s = s.decode('utf-8')
print(s)


settings = {
'udc.enabled': 'false',
'lang.js.enabled': 'true',
'auth.host_based.enabled': 'true',
'auth.host_based.config.0.user': 'crate',
'auth.host_based.config.0.method': 'trust',
'auth.host_based.config.98.user': 'trusted_me',
'auth.host_based.config.98.method': 'trust',
'auth.host_based.config.99.user': 'me',
'auth.host_based.config.99.method': 'password',
}
crate_layer = None


def ensure_cratedb_layer():
"""
In order to skip individual tests by manually disabling them within
`def test_suite()`, it is crucial make the test layer not run on each
and every occasion. So, things like this will be possible::
./bin/test -vvvv --ignore_dir=testing
TODO: Through a subsequent patch, the possibility to individually
unselect specific tests might be added to `def test_suite()`
on behalf of environment variables.
A blueprint for this kind of logic can be found at
https://github.com/crate/crate/commit/414cd833.
"""
global crate_layer

if crate_layer is None:
crate_layer = CrateLayer('crate',
crate_home=crate_path(),
port=crate_port,
host=localhost,
transport_port=crate_transport_port,
settings=settings)
return crate_layer


def setUpCrateLayerBaseline(test):
if hasattr(test, "globs"):
test.globs['crate_host'] = crate_host
test.globs['pprint'] = pprint
test.globs['print'] = cprint

with connect(crate_host) as conn:
cursor = conn.cursor()

with open(docs_path('testing/testdata/mappings/locations.sql')) as s:
stmt = s.read()
cursor.execute(stmt)
stmt = ("select count(*) from information_schema.tables "
"where table_name = 'locations'")
cursor.execute(stmt)
assert cursor.fetchall()[0][0] == 1

data_path = docs_path('testing/testdata/data/test_a.json')
# load testing data into crate
cursor.execute("copy locations from ?", (data_path,))
# refresh location table so imported data is visible immediately
cursor.execute("refresh table locations")
# create blob table
cursor.execute("create blob table myfiles clustered into 1 shards " +
"with (number_of_replicas=0)")

# create users
cursor.execute("CREATE USER me WITH (password = 'my_secret_pw')")
cursor.execute("CREATE USER trusted_me")

cursor.close()


def tearDownDropEntitiesBaseline(test):
"""
Drop all tables, views, and users created by `setUpWithCrateLayer*`.
"""
ddl_statements = [
"DROP TABLE foobar",
"DROP TABLE locations",
"DROP BLOB TABLE myfiles",
"DROP USER me",
"DROP USER trusted_me",
]
_execute_statements(ddl_statements)


class HttpsTestServerLayer:
PORT = 65534
HOST = "localhost"
CERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__),
"pki/server_valid.pem"))
CACERT_FILE = os.path.abspath(os.path.join(os.path.dirname(__file__),
"pki/cacert_valid.pem"))

__name__ = "httpsserver"
__bases__ = tuple()

class HttpsServer(HTTPServer):
def get_request(self):

# Prepare SSL context.
context = ssl._create_unverified_context(
protocol=ssl.PROTOCOL_TLS_SERVER,
cert_reqs=ssl.CERT_OPTIONAL,
check_hostname=False,
purpose=ssl.Purpose.CLIENT_AUTH,
certfile=HttpsTestServerLayer.CERT_FILE,
keyfile=HttpsTestServerLayer.CERT_FILE,
cafile=HttpsTestServerLayer.CACERT_FILE)

# Set minimum protocol version, TLSv1 and TLSv1.1 are unsafe.
context.minimum_version = ssl.TLSVersion.TLSv1_2

# Wrap TLS encryption around socket.
socket, client_address = HTTPServer.get_request(self)
socket = context.wrap_socket(socket, server_side=True)

return socket, client_address

class HttpsHandler(BaseHTTPRequestHandler):

payload = json.dumps({"name": "test", "status": 200, })

def do_GET(self):
self.send_response(200)
payload = self.payload.encode('UTF-8')
self.send_header("Content-Length", len(payload))
self.send_header("Content-Type", "application/json; charset=UTF-8")
self.end_headers()
self.wfile.write(payload)

def setUp(self):
self.server = self.HttpsServer(
(self.HOST, self.PORT),
self.HttpsHandler
)
thread = threading.Thread(target=self.serve_forever)
thread.daemon = True # quit interpreter when only thread exists
thread.start()
self.waitForServer()

def serve_forever(self):
print("listening on", self.HOST, self.PORT)
self.server.serve_forever()
print("server stopped.")

def tearDown(self):
self.server.shutdown()
self.server.server_close()

def isUp(self):
"""
Test if a host is up.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ex = s.connect_ex((self.HOST, self.PORT))
s.close()
return ex == 0

def waitForServer(self, timeout=5):
"""
Wait for the host to be available.
"""
with stopit.ThreadingTimeout(timeout) as to_ctx_mgr:
while True:
if self.isUp():
break
time.sleep(0.001)

Check warning on line 226 in src/crate/client/test_support.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_support.py#L226

Added line #L226 was not covered by tests

if not to_ctx_mgr:
raise TimeoutError("Could not properly start embedded webserver "

Check warning on line 229 in src/crate/client/test_support.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_support.py#L229

Added line #L229 was not covered by tests
"within {} seconds".format(timeout))


def setUpWithHttps(test):
test.globs['crate_host'] = "https://{0}:{1}".format(
HttpsTestServerLayer.HOST, HttpsTestServerLayer.PORT
)
test.globs['pprint'] = pprint
test.globs['print'] = cprint

test.globs['cacert_valid'] = os.path.abspath(
os.path.join(os.path.dirname(__file__), "pki/cacert_valid.pem")
)
test.globs['cacert_invalid'] = os.path.abspath(
os.path.join(os.path.dirname(__file__), "pki/cacert_invalid.pem")
)
test.globs['clientcert_valid'] = os.path.abspath(
os.path.join(os.path.dirname(__file__), "pki/client_valid.pem")
)
test.globs['clientcert_invalid'] = os.path.abspath(
os.path.join(os.path.dirname(__file__), "pki/client_invalid.pem")
)


def _execute_statements(statements, on_error="ignore"):
with connect(crate_host) as conn:
cursor = conn.cursor()
for stmt in statements:
_execute_statement(cursor, stmt, on_error=on_error)
cursor.close()


def _execute_statement(cursor, stmt, on_error="ignore"):
try:
cursor.execute(stmt)
except Exception: # pragma: no cover
# FIXME: Why does this croak on statements like ``DROP TABLE cities``?
# Note: When needing to debug the test environment, you may want to
# enable this logger statement.
# log.exception("Executing SQL statement failed")
if on_error == "ignore":
pass
elif on_error == "raise":
raise

Check warning on line 273 in src/crate/client/test_support.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_support.py#L272-L273

Added lines #L272 - L273 were not covered by tests
Loading

0 comments on commit ea27b3e

Please sign in to comment.