Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get Environment Settings from OpenSearch #87

Merged
merged 14 commits into from
Nov 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@

import logging

from opensearch_sdk_py.actions.internal.environment_settings_response_handler import EnvironmentSettingsResponseHandler
from opensearch_sdk_py.actions.internal.register_rest_actions_response_handler import RegisterRestActionsResponseHandler
from opensearch_sdk_py.actions.request_handler import RequestHandler
from opensearch_sdk_py.actions.response_handlers import ResponseHandlers
from opensearch_sdk_py.api.action_extension import ActionExtension
from opensearch_sdk_py.transport.extension_transport_request import ExtensionTransportRequest
from opensearch_sdk_py.transport.initialize_extension_request import InitializeExtensionRequest
from opensearch_sdk_py.transport.initialize_extension_response import InitializeExtensionResponse
from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest
from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse
from opensearch_sdk_py.transport.register_rest_actions_request import RegisterRestActionsRequest
from opensearch_sdk_py.transport.request_type import RequestType
from opensearch_sdk_py.transport.stream_input import StreamInput
from opensearch_sdk_py.transport.stream_output import StreamOutput

Expand All @@ -31,8 +34,11 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO
initialize_extension_request = InitializeExtensionRequest().read_from(input)
logging.debug(f"< {initialize_extension_request}")

# Create the response message preserving the request id, but don't send it yet.
# This will be sent when response handler calls send()
# We will stack requests/responses, generating them in the reverse order that we send them
# Order of sending: resgister rest actions, then environment settings request, then init response
# Order of generating: init response, environment settings request, register rest actions

# Final Init Response to this init request, preserving the request ID
self.response = OutboundMessageResponse(
request.thread_context_struct,
request.features,
Expand All @@ -43,24 +49,27 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO
request.is_compress,
)

# Sometime between tcp and transport handshakes and the eventual response,
# the uniqueId gets added to the thread context.
# request.thread_context_struct.request_headers["extension_unique_id"] = self.extension.name

# Now send our own initialization requests.
# Stack the Environment Settings request/response, chained to the above init response
settings_request = OutboundMessageRequest(
thread_context=request.thread_context_struct,
features=request.features,
message=ExtensionTransportRequest(RequestType.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS),
version=request.version,
action="internal:discovery/enviornmentsettings",
)
settings_response_handler = EnvironmentSettingsResponseHandler(self)
self.response_handlers.register(settings_request.request_id, settings_response_handler)

# Create the request, this gets us an auto-increment request id
# Stack the Register Rest request/response, chained to the above env settings request
register_request = OutboundMessageRequest(
thread_context=request.thread_context_struct,
features=request.features,
message=RegisterRestActionsRequest(self.extension.name, self.extension.named_routes),
version=request.version,
action="internal:discovery/registerrestactions",
is_handshake=False,
is_compress=False,
)
# Register response handler to handle this request ID invoking this class's send()
register_response_handler = RegisterRestActionsResponseHandler(self)
register_response_handler = RegisterRestActionsResponseHandler(settings_response_handler, settings_request)
self.response_handlers.register(register_request.request_id, register_response_handler)
# Now send the request

# Now send the request at top of stack
return register_response_handler.send(register_request)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

import logging

from opensearch_sdk_py.actions.request_response_handler import RequestResponseHandler
from opensearch_sdk_py.actions.response_handler import ResponseHandler
from opensearch_sdk_py.transport.environment_settings_response import EnvironmentSettingsResponse
from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest
from opensearch_sdk_py.transport.stream_input import StreamInput
from opensearch_sdk_py.transport.stream_output import StreamOutput


class EnvironmentSettingsResponseHandler(ResponseHandler):
def __init__(self, next_handler: RequestResponseHandler) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably sink next_handler into either the response handler (or a new class), or manage the sequence of handlers outside of the handler itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably sink next_handler into either the response handler (or a new class), or manage the sequence of handlers outside of the handler itself.

100% agree, primarily with the "outside of the handler itself" part. I'm convinced there's got to be a better, more general way to "wait for this set of responses to complete".

Related, RequestHandler and ResponseHandler have differing arguments for send() but they ought to be the same.

self.next_handler = next_handler

