Skip to content

Commit

Permalink
Executor ID to provide more unique executor identifier
Browse files Browse the repository at this point in the history
with initial implementation for AWS ECS
  • Loading branch information
musketyr committed Nov 27, 2024
1 parent d9c3c0a commit 584da90
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import com.agorapulse.worker.executor.ExecutorId;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
Expand All @@ -31,7 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -71,13 +71,13 @@ public class RedisJobExecutor implements DistributedJobExecutor {
private static final String DECREASE_JOB_COUNT = "return redis.call('decr', KEYS[1])";

private final StatefulRedisConnection<String, String> connection;
private final String hostname;
private final ExecutorId executorId;
private final BeanContext beanContext;
private final JobManager jobManager;

public RedisJobExecutor(StatefulRedisConnection<String, String> connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname, BeanContext beanContext, JobManager jobManager) {
public RedisJobExecutor(StatefulRedisConnection<String, String> connection, ExecutorId executorId, BeanContext beanContext, JobManager jobManager) {
this.connection = connection;
this.hostname = hostname;
this.executorId = executorId;
this.beanContext = beanContext;
this.jobManager = jobManager;
}
Expand All @@ -87,7 +87,7 @@ public <R> Publisher<R> executeOnlyOnLeader(String jobName, Callable<R> supplier
RedisAsyncCommands<String, String> commands = connection.async();

return readMasterHostname(jobName, commands).flatMap(h -> {
if (hostname.equals(h)) {
if (executorId.id().equals(h)) {
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
}
return Mono.empty();
Expand All @@ -114,7 +114,7 @@ public <R> Publisher<R> executeConcurrently(String jobName, int maxConcurrency,
public <R> Publisher<R> executeOnlyOnFollower(String jobName, Callable<R> supplier) {
RedisAsyncCommands<String, String> commands = connection.async();
return readMasterHostname(jobName, commands).flatMap(h -> {
if (!"".equals(h) && h.equals(hostname)) {
if (!"".equals(h) && h.equals(executorId.id())) {
return Mono.empty();
}
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName)));
Expand Down Expand Up @@ -142,7 +142,7 @@ private Mono<Object> readMasterHostname(String jobName, RedisAsyncCommands<Strin
return Mono.fromFuture(commands.eval(
LEADER_CHECK,
ScriptOutputType.VALUE,
PREFIX_LEADER + jobName, hostname, String.valueOf(LEADER_INACTIVITY_TIMEOUT)
PREFIX_LEADER + jobName, executorId.id(), String.valueOf(LEADER_INACTIVITY_TIMEOUT)
).toCompletableFuture()).defaultIfEmpty("");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.agorapulse.worker.executor;

/**
* Unique identifier of the executor.
* @param id unique identifier of the executor
*/
public record ExecutorId(String id) { }
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.worker.redis;
package com.agorapulse.worker.executor;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;

import jakarta.inject.Named;
import io.micronaut.context.annotation.Secondary;
import jakarta.inject.Singleton;

import java.net.InetAddress;
import java.net.UnknownHostException;

@Factory
public class HostnameFactory {
public class HostnameExecutorIdFactory {

@Bean
@Singleton
@Named(RedisJobExecutor.HOSTNAME_PARAMETER_NAME)
public String hostname() {
@Secondary
public ExecutorId hostname() {
try {
return InetAddress.getLocalHost().getHostName();
return new ExecutorId(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException ignored) {
return "localhost";
return new ExecutorId("localhost");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.agorapulse.worker.executor.ecs;

import com.agorapulse.worker.executor.ExecutorId;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;

@Factory
@Requires(property = "ecs.agent.uri")
public class EcsExecutorIdFactory {

@Bean
@Singleton
public ExecutorId ecsExecutorId(@Value("${ecs.agent.uri}") String ecsAgentUri) {
return new ExecutorId(extractId(ecsAgentUri));
}

private static String extractId(String ecsAgentUri) {
return ecsAgentUri.substring(ecsAgentUri.lastIndexOf('/') + 1, ecsAgentUri.lastIndexOf("-"));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.agorapulse.worker.executor.ecs

import com.agorapulse.worker.executor.ExecutorId
import spock.lang.Specification

class EcsExecutorIdFactorySpec extends Specification {

void 'extract id'() {
given:
String id = 'http://169.254.170.2/api/c613006db31d4ac192bbb07b1577c40e-1280352332'
expect:
new EcsExecutorIdFactory().ecsExecutorId(id) == new ExecutorId('c613006db31d4ac192bbb07b1577c40e')
}

}

0 comments on commit 584da90

Please sign in to comment.