Skip to content

Commit

Permalink
[CDAP-21079] Refactor SPI/TMS for default context before spanner exte…
Browse files Browse the repository at this point in the history
…nsion implementation
  • Loading branch information
sidhdirenge committed Nov 19, 2024
1 parent 8771c9c commit 3e37e7d
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
*/
public interface MessagingService {

/**
* Initialize the messaging service. This method is guaranteed to be called before any other
* method is called. It will only be called once for the lifetime of the messaging service.
*
* @param context the context that can be used to initialize the messaging service.
*/
void initialize(MessagingServiceContext context) throws IOException;

/**
* Returns the name of this MessagingService. The name needs to match with the configuration
* provided through {@code messaging.service.name}.
Expand All @@ -46,7 +54,7 @@ public interface MessagingService {
*
* @param topicMetadata topic to be created
* @throws TopicAlreadyExistsException if the topic to be created already exist
* @throws IOException if failed to create the topic
* @throws IOException if failed to create the topic
* @throws ServiceUnavailableException if the messaging service is not available
*/
void createTopic(TopicMetadata topicMetadata)
Expand All @@ -56,8 +64,8 @@ void createTopic(TopicMetadata topicMetadata)
* Updates the metadata of a topic.
*
* @param topicMetadata the topic metadata to be updated
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to update the topic metadata
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to update the topic metadata
* @throws ServiceUnavailableException if the messaging service is not available
*/
void updateTopic(TopicMetadata topicMetadata)
Expand All @@ -67,8 +75,8 @@ void updateTopic(TopicMetadata topicMetadata)
* Deletes a topic
*
* @param topicId the topic to be deleted
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to delete the topic
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to delete the topic
* @throws ServiceUnavailableException if the messaging service is not available
*/
void deleteTopic(TopicId topicId)
Expand All @@ -79,8 +87,8 @@ void deleteTopic(TopicId topicId)
*
* @param topicId message topic
* @return the {@link TopicMetadata} of the given topic.
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to retrieve topic metadata.
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to retrieve topic metadata.
* @throws ServiceUnavailableException if the messaging service is not available
*/
Map<String, String> getTopicMetadataProperties(TopicId topicId)
Expand All @@ -91,7 +99,7 @@ Map<String, String> getTopicMetadataProperties(TopicId topicId)
*
* @param namespaceId the namespace to list topics under it
* @return a {@link List} of {@link TopicId}.
* @throws IOException if failed to retrieve topics.
* @throws IOException if failed to retrieve topics.
* @throws ServiceUnavailableException if the messaging service is not available
*/
List<TopicId> listTopics(NamespaceId namespaceId) throws IOException, UnauthorizedException;
Expand All @@ -101,9 +109,9 @@ Map<String, String> getTopicMetadataProperties(TopicId topicId)
*
* @param request the {@link StoreRequest} containing messages to be published
* @return if the store request is transactional, then returns a {@link RollbackDetail} containing
* information for rollback; otherwise {@code null} will be returned.
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to publish messages
* information for rollback; otherwise {@code null} will be returned.
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to publish messages
* @throws ServiceUnavailableException if the messaging service is not available
*/
@Nullable
Expand All @@ -115,8 +123,8 @@ RollbackDetail publish(StoreRequest request)
* publishing use case.
*
* @param request the {@link StoreRequest} containing messages to be stored
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to store messages
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to store messages
* @throws ServiceUnavailableException if the messaging service is not available
*/
@Deprecated
Expand All @@ -126,11 +134,12 @@ void storePayload(StoreRequest request)
/**
* Rollbacks messages published to the given topic with the given transaction.
*
* @param topicId the topic where the messages were published under
* @param rollbackDetail the {@link RollbackDetail} as returned by the {@link
* #publish(StoreRequest)} call, which contains information needed for the rollback
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to rollback changes
* @param topicId the topic where the messages were published under
* @param rollbackDetail the {@link RollbackDetail} as returned by the
* {@link #publish(StoreRequest)} call, which contains information needed
* for the rollback
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to rollback changes
* @throws ServiceUnavailableException if the messaging service is not available
*/
@Deprecated
Expand All @@ -142,8 +151,8 @@ void rollback(TopicId topicId, RollbackDetail rollbackDetail)
* system.
*
* @param messageFetchRequest the request for fetching messages
* @throws TopicNotFoundException if the topic does not exist
* @throws IOException if it fails to create the iterator
* @throws TopicNotFoundException if the topic does not exist
* @throws IOException if it fails to create the iterator
* @throws ServiceUnavailableException if the messaging service is not available
*/
CloseableIterator<RawMessage> fetch(MessageFetchRequest messageFetchRequest)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.messaging.spi;

import java.util.Map;

/**
* The context object available to {@link MessagingService} for access to CDAP resources.
*/
public interface MessagingServiceContext {

/**
* System properties are derived from the CDAP configuration. Anything in the CDAP configuration
* will be added as an entry in the system properties.
*
* @return unmodifiable system properties for the messaging service.
*/
Map<String, String> getProperties();

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.internal.io.ExposedByteArrayOutputStream;
import io.cdap.cdap.messaging.spi.MessageFetchRequest;
import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.RollbackDetail;
import io.cdap.cdap.messaging.Schemas;
Expand Down Expand Up @@ -117,6 +118,10 @@ public ClientMessagingService(RemoteClientFactory remoteClientFactory, boolean c
this.compressPayload = compressPayload;
}

@Override
public void initialize(MessagingServiceContext context) throws IOException {
}

@Override
public String getName() {
return this.getClass().getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.cdap.messaging.client;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* Default implementation for {@link MessagingServiceContext}.
*/
public class DefaultMessagingServiceContext implements MessagingServiceContext {

private final CConfiguration cConf;

private static final String storageImpl = "gcp-spanner";

DefaultMessagingServiceContext(CConfiguration cConf) {
this.cConf = cConf;
}

@Override
public Map<String, String> getProperties() {
// TODO: cdap-tms module refactoring will remove this dependency on spanner.
String spannerPropertiesPrefix =
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
Map<String, String> propertiesMap = new HashMap<>(
cConf.getPropsWithPrefix(spannerPropertiesPrefix));
return Collections.unmodifiableMap(propertiesMap);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.messaging.spi.MessageFetchRequest;
import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.messaging.spi.RollbackDetail;
Expand All @@ -38,7 +39,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Delegates {@link io.cdap.cdap.messaging.spi.MessagingService} based on configured extension */
/**
* Delegates {@link io.cdap.cdap.messaging.spi.MessagingService} based on configured extension
*/
public class DelegatingMessagingService implements MessagingService {

private static final Logger LOG = LoggerFactory.getLogger(DelegatingMessagingService.class);
Expand Down Expand Up @@ -71,6 +74,10 @@ public void deleteTopic(TopicId topicId)
getDelegate().deleteTopic(topicId);
}

@Override
public void initialize(MessagingServiceContext context) throws IOException {
}

@Override
public String getName() {
return cConf.get(MessagingSystem.MESSAGING_SERVICE_NAME);
Expand Down Expand Up @@ -130,6 +137,12 @@ private MessagingService getDelegate() {
"Unsupported messaging service implementation " + getName());
}
LOG.info("Messaging service {} is loaded", messagingService.getName());
try {
messagingService.initialize(new DefaultMessagingServiceContext(this.cConf));
} catch (IOException e) {
throw new RuntimeException(e);
}
LOG.info("Messaging service {} is initialized", messagingService.getName());

this.delegate = messagingService;
return messagingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.Collections;
import java.util.Set;

/** A extension loader for {@link MessagingService}. */
/**
* A extension loader for {@link MessagingService}.
*/
final class MessagingServiceExtensionLoader
extends AbstractExtensionLoader<String, MessagingService> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import io.cdap.cdap.api.service.ServiceUnavailableException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.messaging.server.MessagingHttpService;
import io.cdap.cdap.messaging.service.CoreMessagingService;
import io.cdap.cdap.messaging.spi.MessageFetchRequest;
import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.messaging.spi.RollbackDetail;
import io.cdap.cdap.messaging.spi.StoreRequest;
import io.cdap.cdap.messaging.spi.TopicMetadata;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.messaging.server.MessagingHttpService;
import io.cdap.cdap.messaging.service.CoreMessagingService;
import io.cdap.cdap.messaging.store.ForwardingTableFactory;
import io.cdap.cdap.messaging.store.TableFactory;
import io.cdap.cdap.messaging.store.cache.MessageTableCacheProvider;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
Expand Down Expand Up @@ -83,6 +82,10 @@ public class LeaderElectionMessagingService extends AbstractIdleService implemen
this.delegate = new AtomicMarkableReference<>(null, false);
}

@Override
public void initialize(MessagingServiceContext context) throws IOException {
}

@Override
public String getName() {
return this.getClass().getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import com.google.inject.Scopes;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.client.DelegatingMessagingService;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.client.ClientMessagingService;

/**
* The Guice module to bind messaging service based on {@link
* io.cdap.cdap.common.conf.Constants.MessagingSystem#MESSAGING_SERVICE_NAME} if set in cConf.
* Otherwise, binds to {@link ClientMessagingService}.
* The Guice module to bind messaging service based on
* {@link MessagingSystem#MESSAGING_SERVICE_NAME} if set in cConf. Otherwise, binds to
* {@link ClientMessagingService}.
*/
public class MessagingServiceModule extends AbstractModule {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.utils.TimeProvider;
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.spi.MessageFetchRequest;
import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.MessagingServiceUtils;
import io.cdap.cdap.messaging.MessagingUtils;
Expand Down Expand Up @@ -130,6 +131,10 @@ protected CoreMessagingService(
TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
}

@Override
public void initialize(MessagingServiceContext context) throws IOException {
}

@Override
public String getName() {
return this.getClass().getSimpleName();
Expand Down

0 comments on commit 3e37e7d

Please sign in to comment.