Skip to content

Commit

Permalink
Implement AuthenticationProviderMTls (#1441)
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc authored Sep 9, 2024
1 parent 9817118 commit a7f2688
Show file tree
Hide file tree
Showing 13 changed files with 982 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oidc.broker.common;

/**
* Constant values related to Apache Pulsar broker OIDC options.
*/
public final class OIDCConstants {

/**
* Timeout value, in seconds, for metadata resource synchronization operations.
*/
public static final int RESOURCE_SYNC_OPERATION_TIMEOUT_SEC = 30;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oidc.broker.common;

import static io.streamnative.oidc.broker.common.OIDCConstants.RESOURCE_SYNC_OPERATION_TIMEOUT_SEC;
import com.fasterxml.jackson.core.type.TypeReference;
import io.streamnative.oidc.broker.common.pojo.Pool;
import io.streamnative.oidc.broker.common.utils.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.resources.BaseResources;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;


@SuppressWarnings("UnstableApiUsage")
public final class OIDCPoolResources extends BaseResources<Pool> {
private static final String BASE_PATH = "/sn-oidc/pools";

public OIDCPoolResources(@NotNull MetadataStore metadataStore) {
super(metadataStore, new TypeReference<>() { }, RESOURCE_SYNC_OPERATION_TIMEOUT_SEC);
}

public @NotNull Optional<Pool> getPool(@NotNull String poolName) throws MetadataStoreException {
return get(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
}

public @NotNull CompletableFuture<Optional<Pool>> getPoolAsync(@NotNull String poolName) {
return getAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
}

public void createPool(@NotNull Pool pool) throws MetadataStoreException {
create(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), pool);
}

public @NotNull CompletableFuture<Void> createPoolAsync(@NotNull Pool pool) {
return createAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), pool);
}

public @NotNull CompletableFuture<Boolean> existsAsync(@NotNull String poolName) {
return super.existsAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
}

public void deletePool(@NotNull String poolName) throws MetadataStoreException {
super.delete(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
}

public @NotNull CompletableFuture<Void> deletePoolAsync(@NotNull String poolName) {
return super.deleteIfExistsAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName)));
}

public @NotNull CompletableFuture<Void> updatePoolAsync(@NotNull Pool pool) {
return super.setAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), __ -> pool);
}

public @NotNull CompletableFuture<List<String>> listPoolNamesAsync() {
return super.getChildrenAsync(joinPath(BASE_PATH));
}

public @NotNull CompletableFuture<List<Pool>> listPoolsAsync() {
return super.getChildrenAsync(joinPath(BASE_PATH))
.thenCompose(poolNames -> {
List<CompletableFuture<Optional<Pool>>> pools = new ArrayList<>();
for (String name : poolNames) {
pools.add(getAsync(joinPath(BASE_PATH, name)));
}
return FutureUtil.waitForAll(pools)
.thenApply(__ -> pools.stream().map(f -> f.join())
.filter(f -> f.isPresent())
.map(f -> f.get())
.collect(Collectors.toList()));
});
}

public static boolean pathIsFromPool(String path) {
return path.startsWith(BASE_PATH + "/");
}

public static String poolFromPath(String path) {
return path.substring(BASE_PATH.length() + 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Copy from sn-pulsar-plugins, only used to compile stage
package io.streamnative.oidc.broker.common;
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oidc.broker.common.pojo;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import javax.validation.constraints.NotNull;

public record Pool(@JsonProperty(value = "name", required = true) @NotNull String name,
@JsonProperty(value = "auth_type", defaultValue = AUTH_TYPE_TOKEN) @NotNull String authType,
@JsonProperty(value = "description", required = true) @NotNull String description,
@JsonProperty(value = "provider_name") @NotNull String providerName,
@JsonProperty(value = "expression", required = true) @NotNull String expression) {

public static final String AUTH_TYPE_TOKEN = "token";
public static final String AUTH_TYPE_MTLS = "mtls";

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Pool pool = (Pool) o;
return Objects.equals(name, pool.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}

public String authType() {
return (authType == null || authType.isEmpty()) ? AUTH_TYPE_TOKEN : authType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oidc.broker.common.utils;

import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import javax.validation.constraints.NotNull;
import lombok.experimental.UtilityClass;

@UtilityClass
public final class Paths {

public String getUrlEncodedPath(@NotNull String name) {
return URLEncoder.encode(name, StandardCharsets.UTF_8);
}

public String getUrlDecodedPath(@NotNull String name) {
return URLDecoder.decode(name, StandardCharsets.UTF_8);
}
}
Loading

0 comments on commit a7f2688

Please sign in to comment.