def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput:
env_response = EnvironmentSettingsResponse().read_from(input)
logging.debug(f"< {env_response}")
# TODO save the settings somewhere
return self.next_handler.send()
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@


class RegisterRestActionsResponseHandler(ResponseHandler):
def __init__(self, next_handler: RequestResponseHandler) -> None:
def __init__(self, next_handler: RequestResponseHandler, request: OutboundMessageRequest) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce OutboundResponseHandler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce OutboundResponseHandler?

Not sure what you mean here. Then again, this whole set of code is somewhat spaghetti so I'm not sure what I mean either. I cobbled enough together to get it working and will try to figure out a better way to do all this.

self.next_handler = next_handler
self.request = request

def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput:
ack_response = AcknowledgedResponse().read_from(input)
logging.debug(f"< {ack_response}")
if ack_response.status:
return self.next_handler.send()
return self.next_handler.send(self.request)
else:
# TODO error handling
return None
18 changes: 9 additions & 9 deletions src/opensearch_sdk_py/protobuf/ExtensionIdentityProto_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@

# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: ExtensionIdentityProto.proto
# Protobuf Python Version: 4.25.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder

# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1c\x45xtensionIdentityProto.proto\x12\x1forg.opensearch.extensions.proto"%\n\x11\x45xtensionIdentity\x12\x10\n\x08uniqueId\x18\x01 \x01(\tB\x18\x42\x16\x45xtensionIdentityProtob\x06proto3'
)


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x45xtensionIdentityProto.proto\x12\x1forg.opensearch.extensions.proto\"%\n\x11\x45xtensionIdentity\x12\x10\n\x08uniqueId\x18\x01 \x01(\tB\x18\x42\x16\x45xtensionIdentityProtob\x06proto3')

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "ExtensionIdentityProto_pb2", _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'ExtensionIdentityProto_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False: # pragma: no cover
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b"B\026ExtensionIdentityProto"
_globals["_EXTENSIONIDENTITY"]._serialized_start = 65
_globals["_EXTENSIONIDENTITY"]._serialized_end = 102
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'B\026ExtensionIdentityProto'
_globals['_EXTENSIONIDENTITY']._serialized_start=65
_globals['_EXTENSIONIDENTITY']._serialized_end=102
# @@protoc_insertion_point(module_scope)
33 changes: 33 additions & 0 deletions src/opensearch_sdk_py/protobuf/ExtensionRequestProto.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

syntax = "proto3";
package org.opensearch.extensions.proto;

import "ExtensionIdentityProto.proto";
option java_outer_classname = "ExtensionRequestProto";

enum RequestType {
REQUEST_EXTENSION_CLUSTER_STATE = 0;
REQUEST_EXTENSION_CLUSTER_SETTINGS = 1;
REQUEST_EXTENSION_REGISTER_REST_ACTIONS = 2;
REQUEST_EXTENSION_REGISTER_SETTINGS = 3;
REQUEST_EXTENSION_ENVIRONMENT_SETTINGS = 4;
REQUEST_EXTENSION_DEPENDENCY_INFORMATION = 5;
CREATE_COMPONENT = 6;
ON_INDEX_MODULE = 7;
GET_SETTINGS = 8;
}

message ExtensionRequest {
ExtensionIdentity identity = 1;
RequestType requestType = 2;
}
39 changes: 39 additions & 0 deletions src/opensearch_sdk_py/protobuf/ExtensionRequestProto_pb2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: ExtensionRequestProto.proto
# Protobuf Python Version: 4.25.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()


