From f6bbb15acc7038cc47a3ee2cecd1a275e051b45b Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Tue, 5 Mar 2019 19:39:52 +0000 Subject: [PATCH 1/7] Hide some low level connection related APIs --- docs/source/api.rst | 14 ----- src/seabolt/src/bolt/address-private.h | 31 ++++++++++ src/seabolt/src/bolt/address.h | 33 ----------- src/seabolt/src/bolt/connection-private.h | 72 ++++++++++++++++++++++- src/seabolt/src/bolt/connection.c | 4 ++ src/seabolt/src/bolt/connection.h | 62 ------------------- 6 files changed, 106 insertions(+), 110 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index d70013dc..ee9f4d39 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -32,10 +32,6 @@ BoltAddress .. doxygenfunction:: BoltAddress_port -.. doxygenfunction:: BoltAddress_resolve - -.. doxygenfunction:: BoltAddress_copy_resolved_host - BoltAddressSet -------------- @@ -128,16 +124,6 @@ BoltConnection .. doxygentypedef:: BoltConnection -.. doxygenfunction:: BoltConnection_create - -.. doxygenfunction:: BoltConnection_destroy - -.. doxygenfunction:: BoltConnection_open - -.. doxygenfunction:: BoltConnection_close - -.. doxygenfunction:: BoltConnection_init - .. doxygenfunction:: BoltConnection_send .. doxygenfunction:: BoltConnection_fetch diff --git a/src/seabolt/src/bolt/address-private.h b/src/seabolt/src/bolt/address-private.h index e1323785..92607a6e 100644 --- a/src/seabolt/src/bolt/address-private.h +++ b/src/seabolt/src/bolt/address-private.h @@ -48,6 +48,37 @@ BoltAddress* BoltAddress_create_with_lock(const char* host, const char* port); BoltAddress* BoltAddress_create_from_string(const char* endpoint_str, uint64_t endpoint_len); +/** + * Resolves the original host and port into one or more IP addresses and + * a port number. + * + * This can be carried out more than once on the same + * address. Any newly-resolved addresses will replace any previously stored. + * + * The name resolution is a synchronized operation, i.e. concurrent resolution requests on the same + * instance are protected by a mutex. + * + * @param address the instance to be resolved. + * @param n_resolved number of resolved addresses that will be set upon successful resolution. + * @param log an optional \ref BoltLog instance to be used for logging purposes. + * @returns 0 for success, and non-zero error codes returned from getaddrinfo call on error. + */ +int32_t BoltAddress_resolve(BoltAddress* address, int32_t* n_resolved, BoltLog* log); +/** + * Copies the textual representation of the resolved IP address at the specified index into an already + * allocated buffer. + * + * If successful, AF_INET or AF_INET6 is returned depending on the address family. If unsuccessful, -1 is returned. + * Failure may be a result of a system problem or because the supplied buffer is too small for the address. + * + * @param address the instance to be queried. + * @param index index of the resolved IP address + * @param buffer destination buffer to write the IP address's string representation + * @param buffer_size size of the buffer + * @return address family (AF_INET or AF_INET6) or -1 on error + */ +int32_t +BoltAddress_copy_resolved_host(BoltAddress* address, int32_t index, char* buffer, int32_t buffer_size); #endif //SEABOLT_ADDRESS_PRIVATE_H diff --git a/src/seabolt/src/bolt/address.h b/src/seabolt/src/bolt/address.h index eb677976..fc95371e 100644 --- a/src/seabolt/src/bolt/address.h +++ b/src/seabolt/src/bolt/address.h @@ -59,39 +59,6 @@ SEABOLT_EXPORT const char* BoltAddress_host(BoltAddress* address); */ SEABOLT_EXPORT const char* BoltAddress_port(BoltAddress* address); -/** - * Resolves the original host and port into one or more IP addresses and - * a port number. - * - * This can be carried out more than once on the same - * address. Any newly-resolved addresses will replace any previously stored. - * - * The name resolution is a synchronized operation, i.e. concurrent resolution requests on the same - * instance are protected by a mutex. - * - * @param address the instance to be resolved. - * @param n_resolved number of resolved addresses that will be set upon successful resolution. - * @param log an optional \ref BoltLog instance to be used for logging purposes. - * @returns 0 for success, and non-zero error codes returned from getaddrinfo call on error. - */ -SEABOLT_EXPORT int32_t BoltAddress_resolve(BoltAddress* address, int32_t* n_resolved, BoltLog* log); - -/** - * Copies the textual representation of the resolved IP address at the specified index into an already - * allocated buffer. - * - * If successful, AF_INET or AF_INET6 is returned depending on the address family. If unsuccessful, -1 is returned. - * Failure may be a result of a system problem or because the supplied buffer is too small for the address. - * - * @param address the instance to be queried. - * @param index index of the resolved IP address - * @param buffer destination buffer to write the IP address's string representation - * @param buffer_size size of the buffer - * @return address family (AF_INET or AF_INET6) or -1 on error - */ -SEABOLT_EXPORT int32_t -BoltAddress_copy_resolved_host(BoltAddress* address, int32_t index, char* buffer, int32_t buffer_size); - /** * Destroys the passed \ref BoltAddress instance. * diff --git a/src/seabolt/src/bolt/connection-private.h b/src/seabolt/src/bolt/connection-private.h index f5b69795..9c541307 100644 --- a/src/seabolt/src/bolt/connection-private.h +++ b/src/seabolt/src/bolt/connection-private.h @@ -20,14 +20,21 @@ #define SEABOLT_CONNECTION_PRIVATE_H #include "communication.h" -#include "communication-secure.h" #include "connection.h" #include "status-private.h" +#include "bolt.h" + +/** + * Use BOLT_TRANSPORT_MOCK to establish a mock connection + */ +#define BOLT_TRANSPORT_MOCKED -1 typedef void (* error_action_func)(struct BoltConnection*, void*); typedef struct BoltConnectionMetrics BoltConnectionMetrics; +typedef struct BoltSecurityContext BoltSecurityContext; + /** * Record of connection usage statistics. */ @@ -79,6 +86,69 @@ struct BoltConnection { void* on_error_cb_state; }; +/** + * Creates a new instance of \ref BoltConnection. + * + * @return the pointer to the newly allocated \ref BoltConnection instance. + */ +BoltConnection* BoltConnection_create(); + +/** + * Destroys the passed \ref BoltConnection instance. + * + * @param connection the instance to be destroyed. + */ +void BoltConnection_destroy(BoltConnection* connection); + +/** + * Opens a connection to a Bolt server. + * + * This function attempts to connect a BoltConnection to _address_ over + * _transport_. The `address` should be a pointer to a `BoltAddress` struct + * that has been successfully resolved. + * + * This function blocks until the connection attempt succeeds or fails. + * On returning, the connection status will be set to either \ref BOLT_CONNECTION_STATE_CONNECTED + * (if successful) or \ref BOLT_CONNECTION_STATE_DEFUNCT (if not). If defunct, the actual error code will + * be returned. + * + * In case an error code is returned, more information can be gathered through \ref BoltStatus for which you can + * get a reference by calling \ref BoltConnection_status. + * + * @param connection the instance to attempt the connection. + * @param transport the type of transport over which to connect. + * @param address the Bolt server address. + * @param trust the trust settings to be used for \ref BOLT_TRANSPORT_ENCRYPTED connections. + * @param log the logger to be used for logging purposes. + * @param sock_opts the socket options to be applied to the underlying socket. + * @return \ref BOLT_SUCCESS if the connection was opened successfully, + * an error code otherwise. + */ +int32_t BoltConnection_open(BoltConnection* connection, BoltTransport transport, + BoltAddress* address, BoltTrust* trust, BoltLog* log, BoltSocketOptions* sock_opts); + +/** + * Closes the connection. + * + * @param connection the instance to be closed. + */ +void BoltConnection_close(BoltConnection* connection); + +/** + * Initialise the connection and authenticate using the provided authentication token. + * + * Returns 0 on success and -1 in case of an error. More information about the underlying error can be gathered + * through \ref BoltStatus for which you can get a reference by calling \ref BoltConnection_status. + * + * @param connection the instance to be initialised and authenticated. + * @param user_agent the user agent string to present to the server. + * @param auth_token dictionary that contains an authentication token. + * @return 0 on success, + * -1 on error. + */ +int32_t +BoltConnection_init(BoltConnection* connection, const char* user_agent, const BoltValue* auth_token); + /** * Take an exact amount of data from the receive buffer, deferring to * the socket if not enough data is available. diff --git a/src/seabolt/src/bolt/connection.c b/src/seabolt/src/bolt/connection.c index ff0e6023..2ca44b17 100644 --- a/src/seabolt/src/bolt/connection.c +++ b/src/seabolt/src/bolt/connection.c @@ -30,6 +30,7 @@ #include "v3.h" #include "atomic.h" #include "communication-plain.h" +#include "communication-secure.h" #define INITIAL_TX_BUFFER_SIZE 8192 #define INITIAL_RX_BUFFER_SIZE 8192 @@ -241,6 +242,9 @@ BoltConnection_open(BoltConnection* connection, BoltTransport transport, struct connection->comm = BoltCommunication_create_secure(connection->sec_context, trust, sock_opts, log, connection->address->host, connection->id); break; + case BOLT_TRANSPORT_MOCKED: + // Expect connection->comm to be explicitly set by the caller + break; } int status = BoltCommunication_open(connection->comm, address, connection->id); diff --git a/src/seabolt/src/bolt/connection.h b/src/seabolt/src/bolt/connection.h index 5d6c0334..042a63f1 100644 --- a/src/seabolt/src/bolt/connection.h +++ b/src/seabolt/src/bolt/connection.h @@ -38,68 +38,6 @@ typedef uint64_t BoltRequest; */ typedef struct BoltConnection BoltConnection; -/** - * Creates a new instance of \ref BoltConnection. - * - * @return the pointer to the newly allocated \ref BoltConnection instance. - */ -SEABOLT_EXPORT BoltConnection* BoltConnection_create(); - -/** - * Destroys the passed \ref BoltConnection instance. - * - * @param connection the instance to be destroyed. - */ -SEABOLT_EXPORT void BoltConnection_destroy(BoltConnection* connection); - -/** - * Opens a connection to a Bolt server. - * - * This function attempts to connect a BoltConnection to _address_ over - * _transport_. The `address` should be a pointer to a `BoltAddress` struct - * that has been successfully resolved. - * - * This function blocks until the connection attempt succeeds or fails. - * On returning, the connection status will be set to either \ref BOLT_CONNECTION_STATE_CONNECTED - * (if successful) or \ref BOLT_CONNECTION_STATE_DEFUNCT (if not). If defunct, the actual error code will - * be returned. - * - * In case an error code is returned, more information can be gathered through \ref BoltStatus for which you can - * get a reference by calling \ref BoltConnection_status. - * - * @param connection the instance to attempt the connection. - * @param transport the type of transport over which to connect. - * @param address the Bolt server address. - * @param trust the trust settings to be used for \ref BOLT_TRANSPORT_ENCRYPTED connections. - * @param log the logger to be used for logging purposes. - * @param sock_opts the socket options to be applied to the underlying socket. - * @return \ref BOLT_SUCCESS if the connection was opened successfully, - * an error code otherwise. - */ -SEABOLT_EXPORT int32_t BoltConnection_open(BoltConnection* connection, BoltTransport transport, - BoltAddress* address, BoltTrust* trust, BoltLog* log, BoltSocketOptions* sock_opts); - -/** - * Closes the connection. - * - * @param connection the instance to be closed. - */ -SEABOLT_EXPORT void BoltConnection_close(BoltConnection* connection); - -/** - * Initialise the connection and authenticate using the provided authentication token. - * - * Returns 0 on success and -1 in case of an error. More information about the underlying error can be gathered - * through \ref BoltStatus for which you can get a reference by calling \ref BoltConnection_status. - * - * @param connection the instance to be initialised and authenticated. - * @param user_agent the user agent string to present to the server. - * @param auth_token dictionary that contains an authentication token. - * @return 0 on success, - * -1 on error. - */ -SEABOLT_EXPORT int32_t -BoltConnection_init(BoltConnection* connection, const char* user_agent, const BoltValue* auth_token); /** * Sends all of the queued requests. From 61e8cd7c18ee12288b897f5ff5aa2a1e52216261 Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Tue, 5 Mar 2019 19:42:46 +0000 Subject: [PATCH 2/7] Introduce mock communication and unpooled connection pool --- src/seabolt/src/CMakeLists.txt | 2 + src/seabolt/src/bolt/communication-mock.c | 188 ++++++++++++++++++++++ src/seabolt/src/bolt/communication-mock.h | 35 ++++ src/seabolt/src/bolt/config-private.h | 7 + src/seabolt/src/bolt/no-pool.c | 173 ++++++++++++++++++++ src/seabolt/src/bolt/no-pool.h | 57 +++++++ 6 files changed, 462 insertions(+) create mode 100644 src/seabolt/src/bolt/communication-mock.c create mode 100644 src/seabolt/src/bolt/communication-mock.h create mode 100644 src/seabolt/src/bolt/no-pool.c create mode 100644 src/seabolt/src/bolt/no-pool.h diff --git a/src/seabolt/src/CMakeLists.txt b/src/seabolt/src/CMakeLists.txt index 0cdc9732..f3818ec3 100644 --- a/src/seabolt/src/CMakeLists.txt +++ b/src/seabolt/src/CMakeLists.txt @@ -21,12 +21,14 @@ list(APPEND private_source_files ${CMAKE_CURRENT_LIST_DIR}/bolt/connector.c ${CMAKE_CURRENT_LIST_DIR}/bolt/communication.c ${CMAKE_CURRENT_LIST_DIR}/bolt/communication-plain.c + ${CMAKE_CURRENT_LIST_DIR}/bolt/communication-mock.c ${CMAKE_CURRENT_LIST_DIR}/bolt/direct-pool.c ${CMAKE_CURRENT_LIST_DIR}/bolt/error.c ${CMAKE_CURRENT_LIST_DIR}/bolt/lifecycle.c ${CMAKE_CURRENT_LIST_DIR}/bolt/log.c ${CMAKE_CURRENT_LIST_DIR}/bolt/mem.c ${CMAKE_CURRENT_LIST_DIR}/bolt/name.c + ${CMAKE_CURRENT_LIST_DIR}/bolt/no-pool.c ${CMAKE_CURRENT_LIST_DIR}/bolt/packstream.c ${CMAKE_CURRENT_LIST_DIR}/bolt/protocol.c ${CMAKE_CURRENT_LIST_DIR}/bolt/routing-pool.c diff --git a/src/seabolt/src/bolt/communication-mock.c b/src/seabolt/src/bolt/communication-mock.c new file mode 100644 index 00000000..92cf56fb --- /dev/null +++ b/src/seabolt/src/bolt/communication-mock.c @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "communication-mock.h" +#include "bolt-private.h" +#include "config-private.h" +#include "mem.h" +#include "name.h" +#include "status-private.h" +#include "log-private.h" + +#define MAX_IPADDR_LEN 64 + +int mock_last_error(BoltCommunication* comm) +{ + UNUSED(comm); + return BOLT_SUCCESS; +} + +int mock_transform_error(BoltCommunication* comm, int error_code) +{ + UNUSED(comm); + UNUSED(error_code); + return BOLT_SUCCESS; +} + +int mock_socket_ignore_sigpipe(BoltCommunication* comm) +{ + BoltLog_debug(comm->log, "socket_ignore_sigpipe"); + return BOLT_SUCCESS; +} + +int mock_socket_restore_sigpipe(BoltCommunication* comm) +{ + BoltLog_debug(comm->log, "socket_restore_sigpipe"); + return BOLT_SUCCESS; +} + +int mock_socket_open(BoltCommunication* comm, const struct sockaddr_storage* address) +{ + BoltLog_debug(comm->log, "socket_open"); + + MockCommunicationContext* context = comm->context; + + char resolved_host[MAX_IPADDR_LEN], resolved_port[6]; + int status = get_address_components(address, resolved_host, MAX_IPADDR_LEN, resolved_port, 6); + if (status!=0) { + BoltStatus_set_error_with_ctx(comm->status, BOLT_ADDRESS_NAME_INFO_FAILED, + "mock_socket_open(%s:%d), remote get_address_components error code: %d", __FILE__, __LINE__, status); + return BOLT_STATUS_SET; + } + context->remote_endpoint = BoltAddress_create(resolved_host, resolved_port); + context->local_endpoint = BoltAddress_create("localhost", "65000"); + + BoltLog_debug(comm->log, "socket_open: connected"); + + return BOLT_SUCCESS; +} + +int mock_socket_close(BoltCommunication* comm) +{ + BoltLog_debug(comm->log, "socket_close"); + + MockCommunicationContext* context = comm->context; + + if (context->local_endpoint!=NULL) { + BoltAddress_destroy(context->local_endpoint); + context->local_endpoint = NULL; + } + + if (context->remote_endpoint!=NULL) { + BoltAddress_destroy(context->remote_endpoint); + context->remote_endpoint = NULL; + } + + return BOLT_SUCCESS; +} + +int mock_socket_send(BoltCommunication* comm, char* buffer, int length, int* sent) +{ + UNUSED(buffer); + BoltLog_debug(comm->log, "socket_send: %d bytes", length); + + *sent = length; + + return BOLT_SUCCESS; +} + +int mock_socket_recv(BoltCommunication* comm, char* buffer, int length, int* received) +{ + BoltLog_debug(comm->log, "socket_recv: %d bytes", length); + + MockCommunicationContext* context = comm->context; + if (!context->protocol_version_sent) { + memcpy_be(buffer, &context->protocol_version, sizeof(int32_t)); + context->protocol_version_sent = 1; + } + + *received = length; + + return BOLT_SUCCESS; +} + +int mock_socket_destroy(BoltCommunication* comm) +{ + BoltLog_debug(comm->log, "socket_destroy"); + + MockCommunicationContext* context = comm->context; + + if (context!=NULL) { + if (context->local_endpoint!=NULL) { + BoltAddress_destroy(context->local_endpoint); + context->local_endpoint = NULL; + } + if (context->remote_endpoint!=NULL) { + BoltAddress_destroy(context->remote_endpoint); + context->remote_endpoint = NULL; + } + + BoltMem_deallocate(context, sizeof(MockCommunicationContext)); + comm->context = NULL; + } + + return BOLT_SUCCESS; +} + +BoltAddress* mock_socket_local_endpoint(BoltCommunication* comm) +{ + MockCommunicationContext* context = comm->context; + return context->local_endpoint; +} + +BoltAddress* mock_socket_remote_endpoint(BoltCommunication* comm) +{ + MockCommunicationContext* context = comm->context; + return context->remote_endpoint; +} + +BoltCommunication* +BoltCommunication_create_mock(int32_t version, BoltSocketOptions* sock_opts, BoltLog* log) +{ + BoltCommunication* comm = BoltMem_allocate(sizeof(BoltCommunication)); + comm->open = &mock_socket_open; + comm->close = &mock_socket_close; + comm->send = &mock_socket_send; + comm->recv = &mock_socket_recv; + comm->destroy = &mock_socket_destroy; + + comm->get_local_endpoint = &mock_socket_local_endpoint; + comm->get_remote_endpoint = &mock_socket_remote_endpoint; + + comm->ignore_sigpipe = &mock_socket_ignore_sigpipe; + comm->restore_sigpipe = &mock_socket_restore_sigpipe; + + comm->last_error = &mock_last_error; + comm->transform_error = &mock_transform_error; + + comm->status_owned = 1; + comm->status = BoltStatus_create_with_ctx(1024); + comm->sock_opts_owned = sock_opts==NULL; + comm->sock_opts = sock_opts==NULL ? BoltSocketOptions_create() : sock_opts; + comm->log = log; + + MockCommunicationContext* context = BoltMem_allocate(sizeof(MockCommunicationContext)); + context->local_endpoint = NULL; + context->remote_endpoint = NULL; + context->protocol_version = version; + context->protocol_version_sent = 0; + + comm->context = context; + + return comm; +} diff --git a/src/seabolt/src/bolt/communication-mock.h b/src/seabolt/src/bolt/communication-mock.h new file mode 100644 index 00000000..167063ff --- /dev/null +++ b/src/seabolt/src/bolt/communication-mock.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SEABOLT_COMMUNICATION_MOCK_H +#define SEABOLT_COMMUNICATION_MOCK_H + +#include "communication.h" +#include "config.h" + +typedef struct MockCommunicationContext { + BoltAddress* local_endpoint; + BoltAddress* remote_endpoint; + + int32_t protocol_version; + int protocol_version_sent; +} MockCommunicationContext; + +BoltCommunication* BoltCommunication_create_mock(int32_t version, BoltSocketOptions* socket_options, BoltLog* log); + +#endif //SEABOLT_COMMUNICATION_MOCK_H diff --git a/src/seabolt/src/bolt/config-private.h b/src/seabolt/src/bolt/config-private.h index e7d9076e..935d6bac 100644 --- a/src/seabolt/src/bolt/config-private.h +++ b/src/seabolt/src/bolt/config-private.h @@ -21,6 +21,13 @@ #include "config.h" +/** + * Use BOLT_SCHEME_CONNECTION to establish connections on-demand to a single server without + * any connection pooling kicking in. The returned connection will behave as + * BOLT_SCHEME_DIRECT. + */ +#define BOLT_SCHEME_DIRECT_UNPOOLED 2 + struct BoltTrust { char* certs; uint64_t certs_len; diff --git a/src/seabolt/src/bolt/no-pool.c b/src/seabolt/src/bolt/no-pool.c new file mode 100644 index 00000000..9da40d10 --- /dev/null +++ b/src/seabolt/src/bolt/no-pool.c @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "bolt-private.h" +#include "address-private.h" +#include "atomic.h" +#include "config-private.h" +#include "connection-private.h" +#include "no-pool.h" +#include "log-private.h" +#include "mem.h" +#include "protocol.h" +#include "sync.h" +#include "time.h" + +#define MAX_ID_LEN 16 + +static int64_t pool_seq = 0; + +int find_open_connection(struct BoltNoPool* pool, struct BoltConnection* connection) +{ + for (int i = 0; i<pool->size; i++) { + struct BoltConnection* candidate = pool->connections[i]; + if (candidate==connection) { + return i; + } + } + return -1; +} + +struct BoltNoPool* BoltNoPool_create(const struct BoltAddress* address, const struct BoltValue* auth_token, + const struct BoltConfig* config) +{ + char* id = BoltMem_allocate(MAX_ID_LEN); + snprintf(id, MAX_ID_LEN, "pool-%" PRId64, BoltAtomic_increment(&pool_seq)); + BoltLog_info(config->log, "[%s]: Creating pool towards %s:%s", id, address->host, address->port); + struct BoltNoPool* pool = (struct BoltNoPool*) BoltMem_allocate(SIZE_OF_NO_POOL); + BoltSync_mutex_create(&pool->mutex); + pool->id = id; + pool->config = config; + pool->address = BoltAddress_create_with_lock(address->host, address->port); + pool->auth_token = auth_token; + pool->size = 0; + pool->connections = NULL; + return pool; +} + +void BoltNoPool_destroy(struct BoltNoPool* pool) +{ + BoltLog_info(pool->config->log, "[%s]: Destroying non-released connections towards %s:%s", pool->id, + pool->address->host, + pool->address->port); + for (int index = 0; index<pool->size; index++) { + BoltConnection_close(pool->connections[index]); + BoltConnection_destroy(pool->connections[index]); + } + BoltMem_deallocate(pool->connections, pool->size*sizeof(BoltConnection*)); + BoltAddress_destroy(pool->address); + BoltMem_deallocate(pool->id, MAX_ID_LEN); + BoltSync_mutex_destroy(&pool->mutex); + BoltMem_deallocate(pool, SIZE_OF_NO_POOL); +} + +BoltConnection* BoltNoPool_acquire(struct BoltNoPool* pool, BoltStatus* status) +{ + int pool_error = BOLT_SUCCESS; + BoltConnection* connection = NULL; + + BoltLog_info(pool->config->log, "[%s]: Acquiring connection towards %s:%s", pool->id, + pool->address->host, pool->address->port); + + connection = BoltConnection_create(); + + switch (BoltAddress_resolve(pool->address, NULL, pool->config->log)) { + case 0: + break; + default: + pool_error = BOLT_ADDRESS_NOT_RESOLVED; // Could not resolve address + } + + if (pool_error==BOLT_SUCCESS) { + switch (BoltConnection_open(connection, pool->config->transport, pool->address, pool->config->trust, + pool->config->log, pool->config->socket_options)) { + case 0: + break; + default: + pool_error = BOLT_CONNECTION_HAS_MORE_INFO; // Could not open socket + } + } + + if (pool_error==BOLT_SUCCESS) { + switch (BoltConnection_init(connection, pool->config->user_agent, pool->auth_token)) { + case 0: + break; + default: + pool_error = BOLT_CONNECTION_HAS_MORE_INFO; + } + } + + switch (pool_error) { + case BOLT_SUCCESS: { + status->state = connection->status->state; + status->error = BOLT_SUCCESS; + status->error_ctx = NULL; + status->error_ctx_size = 0; + + BoltSync_mutex_lock(&pool->mutex); + int index = pool->size; + pool->connections = BoltMem_reallocate(pool->connections, pool->size*sizeof(BoltConnection*), + (pool->size+1)*sizeof(BoltConnection*)); + pool->size = pool->size+1; + pool->connections[index] = connection; + BoltSync_mutex_unlock(&pool->mutex); + + break; + } + case BOLT_CONNECTION_HAS_MORE_INFO: + status->state = connection->status->state; + status->error = connection->status->error; + status->error_ctx_size = connection->status->error_ctx_size; + status->error_ctx = BoltMem_duplicate(connection->status->error_ctx, connection->status->error_ctx_size); + break; + default: + status->state = BOLT_CONNECTION_STATE_DISCONNECTED; + status->error = pool_error; + status->error_ctx = NULL; + status->error_ctx_size = 0; + break; + } + + return connection; +} + +int BoltNoPool_release(struct BoltNoPool* pool, struct BoltConnection* connection) +{ + BoltLog_info(pool->config->log, "[%s]: Closing connection towards %s:%s", pool->id, + pool->address->host, + pool->address->port); + + BoltSync_mutex_lock(&pool->mutex); + int index = find_open_connection(pool, connection); + if (index>=0) { + for (int i = index; i<pool->size-1; i++) { + pool->connections[i] = pool->connections[i+1]; + } + pool->connections = BoltMem_reallocate(pool->connections, pool->size*sizeof(BoltConnection*), + (pool->size-1)*sizeof(BoltConnection*)); + pool->size = pool->size-1; + } + BoltSync_mutex_unlock(&pool->mutex); + + BoltConnection_close(connection); + BoltConnection_destroy(connection); + + return index; +} + diff --git a/src/seabolt/src/bolt/no-pool.h b/src/seabolt/src/bolt/no-pool.h new file mode 100644 index 00000000..7d6d4f19 --- /dev/null +++ b/src/seabolt/src/bolt/no-pool.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + */ + +#ifndef SEABOLT_NO_POOL_H +#define SEABOLT_NO_POOL_H + +#include "communication-secure.h" +#include "connector.h" +#include "sync.h" + +/** + * Pooling contract for a no-pooling connection acquisition + */ +struct BoltNoPool { + mutex_t mutex; + char* id; + struct BoltAddress* address; + const struct BoltValue* auth_token; + const struct BoltConfig* config; + int size; + BoltConnection** connections; +}; + +#define SIZE_OF_NO_POOL sizeof(struct BoltNoPool) +#define SIZE_OF_NO_POOL_PTR sizeof(struct BoltNoPool*) + +struct BoltNoPool* +BoltNoPool_create(const struct BoltAddress* address, const struct BoltValue* auth_token, + const struct BoltConfig* config); + +void BoltNoPool_destroy(struct BoltNoPool* pool); + +BoltConnection* BoltNoPool_acquire(struct BoltNoPool* pool, BoltStatus* status); + +int BoltNoPool_release(struct BoltNoPool* pool, struct BoltConnection* connection); + +#endif //SEABOLT_POOLING_H From 14dcdb9280c575600605c1f525c1dd3a48f6475e Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Tue, 5 Mar 2019 19:44:00 +0000 Subject: [PATCH 3/7] Rename mode config to scheme --- docs/source/api.rst | 14 +++--- src/seabolt-cli/src/main.c | 2 +- src/seabolt/src/bolt/config-private.h | 2 +- src/seabolt/src/bolt/config.c | 12 +++--- src/seabolt/src/bolt/config.h | 30 ++++++++----- src/seabolt/src/bolt/connector.c | 61 ++++++++++++++++++--------- src/seabolt/tests/test-pooling.cpp | 8 ++-- 7 files changed, 81 insertions(+), 48 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index ee9f4d39..dafad042 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -74,9 +74,9 @@ BoltConfig .. doxygenfunction:: BoltConfig_destroy -.. doxygenfunction:: BoltConfig_get_mode +.. doxygenfunction:: BoltConfig_get_scheme -.. doxygenfunction:: BoltConfig_set_mode +.. doxygenfunction:: BoltConfig_set_scheme .. doxygenfunction:: BoltConfig_get_transport @@ -286,14 +286,16 @@ BoltLog .. doxygenfunction:: BoltLog_set_debug_func -BoltMode +BoltScheme -------- -.. doxygentypedef:: BoltMode +.. doxygentypedef:: BoltScheme -.. doxygendefine:: BOLT_MODE_DIRECT +.. doxygendefine:: BOLT_SCHEME_DIRECT -.. doxygendefine:: BOLT_MODE_ROUTING +.. doxygendefine:: BOLT_SCHEME_ROUTING + +.. doxygendefine:: BOLT_SCHEME_NEO4J BoltSocketOptions diff --git a/src/seabolt-cli/src/main.c b/src/seabolt-cli/src/main.c index b75524db..41204c25 100644 --- a/src/seabolt-cli/src/main.c +++ b/src/seabolt-cli/src/main.c @@ -213,7 +213,7 @@ struct Application* app_create(int argc, char** argv) BoltLog* log = create_logger(app->command==CMD_DEBUG); BoltConfig* config = BoltConfig_create(); - BoltConfig_set_mode(config, (strcmp(BOLT_CONFIG_ROUTING, "1")==0) ? BOLT_MODE_ROUTING : BOLT_MODE_DIRECT); + BoltConfig_set_scheme(config, (strcmp(BOLT_CONFIG_ROUTING, "1")==0) ? BOLT_SCHEME_NEO4J : BOLT_SCHEME_DIRECT); BoltConfig_set_transport(config, (strcmp(BOLT_CONFIG_SECURE, "1")==0) ? BOLT_TRANSPORT_ENCRYPTED : BOLT_TRANSPORT_PLAINTEXT); BoltConfig_set_user_agent(config, "seabolt/" SEABOLT_VERSION); diff --git a/src/seabolt/src/bolt/config-private.h b/src/seabolt/src/bolt/config-private.h index 935d6bac..babc090f 100644 --- a/src/seabolt/src/bolt/config-private.h +++ b/src/seabolt/src/bolt/config-private.h @@ -43,7 +43,7 @@ struct BoltSocketOptions { }; struct BoltConfig { - BoltMode mode; + BoltScheme scheme; BoltTransport transport; struct BoltTrust* trust; char* user_agent; diff --git a/src/seabolt/src/bolt/config.c b/src/seabolt/src/bolt/config.c index 147a8acb..2e7d54b0 100644 --- a/src/seabolt/src/bolt/config.c +++ b/src/seabolt/src/bolt/config.c @@ -143,7 +143,7 @@ int32_t BoltTrust_set_skip_verify_hostname(BoltTrust* trust, int32_t skip_verify BoltConfig* BoltConfig_create() { BoltConfig* config = BoltMem_allocate(sizeof(BoltConfig)); - config->mode = BOLT_MODE_DIRECT; + config->scheme = BOLT_SCHEME_DIRECT; config->transport = BOLT_TRANSPORT_ENCRYPTED; config->trust = NULL; config->user_agent = NULL; @@ -161,7 +161,7 @@ BoltConfig* BoltConfig_clone(BoltConfig* config) { BoltConfig* clone = BoltConfig_create(); if (config!=NULL) { - BoltConfig_set_mode(clone, config->mode); + BoltConfig_set_scheme(clone, config->scheme); BoltConfig_set_transport(clone, config->transport); BoltConfig_set_trust(clone, config->trust); BoltConfig_set_user_agent(clone, config->user_agent); @@ -199,14 +199,14 @@ void BoltConfig_destroy(BoltConfig* config) BoltMem_deallocate(config, sizeof(BoltConfig)); } -BoltMode BoltConfig_get_mode(BoltConfig* config) +BoltScheme BoltConfig_get_scheme(BoltConfig* config) { - return config->mode; + return config->scheme; } -int32_t BoltConfig_set_mode(BoltConfig* config, BoltMode mode) +int32_t BoltConfig_set_scheme(BoltConfig* config, BoltScheme scheme) { - config->mode = mode; + config->scheme = scheme; return BOLT_SUCCESS; } diff --git a/src/seabolt/src/bolt/config.h b/src/seabolt/src/bolt/config.h index 3cbebb67..2085beed 100644 --- a/src/seabolt/src/bolt/config.h +++ b/src/seabolt/src/bolt/config.h @@ -26,15 +26,23 @@ /** * The operating mode of the connector. */ -typedef int32_t BoltMode; +typedef int32_t BoltScheme; /** - * Use BOLT_MODE_DIRECT to establish direct connections towards a single server + * Use BOLT_SCHEME_DIRECT to establish direct connections towards a single server */ -#define BOLT_MODE_DIRECT 0 +#define BOLT_SCHEME_DIRECT 0 + +#ifndef SEABOLT_NO_DEPRECATED +/** + * Use BOLT_SCHEME_ROUTING to establish routing connections towards a casual cluster. + * This is deprecated, please use BOLT_SCHEME_NEO4J instead. + */ +#define BOLT_SCHEME_ROUTING 1 +#endif /** - * Use BOLT_MODE_ROUTING to establish routing connections towards a casual cluster. + * Use BOLT_SCHEME_NEO4J to establish routing first connections towards a neo4j server. */ -#define BOLT_MODE_ROUTING 1 +#define BOLT_SCHEME_NEO4J 1 /** * The transport to use for established connections. @@ -206,21 +214,21 @@ SEABOLT_EXPORT BoltConfig* BoltConfig_create(); SEABOLT_EXPORT void BoltConfig_destroy(BoltConfig* config); /** - * Gets the configured \ref BoltMode "mode". + * Gets the configured \ref BoltScheme "scheme". * * @param config the config instance to query. - * @return the configured \ref BoltMode "mode". + * @return the configured \ref BoltScheme "scheme". */ -SEABOLT_EXPORT BoltMode BoltConfig_get_mode(BoltConfig* config); +SEABOLT_EXPORT BoltScheme BoltConfig_get_scheme(BoltConfig* config); /** - * Sets the configured \ref BoltMode "mode". + * Sets the configured \ref BoltScheme "scheme". * * @param config the config instance to modify. - * @param mode the \ref BoltMode "mode" to set. + * @param mode the \ref BoltScheme "scheme" to set. * @returns \ref BOLT_SUCCESS when the operation is successful, or another positive error code identifying the reason. */ -SEABOLT_EXPORT int32_t BoltConfig_set_mode(BoltConfig* config, BoltMode mode); +SEABOLT_EXPORT int32_t BoltConfig_set_scheme(BoltConfig* config, BoltScheme scheme); /** * Gets the configured \ref BoltTransport "transport". diff --git a/src/seabolt/src/bolt/connector.c b/src/seabolt/src/bolt/connector.c index ab666028..928c8ee6 100644 --- a/src/seabolt/src/bolt/connector.c +++ b/src/seabolt/src/bolt/connector.c @@ -24,9 +24,11 @@ #include "connector-private.h" #include "log-private.h" #include "direct-pool.h" +#include "no-pool.h" #include "mem.h" #include "routing-pool.h" #include "status-private.h" +#include "connection-private.h" BoltConfig* BoltConnector_apply_defaults(BoltConfig* config) { @@ -50,13 +52,16 @@ BoltConnector_create(BoltAddress* address, BoltValue* auth_token, struct BoltCon BoltLog_info(connector->config->log, "[connector]: Version %s [%s]", SEABOLT_VERSION, SEABOLT_VERSION_HASH); - switch (connector->config->mode) { - case BOLT_MODE_DIRECT: + switch (connector->config->scheme) { + case BOLT_SCHEME_DIRECT: connector->pool_state = BoltDirectPool_create(address, connector->auth_token, connector->config); break; - case BOLT_MODE_ROUTING: + case BOLT_SCHEME_NEO4J: connector->pool_state = BoltRoutingPool_create(address, connector->auth_token, connector->config); break; + case BOLT_SCHEME_DIRECT_UNPOOLED: + connector->pool_state = BoltNoPool_create(address, connector->auth_token, connector->config); + break; default: // TODO: Set some status connector->pool_state = NULL; @@ -67,13 +72,16 @@ BoltConnector_create(BoltAddress* address, BoltValue* auth_token, struct BoltCon void BoltConnector_destroy(BoltConnector* connector) { - switch (connector->config->mode) { - case BOLT_MODE_DIRECT: + switch (connector->config->scheme) { + case BOLT_SCHEME_DIRECT: BoltDirectPool_destroy((struct BoltDirectPool*) connector->pool_state); break; - case BOLT_MODE_ROUTING: + case BOLT_SCHEME_NEO4J: BoltRoutingPool_destroy((struct BoltRoutingPool*) connector->pool_state); break; + case BOLT_SCHEME_DIRECT_UNPOOLED: + BoltNoPool_destroy((struct BoltNoPool*) connector->pool_state); + break; } BoltConfig_destroy((struct BoltConfig*) connector->config); @@ -84,29 +92,44 @@ void BoltConnector_destroy(BoltConnector* connector) BoltConnection* BoltConnector_acquire(BoltConnector* connector, BoltAccessMode mode, BoltStatus* status) { - switch (connector->config->mode) { - case BOLT_MODE_DIRECT: - return BoltDirectPool_acquire((struct BoltDirectPool*) connector->pool_state, status); - case BOLT_MODE_ROUTING: - return BoltRoutingPool_acquire((struct BoltRoutingPool*) connector->pool_state, mode, status); + BoltConnection* connection = NULL; + + switch (connector->config->scheme) { + case BOLT_SCHEME_DIRECT: + connection = BoltDirectPool_acquire((struct BoltDirectPool*) connector->pool_state, status); + break; + case BOLT_SCHEME_NEO4J: + connection = BoltRoutingPool_acquire((struct BoltRoutingPool*) connector->pool_state, mode, status); + break; + case BOLT_SCHEME_DIRECT_UNPOOLED: + connection = BoltNoPool_acquire((struct BoltNoPool*) connector->pool_state, status); + break; + default: + status->state = BOLT_CONNECTION_STATE_DISCONNECTED; + status->error = BOLT_UNSUPPORTED; + status->error_ctx = NULL; + status->error_ctx_size = 0; + break; } - status->state = BOLT_CONNECTION_STATE_DISCONNECTED; - status->error = BOLT_UNSUPPORTED; - status->error_ctx = NULL; - status->error_ctx_size = 0; + if (connection!=NULL) { + connection->access_mode = mode; + } - return NULL; + return connection; } void BoltConnector_release(BoltConnector* connector, BoltConnection* connection) { - switch (connector->config->mode) { - case BOLT_MODE_DIRECT: + switch (connector->config->scheme) { + case BOLT_SCHEME_DIRECT: BoltDirectPool_release((struct BoltDirectPool*) connector->pool_state, connection); break; - case BOLT_MODE_ROUTING: + case BOLT_SCHEME_NEO4J: BoltRoutingPool_release((struct BoltRoutingPool*) connector->pool_state, connection); break; + case BOLT_SCHEME_DIRECT_UNPOOLED: + BoltNoPool_release((struct BoltNoPool*) connector->pool_state, connection); + break; } } diff --git a/src/seabolt/tests/test-pooling.cpp b/src/seabolt/tests/test-pooling.cpp index 9f564573..399f7a8e 100644 --- a/src/seabolt/tests/test-pooling.cpp +++ b/src/seabolt/tests/test-pooling.cpp @@ -27,7 +27,7 @@ SCENARIO("Test using a pooled connection", "[integration][ipv6][secure][pooling] GIVEN("a new connection pool") { const auto auth_token = BoltAuth_basic(BOLT_USER, BOLT_PASSWORD, NULL); struct BoltTrust trust{nullptr, 0, 1, 1}; - struct BoltConfig config{BOLT_MODE_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, + struct BoltConfig config{BOLT_SCHEME_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, nullptr, 10, 0, 0, NULL}; struct BoltConnector* connector = BoltConnector_create(&BOLT_IPV6_ADDRESS, auth_token, &config); @@ -55,7 +55,7 @@ SCENARIO("Test reusing a pooled connection", "[integration][ipv6][secure][poolin GIVEN("a new connection pool with one entry") { const auto auth_token = BoltAuth_basic(BOLT_USER, BOLT_PASSWORD, NULL); struct BoltTrust trust{nullptr, 0, 1, 1}; - struct BoltConfig config{BOLT_MODE_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, + struct BoltConfig config{BOLT_SCHEME_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, nullptr, 1, 0, 0, NULL}; struct BoltConnector* connector = BoltConnector_create(&BOLT_IPV6_ADDRESS, auth_token, &config); @@ -89,7 +89,7 @@ SCENARIO("Test reusing a pooled connection that was abandoned", "[integration][i GIVEN("a new connection pool with one entry") { const auto auth_token = BoltAuth_basic(BOLT_USER, BOLT_PASSWORD, NULL); struct BoltTrust trust{nullptr, 0, 1, 1}; - struct BoltConfig config{BOLT_MODE_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, + struct BoltConfig config{BOLT_SCHEME_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, nullptr, 1, 0, 0, NULL}; struct BoltConnector* connector = BoltConnector_create(&BOLT_IPV6_ADDRESS, auth_token, &config); WHEN("a connection is acquired, released and acquired again") { @@ -134,7 +134,7 @@ SCENARIO("Test running out of connections", "[integration][ipv6][secure][pooling GIVEN("a new connection pool with one entry") { const auto auth_token = BoltAuth_basic(BOLT_USER, BOLT_PASSWORD, NULL); struct BoltTrust trust{nullptr, 0, 1, 1}; - struct BoltConfig config{BOLT_MODE_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, + struct BoltConfig config{BOLT_SCHEME_DIRECT, BOLT_TRANSPORT_ENCRYPTED, &trust, BOLT_USER_AGENT, nullptr, nullptr, nullptr, 1, 0, 0, NULL}; struct BoltConnector* connector = BoltConnector_create(&BOLT_IPV6_ADDRESS, auth_token, &config); From c739092369600058836d791f37ddaec110b406f6 Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Tue, 5 Mar 2019 19:44:13 +0000 Subject: [PATCH 4/7] Pass access mode as part of statement metadata --- src/seabolt/src/CMakeLists.txt | 3 +- src/seabolt/src/bolt/connection-private.h | 1 + src/seabolt/src/bolt/v3.c | 30 +++- src/seabolt/tests/integration.hpp | 3 + src/seabolt/tests/seabolt.cpp | 12 ++ src/seabolt/tests/test-v3.cpp | 166 ++++++++++++++++++---- 6 files changed, 179 insertions(+), 36 deletions(-) diff --git a/src/seabolt/src/CMakeLists.txt b/src/seabolt/src/CMakeLists.txt index f3818ec3..4fb062bd 100644 --- a/src/seabolt/src/CMakeLists.txt +++ b/src/seabolt/src/CMakeLists.txt @@ -242,8 +242,7 @@ endif () include(GenerateExportHeader) generate_export_header(${SEABOLT_SHARED} BASE_NAME seabolt - EXPORT_FILE_NAME "${CMAKE_BINARY_DIR}/${INSTALL_INCLUDEDIR}/bolt/bolt-exports.h" - DEFINE_NO_DEPRECATED) + EXPORT_FILE_NAME "${CMAKE_BINARY_DIR}/${INSTALL_INCLUDEDIR}/bolt/bolt-exports.h") target_compile_definitions(${SEABOLT_STATIC} PUBLIC SEABOLT_STATIC_DEFINE) diff --git a/src/seabolt/src/bolt/connection-private.h b/src/seabolt/src/bolt/connection-private.h index 9c541307..6b88ff97 100644 --- a/src/seabolt/src/bolt/connection-private.h +++ b/src/seabolt/src/bolt/connection-private.h @@ -48,6 +48,7 @@ struct BoltConnectionMetrics { struct BoltConnection { /// The agent currently responsible for using this connection const void* agent; + BoltAccessMode access_mode; /// Transport type for this connection BoltTransport transport; diff --git a/src/seabolt/src/bolt/v3.c b/src/seabolt/src/bolt/v3.c index 3eaa8f24..a759920d 100644 --- a/src/seabolt/src/bolt/v3.c +++ b/src/seabolt/src/bolt/v3.c @@ -37,6 +37,10 @@ #define TX_TIMEOUT_KEY_SIZE 10 #define TX_METADATA_KEY "tx_metadata" #define TX_METADATA_KEY_SIZE 11 +#define MODE_KEY "mode" +#define MODE_KEY_SIZE 4 +#define READ_MODE_VALUE "r" +#define READ_MODE_VALUE_SIZE 1 #define BOOKMARK_KEY "bookmark" #define BOOKMARK_KEY_SIZE 8 #define FIELDS_KEY "fields" @@ -494,8 +498,7 @@ int _set_tx_timeout(struct BoltValue* metadata, int64_t tx_timeout) } if (tx_timeout_value==NULL) { - int32_t - index = metadata->size; + int32_t index = metadata->size; BoltValue_format_as_Dictionary(metadata, metadata->size+1); BoltDictionary_set_key(metadata, index, TX_TIMEOUT_KEY, TX_TIMEOUT_KEY_SIZE); tx_timeout_value = BoltDictionary_value(metadata, index); @@ -544,6 +547,23 @@ int _set_tx_metadata(struct BoltValue* metadata, struct BoltValue* tx_metadata) return BOLT_SUCCESS; } +int _set_access_mode(struct BoltValue* metadata, BoltAccessMode access_mode) +{ + if (access_mode==BOLT_ACCESS_MODE_READ) { + struct BoltValue* mode_value = BoltDictionary_value_by_key(metadata, MODE_KEY, MODE_KEY_SIZE); + if (mode_value==NULL) { + int32_t index = metadata->size; + BoltValue_format_as_Dictionary(metadata, metadata->size+1); + BoltDictionary_set_key(metadata, index, MODE_KEY, MODE_KEY_SIZE); + mode_value = BoltDictionary_value(metadata, index); + } + + BoltValue_format_as_String(mode_value, READ_MODE_VALUE, READ_MODE_VALUE_SIZE); + + } + return BOLT_SUCCESS; +} + int BoltProtocolV3_set_begin_tx_bookmark(struct BoltConnection* connection, struct BoltValue* bookmark_list) { struct BoltProtocolV3State* state = BoltProtocolV3_state(connection); @@ -569,6 +589,8 @@ int BoltProtocolV3_set_begin_tx_metadata(struct BoltConnection* connection, stru int BoltProtocolV3_load_begin_tx(struct BoltConnection* connection) { struct BoltProtocolV3State* state = BoltProtocolV3_state(connection); + struct BoltValue* metadata = BoltMessage_param(state->begin_request, 0); + TRY(_set_access_mode(metadata, connection->access_mode)); TRY(BoltProtocolV3_load_message(connection, state->begin_request, 0)); return BOLT_SUCCESS; } @@ -643,6 +665,8 @@ BoltProtocolV3_set_run_cypher_parameter(struct BoltConnection* connection, int32 int BoltProtocolV3_load_run(struct BoltConnection* connection) { struct BoltProtocolV3State* state = BoltProtocolV3_state(connection); + struct BoltValue* metadata = BoltMessage_param(state->run_request, 2); + TRY(_set_access_mode(metadata, connection->access_mode)); TRY(BoltProtocolV3_load_message(connection, state->run_request, 0)); return BOLT_SUCCESS; } @@ -906,7 +930,7 @@ void BoltProtocolV3_extract_metadata(struct BoltConnection* connection, struct B * point to the same buffer (and that is undefined in `snprintf`): */ while (*old_connection_id) { - *state_connection_id++=*old_connection_id++; + *state_connection_id++ = *old_connection_id++; } snprintf(state_connection_id, MAX_CONNECTION_ID_SIZE, "%s%s", CONNECTION_ID_SEPARATOR, new_connection_id); diff --git a/src/seabolt/tests/integration.hpp b/src/seabolt/tests/integration.hpp index 274641bd..813c414e 100644 --- a/src/seabolt/tests/integration.hpp +++ b/src/seabolt/tests/integration.hpp @@ -36,6 +36,7 @@ extern "C" #include "bolt/direct-pool.h" #include "bolt/v3.h" #include "bolt/communication.h" +#include "bolt/communication-mock.h" } #define SETTING(name, default_value) ((char*)((getenv(name) == nullptr) ? (default_value) : getenv(name))) @@ -61,6 +62,8 @@ struct BoltConnection* bolt_open_init_b(BoltTransport transport, const char* hos struct BoltConnection* bolt_open_init_default(); +struct BoltConnection* bolt_open_init_mocked(int32_t bolt_version, BoltLog* logger); + struct BoltValue* bolt_basic_auth(const char* username, const char* password); void bolt_close_and_destroy_b(struct BoltConnection* connection); diff --git a/src/seabolt/tests/seabolt.cpp b/src/seabolt/tests/seabolt.cpp index 41ccda97..056e6a87 100644 --- a/src/seabolt/tests/seabolt.cpp +++ b/src/seabolt/tests/seabolt.cpp @@ -104,6 +104,18 @@ struct BoltConnection* bolt_open_init_default() return connection; } +struct BoltConnection* bolt_open_init_mocked(int32_t bolt_version, BoltLog* logger) +{ + BoltAddress* address = bolt_get_address("localhost", "7687"); + BoltConnection* connection = BoltConnection_create(); + connection->comm = BoltCommunication_create_mock(bolt_version, NULL, logger); + BoltConnection_open(connection, BOLT_TRANSPORT_MOCKED, address, NULL, logger, NULL); + strcpy(connection->id, "id-0"); + connection->status->state = BOLT_CONNECTION_STATE_READY; + BoltAddress_destroy(address); + return connection; +} + void bolt_close_and_destroy_b(struct BoltConnection* connection) { BoltConnection_close(connection); diff --git a/src/seabolt/tests/test-v3.cpp b/src/seabolt/tests/test-v3.cpp index 16f1dab9..c36aa9d6 100644 --- a/src/seabolt/tests/test-v3.cpp +++ b/src/seabolt/tests/test-v3.cpp @@ -18,6 +18,7 @@ */ #include <string> +#include <utils/test-context.h> #include "integration.hpp" #include "catch.hpp" @@ -26,56 +27,159 @@ using Catch::Matchers::Equals; #define CONNECTION_ID_KEY "connection_id" #define CONNECTION_ID_KEY_SIZE 13 -TEST_CASE("Extract metadata", "[unit]") { +TEST_CASE("Extract metadata", "[unit]") +{ GIVEN("an open and initialised connection") { - struct BoltConnection* connection = bolt_open_init_default(); - if (connection->protocol_version == 3) { + TestContext* test_ctx = new TestContext(); + struct BoltConnection* connection = bolt_open_init_mocked(3, test_ctx->log()); + + WHEN("new connection_id would not overrun buffer") { BoltValue* metadata = BoltValue_create(); BoltValue_format_as_Dictionary(metadata, 1); BoltDictionary_set_key(metadata, 0, CONNECTION_ID_KEY, CONNECTION_ID_KEY_SIZE); - WHEN("new connection_id would not overrun buffer") { - std::string old_connection_id = BoltConnection_id(connection); + std::string old_connection_id = BoltConnection_id(connection); - std::string value = "foo"; - BoltValue_format_as_String( + std::string value = "foo"; + BoltValue_format_as_String( BoltDictionary_value(metadata, 0), value.c_str(), (int32_t) value.length()); - BoltProtocolV3_extract_metadata(connection, metadata); + BoltProtocolV3_extract_metadata(connection, metadata); - std::string connection_id = BoltConnection_id(connection); + std::string connection_id = BoltConnection_id(connection); - REQUIRE(old_connection_id.length() > 0); - THEN("it should not be concatenated to a blank with comma") { - REQUIRE_THAT(connection_id, Equals( - old_connection_id + ", " + value - )); - } + REQUIRE(old_connection_id.length()>0); + THEN("it should not be concatenated to a blank with comma") { + REQUIRE_THAT(connection_id, Equals( + old_connection_id+", "+value + )); } - WHEN("new connection_id would overrun buffer") { - THEN("new connection_id should be ignored and the original connection_id should persist") { - std::string old_connection_id = BoltConnection_id(connection); - std::string value = - "0123456789""0123456789""0123456789""0123456789""0123456789" + + BoltValue_destroy(metadata); + } + + WHEN("new connection_id would overrun buffer") { + BoltValue* metadata = BoltValue_create(); + BoltValue_format_as_Dictionary(metadata, 1); + BoltDictionary_set_key(metadata, 0, CONNECTION_ID_KEY, CONNECTION_ID_KEY_SIZE); + + THEN("new connection_id should be ignored and the original connection_id should persist") { + std::string old_connection_id = BoltConnection_id(connection); + std::string value = "0123456789""0123456789""0123456789""0123456789""0123456789" "0123456789""0123456789""0123456789""0123456789""0123456789" "0123456789""0123456789""0123456789""0123456789""0123456789" - ; - BoltValue_format_as_String(BoltDictionary_value(metadata, 0), value.c_str(), + "0123456789""0123456789""0123456789""0123456789""0123456789"; + BoltValue_format_as_String(BoltDictionary_value(metadata, 0), value.c_str(), (int32_t) value.length()); - BoltProtocolV3_extract_metadata(connection, metadata); + BoltProtocolV3_extract_metadata(connection, metadata); - std::string connection_id = BoltConnection_id(connection); + std::string connection_id = BoltConnection_id(connection); - REQUIRE(connection_id.find(value) == std::string::npos); - REQUIRE(connection_id == old_connection_id); - } + REQUIRE(connection_id.find(value)==std::string::npos); + REQUIRE(connection_id==old_connection_id); } BoltValue_destroy(metadata); } - else { - WARN("Test is skipped because functionality is only available for Bolt V3."); - } - bolt_close_and_destroy_b(connection); + + BoltConnection_close(connection); + BoltConnection_destroy(connection); } } + +TEST_CASE("Pass access mode", "[unit]") +{ + GIVEN("an open and initialised connection") { + TestContext* test_ctx = new TestContext(); + struct BoltConnection* connection = bolt_open_init_mocked(3, test_ctx->log()); + + BoltValue* tx_metadata = BoltValue_create(); + BoltValue_format_as_Dictionary(tx_metadata, 2); + BoltDictionary_set_key(tx_metadata, 0, "m1", 2); + BoltValue_format_as_Integer(BoltDictionary_value(tx_metadata, 0), 10); + BoltDictionary_set_key(tx_metadata, 1, "m2", 2); + BoltValue_format_as_Boolean(BoltDictionary_value(tx_metadata, 1), 1); + + BoltValue* bookmarks = BoltValue_create(); + BoltValue_format_as_List(bookmarks, 3); + BoltValue_format_as_String(BoltList_value(bookmarks, 0), "bookmark-1", 10); + BoltValue_format_as_String(BoltList_value(bookmarks, 1), "bookmark-2", 10); + BoltValue_format_as_String(BoltList_value(bookmarks, 2), "bookmark-3", 10); + + WHEN("connection access mode is READ") { + connection->access_mode = BOLT_ACCESS_MODE_READ; + + AND_WHEN("a BEGIN message is loaded") { + BoltConnection_clear_begin(connection); + BoltConnection_set_begin_tx_timeout(connection, 1000); + BoltConnection_set_begin_tx_metadata(connection, tx_metadata); + BoltConnection_set_begin_bookmarks(connection, bookmarks); + BoltConnection_load_begin_request(connection); + + THEN("mode=r should be present in the statement metadata") { + REQUIRE_THAT(*test_ctx, + ContainsLog( + "DEBUG: [id-0]: C[0] BEGIN [{tx_timeout: 1000, tx_metadata: {m1: 10, m2: true}, " + "bookmarks: [bookmark-1, bookmark-2, bookmark-3], mode: r}]")); + } + } + + AND_WHEN("a RUN message is loaded") { + BoltConnection_clear_run(connection); + BoltConnection_set_run_cypher(connection, "RETURN $x", 9, 1); + BoltValue_format_as_Integer(BoltConnection_set_run_cypher_parameter(connection, 0, "x", 1), 5); + BoltConnection_set_run_tx_timeout(connection, 5000); + BoltConnection_set_run_tx_metadata(connection, tx_metadata); + BoltConnection_set_run_bookmarks(connection, bookmarks); + BoltConnection_load_run_request(connection); + + THEN("mode=r should be present in the statement metadata") { + REQUIRE_THAT(*test_ctx, ContainsLog( + "DEBUG: [id-0]: C[0] RUN [RETURN $x, {x: 5}, {tx_timeout: 5000, tx_metadata: {m1: 10, m2: true}, " + "bookmarks: [bookmark-1, bookmark-2, bookmark-3], mode: r}]")); + } + } + } + + WHEN("connection access mode is WRITE") { + connection->access_mode = BOLT_ACCESS_MODE_WRITE; + + AND_WHEN("a BEGIN message is loaded") { + BoltConnection_clear_begin(connection); + BoltConnection_set_begin_tx_timeout(connection, 1000); + BoltConnection_set_begin_tx_metadata(connection, tx_metadata); + BoltConnection_set_begin_bookmarks(connection, bookmarks); + BoltConnection_load_begin_request(connection); + + THEN("no access mode should be present in the statement metadata") { + REQUIRE_THAT(*test_ctx, + ContainsLog( + "DEBUG: [id-0]: C[0] BEGIN [{tx_timeout: 1000, tx_metadata: {m1: 10, m2: true}, " + "bookmarks: [bookmark-1, bookmark-2, bookmark-3]}]")); + } + } + + AND_WHEN("a RUN message is loaded") { + BoltConnection_clear_run(connection); + BoltConnection_set_run_cypher(connection, "RETURN $x", 9, 1); + BoltValue_format_as_Integer(BoltConnection_set_run_cypher_parameter(connection, 0, "x", 1), 5); + BoltConnection_set_run_tx_timeout(connection, 5000); + BoltConnection_set_run_tx_metadata(connection, tx_metadata); + BoltConnection_set_run_bookmarks(connection, bookmarks); + BoltConnection_load_run_request(connection); + + THEN("no access mode should be present in the statement metadata") { + REQUIRE_THAT(*test_ctx, + ContainsLog( + "DEBUG: [id-0]: C[0] RUN [RETURN $x, {x: 5}, {tx_timeout: 5000, tx_metadata: {m1: 10, m2: true}, " + "bookmarks: [bookmark-1, bookmark-2, bookmark-3]}]")); + } + } + } + + BoltValue_destroy(tx_metadata); + BoltValue_destroy(bookmarks); + BoltConnection_close(connection); + BoltConnection_destroy(connection); + } +} \ No newline at end of file From 349d9a674345eb6b16e2060422929e274909d22a Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Wed, 6 Mar 2019 09:17:04 +0000 Subject: [PATCH 5/7] Set WRITE as the default access mode on connection --- src/seabolt/src/bolt/connection.c | 1 + src/seabolt/src/bolt/connector.c | 4 ++ src/seabolt/tests/CMakeLists.txt | 1 + src/seabolt/tests/test-connection.cpp | 54 +++++++++++++++++++++++++++ 4 files changed, 60 insertions(+) create mode 100644 src/seabolt/tests/test-connection.cpp diff --git a/src/seabolt/src/bolt/connection.c b/src/seabolt/src/bolt/connection.c index 2ca44b17..315aeb64 100644 --- a/src/seabolt/src/bolt/connection.c +++ b/src/seabolt/src/bolt/connection.c @@ -203,6 +203,7 @@ BoltConnection* BoltConnection_create() const size_t size = sizeof(BoltConnection); BoltConnection* connection = BoltMem_allocate(size); memset(connection, 0, size); + connection->access_mode = BOLT_ACCESS_MODE_WRITE; connection->status = BoltStatus_create_with_ctx(ERROR_CTX_SIZE); connection->metrics = BoltMem_allocate(sizeof(BoltConnectionMetrics)); memset(connection->metrics, 0, sizeof(BoltConnectionMetrics)); diff --git a/src/seabolt/src/bolt/connector.c b/src/seabolt/src/bolt/connector.c index 928c8ee6..f2968c2d 100644 --- a/src/seabolt/src/bolt/connector.c +++ b/src/seabolt/src/bolt/connector.c @@ -112,6 +112,7 @@ BoltConnection* BoltConnector_acquire(BoltConnector* connector, BoltAccessMode m break; } + // Assign access mode to the returned connection if (connection!=NULL) { connection->access_mode = mode; } @@ -121,6 +122,9 @@ BoltConnection* BoltConnector_acquire(BoltConnector* connector, BoltAccessMode m void BoltConnector_release(BoltConnector* connector, BoltConnection* connection) { + // Reset access mode stored in connection to its default value + connection->access_mode = BOLT_ACCESS_MODE_WRITE; + switch (connector->config->scheme) { case BOLT_SCHEME_DIRECT: BoltDirectPool_release((struct BoltDirectPool*) connector->pool_state, connection); diff --git a/src/seabolt/tests/CMakeLists.txt b/src/seabolt/tests/CMakeLists.txt index 41d783d7..f69faeda 100644 --- a/src/seabolt/tests/CMakeLists.txt +++ b/src/seabolt/tests/CMakeLists.txt @@ -6,6 +6,7 @@ target_sources(seabolt-test ${CMAKE_CURRENT_LIST_DIR}/test-addressing.cpp ${CMAKE_CURRENT_LIST_DIR}/test-chunking-v1.cpp ${CMAKE_CURRENT_LIST_DIR}/test-communication.cpp + ${CMAKE_CURRENT_LIST_DIR}/test-connection.cpp ${CMAKE_CURRENT_LIST_DIR}/test-direct.cpp ${CMAKE_CURRENT_LIST_DIR}/test-pooling.cpp ${CMAKE_CURRENT_LIST_DIR}/test-values.cpp diff --git a/src/seabolt/tests/test-connection.cpp b/src/seabolt/tests/test-connection.cpp new file mode 100644 index 00000000..b208327d --- /dev/null +++ b/src/seabolt/tests/test-connection.cpp @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "catch.hpp" +#include "integration.hpp" + +SCENARIO("BoltConnection") +{ + GIVEN("a new constructed BoltConnection") { + BoltConnection* connection = BoltConnection_create(); + + THEN("access mode should be set to WRITE") { + REQUIRE(connection->access_mode==BOLT_ACCESS_MODE_WRITE); + } + + THEN("status is not null") { + REQUIRE(connection->status!=NULL); + } + + THEN("status->state should be DISCONNECTED") { + REQUIRE(connection->status->state==BOLT_CONNECTION_STATE_DISCONNECTED); + } + + THEN("status->error should be SUCCESS") { + REQUIRE(connection->status->error==BOLT_SUCCESS); + } + + THEN("status->error_ctx should not be NULL") { + REQUIRE(connection->status->error_ctx!=NULL); + } + + THEN("metrics is not null") { + REQUIRE(connection->metrics!=NULL); + } + + BoltConnection_destroy(connection); + } +} \ No newline at end of file From 5a3c301518328568e2fa84175b974e7905769edd Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Wed, 6 Mar 2019 10:07:35 +0000 Subject: [PATCH 6/7] Prevent segmentation fault --- src/seabolt/src/bolt/connector.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/seabolt/src/bolt/connector.c b/src/seabolt/src/bolt/connector.c index f2968c2d..7ea18bdc 100644 --- a/src/seabolt/src/bolt/connector.c +++ b/src/seabolt/src/bolt/connector.c @@ -123,7 +123,9 @@ BoltConnection* BoltConnector_acquire(BoltConnector* connector, BoltAccessMode m void BoltConnector_release(BoltConnector* connector, BoltConnection* connection) { // Reset access mode stored in connection to its default value - connection->access_mode = BOLT_ACCESS_MODE_WRITE; + if (connection!=NULL) { + connection->access_mode = BOLT_ACCESS_MODE_WRITE; + } switch (connector->config->scheme) { case BOLT_SCHEME_DIRECT: From 5e8b32705c25c983e6e40e977f7acaa675782cf5 Mon Sep 17 00:00:00 2001 From: Ali Ince <ali.ince@neotechnology.com> Date: Wed, 6 Mar 2019 14:35:19 +0000 Subject: [PATCH 7/7] Fix windows compilation error --- src/seabolt/tests/test-connection.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/seabolt/tests/test-connection.cpp b/src/seabolt/tests/test-connection.cpp index b208327d..834a8cfe 100644 --- a/src/seabolt/tests/test-connection.cpp +++ b/src/seabolt/tests/test-connection.cpp @@ -30,7 +30,7 @@ SCENARIO("BoltConnection") } THEN("status is not null") { - REQUIRE(connection->status!=NULL); + REQUIRE(connection->status!=nullptr); } THEN("status->state should be DISCONNECTED") { @@ -42,11 +42,11 @@ SCENARIO("BoltConnection") } THEN("status->error_ctx should not be NULL") { - REQUIRE(connection->status->error_ctx!=NULL); + REQUIRE(connection->status->error_ctx!=nullptr); } THEN("metrics is not null") { - REQUIRE(connection->metrics!=NULL); + REQUIRE(connection->metrics!=nullptr); } BoltConnection_destroy(connection);