Skip to content

Commit

Permalink
Merge branch 'main' into issues/729
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrinf authored Jan 15, 2025
2 parents cda283f + ed49499 commit a4b6378
Show file tree
Hide file tree
Showing 24 changed files with 531 additions and 55 deletions.
16 changes: 5 additions & 11 deletions .dev/dev_arm64.yaml → .dev/dev.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
# This is a compose file designed for arm64/Apple Silicon systems
# To adapt this to x86 please find and replace ".arm64" with empty

# ARM64 supported images for kafka can be found here
# https://hub.docker.com/r/confluentinc/cp-kafka/tags?page=1&name=arm64
---
version: '3.8'
name: "kafbat-ui-dev"

Expand Down Expand Up @@ -32,7 +26,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.8.0.arm64
image: confluentinc/cp-kafka:7.8.0
user: "0:0"
hostname: kafka0
container_name: kafka0
Expand Down Expand Up @@ -60,7 +54,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry0:
image: confluentinc/cp-schema-registry:7.8.0.arm64
image: confluentinc/cp-schema-registry:7.8.0
ports:
- 8085:8085
depends_on:
Expand All @@ -76,7 +70,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-connect0:
image: confluentinc/cp-kafka-connect:7.8.0.arm64
image: confluentinc/cp-kafka-connect:7.8.0
ports:
- 8083:8083
depends_on:
Expand All @@ -101,7 +95,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"

ksqldb0:
image: confluentinc/cp-ksqldb-server:7.8.0.arm64
image: confluentinc/cp-ksqldb-server:7.8.0
depends_on:
- kafka0
- kafka-connect0
Expand All @@ -119,7 +113,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0

kafka-init-topics:
image: confluentinc/cp-kafka:7.8.0.arm64
image: confluentinc/cp-kafka:7.8.0
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/cve_checks.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
name: "Infra: CVE checks"
on:
pull_request:
types: [ "opened", "reopened", "synchronize" ]
push:
branches: [ "main" ]
workflow_dispatch:
schedule:
# * is a special character in YAML so you have to quote this string
Expand Down Expand Up @@ -71,7 +75,7 @@ jobs:

notify:
needs: check-cves
if: ${{ always() && needs.build-and-test.result == 'failure' }}
if: ${{ always() && needs.build-and-test.result == 'failure' && github.event_name == 'schedule' }}
uses: ./.github/workflows/infra_discord_hook.yml
with:
message: "Attention! CVE checks run failed! Please fix them CVEs :("
Expand Down
1 change: 1 addition & 0 deletions .mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-Djava.net.useSystemProxies=true
1 change: 1 addition & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@
</goals>
<configuration>
<arguments>build</arguments>
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorNam
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
Expand All @@ -261,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resetConnectorOffsets(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return withRetryOnRebalance(super.resumeConnector(connectorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public AbstractLdapAuthenticationProvider authenticationProvider(LdapAuthorities
}

@Bean
@ConditionalOnProperty(value = "oauth2.ldap.activeDirectory", havingValue = "false")
@ConditionalOnProperty(value = "oauth2.ldap.activeDirectory", havingValue = "false", matchIfMissing = true)
public BindAuthenticator ldapBindAuthentication(LdapContextSource ldapContextSource) {
BindAuthenticator ba = new BindAuthenticator(ldapContextSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;

import io.kafbat.ui.api.KafkaConnectApi;
import io.kafbat.ui.model.ConnectDTO;
Expand Down Expand Up @@ -285,4 +287,23 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
default -> defaultComparator;
};
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
String connectorName,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, VIEW, RESET_OFFSETS)
.operationName("resetConnectorOffsets")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.kafbat.ui.exception;

public class ConnectorOffsetsResetException extends CustomBaseException {

public ConnectorOffsetsResetException(String message) {
super(message);
}

@Override
public ErrorCode getErrorCode() {
return ErrorCode.CONNECTOR_OFFSETS_RESET_ERROR;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum ErrorCode {
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST),
FILE_UPLOAD_EXCEPTION(4019, HttpStatus.INTERNAL_SERVER_ERROR),
CEL_ERROR(4020, HttpStatus.BAD_REQUEST),
CONNECTOR_OFFSETS_RESET_ERROR(4021, HttpStatus.BAD_REQUEST),
;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
EDIT(VIEW),
CREATE(VIEW),
RESTART(VIEW),
DELETE(VIEW)
DELETE(VIEW),
RESET_OFFSETS(VIEW)

;

Expand All @@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
this.dependantActions = dependantActions;
}

public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);

@Nullable
public static ConnectAction fromString(String name) {
Expand Down
18 changes: 18 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.mapper.ClusterMapper;
Expand Down Expand Up @@ -213,6 +214,7 @@ public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
case STOP -> client.stopConnector(connectorName);
case RESUME -> client.resumeConnector(connectorName);
});
}
Expand Down Expand Up @@ -272,4 +274,20 @@ private ReactiveFailover<KafkaConnectClientApi> api(KafkaCluster cluster, String
}
return client;
}

public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName,
String connectorName) {
return api(cluster, connectName)
.mono(client -> client.resetConnectorOffsets(connectorName))
.onErrorResume(WebClientResponseException.NotFound.class,
e -> {
throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName));
})
.onErrorResume(WebClientResponseException.BadRequest.class,
e -> {
throw new ConnectorOffsetsResetException(
"Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first."
.formatted(connectorName, connectName));
});
}
}
60 changes: 57 additions & 3 deletions api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.test.web.reactive.server.ExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;

@Slf4j
Expand All @@ -45,6 +47,7 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest {

@BeforeEach
public void setUp() {

webTestClient.post()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.bodyValue(new NewConnectorDTO()
Expand All @@ -54,11 +57,10 @@ public void setUp() {
"tasks.max", "1",
"topics", "output-topic",
"file", "/tmp/test",
"test.password", "test-credentials"
))
)
"test.password", "test-credentials")))
.exchange()
.expectStatus().isOk();

}

@AfterEach
Expand Down Expand Up @@ -418,4 +420,56 @@ public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
.expectStatus()
.isBadRequest();
}

@Test
public void shouldResetConnectorWhenInStoppedState() {

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(ConnectorDTO.class)
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));

webTestClient.post()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/STOP",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk();

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(ConnectorDTO.class)
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.STOPPED));

webTestClient.delete()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk();

}

@Test
public void shouldReturn400WhenResettingConnectorInRunningState() {

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(ConnectorDTO.class)
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));

webTestClient.delete()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets", LOCAL,
connectName, connectorName)
.exchange()
.expectStatus().isBadRequest();

}
}
1 change: 1 addition & 0 deletions contract/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
</goals>
<configuration>
<arguments>gen:sources</arguments>
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
</configuration>
</execution>
</executions>
Expand Down
Loading

0 comments on commit a4b6378

Please sign in to comment.