Skip to content

Commit

Permalink
[#2612] feat(client): Add Java client support for messaging catalog (#…
Browse files Browse the repository at this point in the history
…2722)

### What changes were proposed in this pull request?

Add Java client support for the messaging catalog

### Why are the changes needed?

Fix: #2612 

### Does this PR introduce _any_ user-facing change?

Add Java client support for the messaging catalog


### How was this patch tested?

UTs
  • Loading branch information
mchades authored Apr 1, 2024
1 parent a1c1d53 commit 4d423a7
Show file tree
Hide file tree
Showing 9 changed files with 641 additions and 3 deletions.
5 changes: 4 additions & 1 deletion api/src/main/java/com/datastrato/gravitino/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ enum Type {
FILESET,

/** Catalog Type for Message Queue, like kafka://topic */
MESSAGING
MESSAGING,

/** Catalog Type for test only. */
UNSUPPORTED
}

/**
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/com/datastrato/gravitino/NameIdentifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ public static void checkFileset(NameIdentifier ident) {
Namespace.checkFileset(ident.namespace);
}

/**
* Check the given {@link NameIdentifier} is a topic identifier. Throw an {@link
* IllegalNameIdentifierException} if it's not.
*
* @param ident The topic {@link NameIdentifier} to check.
*/
public static void checkTopic(NameIdentifier ident) {
check(ident != null, "Topic identifier must not be null");
Namespace.checkTopic(ident.namespace);
}

/**
* Create a {@link NameIdentifier} from the given identifier string.
*
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/com/datastrato/gravitino/Namespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ public static void checkFileset(Namespace namespace) {
namespace);
}

/**
* Check if the given topic namespace is legal, throw an {@link IllegalNamespaceException} if it's
* illegal.
*
* @param namespace The topic namespace
*/
public static void checkTopic(Namespace namespace) {
check(
namespace != null && namespace.length() == 3,
"Topic namespace must be non-null and have 3 levels, the input namespace is %s",
namespace);
}

private Namespace(String[] levels) {
this.levels = levels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import com.datastrato.gravitino.dto.requests.MetalakeUpdateRequest;
import com.datastrato.gravitino.dto.requests.SchemaUpdateRequest;
import com.datastrato.gravitino.dto.requests.TableUpdateRequest;
import com.datastrato.gravitino.dto.requests.TopicUpdateRequest;
import com.datastrato.gravitino.file.FilesetChange;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.TableChange;

Expand Down Expand Up @@ -82,6 +84,15 @@ static Catalog toCatalog(CatalogDTO catalog, RESTClient client) {
.build();

case MESSAGING:
return MessagingCatalog.builder()
.withName(catalog.name())
.withType(catalog.type())
.withProvider(catalog.provider())
.withComment(catalog.comment())
.withProperties(catalog.properties())
.withAudit((AuditDTO) catalog.auditInfo())
.withRestClient(client)
.build();
default:
throw new UnsupportedOperationException("Unsupported catalog type: " + catalog.type());
}
Expand Down Expand Up @@ -183,6 +194,23 @@ static FilesetUpdateRequest toFilesetUpdateRequest(FilesetChange change) {
}
}

static TopicUpdateRequest toTopicUpdateRequest(TopicChange change) {
if (change instanceof TopicChange.UpdateTopicComment) {
return new TopicUpdateRequest.UpdateTopicCommentRequest(
((TopicChange.UpdateTopicComment) change).getNewComment());
} else if (change instanceof TopicChange.SetProperty) {
return new TopicUpdateRequest.SetTopicPropertyRequest(
((TopicChange.SetProperty) change).getProperty(),
((TopicChange.SetProperty) change).getValue());
} else if (change instanceof TopicChange.RemoveProperty) {
return new TopicUpdateRequest.RemoveTopicPropertyRequest(
((TopicChange.RemoveProperty) change).getProperty());
} else {
throw new IllegalArgumentException(
"Unknown change type: " + change.getClass().getSimpleName());
}
}

private static TableUpdateRequest toColumnUpdateRequest(TableChange.ColumnChange change) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import com.datastrato.gravitino.exceptions.NoSuchPartitionException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NoSuchTopicException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.NotFoundException;
import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException;
import com.datastrato.gravitino.exceptions.RESTException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
import com.datastrato.gravitino.exceptions.UnauthorizedException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -112,6 +114,15 @@ public static Consumer<ErrorResponse> filesetErrorHandler() {
return FilesetErrorHandler.INSTANCE;
}

/**
* Creates an error handler specific to Topic operations.
*
* @return A Consumer representing the Topic error handler.
*/
public static Consumer<ErrorResponse> topicErrorHandler() {
return TopicErrorHandler.INSTANCE;
}

private ErrorHandlers() {}

/**
Expand Down Expand Up @@ -410,6 +421,41 @@ public void accept(ErrorResponse errorResponse) {
}
}

/** Error handler specific to Topic operations. */
@SuppressWarnings("FormatStringAnnotation")
private static class TopicErrorHandler extends RestErrorHandler {

private static final TopicErrorHandler INSTANCE = new TopicErrorHandler();

@Override
public void accept(ErrorResponse errorResponse) {
String errorMessage = formatErrorMessage(errorResponse);

switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
throw new IllegalArgumentException(errorMessage);

case ErrorConstants.NOT_FOUND_CODE:
if (errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
throw new NoSuchSchemaException(errorMessage);
} else if (errorResponse.getType().equals(NoSuchTopicException.class.getSimpleName())) {
throw new NoSuchTopicException(errorMessage);
} else {
throw new NotFoundException(errorMessage);
}

case ErrorConstants.ALREADY_EXISTS_CODE:
throw new TopicAlreadyExistsException(errorMessage);

case ErrorConstants.INTERNAL_ERROR_CODE:
throw new RuntimeException(errorMessage);

default:
super.accept(errorResponse);
}
}
}

/** Generic error handler for REST requests. */
private static class RestErrorHandler extends ErrorHandler {
private static final ErrorHandler INSTANCE = new RestErrorHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
.map(DTOConverters::toFilesetUpdateRequest)
.collect(Collectors.toList());
FilesetUpdatesRequest req = new FilesetUpdatesRequest(updates);
req.validate();

FilesetResponse resp =
restClient.put(
Expand Down
Loading

0 comments on commit 4d423a7

Please sign in to comment.