From 80f8d1033131e2a9a1333e6ec9f1d1a23949b041 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sat, 18 Nov 2023 14:32:49 -0800 Subject: [PATCH 01/14] Settings class Signed-off-by: Daniel Widdis --- src/opensearch_sdk_py/settings/settings.py | 60 ++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 src/opensearch_sdk_py/settings/settings.py diff --git a/src/opensearch_sdk_py/settings/settings.py b/src/opensearch_sdk_py/settings/settings.py new file mode 100644 index 0000000..7ce1fdf --- /dev/null +++ b/src/opensearch_sdk_py/settings/settings.py @@ -0,0 +1,60 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + + +from typing import Dict, Optional, Union + + +class Settings: + def __init__( + self, + settings: dict[str, Union[str, Dict]] = {}, + # TODO: Secure Settings + ) -> None: + self.settings = settings + + def get(self, setting: str, default: Optional[str] = None) -> str: + s_value = self.settings.get(setting) + if s_value is None: + return default if default else str(None) + return str(s_value) + + def get_as_int(self, setting: str, default: int) -> int: + s_value = self.settings.get(setting) + if s_value is None: + return default + if isinstance(s_value, str): + return int(s_value) + raise Exception("value is not a string") + + def get_as_float(self, setting: str, default: float) -> float: + s_value = self.settings.get(setting) + if s_value is None: + return default + if isinstance(s_value, str): + return float(s_value) + raise Exception("value is not a string") + + def get_as_boolean(self, setting: str, default: bool) -> bool: + s_value = self.settings.get(setting) + if s_value is None: + return default + if isinstance(s_value, str): + return bool(s_value) + raise Exception("value is not a string") + + def get_as_list(self, setting: str, default: list[str] = []) -> list[str]: + s_value = self.settings.get(setting) + if s_value is None: + return default + if isinstance(s_value, str): + return s_value.split(",") + raise Exception("value is not a string") + + # TODO: get_as_time, get_as_bytes_size From 52dad6fd916c2bd9b3f932600b1b43b483ae6494 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sat, 18 Nov 2023 17:17:52 -0800 Subject: [PATCH 02/14] Settings stream methods Signed-off-by: Daniel Widdis --- src/opensearch_sdk_py/settings/settings.py | 23 ++++++++++- .../transport/stream_input.py | 40 ++++++++++++++++++- .../transport/stream_output.py | 12 +++--- 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/src/opensearch_sdk_py/settings/settings.py b/src/opensearch_sdk_py/settings/settings.py index 7ce1fdf..e12a99e 100644 --- a/src/opensearch_sdk_py/settings/settings.py +++ b/src/opensearch_sdk_py/settings/settings.py @@ -8,7 +8,10 @@ # -from typing import Dict, Optional, Union +from typing import Any, Dict, Optional, Union + +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput class Settings: @@ -58,3 +61,21 @@ def get_as_list(self, setting: str, default: list[str] = []) -> list[str]: raise Exception("value is not a string") # TODO: get_as_time, get_as_bytes_size + + @staticmethod + def read_settings_from_stream(input: StreamInput) -> "Settings": + settings: dict[str, Union[str, Dict]] = {} + num_settings: int = input.read_v_int + for i in range(num_settings): + key: str = input.read_string + value: Any = input.read_generic_value + settings[key] = value + return Settings(settings) + + @staticmethod + def write_settings_to_stream(settings: "Settings", out: StreamOutput) -> None: + out.write_v_int(len(settings.settings)) + for key, value in settings.settings.items(): + out.write_string(key) + out.write_generic_value(value) + return diff --git a/src/opensearch_sdk_py/transport/stream_input.py b/src/opensearch_sdk_py/transport/stream_input.py index 878ca45..677e93d 100644 --- a/src/opensearch_sdk_py/transport/stream_input.py +++ b/src/opensearch_sdk_py/transport/stream_input.py @@ -9,7 +9,7 @@ import io from enum import Enum -from typing import Any, Optional, Union +from typing import Any, Callable, Optional, Union from opensearch_sdk_py.transport.version import Version @@ -230,5 +230,43 @@ def read_string_to_string_set_dict(self) -> dict[str, set[str]]: return result + def read_generic_value(self) -> Any: + type: int = self.read_byte() + if type == -1: + return None + reader: dict[int, Callable] = { + 0: self.read_string, + 1: self.read_int, + 2: self.read_long, + # 3: self.read_float, + # 4: self.read_double, + 5: self.read_boolean, + 6: self.read_bytes, + # 7: self.read_array_list, + # 8: self.read_array, + # 9: self.read_linked_hash_map, + # 10: self.read_hash_map, + 11: self.read_byte, + # 12: self.read_date, + # 13: self.read_zoned_date_time, + # 14: self.read_bytes_reference, + # 15: self.read_text, + 16: self.read_short, + # 17: self.read_int_array, + # 18: self.read_long_array, + # 19: self.read_float_array, + # 20: self.read_double_array, + # 21: self.read_bytes_ref, + # no 22 + # 23: self.read_zoned_date_time, + # 24: self.read_collection, + # 25: self.read_collection, + # 26: self.read_big_integer, + } + try: + return reader[type]() + except KeyError: + raise Exception(f"Type {type} is not implemented") + def read_enum(self, enum: Enum) -> Any: return enum(self.read_v_int()) # type:ignore diff --git a/src/opensearch_sdk_py/transport/stream_output.py b/src/opensearch_sdk_py/transport/stream_output.py index 9c17cc2..56adccf 100644 --- a/src/opensearch_sdk_py/transport/stream_output.py +++ b/src/opensearch_sdk_py/transport/stream_output.py @@ -9,7 +9,7 @@ from enum import Enum from io import BytesIO -from typing import Union +from typing import Any, Union from opensearch_sdk_py.transport.version import Version @@ -658,11 +658,11 @@ def string_to_string_collection_dict_size(self, d: dict[str, Union[list[str], se # If want to keep stream out map and stream in map have the same stream order when stream, # can use {@code writeMapWithConsistentOrder} # - # def writeGenericValue(@Nullable Object value) throws IOException { - # if (value == null) { - # write_byte((byte) -1); - # return; - # } + def write_generic_value(self, value: Any) -> None: + if value is None: + self.write_byte(-1) + + # TODO: Continue porting # final Class type = getGenericType(value); # Writer writer = WriteableRegistry.getWriter(type); # if (writer == null) { From 49f2c34d0e2847ef3f6c4704bdcddd729c211d90 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sun, 19 Nov 2023 16:44:44 -0800 Subject: [PATCH 03/14] Settings Transport Request/Response Signed-off-by: Daniel Widdis --- .../discovery_extensions_request_handler.py | 35 +++++++++------ .../environment_settings_response_handler.py | 31 +++++++++++++ .../protobuf/ExtensionIdentityProto_pb2.py | 18 ++++---- .../protobuf/ExtensionRequestProto.proto | 33 ++++++++++++++ .../protobuf/ExtensionRequestProto_pb2.py | 39 ++++++++++++++++ .../protobuf/RegisterRestActionsProto_pb2.py | 16 +++---- .../environment_settings_response.py | 31 +++++++++++++ .../transport/extension_transport_request.py | 45 +++++++++++++++++++ .../transport/request_type.py | 22 +++++++++ 9 files changed, 239 insertions(+), 31 deletions(-) create mode 100644 src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py create mode 100644 src/opensearch_sdk_py/protobuf/ExtensionRequestProto.proto create mode 100644 src/opensearch_sdk_py/protobuf/ExtensionRequestProto_pb2.py create mode 100644 src/opensearch_sdk_py/transport/environment_settings_response.py create mode 100644 src/opensearch_sdk_py/transport/extension_transport_request.py create mode 100644 src/opensearch_sdk_py/transport/request_type.py diff --git a/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py b/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py index 0fd8a11..e1cfbda 100644 --- a/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py +++ b/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py @@ -9,15 +9,18 @@ import logging +from opensearch_sdk_py.actions.internal.environment_settings_response_handler import EnvironmentSettingsResponseHandler from opensearch_sdk_py.actions.internal.register_rest_actions_response_handler import RegisterRestActionsResponseHandler from opensearch_sdk_py.actions.request_handler import RequestHandler from opensearch_sdk_py.actions.response_handlers import ResponseHandlers from opensearch_sdk_py.api.action_extension import ActionExtension +from opensearch_sdk_py.transport.extension_transport_request import ExtensionTransportRequest from opensearch_sdk_py.transport.initialize_extension_request import InitializeExtensionRequest from opensearch_sdk_py.transport.initialize_extension_response import InitializeExtensionResponse from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse from opensearch_sdk_py.transport.register_rest_actions_request import RegisterRestActionsRequest +from opensearch_sdk_py.transport.request_type import RequestType from opensearch_sdk_py.transport.stream_input import StreamInput from opensearch_sdk_py.transport.stream_output import StreamOutput @@ -31,8 +34,11 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO initialize_extension_request = InitializeExtensionRequest().read_from(input) logging.debug(f"< {initialize_extension_request}") - # Create the response message preserving the request id, but don't send it yet. - # This will be sent when response handler calls send() + # We will stack requests/responses, generating them in the reverse order that we send them + # Order of sending: resgister rest actions, then environment settings request, then init response + # Order of generating: init response, environment settings request, register rest actions + + # Final Init Response to this init request, preserving the request ID self.response = OutboundMessageResponse( request.thread_context_struct, request.features, @@ -43,24 +49,27 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO request.is_compress, ) - # Sometime between tcp and transport handshakes and the eventual response, - # the uniqueId gets added to the thread context. - # request.thread_context_struct.request_headers["extension_unique_id"] = self.extension.name - - # Now send our own initialization requests. + # Stack the Environment Settings request/response, chained to the above init response + settings_request = OutboundMessageRequest( + thread_context=request.thread_context_struct, + features=request.features, + message=ExtensionTransportRequest(RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS), + version=request.version, + action="internal:discovery/enviornmentsettings", + ) + settings_response_handler = EnvironmentSettingsResponseHandler(self) + self.response_handlers.register(settings_request.request_id, settings_response_handler) - # Create the request, this gets us an auto-increment request id + # Stack the Register Rest request/response, chained to the above env settings request register_request = OutboundMessageRequest( thread_context=request.thread_context_struct, features=request.features, message=RegisterRestActionsRequest(self.extension.name, self.extension.named_routes), version=request.version, action="internal:discovery/registerrestactions", - is_handshake=False, - is_compress=False, ) - # Register response handler to handle this request ID invoking this class's send() - register_response_handler = RegisterRestActionsResponseHandler(self) + register_response_handler = RegisterRestActionsResponseHandler(self) # TODO: change this self to settings handler self.response_handlers.register(register_request.request_id, register_response_handler) - # Now send the request + + # Now send the request at top of stack return register_response_handler.send(register_request) diff --git a/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py b/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py new file mode 100644 index 0000000..7b5a141 --- /dev/null +++ b/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py @@ -0,0 +1,31 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import logging + +from opensearch_sdk_py.actions.request_response_handler import RequestResponseHandler +from opensearch_sdk_py.actions.response_handler import ResponseHandler +from opensearch_sdk_py.transport.environment_settings_response import EnvironmentSettingsResponse +from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput + + +class EnvironmentSettingsResponseHandler(ResponseHandler): + def __init__(self, next_handler: RequestResponseHandler) -> None: + self.next_handler = next_handler + + def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput: + env_response = EnvironmentSettingsResponse().read_from(input) + logging.debug(f"< {env_response}") + if env_response.status: + return self.next_handler.send() + else: + # TODO error handling + return None diff --git a/src/opensearch_sdk_py/protobuf/ExtensionIdentityProto_pb2.py b/src/opensearch_sdk_py/protobuf/ExtensionIdentityProto_pb2.py index 571ffb3..5f21b86 100644 --- a/src/opensearch_sdk_py/protobuf/ExtensionIdentityProto_pb2.py +++ b/src/opensearch_sdk_py/protobuf/ExtensionIdentityProto_pb2.py @@ -10,27 +10,27 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: ExtensionIdentityProto.proto +# Protobuf Python Version: 4.25.0 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder - # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1c\x45xtensionIdentityProto.proto\x12\x1forg.opensearch.extensions.proto"%\n\x11\x45xtensionIdentity\x12\x10\n\x08uniqueId\x18\x01 \x01(\tB\x18\x42\x16\x45xtensionIdentityProtob\x06proto3' -) + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x45xtensionIdentityProto.proto\x12\x1forg.opensearch.extensions.proto\"%\n\x11\x45xtensionIdentity\x12\x10\n\x08uniqueId\x18\x01 \x01(\tB\x18\x42\x16\x45xtensionIdentityProtob\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "ExtensionIdentityProto_pb2", _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'ExtensionIdentityProto_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: # pragma: no cover - DESCRIPTOR._options = None - DESCRIPTOR._serialized_options = b"B\026ExtensionIdentityProto" - _globals["_EXTENSIONIDENTITY"]._serialized_start = 65 - _globals["_EXTENSIONIDENTITY"]._serialized_end = 102 + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'B\026ExtensionIdentityProto' + _globals['_EXTENSIONIDENTITY']._serialized_start=65 + _globals['_EXTENSIONIDENTITY']._serialized_end=102 # @@protoc_insertion_point(module_scope) diff --git a/src/opensearch_sdk_py/protobuf/ExtensionRequestProto.proto b/src/opensearch_sdk_py/protobuf/ExtensionRequestProto.proto new file mode 100644 index 0000000..a821a02 --- /dev/null +++ b/src/opensearch_sdk_py/protobuf/ExtensionRequestProto.proto @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +syntax = "proto3"; +package org.opensearch.extensions.proto; + +import "ExtensionIdentityProto.proto"; +option java_outer_classname = "ExtensionRequestProto"; + +enum RequestType { + REQUEST_EXTENSION_CLUSTER_STATE = 0; + REQUEST_EXTENSION_CLUSTER_SETTINGS = 1; + REQUEST_EXTENSION_REGISTER_REST_ACTIONS = 2; + REQUEST_EXTENSION_REGISTER_SETTINGS = 3; + REQUEST_EXTENSION_ENVIRONMENT_SETTINGS = 4; + REQUEST_EXTENSION_DEPENDENCY_INFORMATION = 5; + CREATE_COMPONENT = 6; + ON_INDEX_MODULE = 7; + GET_SETTINGS = 8; +} + +message ExtensionRequest { + ExtensionIdentity identity = 1; + RequestType requestType = 2; +} \ No newline at end of file diff --git a/src/opensearch_sdk_py/protobuf/ExtensionRequestProto_pb2.py b/src/opensearch_sdk_py/protobuf/ExtensionRequestProto_pb2.py new file mode 100644 index 0000000..2209f60 --- /dev/null +++ b/src/opensearch_sdk_py/protobuf/ExtensionRequestProto_pb2.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: ExtensionRequestProto.proto +# Protobuf Python Version: 4.25.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from . import ExtensionIdentityProto_pb2 as ExtensionIdentityProto__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x45xtensionRequestProto.proto\x12\x1forg.opensearch.extensions.proto\x1a\x1c\x45xtensionIdentityProto.proto\"\x9b\x01\n\x10\x45xtensionRequest\x12\x44\n\x08identity\x18\x01 \x01(\x0b\x32\x32.org.opensearch.extensions.proto.ExtensionIdentity\x12\x41\n\x0brequestType\x18\x02 \x01(\x0e\x32,.org.opensearch.extensions.proto.RequestType*\xc7\x02\n\x0bRequestType\x12#\n\x1fREQUEST_EXTENSION_CLUSTER_STATE\x10\x00\x12&\n\"REQUEST_EXTENSION_CLUSTER_SETTINGS\x10\x01\x12+\n\'REQUEST_EXTENSION_REGISTER_REST_ACTIONS\x10\x02\x12\'\n#REQUEST_EXTENSION_REGISTER_SETTINGS\x10\x03\x12*\n&REQUEST_EXTENSION_ENVIRONMENT_SETTINGS\x10\x04\x12,\n(REQUEST_EXTENSION_DEPENDENCY_INFORMATION\x10\x05\x12\x14\n\x10\x43REATE_COMPONENT\x10\x06\x12\x13\n\x0fON_INDEX_MODULE\x10\x07\x12\x10\n\x0cGET_SETTINGS\x10\x08\x42\x17\x42\x15\x45xtensionRequestProtob\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'ExtensionRequestProto_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: # pragma: no cover + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'B\025ExtensionRequestProto' + _globals['_REQUESTTYPE']._serialized_start=253 + _globals['_REQUESTTYPE']._serialized_end=580 + _globals['_EXTENSIONREQUEST']._serialized_start=95 + _globals['_EXTENSIONREQUEST']._serialized_end=250 +# @@protoc_insertion_point(module_scope) diff --git a/src/opensearch_sdk_py/protobuf/RegisterRestActionsProto_pb2.py b/src/opensearch_sdk_py/protobuf/RegisterRestActionsProto_pb2.py index 24c7ee9..17e275c 100644 --- a/src/opensearch_sdk_py/protobuf/RegisterRestActionsProto_pb2.py +++ b/src/opensearch_sdk_py/protobuf/RegisterRestActionsProto_pb2.py @@ -10,12 +10,12 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: RegisterRestActionsProto.proto +# Protobuf Python Version: 4.25.0 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder - # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -24,16 +24,14 @@ from . import ExtensionIdentityProto_pb2 as ExtensionIdentityProto__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1eRegisterRestActionsProto.proto\x12\x1forg.opensearch.extensions.proto\x1a\x1c\x45xtensionIdentityProto.proto"\x8f\x01\n\x13RegisterRestActions\x12\x44\n\x08identity\x18\x01 \x01(\x0b\x32\x32.org.opensearch.extensions.proto.ExtensionIdentity\x12\x13\n\x0brestActions\x18\x02 \x03(\t\x12\x1d\n\x15\x64\x65precatedRestActions\x18\x03 \x03(\tB\x1a\x42\x18RegisterRestActionsProtob\x06proto3' -) +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1eRegisterRestActionsProto.proto\x12\x1forg.opensearch.extensions.proto\x1a\x1c\x45xtensionIdentityProto.proto\"\x8f\x01\n\x13RegisterRestActions\x12\x44\n\x08identity\x18\x01 \x01(\x0b\x32\x32.org.opensearch.extensions.proto.ExtensionIdentity\x12\x13\n\x0brestActions\x18\x02 \x03(\t\x12\x1d\n\x15\x64\x65precatedRestActions\x18\x03 \x03(\tB\x1a\x42\x18RegisterRestActionsProtob\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "RegisterRestActionsProto_pb2", _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'RegisterRestActionsProto_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: # pragma: no cover - DESCRIPTOR._options = None - DESCRIPTOR._serialized_options = b"B\030RegisterRestActionsProto" - _globals["_REGISTERRESTACTIONS"]._serialized_start = 98 - _globals["_REGISTERRESTACTIONS"]._serialized_end = 241 + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'B\030RegisterRestActionsProto' + _globals['_REGISTERRESTACTIONS']._serialized_start=98 + _globals['_REGISTERRESTACTIONS']._serialized_end=241 # @@protoc_insertion_point(module_scope) diff --git a/src/opensearch_sdk_py/transport/environment_settings_response.py b/src/opensearch_sdk_py/transport/environment_settings_response.py new file mode 100644 index 0000000..7d28a50 --- /dev/null +++ b/src/opensearch_sdk_py/transport/environment_settings_response.py @@ -0,0 +1,31 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +# https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/env/EnvironmentSettingsResponse.java + +from opensearch_sdk_py.settings.settings import Settings +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput +from opensearch_sdk_py.transport.transport_response import TransportResponse + + +class EnvironmentSettingsResponse(TransportResponse): + def __init__(self, environmentSettings: Settings = None): + super().__init__() + self.environmentSettings = environmentSettings + + def read_from(self, input: StreamInput) -> "EnvironmentSettingsResponse": + super().read_from(input) + self.environmentSettings = Settings.read_settings_from_stream(input) + return self + + def write_to(self, output: StreamOutput) -> "EnvironmentSettingsResponse": + super().write_to(output) + Settings.write_settings_to_stream(self.environmentSettings, output) + return self diff --git a/src/opensearch_sdk_py/transport/extension_transport_request.py b/src/opensearch_sdk_py/transport/extension_transport_request.py new file mode 100644 index 0000000..fc01e82 --- /dev/null +++ b/src/opensearch_sdk_py/transport/extension_transport_request.py @@ -0,0 +1,45 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +# https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/extensions/ExtensionRequest.java + +from typing import Optional + +from opensearch_sdk_py.protobuf.ExtensionRequestProto_pb2 import ExtensionRequest +from opensearch_sdk_py.transport.request_type import RequestType +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput +from opensearch_sdk_py.transport.transport_request import TransportRequest + + +class ExtensionTransportRequest(TransportRequest): + def __init__( + self, + request_type: "RequestType", + unique_id: Optional[str] = None, + ) -> None: + super().__init__() + self.er = ExtensionRequest() + self.er.requestType = request_type.value + if unique_id is not None: + self.er.identity.uniqueId = unique_id + + def read_from(self, input: StreamInput) -> "ExtensionTransportRequest": + super().read_from(input) + er_bytes = input.read_bytes(input.read_v_int()) + self.er = ExtensionRequest() + self.er.ParseFromString(er_bytes) + return self + + def write_to(self, output: StreamOutput) -> "ExtensionTransportRequest": + super().write_to(output) + er_bytes = self.er.SerializeToString() + output.write_v_int(len(er_bytes)) + output.write(er_bytes) + return self diff --git a/src/opensearch_sdk_py/transport/request_type.py b/src/opensearch_sdk_py/transport/request_type.py new file mode 100644 index 0000000..4e31a5b --- /dev/null +++ b/src/opensearch_sdk_py/transport/request_type.py @@ -0,0 +1,22 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +from enum import Enum + + +class RequestType(Enum): + REQUEST_EXTENSION_CLUSTER_STATE = 0 + REQUEST_EXTENSION_CLUSTER_SETTINGS = 1 + REQUEST_EXTENSION_REGISTER_REST_ACTIONS = 2 + REQUEST_EXTENSION_REGISTER_SETTINGS = 3 + REQUEST_EXTENSION_ENVIRONMENT_SETTINGS = 4 + REQUEST_EXTENSION_DEPENDENCY_INFORMATION = 5 + CREATE_COMPONENT = 6 + ON_INDEX_MODULE = 7 + GET_SETTINGS = 8 From b9886918fe126158bd6670935aaa005400c972ec Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sun, 19 Nov 2023 18:57:57 -0800 Subject: [PATCH 04/14] Finishing up TODOs Signed-off-by: Daniel Widdis --- .../discovery_extensions_request_handler.py | 2 +- .../register_rest_actions_response_handler.py | 5 +++-- src/opensearch_sdk_py/settings/settings.py | 10 +++++++--- ..._register_rest_actions_response_handler.py | 7 ++++--- tests/actions/test_response_handlers.py | 20 +++++++++---------- tests/server/test_async_extension_host.py | 7 ++++--- 6 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py b/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py index e1cfbda..81f83fc 100644 --- a/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py +++ b/src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py @@ -68,7 +68,7 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO version=request.version, action="internal:discovery/registerrestactions", ) - register_response_handler = RegisterRestActionsResponseHandler(self) # TODO: change this self to settings handler + register_response_handler = RegisterRestActionsResponseHandler(settings_response_handler, settings_request) self.response_handlers.register(register_request.request_id, register_response_handler) # Now send the request at top of stack diff --git a/src/opensearch_sdk_py/actions/internal/register_rest_actions_response_handler.py b/src/opensearch_sdk_py/actions/internal/register_rest_actions_response_handler.py index bd17701..ab7b881 100644 --- a/src/opensearch_sdk_py/actions/internal/register_rest_actions_response_handler.py +++ b/src/opensearch_sdk_py/actions/internal/register_rest_actions_response_handler.py @@ -18,14 +18,15 @@ class RegisterRestActionsResponseHandler(ResponseHandler): - def __init__(self, next_handler: RequestResponseHandler) -> None: + def __init__(self, next_handler: RequestResponseHandler, request: OutboundMessageRequest) -> None: self.next_handler = next_handler + self.request = request def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput: ack_response = AcknowledgedResponse().read_from(input) logging.debug(f"< {ack_response}") if ack_response.status: - return self.next_handler.send() + return self.next_handler.send(self.request) else: # TODO error handling return None diff --git a/src/opensearch_sdk_py/settings/settings.py b/src/opensearch_sdk_py/settings/settings.py index e12a99e..1d16097 100644 --- a/src/opensearch_sdk_py/settings/settings.py +++ b/src/opensearch_sdk_py/settings/settings.py @@ -8,6 +8,7 @@ # +import logging from typing import Any, Dict, Optional, Union from opensearch_sdk_py.transport.stream_input import StreamInput @@ -65,10 +66,13 @@ def get_as_list(self, setting: str, default: list[str] = []) -> list[str]: @staticmethod def read_settings_from_stream(input: StreamInput) -> "Settings": settings: dict[str, Union[str, Dict]] = {} - num_settings: int = input.read_v_int + num_settings: int = input.read_v_int() + logging.info(f">>>>> Reading {num_settings} settings") for i in range(num_settings): - key: str = input.read_string - value: Any = input.read_generic_value + key: str = input.read_string() + logging.info(f">>>>> Setting {i}: Reading key {key}") + value: Any = input.read_generic_value() + logging.info(f">>>>> Setting {i}: Value is {value}") settings[key] = value return Settings(settings) diff --git a/tests/actions/internal/test_register_rest_actions_response_handler.py b/tests/actions/internal/test_register_rest_actions_response_handler.py index e019878..48fea29 100644 --- a/tests/actions/internal/test_register_rest_actions_response_handler.py +++ b/tests/actions/internal/test_register_rest_actions_response_handler.py @@ -23,13 +23,14 @@ class TestRegisterRestActionsResponseHandler(unittest.TestCase): def test_register_rest_actions_response_handler(self) -> None: input = StreamInput(bytes(OutboundMessageRequest(version=Version(2100099), message=AcknowledgedResponse(status=True)))) omr = OutboundMessageRequest().read_from(input) + request = OutboundMessageRequest() next_handler = FakeResponseHandler() - output = RegisterRestActionsResponseHandler(next_handler).handle(omr, input) + output = RegisterRestActionsResponseHandler(next_handler, request).handle(omr, input) self.assertEqual(output, b"test") input = StreamInput(bytes(OutboundMessageRequest(version=Version(2100099), message=AcknowledgedResponse(status=False)))) omr = OutboundMessageRequest().read_from(input) - output = RegisterRestActionsResponseHandler(next_handler).handle(omr, input) + output = RegisterRestActionsResponseHandler(next_handler, request).handle(omr, input) self.assertIsNone(output) @@ -37,5 +38,5 @@ class FakeResponseHandler(ResponseHandler): def handle(self, request: OutboundMessageRequest, input: StreamInput = None) -> Optional[bytes]: pass - def send(self) -> StreamOutput: + def send(self, request: OutboundMessageRequest) -> StreamOutput: return b"test" diff --git a/tests/actions/test_response_handlers.py b/tests/actions/test_response_handlers.py index 4c5033f..1e93498 100644 --- a/tests/actions/test_response_handlers.py +++ b/tests/actions/test_response_handlers.py @@ -11,7 +11,7 @@ from typing import Optional from opensearch_sdk_py.actions.internal.register_rest_actions_response_handler import RegisterRestActionsResponseHandler -from opensearch_sdk_py.actions.request_handler import RequestHandler +from opensearch_sdk_py.actions.response_handler import ResponseHandler from opensearch_sdk_py.actions.response_handlers import ResponseHandlers from opensearch_sdk_py.extension import Extension from opensearch_sdk_py.transport.acknowledged_response import AcknowledgedResponse @@ -30,8 +30,9 @@ def __init__(self) -> None: def setUp(self) -> None: self.extension = TestResponseHandlers.MyExtension() self.response_handlers = ResponseHandlers(self.extension) - next_handler = FakeRequestHandler(self.extension) - self.response_handlers.register(123, RegisterRestActionsResponseHandler(next_handler)) + self.next_handler = FakeResponseHandler() + request = OutboundMessageRequest() + self.response_handlers.register(123, RegisterRestActionsResponseHandler(self.next_handler, request)) def test_register_handlers(self) -> None: self.assertEqual(len(self.response_handlers), 1) @@ -42,8 +43,8 @@ def test_handle(self) -> None: input = StreamInput(bytes(AcknowledgedResponse(status=True))) output = self.response_handlers.handle(response, input) self.assertEqual(len(self.response_handlers), 0) - self.assertEqual(output, None) - self.assertEqual(self.extension.test, "modified") + self.assertIsNone(output) + self.assertEqual(self.next_handler.test, "modified") def test_handle_unregistered(self) -> None: response = OutboundMessageResponse(request_id=1234) @@ -52,13 +53,10 @@ def test_handle_unregistered(self) -> None: self.assertIsNone(output) -class FakeRequestHandler(RequestHandler): - def __init__(self, extension: Extension) -> None: - super().__init__("test-extension", extension) - +class FakeResponseHandler(ResponseHandler): def handle(self, request: OutboundMessageRequest, input: StreamInput = None) -> Optional[bytes]: return None - def send(self) -> StreamOutput: - self.extension.test = "modified" + def send(self, request: OutboundMessageRequest) -> StreamOutput: + self.test = "modified" return None diff --git a/tests/server/test_async_extension_host.py b/tests/server/test_async_extension_host.py index c51ea46..b350815 100644 --- a/tests/server/test_async_extension_host.py +++ b/tests/server/test_async_extension_host.py @@ -21,10 +21,11 @@ from opensearch_sdk_py.rest.rest_status import RestStatus from opensearch_sdk_py.server.async_extension_host import AsyncExtensionHost from opensearch_sdk_py.transport.acknowledged_response import AcknowledgedResponse -from opensearch_sdk_py.transport.initialize_extension_response import InitializeExtensionResponse +from opensearch_sdk_py.transport.extension_transport_request import ExtensionTransportRequest from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse from opensearch_sdk_py.transport.register_rest_actions_request import RegisterRestActionsRequest +from opensearch_sdk_py.transport.request_type import RequestType from opensearch_sdk_py.transport.stream_input import StreamInput from opensearch_sdk_py.transport.tcp_header import TcpHeader from opensearch_sdk_py.transport.version import Version @@ -140,8 +141,8 @@ def test_acknowledged_response(self) -> None: assert responses[1] is not None reply: TestAsyncExtensionHost.Response = responses[1] self.assertEqual(reply.response.thread_context_struct.request_headers, {"_system_index_access_allowed": "false"}) - init_response = InitializeExtensionResponse().read_from(reply.remaining_input) - self.assertEqual(init_response.name, "hello-world") + extension_request = ExtensionTransportRequest(RequestType.GET_SETTINGS).read_from(reply.remaining_input) + self.assertEqual(extension_request.er.requestType, RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS.value) def test_error_response(self) -> None: request1 = NettyTraceData.load("tests/transport/data/transport_service_handshake_request.txt").data From f87abad1ec16b1e32709fd5f594ace065e1403d5 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sun, 19 Nov 2023 23:23:19 -0800 Subject: [PATCH 05/14] Working initialization sequence Signed-off-by: Daniel Widdis --- .../internal/environment_settings_response_handler.py | 7 ++----- src/opensearch_sdk_py/transport/stream_input.py | 8 +++++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py b/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py index 7b5a141..f865de0 100644 --- a/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py +++ b/src/opensearch_sdk_py/actions/internal/environment_settings_response_handler.py @@ -24,8 +24,5 @@ def __init__(self, next_handler: RequestResponseHandler) -> None: def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput: env_response = EnvironmentSettingsResponse().read_from(input) logging.debug(f"< {env_response}") - if env_response.status: - return self.next_handler.send() - else: - # TODO error handling - return None + # TODO save the settings somewhere + return self.next_handler.send() diff --git a/src/opensearch_sdk_py/transport/stream_input.py b/src/opensearch_sdk_py/transport/stream_input.py index 677e93d..f6a940f 100644 --- a/src/opensearch_sdk_py/transport/stream_input.py +++ b/src/opensearch_sdk_py/transport/stream_input.py @@ -242,7 +242,7 @@ def read_generic_value(self) -> Any: # 4: self.read_double, 5: self.read_boolean, 6: self.read_bytes, - # 7: self.read_array_list, + 7: self.read_array_list, # 8: self.read_array, # 9: self.read_linked_hash_map, # 10: self.read_hash_map, @@ -268,5 +268,11 @@ def read_generic_value(self) -> Any: except KeyError: raise Exception(f"Type {type} is not implemented") + def read_array_list(self) -> list[Any]: + result: list[Any] = list() + for i in range(self.read_v_int()): + result.append(self.read_generic_value()) + return result + def read_enum(self, enum: Enum) -> Any: return enum(self.read_v_int()) # type:ignore From f9b4900592488f8325c2722ddaf8831d7b6b0665 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sun, 19 Nov 2023 23:37:57 -0800 Subject: [PATCH 06/14] Remove debug output Signed-off-by: Daniel Widdis --- src/opensearch_sdk_py/settings/settings.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/opensearch_sdk_py/settings/settings.py b/src/opensearch_sdk_py/settings/settings.py index 1d16097..df3f83f 100644 --- a/src/opensearch_sdk_py/settings/settings.py +++ b/src/opensearch_sdk_py/settings/settings.py @@ -8,7 +8,6 @@ # -import logging from typing import Any, Dict, Optional, Union from opensearch_sdk_py.transport.stream_input import StreamInput @@ -67,12 +66,9 @@ def get_as_list(self, setting: str, default: list[str] = []) -> list[str]: def read_settings_from_stream(input: StreamInput) -> "Settings": settings: dict[str, Union[str, Dict]] = {} num_settings: int = input.read_v_int() - logging.info(f">>>>> Reading {num_settings} settings") for i in range(num_settings): key: str = input.read_string() - logging.info(f">>>>> Setting {i}: Reading key {key}") value: Any = input.read_generic_value() - logging.info(f">>>>> Setting {i}: Value is {value}") settings[key] = value return Settings(settings) From 92fb6d107088e3d69dbb7825ff7c4dfb26c0cdd5 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 20 Nov 2023 08:42:49 -0800 Subject: [PATCH 07/14] Add some unit tests Signed-off-by: Daniel Widdis --- .../transport/extension_transport_request.py | 6 +-- .../protobuf/test_extension_request_proto.py | 36 +++++++++++++++ .../test_extension_transport_request.py | 45 +++++++++++++++++++ 3 files changed, 83 insertions(+), 4 deletions(-) create mode 100644 tests/protobuf/test_extension_request_proto.py create mode 100644 tests/transport/test_extension_transport_request.py diff --git a/src/opensearch_sdk_py/transport/extension_transport_request.py b/src/opensearch_sdk_py/transport/extension_transport_request.py index fc01e82..6f0ae03 100644 --- a/src/opensearch_sdk_py/transport/extension_transport_request.py +++ b/src/opensearch_sdk_py/transport/extension_transport_request.py @@ -9,7 +9,6 @@ # https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/extensions/ExtensionRequest.java -from typing import Optional from opensearch_sdk_py.protobuf.ExtensionRequestProto_pb2 import ExtensionRequest from opensearch_sdk_py.transport.request_type import RequestType @@ -22,13 +21,12 @@ class ExtensionTransportRequest(TransportRequest): def __init__( self, request_type: "RequestType", - unique_id: Optional[str] = None, + unique_id: str = "", ) -> None: super().__init__() self.er = ExtensionRequest() self.er.requestType = request_type.value - if unique_id is not None: - self.er.identity.uniqueId = unique_id + self.er.identity.uniqueId = unique_id def read_from(self, input: StreamInput) -> "ExtensionTransportRequest": super().read_from(input) diff --git a/tests/protobuf/test_extension_request_proto.py b/tests/protobuf/test_extension_request_proto.py new file mode 100644 index 0000000..c51e4d8 --- /dev/null +++ b/tests/protobuf/test_extension_request_proto.py @@ -0,0 +1,36 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import unittest + +from opensearch_sdk_py.protobuf import ExtensionRequestProto_pb2 +from opensearch_sdk_py.transport.request_type import RequestType + + +class TestExtensionRequestProto_pb2(unittest.TestCase): + def test_extension_request(self) -> None: + request = ExtensionRequestProto_pb2.ExtensionRequest() + request.requestType = RequestType.GET_SETTINGS.value + request.identity.uniqueId = "test" + serialized_str = request.SerializeToString() + + parsed_request = ExtensionRequestProto_pb2.ExtensionRequest() + parsed_request.ParseFromString(serialized_str) + self.assertEqual(parsed_request.requestType, RequestType.GET_SETTINGS.value) + self.assertEqual(parsed_request.identity.uniqueId, "test") + + def test_extension_request_no_id(self) -> None: + request = ExtensionRequestProto_pb2.ExtensionRequest() + request.requestType = RequestType.GET_SETTINGS.value + serialized_str = request.SerializeToString() + + parsed_request = ExtensionRequestProto_pb2.ExtensionRequest() + parsed_request.ParseFromString(serialized_str) + self.assertEqual(parsed_request.requestType, RequestType.GET_SETTINGS.value) + self.assertEqual(parsed_request.identity.uniqueId, "") diff --git a/tests/transport/test_extension_transport_request.py b/tests/transport/test_extension_transport_request.py new file mode 100644 index 0000000..3fcd15d --- /dev/null +++ b/tests/transport/test_extension_transport_request.py @@ -0,0 +1,45 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import unittest + +from opensearch_sdk_py.transport.extension_transport_request import ExtensionTransportRequest +from opensearch_sdk_py.transport.request_type import RequestType +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput + + +class TestExtensionTransportRequest(unittest.TestCase): + def test_extension_transport_request(self) -> None: + etr = ExtensionTransportRequest(RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS, "test") + self.assertEqual(etr.er.requestType, RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS.value) + self.assertEqual(etr.er.identity.uniqueId, "test") + + out = StreamOutput() + etr.write_to(out) + etr = ExtensionTransportRequest(RequestType.GET_SETTINGS) + self.assertEqual(etr.er.requestType, RequestType.GET_SETTINGS.value) + self.assertEqual(etr.er.identity.uniqueId, "") + etr.read_from(input=StreamInput(out.getvalue())) + self.assertEqual(etr.er.requestType, RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS.value) + self.assertEqual(etr.er.identity.uniqueId, "test") + + def test_extension_transport_request_no_id(self) -> None: + etr = ExtensionTransportRequest(RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS) + self.assertEqual(etr.er.requestType, RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS.value) + self.assertEqual(etr.er.identity.uniqueId, "") + + out = StreamOutput() + etr.write_to(out) + etr = ExtensionTransportRequest(RequestType.GET_SETTINGS) + self.assertEqual(etr.er.requestType, RequestType.GET_SETTINGS.value) + self.assertEqual(etr.er.identity.uniqueId, "") + etr.read_from(input=StreamInput(out.getvalue())) + self.assertEqual(etr.er.requestType, RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS.value) + self.assertEqual(etr.er.identity.uniqueId, "") From 223deb844d109547a873efe382541c7e3208ff27 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 20 Nov 2023 14:48:25 -0800 Subject: [PATCH 08/14] Add tests for stream_input implementations Signed-off-by: Daniel Widdis --- .../transport/stream_input.py | 12 +++++-- tests/transport/test_stream_input.py | 36 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/opensearch_sdk_py/transport/stream_input.py b/src/opensearch_sdk_py/transport/stream_input.py index f6a940f..5c28dc6 100644 --- a/src/opensearch_sdk_py/transport/stream_input.py +++ b/src/opensearch_sdk_py/transport/stream_input.py @@ -232,7 +232,9 @@ def read_string_to_string_set_dict(self) -> dict[str, set[str]]: def read_generic_value(self) -> Any: type: int = self.read_byte() - if type == -1: + # TODO: Handle negatives and make this -1 + # https://github.com/opensearch-project/opensearch-sdk-py/issues/88 + if type & 0xff == 0xff: return None reader: dict[int, Callable] = { 0: self.read_string, @@ -241,7 +243,7 @@ def read_generic_value(self) -> Any: # 3: self.read_float, # 4: self.read_double, 5: self.read_boolean, - 6: self.read_bytes, + 6: self.read_byte_array, 7: self.read_array_list, # 8: self.read_array, # 9: self.read_linked_hash_map, @@ -274,5 +276,11 @@ def read_array_list(self) -> list[Any]: result.append(self.read_generic_value()) return result + def read_byte_array(self) -> bytes: + size: int = self.read_v_int() + if size == 0: + return b"" + return self.read_bytes(size) + def read_enum(self, enum: Enum) -> Any: return enum(self.read_v_int()) # type:ignore diff --git a/tests/transport/test_stream_input.py b/tests/transport/test_stream_input.py index a4841fe..45869de 100644 --- a/tests/transport/test_stream_input.py +++ b/tests/transport/test_stream_input.py @@ -167,6 +167,42 @@ def test_read_string_to_string_set_dict(self) -> None: self.assertSetEqual(dict["foo"], {"bar", "baz"}) self.assertSetEqual(dict["qux"], set()) + def test_read_generic_value(self) -> None: + # -1 + input = StreamInput(b"\xff") + self.assertIsNone(input.read_generic_value()) + # 0 + input = StreamInput(b"\x00\x04test") + self.assertEqual(input.read_generic_value(), "test") + # 1 + input = StreamInput(b"\x01\x00\x00\x00\x2a") + self.assertEqual(input.read_generic_value(), 42) + # 2 + input = StreamInput(b"\x02\x00\x00\x00\x01\x02\x03\x04\x05") + self.assertEqual(input.read_generic_value(), 4328719365) + # 5 + input = StreamInput(b"\x05\x01") + self.assertEqual(input.read_generic_value(), True) + # 6 + input = StreamInput(b"\x06\x00") + self.assertEqual(input.read_generic_value(), b"") + input = StreamInput(b"\x06\x03\x27\x10\x42") + self.assertEqual(input.read_generic_value(), b"\x27\x10\x42") + # 7 + input = StreamInput(b"\x07\x00") + self.assertEqual(input.read_generic_value(), []) + input = StreamInput(b"\x07\x02\x00\x03foo\x00\x03bar") + self.assertEqual(input.read_generic_value(), ["foo", "bar"]) + # 11 + input = StreamInput(b"\x0b\x2a") + self.assertEqual(input.read_generic_value(), 42) + # 16 + input = StreamInput(b"\x10\x00\x2a") + self.assertEqual(input.read_generic_value(), 42) + # not implemented + input = StreamInput(b"\xfe") + self.assertRaises(Exception, input.read_generic_value) + def test_read_enum(self) -> None: TestEnum = Enum("TestEnum", ["FOO", "BAR", "BAZ"], start=0) input = StreamInput(b"\x01") From 0a6fda16fbc6b4f2194ce4b7bde46ef8cbb61342 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 21 Nov 2023 09:30:25 -0800 Subject: [PATCH 09/14] Add stream_output methods, address review comments Signed-off-by: Daniel Widdis --- src/opensearch_sdk_py/settings/settings.py | 42 ++------------- .../environment_settings_response.py | 8 +-- .../transport/stream_input.py | 8 ++- .../transport/stream_output.py | 53 ++++++++++--------- 4 files changed, 38 insertions(+), 73 deletions(-) diff --git a/src/opensearch_sdk_py/settings/settings.py b/src/opensearch_sdk_py/settings/settings.py index df3f83f..24b91b8 100644 --- a/src/opensearch_sdk_py/settings/settings.py +++ b/src/opensearch_sdk_py/settings/settings.py @@ -22,46 +22,11 @@ def __init__( ) -> None: self.settings = settings - def get(self, setting: str, default: Optional[str] = None) -> str: + def get(self, setting: str, default: Optional[str] = None) -> Optional[str]: s_value = self.settings.get(setting) - if s_value is None: - return default if default else str(None) - return str(s_value) - - def get_as_int(self, setting: str, default: int) -> int: - s_value = self.settings.get(setting) - if s_value is None: - return default - if isinstance(s_value, str): - return int(s_value) - raise Exception("value is not a string") - - def get_as_float(self, setting: str, default: float) -> float: - s_value = self.settings.get(setting) - if s_value is None: - return default - if isinstance(s_value, str): - return float(s_value) - raise Exception("value is not a string") - - def get_as_boolean(self, setting: str, default: bool) -> bool: - s_value = self.settings.get(setting) - if s_value is None: - return default - if isinstance(s_value, str): - return bool(s_value) - raise Exception("value is not a string") - - def get_as_list(self, setting: str, default: list[str] = []) -> list[str]: - s_value = self.settings.get(setting) - if s_value is None: - return default - if isinstance(s_value, str): - return s_value.split(",") - raise Exception("value is not a string") - - # TODO: get_as_time, get_as_bytes_size + return str(s_value) if s_value else default + # TODO change to read_from @staticmethod def read_settings_from_stream(input: StreamInput) -> "Settings": settings: dict[str, Union[str, Dict]] = {} @@ -72,6 +37,7 @@ def read_settings_from_stream(input: StreamInput) -> "Settings": settings[key] = value return Settings(settings) + # TODO change to write_to @staticmethod def write_settings_to_stream(settings: "Settings", out: StreamOutput) -> None: out.write_v_int(len(settings.settings)) diff --git a/src/opensearch_sdk_py/transport/environment_settings_response.py b/src/opensearch_sdk_py/transport/environment_settings_response.py index 7d28a50..03147f3 100644 --- a/src/opensearch_sdk_py/transport/environment_settings_response.py +++ b/src/opensearch_sdk_py/transport/environment_settings_response.py @@ -16,16 +16,16 @@ class EnvironmentSettingsResponse(TransportResponse): - def __init__(self, environmentSettings: Settings = None): + def __init__(self, environment_settings: Settings = None): super().__init__() - self.environmentSettings = environmentSettings + self.environment_settings = environment_settings def read_from(self, input: StreamInput) -> "EnvironmentSettingsResponse": super().read_from(input) - self.environmentSettings = Settings.read_settings_from_stream(input) + self.environment_settings = Settings.read_settings_from_stream(input) return self def write_to(self, output: StreamOutput) -> "EnvironmentSettingsResponse": super().write_to(output) - Settings.write_settings_to_stream(self.environmentSettings, output) + Settings.write_settings_to_stream(self.environment_settings, output) return self diff --git a/src/opensearch_sdk_py/transport/stream_input.py b/src/opensearch_sdk_py/transport/stream_input.py index 5c28dc6..f60bcfc 100644 --- a/src/opensearch_sdk_py/transport/stream_input.py +++ b/src/opensearch_sdk_py/transport/stream_input.py @@ -234,7 +234,7 @@ def read_generic_value(self) -> Any: type: int = self.read_byte() # TODO: Handle negatives and make this -1 # https://github.com/opensearch-project/opensearch-sdk-py/issues/88 - if type & 0xff == 0xff: + if type == 0xFF: return None reader: dict[int, Callable] = { 0: self.read_string, @@ -277,10 +277,8 @@ def read_array_list(self) -> list[Any]: return result def read_byte_array(self) -> bytes: - size: int = self.read_v_int() - if size == 0: - return b"" - return self.read_bytes(size) + size: int = self.read_array_size() + return self.read_bytes(size) if size > 0 else b"" def read_enum(self, enum: Enum) -> Any: return enum(self.read_v_int()) # type:ignore diff --git a/src/opensearch_sdk_py/transport/stream_output.py b/src/opensearch_sdk_py/transport/stream_output.py index 56adccf..16bf605 100644 --- a/src/opensearch_sdk_py/transport/stream_output.py +++ b/src/opensearch_sdk_py/transport/stream_output.py @@ -61,15 +61,9 @@ def write_version(self, version: Version) -> int: def write_long(self, i: int) -> int: return self.write(i.to_bytes(8, byteorder="big")) - # } - # /** - # Writes an array of bytes. - # # @param b the bytes to write - # - # def write_byteArray(byte[] b) throws IOException { - # writeVInt(b.length); - # write_bytes(b, 0, b.length); - # } + def write_byte_array(self, b: bytes) -> int: + self.write_v_int(len(b)) + return self.write(b) # /** # Writes the bytes reference, including a length header. @@ -540,15 +534,12 @@ def string_to_string_collection_dict_size(self, d: dict[str, Union[list[str], se # o.writeVInt(bytes.length); # o.write_bytes(bytes); # }); - # writers.put(List.class, (o, v) -> { - # o.write_byte((byte) 7); - # @SuppressWarnings("rawtypes") - # final List list = (List) v; - # o.writeVInt(list.size()); - # for (Object item : list) { - # o.writeGenericValue(item); - # } - # }); + + def write_array_list(self, li: list[Any]) -> None: + self.write_v_int(len(li)) + for i in li: + self.write_generic_value(i) + # writers.put(Object[].class, (o, v) -> { # o.write_byte((byte) 8); # final Object[] list = (Object[]) v; @@ -652,17 +643,27 @@ def string_to_string_collection_dict_size(self, d: dict[str, Union[list[str], se # } # } - # /** - # Notice: when serialization a map, the stream out map with the stream in map maybe have the - # different key-value orders, they will maybe have different stream order. - # If want to keep stream out map and stream in map have the same stream order when stream, - # can use {@code writeMapWithConsistentOrder} - # def write_generic_value(self, value: Any) -> None: if value is None: - self.write_byte(-1) + # TODO: Handle negatives and make this -1 + # https://github.com/opensearch-project/opensearch-sdk-py/issues/88 + self.write_byte(0xFF) + elif isinstance(value, str): + self.write_byte(0) + self.write_string(value) + elif isinstance(value, int): + self.write_byte(2) + self.write(value.to_bytes(8, "big", signed=True)) + elif isinstance(value, bool): + self.write_byte(5) + self.write_boolean(value) + elif isinstance(value, bytes): + self.write_byte(6) + self.write_byte_array(value) + elif isinstance(value, list): + self.write_byte(7) + self.write_array_list(value) - # TODO: Continue porting # final Class type = getGenericType(value); # Writer writer = WriteableRegistry.getWriter(type); # if (writer == null) { From af07dcac0ece49bcdcb6440d6491cc029cfcf8f9 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 21 Nov 2023 11:39:04 -0800 Subject: [PATCH 10/14] Handle signed integers in streaminput/output Signed-off-by: Daniel Widdis --- .../transport/stream_input.py | 38 +++++-------------- .../transport/stream_output.py | 12 +++--- tests/transport/test_stream_input.py | 26 ++++++------- 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/src/opensearch_sdk_py/transport/stream_input.py b/src/opensearch_sdk_py/transport/stream_input.py index f60bcfc..1cb9237 100644 --- a/src/opensearch_sdk_py/transport/stream_input.py +++ b/src/opensearch_sdk_py/transport/stream_input.py @@ -20,16 +20,16 @@ def __init__(self, input: Union[bytearray, bytes]) -> None: self.data = io.BytesIO(input) def read_byte(self) -> int: - return self.data.read(1)[0] + return int.from_bytes(self.data.read(1), byteorder="big", signed=True) def read_bytes(self, len: int) -> bytes: return self.data.read(len) def read_int(self) -> int: - return ((self.read_byte() & 0xFF) << 24) | ((self.read_byte() & 0xFF) << 16) | ((self.read_byte() & 0xFF) << 8) | (self.read_byte() & 0xFF) + return int.from_bytes(self.data.read(4), byteorder="big", signed=True) def read_short(self) -> int: - return ((self.read_byte() & 0xFF) << 8) | (self.read_byte() & 0xFF) + return int.from_bytes(self.data.read(2), byteorder="big", signed=True) def read_boolean(self) -> bool: value = self.read_byte() @@ -88,9 +88,8 @@ def read_optional_int(self) -> Optional[int]: else: return None - # reads eight bytes and returns a long def read_long(self) -> int: - return self.read_int() << 32 | self.read_int() & 0xFFFFFFFF + return int.from_bytes(self.data.read(8), byteorder="big", signed=True) # reads a long stored in variable-length format def read_v_long(self) -> int: @@ -138,39 +137,22 @@ def read_v_long(self) -> int: return i def read_optional_v_long(self) -> Optional[int]: - if self.read_boolean(): - return self.read_v_long() - else: - return None + return self.read_v_long() if self.read_boolean() else None def read_optional_long(self) -> Optional[int]: - if self.read_boolean(): - return self.read_long() - else: - return None + return self.read_long() if self.read_boolean() else None def read_optional_string(self) -> Optional[str]: - if self.read_boolean(): - return self.read_string() - else: - return None + return self.read_string() if self.read_boolean() else None def read_array_size(self) -> int: array_size = self.read_v_int() - if array_size > 2**31: raise Exception(f"array length must be <= to {2**31} but was: {array_size}") - - # if array_size < 0: - # raise Exception(f"array size must be positive but was: {array_size}") - - # ensureCanReadBytes(arraySize); - return array_size def read_string(self) -> str: - char_count = self.read_array_size() - return str(self.read_bytes(char_count), "utf-8") + return str(self.read_bytes(self.read_array_size()), "utf-8") def read_string_array(self) -> list[str]: size = self.read_array_size() @@ -232,9 +214,7 @@ def read_string_to_string_set_dict(self) -> dict[str, set[str]]: def read_generic_value(self) -> Any: type: int = self.read_byte() - # TODO: Handle negatives and make this -1 - # https://github.com/opensearch-project/opensearch-sdk-py/issues/88 - if type == 0xFF: + if type == -1: return None reader: dict[int, Callable] = { 0: self.read_string, diff --git a/src/opensearch_sdk_py/transport/stream_output.py b/src/opensearch_sdk_py/transport/stream_output.py index 16bf605..7dbd860 100644 --- a/src/opensearch_sdk_py/transport/stream_output.py +++ b/src/opensearch_sdk_py/transport/stream_output.py @@ -15,12 +15,13 @@ class StreamOutput(BytesIO): + # writes a signed byte def write_byte(self, b: int) -> int: - return self.write(b.to_bytes(1, byteorder="big")) + return self.write(b.to_bytes(1, byteorder="big", signed=True)) - # writes an int as four bytes. + # writes a signed int as big-endian four bytes. def write_int(self, i: int) -> int: - return self.write(i.to_bytes(4, byteorder="big")) + return self.write(i.to_bytes(4, byteorder="big", signed=True)) # writes an int in a variable-length format def write_v_int(self, i: int) -> int: @@ -58,8 +59,9 @@ def version_size(self, version: Version) -> int: def write_version(self, version: Version) -> int: return self.write_v_int(version.id) + # writes a signed long as big-endian eight bytes. def write_long(self, i: int) -> int: - return self.write(i.to_bytes(8, byteorder="big")) + return self.write(i.to_bytes(8, byteorder="big", signed=True)) def write_byte_array(self, b: bytes) -> int: self.write_v_int(len(b)) @@ -653,7 +655,7 @@ def write_generic_value(self, value: Any) -> None: self.write_string(value) elif isinstance(value, int): self.write_byte(2) - self.write(value.to_bytes(8, "big", signed=True)) + self.write_int(value) elif isinstance(value, bool): self.write_byte(5) self.write_boolean(value) diff --git a/tests/transport/test_stream_input.py b/tests/transport/test_stream_input.py index 45869de..04419b2 100644 --- a/tests/transport/test_stream_input.py +++ b/tests/transport/test_stream_input.py @@ -15,24 +15,23 @@ class TestStreamInput(unittest.TestCase): def test_read_byte(self) -> None: - input = StreamInput(b"\x2a") + input = StreamInput(b"\x2a\xff") self.assertEqual(input.read_byte(), 42) + self.assertEqual(input.read_byte(), -1) def test_read_bytes(self) -> None: input = StreamInput(b"\x27\x10\x42") self.assertEqual(input.read_bytes(3), b"\x27\x10\x42") def test_read_int(self) -> None: - input = StreamInput(b"\x00\x00\x00\x2a\x00\x00\x00\x2a\x00") - self.assertEqual(input.read_int(), 42) + input = StreamInput(b"\x00\x00\x00\x2a\xff\xff\xff\xff") self.assertEqual(input.read_int(), 42) - self.assertRaises(IndexError, input.read_int) + self.assertEqual(input.read_int(), -1) def test_read_short(self) -> None: - input = StreamInput(b"\x12\x34\x56\x78\x90") + input = StreamInput(b"\x12\x34\xff\xff") self.assertEqual(input.read_short(), 4660) - self.assertEqual(input.read_short(), 22136) - self.assertRaises(IndexError, input.read_short) + self.assertEqual(input.read_short(), -1) def test_read_boolean(self) -> None: input = StreamInput(b"\x00\x01\x02") @@ -68,16 +67,15 @@ def test_read_version(self) -> None: self.assertEqual(str(v), "2.10.0.99") def test_read_optional_int(self) -> None: - input = StreamInput(b"\x01\x00\x00\x00\x2a\x00\x01") + input = StreamInput(b"\x01\x00\x00\x00\x2a\x01\xff\xff\xff\xff\x00") self.assertEqual(input.read_optional_int(), 42) + self.assertEqual(input.read_optional_int(), -1) self.assertEqual(input.read_optional_int(), None) - self.assertRaises(IndexError, input.read_optional_int) def test_read_long(self) -> None: - input = StreamInput(b"\x00\x00\x00\x01\x02\x03\x04\x05\x00\x00\x00\x01\x02\x03\x04\x05\x00") - self.assertEqual(input.read_long(), 4328719365) + input = StreamInput(b"\x00\x00\x00\x01\x02\x03\x04\x05\xff\xff\xff\xff\xff\xff\xff\xff") self.assertEqual(input.read_long(), 4328719365) - self.assertRaises(IndexError, input.read_long) + self.assertEqual(input.read_long(), -1) def test_read_v_long(self) -> None: input = StreamInput(b"\x2a") @@ -112,10 +110,10 @@ def test_read_optional_v_long(self) -> None: self.assertRaises(Exception, input.read_optional_v_long) def test_read_optional_long(self) -> None: - input = StreamInput(b"\x01\x00\x00\x00\x00\x00\x00\x00\x2a\x00\x01") + input = StreamInput(b"\x01\x00\x00\x00\x00\x00\x00\x00\x2a\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00") self.assertEqual(input.read_optional_long(), 42) + self.assertEqual(input.read_optional_long(), -1) self.assertEqual(input.read_optional_long(), None) - self.assertRaises(IndexError, input.read_optional_long) def test_read_optional_string(self) -> None: input = StreamInput(b"\x01\x04test") From 127d0993b48f703752bc2e62afcaee1fe59ffdd9 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 21 Nov 2023 12:58:25 -0800 Subject: [PATCH 11/14] Streamoutput tests and handle bool case before int bugfix Signed-off-by: Daniel Widdis --- .../transport/stream_output.py | 17 +++++++------- tests/transport/test_stream_output.py | 22 +++++++++++++++++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/opensearch_sdk_py/transport/stream_output.py b/src/opensearch_sdk_py/transport/stream_output.py index 7dbd860..d038573 100644 --- a/src/opensearch_sdk_py/transport/stream_output.py +++ b/src/opensearch_sdk_py/transport/stream_output.py @@ -63,9 +63,9 @@ def write_version(self, version: Version) -> int: def write_long(self, i: int) -> int: return self.write(i.to_bytes(8, byteorder="big", signed=True)) - def write_byte_array(self, b: bytes) -> int: + def write_byte_array(self, b: bytes) -> None: self.write_v_int(len(b)) - return self.write(b) + self.write(b) # /** # Writes the bytes reference, including a length header. @@ -647,18 +647,17 @@ def write_array_list(self, li: list[Any]) -> None: def write_generic_value(self, value: Any) -> None: if value is None: - # TODO: Handle negatives and make this -1 - # https://github.com/opensearch-project/opensearch-sdk-py/issues/88 - self.write_byte(0xFF) + self.write_byte(-1) + # bool is int subclass so must handle before other int types + elif isinstance(value, bool): + self.write_byte(5) + self.write_boolean(value) elif isinstance(value, str): self.write_byte(0) self.write_string(value) elif isinstance(value, int): self.write_byte(2) - self.write_int(value) - elif isinstance(value, bool): - self.write_byte(5) - self.write_boolean(value) + self.write_long(value) elif isinstance(value, bytes): self.write_byte(6) self.write_byte_array(value) diff --git a/tests/transport/test_stream_output.py b/tests/transport/test_stream_output.py index 94df965..93ffa19 100644 --- a/tests/transport/test_stream_output.py +++ b/tests/transport/test_stream_output.py @@ -100,6 +100,11 @@ def test_write_long(self) -> None: out.write_long(5409454583320448) self.assertEqual(out.getvalue(), b"\x00\x13\x37\xde\xca\xde\x0f\x80") + def test_write_byte_array(self) -> None: + out = StreamOutput() + out.write_byte_array(b"test") + self.assertEqual(out.getvalue(), b"\x04test") + def test_write_string(self) -> None: out = StreamOutput() out.write_string("test") @@ -149,6 +154,23 @@ def test_write_string_to_string_set_dict(self) -> None: ) self.assertEqual(StreamOutput.string_to_string_collection_dict_size(d), len(out.getvalue())) + def test_write_array_list(self) -> None: + out = StreamOutput() + out.write_array_list(["foo", "bar"]) + self.assertEqual(out.getvalue(), b"\x02\x00\x03foo\x00\x03bar") + + def test_write_generic_value(self) -> None: + out = StreamOutput() + out.write_generic_value(None) + out.write_generic_value("test") + out.write_generic_value(42) + out.write_generic_value(True) + self.assertEqual(out.getvalue(), b"\xff\x00\x04test\x02\x00\x00\x00\x00\x00\x00\x00\x2a\05\01") + out = StreamOutput() + out.write_generic_value(b"test") + out.write_generic_value(["foo", "bar"]) + self.assertEqual(out.getvalue(), b"\x06\x04test\x07\x02\x00\x03foo\x00\x03bar") + def test_write_enum(self) -> None: TestEnum = Enum("TestEnum", ["FOO", "BAR", "BAZ"], start=0) out = StreamOutput() From 214ce8901d6719cba74d59573e95db333184fd40 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 21 Nov 2023 14:36:50 -0800 Subject: [PATCH 12/14] Convert Settings reader and writer, add tests Signed-off-by: Daniel Widdis --- src/opensearch_sdk_py/settings/settings.py | 29 ++++++-------- tests/settings/test_settings.py | 46 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 17 deletions(-) create mode 100644 tests/settings/test_settings.py diff --git a/src/opensearch_sdk_py/settings/settings.py b/src/opensearch_sdk_py/settings/settings.py index 24b91b8..9021f17 100644 --- a/src/opensearch_sdk_py/settings/settings.py +++ b/src/opensearch_sdk_py/settings/settings.py @@ -22,26 +22,21 @@ def __init__( ) -> None: self.settings = settings - def get(self, setting: str, default: Optional[str] = None) -> Optional[str]: - s_value = self.settings.get(setting) - return str(s_value) if s_value else default - - # TODO change to read_from - @staticmethod - def read_settings_from_stream(input: StreamInput) -> "Settings": - settings: dict[str, Union[str, Dict]] = {} + def read_from(self, input: StreamInput) -> "Settings": num_settings: int = input.read_v_int() for i in range(num_settings): key: str = input.read_string() value: Any = input.read_generic_value() - settings[key] = value - return Settings(settings) - - # TODO change to write_to - @staticmethod - def write_settings_to_stream(settings: "Settings", out: StreamOutput) -> None: - out.write_v_int(len(settings.settings)) - for key, value in settings.settings.items(): + self.settings[key] = value + return self + + def write_to(self, out: StreamOutput) -> "Settings": + out.write_v_int(len(self.settings)) + for key, value in self.settings.items(): out.write_string(key) out.write_generic_value(value) - return + return self + + def get(self, setting: str, default: Optional[str] = None) -> Optional[str]: + s_value = self.settings.get(setting) + return str(s_value) if s_value else default diff --git a/tests/settings/test_settings.py b/tests/settings/test_settings.py new file mode 100644 index 0000000..d179a8c --- /dev/null +++ b/tests/settings/test_settings.py @@ -0,0 +1,46 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import unittest + +from opensearch_sdk_py.settings.settings import Settings +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput + + +class TestSettings(unittest.TestCase): + def test_settings(self) -> None: + settings = Settings() + self.assertIsNone(settings.get("bar")) + self.assertEqual(settings.get("bar", "baz"), "baz") + + def test_settings_read_write(self) -> None: + settings = Settings({"foo": "bar", "baz": 42, "qux": True, "bytes": b"test", "list": ["a", "b", "c"]}) + self.assertIsNone(settings.get("bar")) + self.assertEqual(settings.get("bar", "baz"), "baz") + self.assertEqual(settings.get("foo"), "bar") + self.assertEqual(settings.get("foo", "baz"), "bar") + self.assertEqual(settings.get("baz"), "42") + self.assertEqual(settings.get("qux"), "True") + self.assertEqual(settings.get("bytes"), "b'test'") + self.assertEqual(settings.get("list"), "['a', 'b', 'c']") + + output = StreamOutput() + settings.write_to(output) + input = StreamInput(output.getvalue()) + settings = Settings().read_from(input) + + self.assertIsNone(settings.get("bar")) + self.assertEqual(settings.get("bar", "baz"), "baz") + self.assertEqual(settings.get("foo"), "bar") + self.assertEqual(settings.get("foo", "baz"), "bar") + self.assertEqual(settings.get("baz"), "42") + self.assertEqual(settings.get("qux"), "True") + self.assertEqual(settings.get("bytes"), "b'test'") + self.assertEqual(settings.get("list"), "['a', 'b', 'c']") From d4b0303bb4cb1e5153013b564a4a367afdbf0027 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 21 Nov 2023 16:56:08 -0800 Subject: [PATCH 13/14] Fix env setting response class and add tests Signed-off-by: Daniel Widdis --- .../environment_settings_response.py | 4 +-- .../test_environment_settings_response.py | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 tests/transport/test_environment_settings_response.py diff --git a/src/opensearch_sdk_py/transport/environment_settings_response.py b/src/opensearch_sdk_py/transport/environment_settings_response.py index 03147f3..67136c7 100644 --- a/src/opensearch_sdk_py/transport/environment_settings_response.py +++ b/src/opensearch_sdk_py/transport/environment_settings_response.py @@ -22,10 +22,10 @@ def __init__(self, environment_settings: Settings = None): def read_from(self, input: StreamInput) -> "EnvironmentSettingsResponse": super().read_from(input) - self.environment_settings = Settings.read_settings_from_stream(input) + self.environment_settings = Settings().read_from(input) return self def write_to(self, output: StreamOutput) -> "EnvironmentSettingsResponse": super().write_to(output) - Settings.write_settings_to_stream(self.environment_settings, output) + self.environment_settings.write_to(output) return self diff --git a/tests/transport/test_environment_settings_response.py b/tests/transport/test_environment_settings_response.py new file mode 100644 index 0000000..94dc9f0 --- /dev/null +++ b/tests/transport/test_environment_settings_response.py @@ -0,0 +1,32 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import unittest + +from opensearch_sdk_py.settings.settings import Settings +from opensearch_sdk_py.transport.environment_settings_response import EnvironmentSettingsResponse +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput + + +class TestEnvironmentSettingsResponse(unittest.TestCase): + def test_environment_settings_response(self) -> None: + esr = EnvironmentSettingsResponse() + self.assertIsNone(esr.environment_settings) + + settings = Settings({"foo": "bar"}) + esr = EnvironmentSettingsResponse(settings) + self.assertEqual(esr.environment_settings.get("foo"), "bar") + + out = StreamOutput() + esr.write_to(out) + input = StreamInput(out.getvalue()) + esr = EnvironmentSettingsResponse().read_from(input) + + self.assertEqual(esr.environment_settings.get("foo"), "bar") From d213f8b65591eeaf6b68a9d046bd8849d0046341 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 21 Nov 2023 18:26:49 -0800 Subject: [PATCH 14/14] Response handler tests Signed-off-by: Daniel Widdis --- ...t_environment_settings_response_handler.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 tests/actions/internal/test_environment_settings_response_handler.py diff --git a/tests/actions/internal/test_environment_settings_response_handler.py b/tests/actions/internal/test_environment_settings_response_handler.py new file mode 100644 index 0000000..c21f8fc --- /dev/null +++ b/tests/actions/internal/test_environment_settings_response_handler.py @@ -0,0 +1,38 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +import unittest +from typing import Optional + +from opensearch_sdk_py.actions.internal.environment_settings_response_handler import EnvironmentSettingsResponseHandler +from opensearch_sdk_py.actions.response_handler import ResponseHandler +from opensearch_sdk_py.settings.settings import Settings +from opensearch_sdk_py.transport.environment_settings_response import EnvironmentSettingsResponse +from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest +from opensearch_sdk_py.transport.stream_input import StreamInput +from opensearch_sdk_py.transport.stream_output import StreamOutput +from opensearch_sdk_py.transport.version import Version + + +class TestEnvironmentSettingsResponseHandler(unittest.TestCase): + def test_environment_settings_response_handler(self) -> None: + settings = Settings({"foo": "bar"}) + input = StreamInput(bytes(OutboundMessageRequest(version=Version(2100099), message=EnvironmentSettingsResponse(settings)))) + omr = OutboundMessageRequest().read_from(input) + next_handler = FakeResponseHandler() + output = EnvironmentSettingsResponseHandler(next_handler).handle(omr, input) + self.assertEqual(output, b"test") + + +class FakeResponseHandler(ResponseHandler): + def handle(self, input: StreamInput = None) -> Optional[bytes]: + pass + + def send(self) -> StreamOutput: + return b"test"