Skip to content

Commit

Permalink
Optimize driver agent to support multiple data sources (#32612)
Browse files Browse the repository at this point in the history
* Improve agent for driver

* Fix test error

* Rename ShardingSphereDataSourceHolder to ShardingSphereDataSourceContextHolder

* Update java doc

* Fix test error
  • Loading branch information
jiangML authored Aug 22, 2024
1 parent 61a773a commit 705ba96
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.agent.plugin.core.holder.ContextManagerHolder;
import org.apache.shardingsphere.agent.plugin.core.holder.ShardingSphereDataSourceContextHolder;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
Expand Down Expand Up @@ -61,17 +61,12 @@ public boolean isPluginEnabled() {
return null == contextManager || contextManager.getMetaDataContexts().getMetaData().getProps().<Boolean>getValue(ConfigurationPropertyKey.AGENT_PLUGINS_ENABLED);
}

/**
* Get context manager.
*
* @return context manager
*/
public Optional<ContextManager> getContextManager() {
private Optional<ContextManager> getContextManager() {
if (isEnhancedForProxy) {
return Optional.ofNullable(ProxyContext.getInstance().getContextManager());
}
return ContextManagerHolder.getDatabaseContextManager().isEmpty()
return ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().isEmpty()
? Optional.empty()
: Optional.of(ContextManagerHolder.getDatabaseContextManager().values().iterator().next());
: Optional.of(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().values().iterator().next().getContextManager());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.agent.plugin.core.context;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.ContextManager;

/**
* ShardingSphere data source context.
*/
@RequiredArgsConstructor
@Getter
public final class ShardingSphereDataSourceContext {

private final String databaseName;

private final ContextManager contextManager;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,44 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.agent.plugin.core.context.ShardingSphereDataSourceContext;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Context manager holder.
* ShardingSphere data source context holder.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ContextManagerHolder {
public final class ShardingSphereDataSourceContextHolder {

private static final Map<String, ContextManager> DATABASE_CONTEXT_MANAGER_CACHE = new ConcurrentHashMap<>();
private static final Map<String, ShardingSphereDataSourceContext> DATA_SOURCE_CONTEXTS = new ConcurrentHashMap<>();

/**
* put.
* Put.
*
* @param database database
* @param contextManager context manager
* @param instanceId instance Id
* @param dataSourceContext sharding sphere data source context
*/
public static void put(final String database, final ContextManager contextManager) {
DATABASE_CONTEXT_MANAGER_CACHE.put(database, contextManager);
public static void put(final String instanceId, final ShardingSphereDataSourceContext dataSourceContext) {
DATA_SOURCE_CONTEXTS.put(instanceId, dataSourceContext);
}

/**
* remove.
* Remove.
*
* @param database database
* @param instanceId instance id
*/
public static void remove(final String database) {
DATABASE_CONTEXT_MANAGER_CACHE.remove(database);
public static void remove(final String instanceId) {
DATA_SOURCE_CONTEXTS.remove(instanceId);
}

/**
* Get database context manager.
* Get sharding sphere data source contexts.
*
* @return database context manager
* @return sharding sphere data source contexts
*/
public static Map<String, ContextManager> getDatabaseContextManager() {
return DATABASE_CONTEXT_MANAGER_CACHE;
public static Map<String, ShardingSphereDataSourceContext> getShardingSphereDataSourceContexts() {
return DATA_SOURCE_CONTEXTS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.shardingsphere.agent.api.advice.TargetAdviceMethod;
import org.apache.shardingsphere.agent.api.advice.TargetAdviceObject;
import org.apache.shardingsphere.agent.plugin.core.advice.AbstractInstanceMethodAdvice;
import org.apache.shardingsphere.agent.plugin.core.holder.ContextManagerHolder;
import org.apache.shardingsphere.agent.plugin.core.context.ShardingSphereDataSourceContext;
import org.apache.shardingsphere.agent.plugin.core.holder.ShardingSphereDataSourceContextHolder;
import org.apache.shardingsphere.agent.plugin.core.util.AgentReflectionUtils;
import org.apache.shardingsphere.mode.manager.ContextManager;

Expand All @@ -32,18 +33,16 @@ public final class ShardingSphereDataSourceAdvice extends AbstractInstanceMethod
@Override
public void beforeMethod(final TargetAdviceObject target, final TargetAdviceMethod method, final Object[] args, final String pluginType) {
if ("close".equals(method.getName())) {
ContextManagerHolder.remove(getDatabaseName(target));
ContextManager contextManager = AgentReflectionUtils.getFieldValue(target, "contextManager");
ShardingSphereDataSourceContextHolder.remove(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId());
}
}

@Override
public void afterMethod(final TargetAdviceObject target, final TargetAdviceMethod method, final Object[] args, final Object result, final String pluginType) {
if ("createContextManager".equals(method.getName())) {
ContextManagerHolder.put(getDatabaseName(target), (ContextManager) result);
ShardingSphereDataSourceContextHolder.put(((ContextManager) result).getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
new ShardingSphereDataSourceContext(AgentReflectionUtils.getFieldValue(target, "databaseName"), (ContextManager) result));
}
}

private String getDatabaseName(final TargetAdviceObject target) {
return AgentReflectionUtils.getFieldValue(target, "databaseName");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@

package org.apache.shardingsphere.agent.plugin.metrics.core.exporter.impl.jdbc;

import org.apache.shardingsphere.agent.plugin.core.holder.ContextManagerHolder;
import org.apache.shardingsphere.agent.plugin.core.context.ShardingSphereDataSourceContext;
import org.apache.shardingsphere.agent.plugin.core.holder.ShardingSphereDataSourceContextHolder;
import org.apache.shardingsphere.agent.plugin.metrics.core.collector.MetricsCollectorRegistry;
import org.apache.shardingsphere.agent.plugin.metrics.core.collector.type.GaugeMetricFamilyMetricsCollector;
import org.apache.shardingsphere.agent.plugin.metrics.core.config.MetricCollectorType;
import org.apache.shardingsphere.agent.plugin.metrics.core.config.MetricConfiguration;
import org.apache.shardingsphere.agent.plugin.metrics.core.exporter.MetricsExporter;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.Optional;

Expand All @@ -36,24 +34,18 @@
*/
public final class JDBCMetaDataInfoExporter implements MetricsExporter {

private final MetricConfiguration config = new MetricConfiguration("jdbc_meta_data_info",
MetricCollectorType.GAUGE_METRIC_FAMILY, "Meta data information of ShardingSphere-JDBC",
Arrays.asList("database", "type"), Collections.emptyMap());
private final MetricConfiguration config = new MetricConfiguration("jdbc_meta_data_info", MetricCollectorType.GAUGE_METRIC_FAMILY,
"Meta data information of ShardingSphere-JDBC",
Arrays.asList("driver_instance", "database", "type"));

@Override
public Optional<GaugeMetricFamilyMetricsCollector> export(final String pluginType) {
GaugeMetricFamilyMetricsCollector result = MetricsCollectorRegistry.get(config, pluginType);
result.cleanMetrics();
for (Entry<String, ContextManager> entry : ContextManagerHolder.getDatabaseContextManager().entrySet()) {
addMetric(result, entry.getKey(), entry.getValue());
for (Entry<String, ShardingSphereDataSourceContext> entry : ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().entrySet()) {
Optional.ofNullable(entry.getValue().getContextManager().getDatabase(entry.getValue().getDatabaseName()))
.ifPresent(optional -> result.addMetric(Arrays.asList(entry.getKey(), optional.getName(), "storage_unit_count"), optional.getResourceMetaData().getStorageUnits().size()));
}
return Optional.of(result);
}

private void addMetric(final GaugeMetricFamilyMetricsCollector collector, final String database, final ContextManager contextManager) {
ShardingSphereDatabase shardingSphereDatabase = contextManager.getDatabase(database);
if (null != shardingSphereDatabase) {
collector.addMetric(Arrays.asList(database, "storage_unit_count"), shardingSphereDatabase.getResourceMetaData().getStorageUnits().size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.shardingsphere.agent.plugin.metrics.core.exporter.impl.jdbc;

import org.apache.shardingsphere.agent.plugin.core.holder.ContextManagerHolder;
import org.apache.shardingsphere.agent.plugin.core.context.ShardingSphereDataSourceContext;
import org.apache.shardingsphere.agent.plugin.core.holder.ShardingSphereDataSourceContextHolder;
import org.apache.shardingsphere.agent.plugin.metrics.core.collector.MetricsCollectorRegistry;
import org.apache.shardingsphere.agent.plugin.metrics.core.collector.type.GaugeMetricFamilyMetricsCollector;
import org.apache.shardingsphere.agent.plugin.metrics.core.config.MetricCollectorType;
import org.apache.shardingsphere.agent.plugin.metrics.core.config.MetricConfiguration;
import org.apache.shardingsphere.agent.plugin.metrics.core.exporter.MetricsExporter;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.util.Collections;
import java.util.Arrays;
import java.util.Map.Entry;
import java.util.Optional;

Expand All @@ -36,22 +35,17 @@
public final class JDBCStateExporter implements MetricsExporter {

private final MetricConfiguration config = new MetricConfiguration("jdbc_state", MetricCollectorType.GAUGE_METRIC_FAMILY,
"State of ShardingSphere-JDBC. 0 is OK; 1 is CIRCUIT BREAK", Collections.singletonList("database"));
"State of ShardingSphere-JDBC. 0 is OK; 1 is CIRCUIT BREAK", Arrays.asList("driver_instance", "database"));

@Override
public Optional<GaugeMetricFamilyMetricsCollector> export(final String pluginType) {
GaugeMetricFamilyMetricsCollector result = MetricsCollectorRegistry.get(config, pluginType);
result.cleanMetrics();
for (Entry<String, ContextManager> entry : ContextManagerHolder.getDatabaseContextManager().entrySet()) {
addMetric(result, entry.getKey(), entry.getValue());
for (Entry<String, ShardingSphereDataSourceContext> entry : ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().entrySet()) {
Optional.ofNullable(entry.getValue().getContextManager().getDatabase(entry.getValue().getDatabaseName()))
.ifPresent(optional -> result.addMetric(Arrays.asList(entry.getKey(), optional.getName()),
entry.getValue().getContextManager().getComputeNodeInstanceContext().getInstance().getState().getCurrentState().ordinal()));
}
return Optional.of(result);
}

private void addMetric(final GaugeMetricFamilyMetricsCollector collector, final String database, final ContextManager contextManager) {
ShardingSphereDatabase shardingSphereDatabase = contextManager.getDatabase(database);
if (null != shardingSphereDatabase) {
collector.addMetric(Collections.singletonList(database), contextManager.getComputeNodeInstanceContext().getInstance().getState().getCurrentState().ordinal());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package org.apache.shardingsphere.agent.plugin.metrics.core.advice.jdbc;

import org.apache.shardingsphere.agent.api.advice.TargetAdviceMethod;
import org.apache.shardingsphere.agent.plugin.core.holder.ContextManagerHolder;
import org.apache.shardingsphere.agent.plugin.core.context.ShardingSphereDataSourceContext;
import org.apache.shardingsphere.agent.plugin.core.holder.ShardingSphereDataSourceContextHolder;
import org.apache.shardingsphere.agent.plugin.core.util.AgentReflectionUtils;
import org.apache.shardingsphere.agent.plugin.metrics.core.fixture.TargetAdviceObjectFixture;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.UUID;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
Expand All @@ -43,35 +45,43 @@ class ShardingSphereDataSourceAdviceTest {

private final String databaseName = "sharding_db";

@BeforeEach
void setup() {
when(AgentReflectionUtils.getFieldValue(fixture, "databaseName")).thenReturn(databaseName);
}
private final String instanceId = UUID.randomUUID().toString();

@AfterEach
void clean() {
ContextManagerHolder.getDatabaseContextManager().clear();
ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().clear();
}

@Test
void assertBeforeMethod() {
ContextManagerHolder.put(databaseName, mock(ContextManager.class, RETURNS_DEEP_STUBS));
assertThat(ContextManagerHolder.getDatabaseContextManager().size(), is(1));
ContextManager contextManager = mockContextManager();
when(AgentReflectionUtils.getFieldValue(fixture, "contextManager")).thenReturn(contextManager);
ShardingSphereDataSourceContextHolder.put(instanceId, new ShardingSphereDataSourceContext(databaseName, mock(ContextManager.class, RETURNS_DEEP_STUBS)));
assertThat(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().size(), is(1));
TargetAdviceMethod method = mock(TargetAdviceMethod.class);
when(method.getName()).thenReturn("close");
ShardingSphereDataSourceAdvice advice = new ShardingSphereDataSourceAdvice();
advice.beforeMethod(fixture, method, new Object[]{}, "FIXTURE");
assertThat(ContextManagerHolder.getDatabaseContextManager().size(), is(0));
assertThat(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().size(), is(0));
}

@Test
void assertAfterMethod() {
assertThat(ContextManagerHolder.getDatabaseContextManager().size(), is(0));
assertThat(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().size(), is(0));
when(AgentReflectionUtils.getFieldValue(fixture, "databaseName")).thenReturn(databaseName);
TargetAdviceMethod method = mock(TargetAdviceMethod.class);
when(method.getName()).thenReturn("createContextManager");
ShardingSphereDataSourceAdvice advice = new ShardingSphereDataSourceAdvice();
advice.afterMethod(fixture, method, new Object[]{}, mock(ContextManager.class, RETURNS_DEEP_STUBS), "FIXTURE");
assertThat(ContextManagerHolder.getDatabaseContextManager().size(), is(1));
assertThat(ContextManagerHolder.getDatabaseContextManager().keySet().iterator().next(), is(databaseName));
ContextManager contextManager = mockContextManager();
advice.afterMethod(fixture, method, new Object[]{}, contextManager, "FIXTURE");
assertThat(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().size(), is(1));
assertThat(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().keySet().iterator().next(), is(instanceId));
assertThat(ShardingSphereDataSourceContextHolder.getShardingSphereDataSourceContexts().get(instanceId).getDatabaseName(), is(databaseName));
}

private ContextManager mockContextManager() {
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(result.getComputeNodeInstanceContext().getInstance().getMetaData().getId()).thenReturn(instanceId);
return result;
}
}
Loading

0 comments on commit 705ba96

Please sign in to comment.