from . import ExtensionIdentityProto_pb2 as ExtensionIdentityProto__pb2


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x45xtensionRequestProto.proto\x12\x1forg.opensearch.extensions.proto\x1a\x1c\x45xtensionIdentityProto.proto\"\x9b\x01\n\x10\x45xtensionRequest\x12\x44\n\x08identity\x18\x01 \x01(\x0b\x32\x32.org.opensearch.extensions.proto.ExtensionIdentity\x12\x41\n\x0brequestType\x18\x02 \x01(\x0e\x32,.org.opensearch.extensions.proto.RequestType*\xc7\x02\n\x0bRequestType\x12#\n\x1fREQUEST_EXTENSION_CLUSTER_STATE\x10\x00\x12&\n\"REQUEST_EXTENSION_CLUSTER_SETTINGS\x10\x01\x12+\n\'REQUEST_EXTENSION_REGISTER_REST_ACTIONS\x10\x02\x12\'\n#REQUEST_EXTENSION_REGISTER_SETTINGS\x10\x03\x12*\n&REQUEST_EXTENSION_ENVIRONMENT_SETTINGS\x10\x04\x12,\n(REQUEST_EXTENSION_DEPENDENCY_INFORMATION\x10\x05\x12\x14\n\x10\x43REATE_COMPONENT\x10\x06\x12\x13\n\x0fON_INDEX_MODULE\x10\x07\x12\x10\n\x0cGET_SETTINGS\x10\x08\x42\x17\x42\x15\x45xtensionRequestProtob\x06proto3')

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'ExtensionRequestProto_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False: # pragma: no cover
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'B\025ExtensionRequestProto'
_globals['_REQUESTTYPE']._serialized_start=253
_globals['_REQUESTTYPE']._serialized_end=580
_globals['_EXTENSIONREQUEST']._serialized_start=95
_globals['_EXTENSIONREQUEST']._serialized_end=250
# @@protoc_insertion_point(module_scope)
16 changes: 7 additions & 9 deletions src/opensearch_sdk_py/protobuf/RegisterRestActionsProto_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: RegisterRestActionsProto.proto
# Protobuf Python Version: 4.25.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder

# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()
Expand All @@ -24,16 +24,14 @@
from . import ExtensionIdentityProto_pb2 as ExtensionIdentityProto__pb2


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1eRegisterRestActionsProto.proto\x12\x1forg.opensearch.extensions.proto\x1a\x1c\x45xtensionIdentityProto.proto"\x8f\x01\n\x13RegisterRestActions\x12\x44\n\x08identity\x18\x01 \x01(\x0b\x32\x32.org.opensearch.extensions.proto.ExtensionIdentity\x12\x13\n\x0brestActions\x18\x02 \x03(\t\x12\x1d\n\x15\x64\x65precatedRestActions\x18\x03 \x03(\tB\x1a\x42\x18RegisterRestActionsProtob\x06proto3'
)
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1eRegisterRestActionsProto.proto\x12\x1forg.opensearch.extensions.proto\x1a\x1c\x45xtensionIdentityProto.proto\"\x8f\x01\n\x13RegisterRestActions\x12\x44\n\x08identity\x18\x01 \x01(\x0b\x32\x32.org.opensearch.extensions.proto.ExtensionIdentity\x12\x13\n\x0brestActions\x18\x02 \x03(\t\x12\x1d\n\x15\x64\x65precatedRestActions\x18\x03 \x03(\tB\x1a\x42\x18RegisterRestActionsProtob\x06proto3')

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "RegisterRestActionsProto_pb2", _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'RegisterRestActionsProto_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False: # pragma: no cover
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b"B\030RegisterRestActionsProto"
_globals["_REGISTERRESTACTIONS"]._serialized_start = 98
_globals["_REGISTERRESTACTIONS"]._serialized_end = 241
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'B\030RegisterRestActionsProto'
_globals['_REGISTERRESTACTIONS']._serialized_start=98
_globals['_REGISTERRESTACTIONS']._serialized_end=241
# @@protoc_insertion_point(module_scope)
42 changes: 42 additions & 0 deletions src/opensearch_sdk_py/settings/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#


from typing import Any, Dict, Optional, Union

from opensearch_sdk_py.transport.stream_input import StreamInput
from opensearch_sdk_py.transport.stream_output import StreamOutput


class Settings:
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would inherit from Dict.

I considered that but

  • there are actually two internal map-like structures. There's a secure_settings (not yet implemented) which acts similarly to a string-to-string map (including a get() method) and it's reasonable to keep them on the same level for consistency
  • we use a get() method which would conflict with the Dict get() method (or at least return unexpected results)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first one I wonder whether "secure setting" should be expressed as "a kind of setting".

In what way are we reimplementing get? I would expect that populating settings changes, but retrieval should be a direct lookup, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first one I wonder whether "secure setting" should be expressed as "a kind of setting".

