diff --git a/.dev/dev_arm64.yaml b/.dev/dev.yaml
similarity index 91%
rename from .dev/dev_arm64.yaml
rename to .dev/dev.yaml
index dc1a8726e..8c2ba5e74 100644
--- a/.dev/dev_arm64.yaml
+++ b/.dev/dev.yaml
@@ -1,9 +1,3 @@
-# This is a compose file designed for arm64/Apple Silicon systems
-# To adapt this to x86 please find and replace ".arm64" with empty
-
-# ARM64 supported images for kafka can be found here
-# https://hub.docker.com/r/confluentinc/cp-kafka/tags?page=1&name=arm64
----
version: '3.8'
name: "kafbat-ui-dev"
@@ -32,7 +26,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
kafka0:
- image: confluentinc/cp-kafka:7.8.0.arm64
+ image: confluentinc/cp-kafka:7.8.0
user: "0:0"
hostname: kafka0
container_name: kafka0
@@ -60,7 +54,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry0:
- image: confluentinc/cp-schema-registry:7.8.0.arm64
+ image: confluentinc/cp-schema-registry:7.8.0
ports:
- 8085:8085
depends_on:
@@ -76,7 +70,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
kafka-connect0:
- image: confluentinc/cp-kafka-connect:7.8.0.arm64
+ image: confluentinc/cp-kafka-connect:7.8.0
ports:
- 8083:8083
depends_on:
@@ -101,7 +95,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"
ksqldb0:
- image: confluentinc/cp-ksqldb-server:7.8.0.arm64
+ image: confluentinc/cp-ksqldb-server:7.8.0
depends_on:
- kafka0
- kafka-connect0
@@ -119,7 +113,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
kafka-init-topics:
- image: confluentinc/cp-kafka:7.8.0.arm64
+ image: confluentinc/cp-kafka:7.8.0
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
diff --git a/.github/workflows/cve_checks.yml b/.github/workflows/cve_checks.yml
index e9c90ac14..cfdc40a60 100644
--- a/.github/workflows/cve_checks.yml
+++ b/.github/workflows/cve_checks.yml
@@ -1,5 +1,9 @@
name: "Infra: CVE checks"
on:
+ pull_request:
+ types: [ "opened", "reopened", "synchronize" ]
+ push:
+ branches: [ "main" ]
workflow_dispatch:
schedule:
# * is a special character in YAML so you have to quote this string
@@ -71,7 +75,7 @@ jobs:
notify:
needs: check-cves
- if: ${{ always() && needs.build-and-test.result == 'failure' }}
+ if: ${{ always() && needs.build-and-test.result == 'failure' && github.event_name == 'schedule' }}
uses: ./.github/workflows/infra_discord_hook.yml
with:
message: "Attention! CVE checks run failed! Please fix them CVEs :("
diff --git a/.mvn/jvm.config b/.mvn/jvm.config
new file mode 100644
index 000000000..2bf66750a
--- /dev/null
+++ b/.mvn/jvm.config
@@ -0,0 +1 @@
+-Djava.net.useSystemProxies=true
\ No newline at end of file
diff --git a/api/pom.xml b/api/pom.xml
index dc774c09a..bbeb9dff8 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -492,6 +492,7 @@
build
+ false
diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
index 474a0c159..df2da3e55 100644
--- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
+++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
@@ -238,6 +238,16 @@ public Mono> pauseConnectorWithHttpInfo(String connectorNam
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}
+ @Override
+ public Mono stopConnector(String connectorName) throws WebClientResponseException {
+ return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
+ }
+
+ @Override
+ public Mono> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
+ return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
+ }
+
@Override
public Mono restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
@@ -261,6 +271,18 @@ public Mono> restartConnectorTaskWithHttpInfo(String connec
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}
+ @Override
+ public Mono resetConnectorOffsets(String connectorName)
+ throws WebClientResponseException {
+ return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
+ }
+
+ @Override
+ public Mono> resetConnectorOffsetsWithHttpInfo(String connectorName)
+ throws WebClientResponseException {
+ return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
+ }
+
@Override
public Mono resumeConnector(String connectorName) throws WebClientResponseException {
return withRetryOnRebalance(super.resumeConnector(connectorName));
diff --git a/api/src/main/java/io/kafbat/ui/config/auth/LdapSecurityConfig.java b/api/src/main/java/io/kafbat/ui/config/auth/LdapSecurityConfig.java
index 9b1445507..4b7473942 100644
--- a/api/src/main/java/io/kafbat/ui/config/auth/LdapSecurityConfig.java
+++ b/api/src/main/java/io/kafbat/ui/config/auth/LdapSecurityConfig.java
@@ -80,7 +80,7 @@ public AbstractLdapAuthenticationProvider authenticationProvider(LdapAuthorities
}
@Bean
- @ConditionalOnProperty(value = "oauth2.ldap.activeDirectory", havingValue = "false")
+ @ConditionalOnProperty(value = "oauth2.ldap.activeDirectory", havingValue = "false", matchIfMissing = true)
public BindAuthenticator ldapBindAuthentication(LdapContextSource ldapContextSource) {
BindAuthenticator ba = new BindAuthenticator(ldapContextSource);
diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java
index 08eb304c0..328a7353e 100644
--- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java
+++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java
@@ -3,6 +3,8 @@
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
+import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
+import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;
import io.kafbat.ui.api.KafkaConnectApi;
import io.kafbat.ui.model.ConnectDTO;
@@ -285,4 +287,23 @@ private Comparator getConnectorsComparator(ConnectorColumn
default -> defaultComparator;
};
}
+
+ @Override
+ public Mono> resetConnectorOffsets(String clusterName, String connectName,
+ String connectorName,
+ ServerWebExchange exchange) {
+
+ var context = AccessContext.builder()
+ .cluster(clusterName)
+ .connectActions(connectName, VIEW, RESET_OFFSETS)
+ .operationName("resetConnectorOffsets")
+ .operationParams(Map.of(CONNECTOR_NAME, connectorName))
+ .build();
+
+ return validateAccess(context).then(
+ kafkaConnectService
+ .resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
+ .map(ResponseEntity::ok))
+ .doOnEach(sig -> audit(context, sig));
+ }
}
diff --git a/api/src/main/java/io/kafbat/ui/exception/ConnectorOffsetsResetException.java b/api/src/main/java/io/kafbat/ui/exception/ConnectorOffsetsResetException.java
new file mode 100644
index 000000000..f76feddc3
--- /dev/null
+++ b/api/src/main/java/io/kafbat/ui/exception/ConnectorOffsetsResetException.java
@@ -0,0 +1,13 @@
+package io.kafbat.ui.exception;
+
+public class ConnectorOffsetsResetException extends CustomBaseException {
+
+ public ConnectorOffsetsResetException(String message) {
+ super(message);
+ }
+
+ @Override
+ public ErrorCode getErrorCode() {
+ return ErrorCode.CONNECTOR_OFFSETS_RESET_ERROR;
+ }
+}
diff --git a/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java b/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
index a1b499aff..6d4a732e3 100644
--- a/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
+++ b/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
@@ -32,6 +32,7 @@ public enum ErrorCode {
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST),
FILE_UPLOAD_EXCEPTION(4019, HttpStatus.INTERNAL_SERVER_ERROR),
CEL_ERROR(4020, HttpStatus.BAD_REQUEST),
+ CONNECTOR_OFFSETS_RESET_ERROR(4021, HttpStatus.BAD_REQUEST),
;
static {
diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectAction.java
index 7634e89c0..a357245bc 100644
--- a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectAction.java
+++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectAction.java
@@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
EDIT(VIEW),
CREATE(VIEW),
RESTART(VIEW),
- DELETE(VIEW)
+ DELETE(VIEW),
+ RESET_OFFSETS(VIEW)
;
@@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
this.dependantActions = dependantActions;
}
- public static final Set ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
+ public static final Set ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);
@Nullable
public static ConnectAction fromString(String name) {
diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
index 815069d07..31e4268a0 100644
--- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
+++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
@@ -6,6 +6,7 @@
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.TaskStatus;
+import io.kafbat.ui.exception.ConnectorOffsetsResetException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.mapper.ClusterMapper;
@@ -213,6 +214,7 @@ public Mono updateConnectorState(KafkaCluster cluster, String connectName,
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
+ case STOP -> client.stopConnector(connectorName);
case RESUME -> client.resumeConnector(connectorName);
});
}
@@ -272,4 +274,20 @@ private ReactiveFailover api(KafkaCluster cluster, String
}
return client;
}
+
+ public Mono resetConnectorOffsets(KafkaCluster cluster, String connectName,
+ String connectorName) {
+ return api(cluster, connectName)
+ .mono(client -> client.resetConnectorOffsets(connectorName))
+ .onErrorResume(WebClientResponseException.NotFound.class,
+ e -> {
+ throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName));
+ })
+ .onErrorResume(WebClientResponseException.BadRequest.class,
+ e -> {
+ throw new ConnectorOffsetsResetException(
+ "Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first."
+ .formatted(connectorName, connectName));
+ });
+ }
}
diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
index 26d55b5ad..c5fbb14b4 100644
--- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
+++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
@@ -24,6 +24,8 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpStatus;
+import org.springframework.test.web.reactive.server.ExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
@Slf4j
@@ -45,6 +47,7 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest {
@BeforeEach
public void setUp() {
+
webTestClient.post()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.bodyValue(new NewConnectorDTO()
@@ -54,11 +57,10 @@ public void setUp() {
"tasks.max", "1",
"topics", "output-topic",
"file", "/tmp/test",
- "test.password", "test-credentials"
- ))
- )
+ "test.password", "test-credentials")))
.exchange()
.expectStatus().isOk();
+
}
@AfterEach
@@ -418,4 +420,56 @@ public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
.expectStatus()
.isBadRequest();
}
+
+ @Test
+ public void shouldResetConnectorWhenInStoppedState() {
+
+ webTestClient.get()
+ .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
+ LOCAL, connectName, connectorName)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(ConnectorDTO.class)
+ .value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));
+
+ webTestClient.post()
+ .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/STOP",
+ LOCAL, connectName, connectorName)
+ .exchange()
+ .expectStatus().isOk();
+
+ webTestClient.get()
+ .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
+ LOCAL, connectName, connectorName)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(ConnectorDTO.class)
+ .value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.STOPPED));
+
+ webTestClient.delete()
+ .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets",
+ LOCAL, connectName, connectorName)
+ .exchange()
+ .expectStatus().isOk();
+
+ }
+
+ @Test
+ public void shouldReturn400WhenResettingConnectorInRunningState() {
+
+ webTestClient.get()
+ .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
+ LOCAL, connectName, connectorName)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(ConnectorDTO.class)
+ .value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));
+
+ webTestClient.delete()
+ .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets", LOCAL,
+ connectName, connectorName)
+ .exchange()
+ .expectStatus().isBadRequest();
+
+ }
}
diff --git a/contract/pom.xml b/contract/pom.xml
index 8d7e76cea..55d86d40b 100644
--- a/contract/pom.xml
+++ b/contract/pom.xml
@@ -201,6 +201,7 @@
gen:sources
+ false
diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml
index 315c4a17e..24530bcb4 100644
--- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml
+++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml
@@ -1565,7 +1565,7 @@ paths:
post:
tags:
- Kafka Connect
- summary: update connector state (restart, pause or resume)
+ summary: update connector state (restart, pause, stop or resume)
operationId: updateConnectorState
parameters:
- name: clusterName
@@ -1722,6 +1722,31 @@ paths:
200:
description: OK
+ /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/offsets:
+ delete:
+ tags:
+ - Kafka Connect
+ summary: reset the offsets for the specified connector
+ operationId: resetConnectorOffsets
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
/api/clusters/{clusterName}/ksql/v2:
post:
@@ -3567,6 +3592,7 @@ components:
- RESTART_FAILED_TASKS
- PAUSE
- RESUME
+ - STOP
TaskAction:
type: string
@@ -3953,7 +3979,16 @@ components:
KafkaAcl:
type: object
- required: [resourceType, resourceName, namePatternType, principal, host, operation, permission]
+ required:
+ [
+ resourceType,
+ resourceName,
+ namePatternType,
+ principal,
+ host,
+ operation,
+ permission,
+ ]
properties:
resourceType:
$ref: '#/components/schemas/KafkaAclResourceType'
diff --git a/contract/src/main/resources/swagger/kafka-connect-api.yaml b/contract/src/main/resources/swagger/kafka-connect-api.yaml
index e014d5529..5fa8dc230 100644
--- a/contract/src/main/resources/swagger/kafka-connect-api.yaml
+++ b/contract/src/main/resources/swagger/kafka-connect-api.yaml
@@ -144,6 +144,42 @@ paths:
500:
description: Internal server error
+ /connectors/{connector}/offsets:
+ delete:
+ tags:
+ - KafkaConnectClient
+ summary: Reset the offsets for the specified connector
+ operationId: resetConnectorOffsets
+ parameters:
+ - in: path
+ name: connector
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ 400:
+ description: Bad request
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConnectorOffsetsError'
+
+ get:
+ tags:
+ - KafkaConnectClient
+ summary: Get the offsets for the specified connector
+ operationId: getConnectorOffsets
+ parameters:
+ - in: path
+ name: connector
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
/connectors/{connectorName}/status:
get:
@@ -230,6 +266,22 @@ paths:
202:
description: Accepted
+ /connectors/{connectorName}/stop:
+ put:
+ tags:
+ - KafkaConnectClient
+ summary: stop the connector
+ operationId: stopConnector
+ parameters:
+ - name: connectorName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 204:
+ description: No Content
+
/connectors/{connectorName}/tasks:
get:
tags:
@@ -432,6 +484,14 @@ components:
trace:
type: string
+ ConnectorOffsetsError:
+ type: object
+ properties:
+ error_code:
+ type: number
+ message:
+ type: string
+
ConnectorStatus:
type: object
properties:
diff --git a/documentation/compose/DOCKER_COMPOSE.md b/documentation/compose/DOCKER_COMPOSE.md
index 57a5cf4d0..186c15c45 100644
--- a/documentation/compose/DOCKER_COMPOSE.md
+++ b/documentation/compose/DOCKER_COMPOSE.md
@@ -1,7 +1,7 @@
# Descriptions of docker-compose configurations (*.yaml)
1. [kafka-ui.yaml](./kafbat-ui.yaml) - Default configuration with 2 kafka clusters with two nodes of Schema Registry, one kafka-connect and a few dummy topics.
-2. [kafka-ui-arm64.yaml](../../.dev/dev_arm64.yaml) - Default configuration for ARM64(Mac M1) architecture with 1 kafka cluster without zookeeper with one node of Schema Registry, one kafka-connect and a few dummy topics.
+2. [kafka-ui.yaml](../../.dev/dev_arm64.yaml) - Default configuration with 1 kafka cluster without zookeeper with one node of Schema Registry, one kafka-connect and a few dummy topics.
3. [kafka-ui-ssl.yml](./kafka-ssl.yml) - Connect to Kafka via TLS/SSL
4. [kafka-cluster-sr-auth.yaml](./cluster-sr-auth.yaml) - Schema registry with authentication.
5. [kafka-ui-auth-context.yaml](./auth-context.yaml) - Basic (username/password) authentication with custom path (URL) (issue 861).
diff --git a/frontend/package.json b/frontend/package.json
index b42a0d12e..6c55031f5 100644
--- a/frontend/package.json
+++ b/frontend/package.json
@@ -103,8 +103,8 @@
"whatwg-fetch": "3.6.20"
},
"engines": {
- "node": "^22.12.0",
- "pnpm": "^9.15.0"
+ "node": "^22",
+ "pnpm": "^9"
},
"pnpm": {
"overrides": {
diff --git a/frontend/src/components/Connect/Details/Actions/Actions.tsx b/frontend/src/components/Connect/Details/Actions/Actions.tsx
index 61eeabda4..6909b4617 100644
--- a/frontend/src/components/Connect/Details/Actions/Actions.tsx
+++ b/frontend/src/components/Connect/Details/Actions/Actions.tsx
@@ -11,6 +11,7 @@ import useAppParams from 'lib/hooks/useAppParams';
import {
useConnector,
useDeleteConnector,
+ useResetConnectorOffsets,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
import {
@@ -37,7 +38,7 @@ const Actions: React.FC = () => {
const deleteConnectorHandler = () =>
confirm(
<>
- Are you sure you want to remove {routerProps.connectorName}{' '}
+ Are you sure you want to remove the {routerProps.connectorName}{' '}
connector?
>,
async () => {
@@ -59,11 +60,25 @@ const Actions: React.FC = () => {
stateMutation.mutateAsync(ConnectorAction.RESTART_FAILED_TASKS);
const pauseConnectorHandler = () =>
stateMutation.mutateAsync(ConnectorAction.PAUSE);
+ const stopConnectorHandler = () =>
+ stateMutation.mutateAsync(ConnectorAction.STOP);
const resumeConnectorHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESUME);
+
+ const resetConnectorOffsetsMutation = useResetConnectorOffsets(routerProps);
+ const resetConnectorOffsetsHandler = () =>
+ confirm(
+ <>
+ Are you sure you want to reset the {routerProps.connectorName}{' '}
+ connector offsets?
+ >,
+ () => resetConnectorOffsetsMutation.mutateAsync()
+ );
+
return (
Restart
@@ -74,7 +89,6 @@ const Actions: React.FC = () => {
{connector?.status.state === ConnectorState.RUNNING && (
{
Pause
)}
- {connector?.status.state === ConnectorState.PAUSED && (
+ {connector?.status.state === ConnectorState.RUNNING && (
+
+ Stop
+
+ )}
+ {(connector?.status.state === ConnectorState.PAUSED ||
+ connector?.status.state === ConnectorState.STOPPED) && (
{
)}
{
{
{
+
+ Reset Offsets
+
({
@@ -34,12 +35,14 @@ jest.mock('lib/hooks/api/kafkaConnect', () => ({
useConnector: jest.fn(),
useDeleteConnector: jest.fn(),
useUpdateConnectorState: jest.fn(),
+ useResetConnectorOffsets: jest.fn(),
}));
const expectActionButtonsExists = () => {
expect(screen.getByText('Restart Connector')).toBeInTheDocument();
expect(screen.getByText('Restart All Tasks')).toBeInTheDocument();
expect(screen.getByText('Restart Failed Tasks')).toBeInTheDocument();
+ expect(screen.getByText('Reset Offsets')).toBeInTheDocument();
expect(screen.getByText('Delete')).toBeInTheDocument();
};
const afterClickDropDownButton = async () => {
@@ -55,6 +58,7 @@ describe('Actions', () => {
mockHistoryPush.mockClear();
deleteConnector.mockClear();
cancelMock.mockClear();
+ resetConnectorOffsets.mockClear();
});
describe('view', () => {
@@ -82,6 +86,30 @@ describe('Actions', () => {
expect(screen.getAllByRole('menuitem').length).toEqual(4);
expect(screen.getByText('Resume')).toBeInTheDocument();
expect(screen.queryByText('Pause')).not.toBeInTheDocument();
+ expect(screen.queryByText('Stop')).not.toBeInTheDocument();
+ await afterClickDropDownButton();
+ expect(screen.getByText('Reset Offsets')).toBeInTheDocument();
+ expect(
+ screen.getByRole('menuitem', { name: 'Reset Offsets' })
+ ).toHaveAttribute('aria-disabled');
+ expectActionButtonsExists();
+ });
+
+ it('renders buttons when stopped', async () => {
+ (useConnector as jest.Mock).mockImplementation(() => ({
+ data: setConnectorStatus(connector, ConnectorState.STOPPED),
+ }));
+ renderComponent();
+ await afterClickRestartButton();
+ expect(screen.getAllByRole('menuitem').length).toEqual(4);
+ expect(screen.getByText('Resume')).toBeInTheDocument();
+ expect(screen.queryByText('Pause')).not.toBeInTheDocument();
+ expect(screen.queryByText('Stop')).not.toBeInTheDocument();
+ await afterClickDropDownButton();
+ expect(screen.getByText('Reset Offsets')).toBeInTheDocument();
+ expect(
+ screen.getByRole('menuitem', { name: 'Reset Offsets' })
+ ).not.toHaveAttribute('aria-disabled');
expectActionButtonsExists();
});
@@ -94,6 +122,12 @@ describe('Actions', () => {
expect(screen.getAllByRole('menuitem').length).toEqual(3);
expect(screen.queryByText('Resume')).not.toBeInTheDocument();
expect(screen.queryByText('Pause')).not.toBeInTheDocument();
+ expect(screen.queryByText('Stop')).not.toBeInTheDocument();
+ await afterClickDropDownButton();
+ expect(screen.getByText('Reset Offsets')).toBeInTheDocument();
+ expect(
+ screen.getByRole('menuitem', { name: 'Reset Offsets' })
+ ).toHaveAttribute('aria-disabled');
expectActionButtonsExists();
});
@@ -106,6 +140,12 @@ describe('Actions', () => {
expect(screen.getAllByRole('menuitem').length).toEqual(3);
expect(screen.queryByText('Resume')).not.toBeInTheDocument();
expect(screen.queryByText('Pause')).not.toBeInTheDocument();
+ expect(screen.queryByText('Stop')).not.toBeInTheDocument();
+ await afterClickDropDownButton();
+ expect(screen.getByText('Reset Offsets')).toBeInTheDocument();
+ expect(
+ screen.getByRole('menuitem', { name: 'Reset Offsets' })
+ ).toHaveAttribute('aria-disabled');
expectActionButtonsExists();
});
@@ -115,9 +155,15 @@ describe('Actions', () => {
}));
renderComponent();
await afterClickRestartButton();
- expect(screen.getAllByRole('menuitem').length).toEqual(4);
+ expect(screen.getAllByRole('menuitem').length).toEqual(5);
expect(screen.queryByText('Resume')).not.toBeInTheDocument();
expect(screen.getByText('Pause')).toBeInTheDocument();
+ expect(screen.getByText('Stop')).toBeInTheDocument();
+ await afterClickDropDownButton();
+ expect(screen.getByText('Reset Offsets')).toBeInTheDocument();
+ expect(
+ screen.getByRole('menuitem', { name: 'Reset Offsets' })
+ ).toHaveAttribute('aria-disabled');
expectActionButtonsExists();
});
@@ -137,6 +183,20 @@ describe('Actions', () => {
expect(screen.getByRole('dialog')).toBeInTheDocument();
});
+ it('opens confirmation modal when reset offsets button clicked on a STOPPED connector', async () => {
+ (useConnector as jest.Mock).mockImplementation(() => ({
+ data: setConnectorStatus(connector, ConnectorState.STOPPED),
+ }));
+ renderComponent();
+ await afterClickDropDownButton();
+ await waitFor(async () =>
+ userEvent.click(
+ screen.getByRole('menuitem', { name: 'Reset Offsets' })
+ )
+ );
+ expect(screen.getByRole('dialog')).toBeInTheDocument();
+ });
+
it('calls restartConnector when restart button clicked', async () => {
const restartConnector = jest.fn();
(useUpdateConnectorState as jest.Mock).mockImplementation(() => ({
@@ -191,7 +251,18 @@ describe('Actions', () => {
expect(pauseConnector).toHaveBeenCalledWith(ConnectorAction.PAUSE);
});
- it('calls resumeConnector when resume button clicked', async () => {
+ it('calls stopConnector when stop button clicked', async () => {
+ const stopConnector = jest.fn();
+ (useUpdateConnectorState as jest.Mock).mockImplementation(() => ({
+ mutateAsync: stopConnector,
+ }));
+ renderComponent();
+ await afterClickRestartButton();
+ await userEvent.click(screen.getByRole('menuitem', { name: 'Stop' }));
+ expect(stopConnector).toHaveBeenCalledWith(ConnectorAction.STOP);
+ });
+
+ it('calls resumeConnector when resume button clicked from PAUSED state', async () => {
const resumeConnector = jest.fn();
(useConnector as jest.Mock).mockImplementation(() => ({
data: setConnectorStatus(connector, ConnectorState.PAUSED),
@@ -204,6 +275,20 @@ describe('Actions', () => {
await userEvent.click(screen.getByRole('menuitem', { name: 'Resume' }));
expect(resumeConnector).toHaveBeenCalledWith(ConnectorAction.RESUME);
});
+
+ it('calls resumeConnector when resume button clicked from STOPPED state', async () => {
+ const resumeConnector = jest.fn();
+ (useConnector as jest.Mock).mockImplementation(() => ({
+ data: setConnectorStatus(connector, ConnectorState.STOPPED),
+ }));
+ (useUpdateConnectorState as jest.Mock).mockImplementation(() => ({
+ mutateAsync: resumeConnector,
+ }));
+ renderComponent();
+ await afterClickRestartButton();
+ await userEvent.click(screen.getByRole('menuitem', { name: 'Resume' }));
+ expect(resumeConnector).toHaveBeenCalledWith(ConnectorAction.RESUME);
+ });
});
});
});
diff --git a/frontend/src/components/Connect/List/ActionsCell.tsx b/frontend/src/components/Connect/List/ActionsCell.tsx
index 9b219f20e..4ad50c00e 100644
--- a/frontend/src/components/Connect/List/ActionsCell.tsx
+++ b/frontend/src/components/Connect/List/ActionsCell.tsx
@@ -9,9 +9,10 @@ import {
import { CellContext } from '@tanstack/react-table';
import { ClusterNameRoute } from 'lib/paths';
import useAppParams from 'lib/hooks/useAppParams';
-import { Dropdown, DropdownItem } from 'components/common/Dropdown';
+import { Dropdown } from 'components/common/Dropdown';
import {
useDeleteConnector,
+ useResetConnectorOffsets,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
import { useConfirm } from 'lib/hooks/useConfirm';
@@ -36,10 +37,15 @@ const ActionsCell: React.FC> = ({
connectName: connect,
connectorName: name,
});
+ const resetConnectorOffsetsMutation = useResetConnectorOffsets({
+ clusterName,
+ connectName: connect,
+ connectorName: name,
+ });
const handleDelete = () => {
confirm(
<>
- Are you sure want to remove {name} connector?
+ Are you sure you want to remove the {name} connector?
>,
async () => {
await deleteMutation.mutateAsync();
@@ -58,9 +64,25 @@ const ActionsCell: React.FC> = ({
const restartFailedTasksHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART_FAILED_TASKS);
+ const pauseConnectorHandler = () =>
+ stateMutation.mutateAsync(ConnectorAction.PAUSE);
+
+ const stopConnectorHandler = () =>
+ stateMutation.mutateAsync(ConnectorAction.STOP);
+
+ const resetOffsetsHandler = () => {
+ confirm(
+ <>
+ Are you sure you want to reset the {name} connector offsets?
+ >,
+ () => resetConnectorOffsetsMutation.mutateAsync()
+ );
+ };
+
return (
- {status.state === ConnectorState.PAUSED && (
+ {(status.state === ConnectorState.PAUSED ||
+ status.state === ConnectorState.STOPPED) && (
> = ({
Resume
)}
+ {status.state === ConnectorState.RUNNING && (
+
+ Pause
+
+ )}
+ {status.state === ConnectorState.RUNNING && (
+
+ Stop
+
+ )}
> = ({
>
Restart Failed Tasks
-
- Remove Connector
-
+
+ Reset Offsets
+
+
+ Delete
+
);
};
diff --git a/frontend/src/components/Connect/List/__tests__/List.spec.tsx b/frontend/src/components/Connect/List/__tests__/List.spec.tsx
index 82b4aab21..194d97246 100644
--- a/frontend/src/components/Connect/List/__tests__/List.spec.tsx
+++ b/frontend/src/components/Connect/List/__tests__/List.spec.tsx
@@ -12,11 +12,13 @@ import { clusterConnectConnectorPath, clusterConnectorsPath } from 'lib/paths';
import {
useConnectors,
useDeleteConnector,
+ useResetConnectorOffsets,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
const mockedUsedNavigate = jest.fn();
const mockDelete = jest.fn();
+const mockResetOffsets = jest.fn();
jest.mock('react-router-dom', () => ({
...jest.requireActual('react-router-dom'),
@@ -27,6 +29,7 @@ jest.mock('lib/hooks/api/kafkaConnect', () => ({
useConnectors: jest.fn(),
useDeleteConnector: jest.fn(),
useUpdateConnectorState: jest.fn(),
+ useResetConnectorOffsets: jest.fn(),
}));
const clusterName = 'local';
@@ -56,7 +59,7 @@ describe('Connectors List', () => {
it('renders', async () => {
renderComponent();
expect(screen.getByRole('table')).toBeInTheDocument();
- expect(screen.getAllByRole('row').length).toEqual(3);
+ expect(screen.getAllByRole('row').length).toEqual(4);
});
it('opens broker when row clicked', async () => {
@@ -94,7 +97,7 @@ describe('Connectors List', () => {
});
});
- describe('when remove connector modal is open', () => {
+ describe('when delete modal is open', () => {
beforeEach(() => {
(useConnectors as jest.Mock).mockImplementation(() => ({
data: connectors,
@@ -104,10 +107,10 @@ describe('Connectors List', () => {
}));
});
- it('calls removeConnector on confirm', async () => {
+ it('calls deleteConnector on confirm', async () => {
renderComponent();
- const removeButton = screen.getAllByText('Remove Connector')[0];
- await waitFor(() => userEvent.click(removeButton));
+ const deleteButton = screen.getAllByText('Delete')[0];
+ await waitFor(() => userEvent.click(deleteButton));
const submitButton = screen.getAllByRole('button', {
name: 'Confirm',
@@ -118,8 +121,43 @@ describe('Connectors List', () => {
it('closes the modal when cancel button is clicked', async () => {
renderComponent();
- const removeButton = screen.getAllByText('Remove Connector')[0];
- await waitFor(() => userEvent.click(removeButton));
+ const deleteButton = screen.getAllByText('Delete')[0];
+ await waitFor(() => userEvent.click(deleteButton));
+
+ const cancelButton = screen.getAllByRole('button', {
+ name: 'Cancel',
+ })[0];
+ await waitFor(() => userEvent.click(cancelButton));
+ expect(cancelButton).not.toBeInTheDocument();
+ });
+ });
+
+ describe('when reset connector offsets modal is open', () => {
+ beforeEach(() => {
+ (useConnectors as jest.Mock).mockImplementation(() => ({
+ data: connectors,
+ }));
+ (useResetConnectorOffsets as jest.Mock).mockImplementation(() => ({
+ mutateAsync: mockResetOffsets,
+ }));
+ });
+
+ it('calls resetConnectorOffsets on confirm', async () => {
+ renderComponent();
+ const resetButton = screen.getAllByText('Reset Offsets')[2];
+ await waitFor(() => userEvent.click(resetButton));
+
+ const submitButton = screen.getAllByRole('button', {
+ name: 'Confirm',
+ })[0];
+ await userEvent.click(submitButton);
+ expect(mockResetOffsets).toHaveBeenCalledWith();
+ });
+
+ it('closes the modal when cancel button is clicked', async () => {
+ renderComponent();
+ const resetButton = screen.getAllByText('Reset Offsets')[2];
+ await waitFor(() => userEvent.click(resetButton));
const cancelButton = screen.getAllByRole('button', {
name: 'Cancel',
diff --git a/frontend/src/lib/fixtures/kafkaConnect.ts b/frontend/src/lib/fixtures/kafkaConnect.ts
index 8a79760e6..4185695ee 100644
--- a/frontend/src/lib/fixtures/kafkaConnect.ts
+++ b/frontend/src/lib/fixtures/kafkaConnect.ts
@@ -38,6 +38,18 @@ export const connectors: FullConnectorInfo[] = [
tasksCount: 3,
failedTasksCount: 1,
},
+ {
+ connect: 'third',
+ name: 'hdfs3-source-connector',
+ connectorClass: 'FileStreamSource',
+ type: ConnectorType.SINK,
+ topics: ['test-topic'],
+ status: {
+ state: ConnectorState.STOPPED,
+ },
+ tasksCount: 0,
+ failedTasksCount: 0,
+ },
];
export const connector: Connector = {
diff --git a/frontend/src/lib/hooks/api/kafkaConnect.ts b/frontend/src/lib/hooks/api/kafkaConnect.ts
index 225e72165..743d78307 100644
--- a/frontend/src/lib/hooks/api/kafkaConnect.ts
+++ b/frontend/src/lib/hooks/api/kafkaConnect.ts
@@ -98,7 +98,10 @@ export function useUpdateConnectorState(props: UseConnectorProps) {
(action: ConnectorAction) => api.updateConnectorState({ ...props, action }),
{
onSuccess: () =>
- client.invalidateQueries(['clusters', props.clusterName, 'connectors']),
+ Promise.all([
+ client.invalidateQueries(connectorsKey(props.clusterName)),
+ client.invalidateQueries(connectorKey(props)),
+ ]),
}
);
}
@@ -161,3 +164,11 @@ export function useDeleteConnector(props: UseConnectorProps) {
onSuccess: () => client.invalidateQueries(connectorsKey(props.clusterName)),
});
}
+
+export function useResetConnectorOffsets(props: UseConnectorProps) {
+ const client = useQueryClient();
+
+ return useMutation(() => api.resetConnectorOffsets(props), {
+ onSuccess: () => client.invalidateQueries(connectorKey(props)),
+ });
+}
diff --git a/pom.xml b/pom.xml
index 64b10b982..5fe46b1fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,10 +73,6 @@
-
- confluent
- https://packages.confluent.io/maven/
-
central
Central Repository
@@ -86,13 +82,13 @@
false
+
+ confluent
+ https://packages.confluent.io/maven/
+
-
- confluent
- https://packages.confluent.io/maven/
-
central
Central Repository
@@ -105,6 +101,10 @@
never
+
+ confluent
+ https://packages.confluent.io/maven/
+