Skip to content

Commit

Permalink
Merge pull request #15843 from cdapio/cherrypick/CDAP-21118-sidhdiren…
Browse files Browse the repository at this point in the history
…ge-taskworker-fix

[cherry-pick][CDAP-21118] Task workers should communicate with Spanner Messaging Service only via App fabric
sidhdirenge authored Jan 31, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 92b8740 + bff7122 commit 5e9780a
Showing 28 changed files with 332 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@
import io.cdap.cdap.app.DefaultAppConfigurer;
import io.cdap.cdap.app.DefaultApplicationContext;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.app.guice.TwillModule;
@@ -56,7 +55,7 @@
import io.cdap.cdap.internal.app.runtime.SimpleProgramOptions;
import io.cdap.cdap.internal.app.runtime.SystemArguments;
import io.cdap.cdap.logging.guice.LocalLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.operations.guice.OperationalStatsModule;
@@ -70,7 +69,6 @@
import io.cdap.cdap.security.guice.SecureStoreServerModule;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.Configs;
@@ -284,7 +282,7 @@ private static ProgramRunnerFactory createProgramRunnerFactory(CConfiguration cC
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AuditModule(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
new AuthenticationContextModules().getNoOpModule(),
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
@@ -149,6 +150,9 @@
import io.cdap.cdap.internal.tethering.TetheringClientHandler;
import io.cdap.cdap.internal.tethering.TetheringHandler;
import io.cdap.cdap.internal.tethering.TetheringServerHandler;
import io.cdap.cdap.messaging.server.FetchHandler;
import io.cdap.cdap.messaging.server.MetadataHandler;
import io.cdap.cdap.messaging.server.StoreHandler;
import io.cdap.cdap.metadata.LocalPreferencesFetcherInternal;
import io.cdap.cdap.metadata.PreferencesFetcher;
import io.cdap.cdap.pipeline.PipelineFactory;
@@ -532,6 +536,14 @@ protected void configure() {
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and task workers need to
// communicate with messaging service via AppFabric.
handlerBinder.addBinding().to(MetadataHandler.class);
handlerBinder.addBinding().to(StoreHandler.class);
handlerBinder.addBinding().to(FetchHandler.class);
}

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
handlerBinder.addBinding().to(GcpWorkloadIdentityHttpHandler.class);
Original file line number Diff line number Diff line change
@@ -64,8 +64,8 @@
import io.cdap.cdap.logging.guice.TMSLogAppenderModule;
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.client.DefaultClientMessagingService;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.metadata.MetadataReaderWriterModules;
import io.cdap.cdap.metadata.PreferencesFetcher;
@@ -327,7 +327,7 @@ private Module getMessagingModules() {
return new MessagingServiceModule(cConf);
}

return new MessagingClientModule();
return new DefaultMessagingClientModule();
}

/**
@@ -389,7 +389,7 @@ public ProgramStatePublisher get() {
internalAuthenticator);

return new MessagingProgramStatePublisher(cConf,
new ClientMessagingService(cConf, remoteClientFactory));
new DefaultClientMessagingService(cConf, remoteClientFactory));
}
}
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import io.cdap.cdap.app.store.preview.PreviewStore;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.data.runtime.DataSetsModules;
import io.cdap.cdap.data2.datafabric.dataset.RemoteDatasetFramework;
import io.cdap.cdap.data2.dataset2.DatasetDefinitionRegistryFactory;
@@ -37,6 +39,9 @@
import io.cdap.cdap.internal.app.preview.PreviewDataCleanupService;
import io.cdap.cdap.internal.app.preview.PreviewRunStopper;
import io.cdap.cdap.internal.app.store.preview.DefaultPreviewStore;
import io.cdap.cdap.messaging.server.FetchHandler;
import io.cdap.cdap.messaging.server.MetadataHandler;
import io.cdap.cdap.messaging.server.StoreHandler;
import io.cdap.http.HttpHandler;

/**
@@ -46,7 +51,10 @@ public class PreviewManagerModule extends PrivateModule {

private final boolean distributedRunner;

public PreviewManagerModule(boolean distributedRunner) {
private final CConfiguration cConf;

public PreviewManagerModule(CConfiguration cConf, boolean distributedRunner) {
this.cConf = cConf;
this.distributedRunner = distributedRunner;
}

@@ -76,6 +84,15 @@ protected void configure() {
handlerBinder.addBinding().to(PreviewHttpHandler.class);
handlerBinder.addBinding().to(PreviewErrorClassificationHttpHandler.class);
handlerBinder.addBinding().to(PreviewHttpHandlerInternal.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and preview runners need to
// communicate with messaging service via preview manager.
handlerBinder.addBinding().to(MetadataHandler.class);
handlerBinder.addBinding().to(StoreHandler.class);
handlerBinder.addBinding().to(FetchHandler.class);
}

CommonHandlers.add(handlerBinder);

bind(PreviewHttpServer.class);
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.twill.ExtendedTwillContext;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.messaging.guice.client.PreviewRunnerMessagingClientModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
@@ -237,7 +237,7 @@ protected void configure() {
}

modules.add(new PreviewRunnerManagerModule().getDistributedModules());
modules.add(new MessagingServiceModule(cConf));
modules.add(new PreviewRunnerMessagingClientModule(cConf));
modules.add(new SecureStoreClientModule());
// Needed for InMemoryProgramRunnerModule. We use local metadata reader/publisher to avoid conflicting with
// metadata stored in AppFabric.
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@
import io.cdap.cdap.logging.guice.RemoteLogAppenderModule;
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.messaging.guice.client.TaskWorkerMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
@@ -95,7 +95,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf) {
modules.add(new IOModule());
modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(coreSecurityModule);
modules.add(new MessagingServiceModule(cConf));
modules.add(new TaskWorkerMessagingClientModule(cConf));
modules.add(new SystemAppModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new AuditLogWriterModule(cConf).getDistributedModules());
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@
import io.cdap.cdap.internal.metadata.MetadataConsumerSubscriberService;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metadata.MetadataService;
import io.cdap.cdap.metadata.MetadataServiceModule;
import io.cdap.cdap.metadata.MetadataSubscriberService;
@@ -111,7 +111,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, String
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new MetricsClientRuntimeModule().getDistributedModules(),
new DFSLocationModule(),
new NamespaceQueryAdminModule(),
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@
import io.cdap.cdap.logging.guice.DistributedLogFrameworkModule;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.logging.service.LogSaverStatusService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
@@ -118,7 +118,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new AuditModule(),
new AuthorizationEnforcementModule().getDistributedModules(),
new AuthenticationContextModules().getMasterModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new NoOpAuditLogModule(),
new AbstractModule() {
@Override
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.master.startup.ServiceResourceKeys;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.operations.OperationalStatsService;
@@ -538,7 +538,7 @@ protected void configure() {
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AuditModule(),
new AuditLogWriterModule(cConf).getDistributedModules(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@
import io.cdap.cdap.data2.audit.AuditModule;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsProcessorStatusServiceModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
@@ -121,7 +121,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, S
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new KafkaLogAppenderModule(),
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@
import io.cdap.cdap.logging.guice.LogQueryRuntimeModule;
import io.cdap.cdap.logging.guice.LogReaderRuntimeModules;
import io.cdap.cdap.logging.service.LogQueryService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsHandlerModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
@@ -110,7 +110,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new DataFabricModules(txClientId).getDistributedModules(),
new DataSetsModules().getDistributedModules(),
// For the injection of DatasetDefinition of MetricsTable directly
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@
import io.cdap.cdap.data2.audit.AuditModule;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
@@ -105,7 +105,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new DataFabricModules(txClientId).getDistributedModules(),
new DataSetsModules().getDistributedModules(),
new SystemDatasetRuntimeModule().getDistributedModules(),
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.schedule.Trigger;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.app.guice.TwillModule;
@@ -63,7 +62,7 @@
import io.cdap.cdap.internal.schedule.constraint.Constraint;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.data.MessageId;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
@@ -74,7 +73,6 @@
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.cli.BasicParser;
@@ -361,7 +359,7 @@ private static Injector createInjector() throws Exception {
new AuthorizationModule(),
new AuthorizationEnforcementModule().getMasterModule(),
new SecureStoreServerModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AbstractModule() {
@Override
protected void configure() {
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@
import io.cdap.cdap.security.guice.SecureStoreClientModule;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -124,9 +123,9 @@ protected void configure() {
));

if (cConf.getInt(Constants.Preview.CONTAINER_COUNT) > 0) {
modules.add(new PreviewManagerModule(true));
modules.add(new PreviewManagerModule(cConf, true));
} else {
modules.add(new PreviewManagerModule(false));
modules.add(new PreviewManagerModule(cConf, false));
modules.add(new PreviewRunnerManagerModule().getStandaloneModules());
}

Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@
import io.cdap.cdap.logging.gateway.handlers.ProgramRunRecordFetcher;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
import java.util.Arrays;
@@ -66,7 +66,7 @@ protected List<Module> getServiceModules(MasterEnvironment masterEnv,
RemoteAuthenticatorModules.getDefaultModule(
TetheringAgentService.REMOTE_TETHERING_AUTHENTICATOR,
Constants.Tethering.CLIENT_AUTHENTICATOR_NAME),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new NamespaceQueryAdminModule(),
getDataFabricModule(),
// Always use local table implementations, which use LevelDB.
Original file line number Diff line number Diff line change
@@ -21,9 +21,9 @@
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.messaging.DefaultMessageFetchRequest;
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.client.DefaultClientMessagingService;
import io.cdap.cdap.messaging.client.StoreRequestBuilder;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
@@ -50,7 +50,7 @@ public void testMessagingService() throws Exception {

// Use a separate TMS client to create topic, then publish and then poll some messages
TopicId topicId = NamespaceId.SYSTEM.topic("test");
MessagingService messagingService = new ClientMessagingService(remoteClientFactory, true);
MessagingService messagingService = new DefaultClientMessagingService(remoteClientFactory, true);
messagingService.createTopic(new DefaultTopicMetadata(topicId));

// Publish 10 messages
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
@@ -259,7 +259,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, Master
modules.add(new IOModule());
modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(coreSecurityModule);
modules.add(new MessagingClientModule());
modules.add(new DefaultMessagingClientModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
//Need for guice binding, but No Audit Log action required.
modules.add(new NoOpAuditLogModule());
Loading

0 comments on commit 5e9780a

Please sign in to comment.