It's actually an interface with multiple implementations. Here's one: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/common/settings/KeyStoreWrapper.java

I'd not want to stray too far from the existing implementation.

In what way are we reimplementing get?

Our version returns a String (str) not an object. So an integer or boolean returns its string representation; an array/list object returns a string with square brackets; a map returns a key-value map with curly braces (basically JSON), a time value will include time units, byte sizes will include byte units, etc.

self.assertIsNone(settings.get("bar"))
self.assertEqual(settings.get("bar", "baz"), "baz")
self.assertEqual(settings.get("foo"), "bar")
self.assertEqual(settings.get("foo", "baz"), "bar")
self.assertEqual(settings.get("baz"), "42")
self.assertEqual(settings.get("qux"), "True")
self.assertEqual(settings.get("bytes"), "b'test'")
self.assertEqual(settings.get("list"), "['a', 'b', 'c']")

These Strings will be parsed by our future implementation of the Setting<T> class, each type having its own parser for the str.

def __init__(
self,
settings: dict[str, Union[str, Dict]] = {},
# TODO: Secure Settings
) -> None:
self.settings = settings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When initialized with a Dict I think you don't get a copy, does it behave like so?

initial = { 'x': 'y' }
settings = Settings(initial)
settings.read_from(...)
print(initial['Z']) # yields results

If it does, we should copy the dictionary or remove initializing with an external argument altogether.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial static method implementation didn't have that problem. :)

We could always just empty the map on read_from.


def read_from(self, input: StreamInput) -> "Settings":
num_settings: int = input.read_v_int()
for i in range(num_settings):
key: str = input.read_string()
value: Any = input.read_generic_value()
self.settings[key] = value
return self

def write_to(self, out: StreamOutput) -> "Settings":
out.write_v_int(len(self.settings))
for key, value in self.settings.items():
out.write_string(key)
out.write_generic_value(value)
return self

def get(self, setting: str, default: Optional[str] = None) -> Optional[str]:
s_value = self.settings.get(setting)
return str(s_value) if s_value else default
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

# https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/env/EnvironmentSettingsResponse.java

from opensearch_sdk_py.settings.settings import Settings
from opensearch_sdk_py.transport.stream_input import StreamInput
from opensearch_sdk_py.transport.stream_output import StreamOutput
from opensearch_sdk_py.transport.transport_response import TransportResponse


class EnvironmentSettingsResponse(TransportResponse):
def __init__(self, environment_settings: Settings = None):
super().__init__()
self.environment_settings = environment_settings

def read_from(self, input: StreamInput) -> "EnvironmentSettingsResponse":
super().read_from(input)
self.environment_settings = Settings().read_from(input)
return self

def write_to(self, output: StreamOutput) -> "EnvironmentSettingsResponse":
super().write_to(output)
self.environment_settings.write_to(output)
return self
43 changes: 43 additions & 0 deletions src/opensearch_sdk_py/transport/extension_transport_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

# https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/extensions/ExtensionRequest.java


from opensearch_sdk_py.protobuf.ExtensionRequestProto_pb2 import ExtensionRequest
from opensearch_sdk_py.transport.request_type import RequestType
from opensearch_sdk_py.transport.stream_input import StreamInput
from opensearch_sdk_py.transport.stream_output import StreamOutput
from opensearch_sdk_py.transport.transport_request import TransportRequest


class ExtensionTransportRequest(TransportRequest):
def __init__(
self,
request_type: "RequestType",
unique_id: str = "",
) -> None:
super().__init__()
self.er = ExtensionRequest()
self.er.requestType = request_type.value
self.er.identity.uniqueId = unique_id

def read_from(self, input: StreamInput) -> "ExtensionTransportRequest":
super().read_from(input)
er_bytes = input.read_bytes(input.read_v_int())
self.er = ExtensionRequest()
self.er.ParseFromString(er_bytes)
return self

def write_to(self, output: StreamOutput) -> "ExtensionTransportRequest":
super().write_to(output)
er_bytes = self.er.SerializeToString()
output.write_v_int(len(er_bytes))
output.write(er_bytes)
return self
Loading