Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor ID to provide more unique executor identifier #42

Merged
merged 4 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import com.agorapulse.worker.executor.ExecutorId;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
Expand All @@ -31,7 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -71,13 +71,13 @@ public class RedisJobExecutor implements DistributedJobExecutor {
private static final String DECREASE_JOB_COUNT = "return redis.call('decr', KEYS[1])";

private final StatefulRedisConnection<String, String> connection;
private final String hostname;
private final ExecutorId executorId;
private final BeanContext beanContext;
private final JobManager jobManager;

public RedisJobExecutor(StatefulRedisConnection<String, String> connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname, BeanContext beanContext, JobManager jobManager) {
public RedisJobExecutor(StatefulRedisConnection<String, String> connection, ExecutorId executorId, BeanContext beanContext, JobManager jobManager) {
this.connection = connection;
this.hostname = hostname;
this.executorId = executorId;
this.beanContext = beanContext;
this.jobManager = jobManager;
}
Expand All @@ -87,7 +87,7 @@ public <R> Publisher<R> executeOnlyOnLeader(String jobName, Callable<R> supplier
RedisAsyncCommands<String, String> commands = connection.async();

return readMasterHostname(jobName, commands).flatMap(h -> {
if (hostname.equals(h)) {
if (executorId.id().equals(h)) {
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
}
return Mono.empty();
Expand All @@ -114,7 +114,7 @@ public <R> Publisher<R> executeConcurrently(String jobName, int maxConcurrency,
public <R> Publisher<R> executeOnlyOnFollower(String jobName, Callable<R> supplier) {
RedisAsyncCommands<String, String> commands = connection.async();
return readMasterHostname(jobName, commands).flatMap(h -> {
if (!"".equals(h) && h.equals(hostname)) {
if (!"".equals(h) && h.equals(executorId.id())) {
return Mono.empty();
}
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
Expand Down Expand Up @@ -142,7 +142,7 @@ private Mono<Object> readMasterHostname(String jobName, RedisAsyncCommands<Strin
return Mono.fromFuture(commands.eval(
LEADER_CHECK,
ScriptOutputType.VALUE,
PREFIX_LEADER + jobName, hostname, String.valueOf(LEADER_INACTIVITY_TIMEOUT)
PREFIX_LEADER + jobName, executorId.id(), String.valueOf(LEADER_INACTIVITY_TIMEOUT)
).toCompletableFuture()).defaultIfEmpty("");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.executor;

/**
* Unique identifier of the executor.
* @param id unique identifier of the executor
*/
public record ExecutorId(String id) { }
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.redis;
package com.agorapulse.worker.executor;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;

import jakarta.inject.Named;
import io.micronaut.context.annotation.Secondary;
import jakarta.inject.Singleton;

import java.net.InetAddress;
import java.net.UnknownHostException;

@Factory
public class HostnameFactory {
public class HostnameExecutorIdFactory {

@Bean
@Singleton
@Named(RedisJobExecutor.HOSTNAME_PARAMETER_NAME)
public String hostname() {
@Secondary
public ExecutorId hostname() {
try {
return InetAddress.getLocalHost().getHostName();
return new ExecutorId(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException ignored) {
return "localhost";
return new ExecutorId("localhost");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.executor.ecs;

import com.agorapulse.worker.executor.ExecutorId;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;

@Factory
@Requires(property = "ecs.agent.uri")
public class EcsExecutorIdFactory {

@Bean
@Singleton
public ExecutorId ecsExecutorId(@Value("${ecs.agent.uri}") String ecsAgentUri) {
return new ExecutorId(extractId(ecsAgentUri));
}

private static String extractId(String ecsAgentUri) {
return ecsAgentUri.substring(ecsAgentUri.lastIndexOf('/') + 1, ecsAgentUri.lastIndexOf("-"));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.executor.ecs

import com.agorapulse.worker.executor.ExecutorId
import spock.lang.Specification

class EcsExecutorIdFactorySpec extends Specification {

void 'extract id'() {
given:
String id = 'http://169.254.170.2/api/c613006db31d4ac192bbb07b1577c40e-1280352332'
expect:
new EcsExecutorIdFactory().ecsExecutorId(id) == new ExecutorId('c613006db31d4ac192bbb07b1577c40e')
}

}
Loading