Skip to content

Commit

Permalink
增加nop_sys_lock和nop_sys_cluster_leader表
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Jul 19, 2023
1 parent d3c05a9 commit ec3197e
Show file tree
Hide file tree
Showing 78 changed files with 3,236 additions and 98 deletions.
4 changes: 4 additions & 0 deletions nop-api-core/src/main/java/io/nop/api/core/ApiConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public interface ApiConfigs {
IConfigReference<Boolean> CFG_DEBUG =
varRef("nop.debug", Boolean.class, false);

@Description("每次应用启动都会被分配唯一ID")
IConfigReference<String> CFG_HOST_ID =
varRef("nop.server.host-id", String.class, null);

@Description("每一个启动的Nop应用程序应该指定一个唯一名称用于区分")
IConfigReference<String> CFG_APPLICATION_NAME =
varRef("nop.application.name", String.class, "nop-app");
Expand Down
17 changes: 17 additions & 0 deletions nop-api-core/src/main/java/io/nop/api/core/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
package io.nop.api.core.config;

import io.nop.api.core.annotations.core.GlobalInstance;
import io.nop.api.core.util.ApiStringHelper;

import java.util.UUID;

import static io.nop.api.core.ApiConfigs.CFG_APPLICATION_LOCALE;
import static io.nop.api.core.ApiConfigs.CFG_APPLICATION_NAME;
import static io.nop.api.core.ApiConfigs.CFG_APPLICATION_TIMEZONE;
import static io.nop.api.core.ApiConfigs.CFG_APPLICATION_VERSION;
import static io.nop.api.core.ApiConfigs.CFG_DEBUG;
import static io.nop.api.core.ApiConfigs.CFG_HOST_ID;
import static io.nop.api.core.ApiConfigs.CFG_PROFILE;

@SuppressWarnings("PMD.TooManyStaticImports")
Expand All @@ -37,6 +41,19 @@ public static String appVersion() {
return CFG_APPLICATION_VERSION.get();
}

public static String hostId() {
String hostId = CFG_HOST_ID.get();
if (ApiStringHelper.isEmpty(hostId)) {
synchronized (CFG_HOST_ID) {
hostId = CFG_HOST_ID.get();
if (ApiStringHelper.isEmpty(hostId)) {
hostId = UUID.randomUUID().toString();
s_provider.updateConfigValue(CFG_HOST_ID, hostId);
}
}
}
return hostId;
}

public static String appLocale() {
return CFG_APPLICATION_LOCALE.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,8 @@ public static <T> void bindResult(CompletionStage<T> promise, CompletableFuture<
*/
public static void bindCancel(CompletionStage<?> promise, Future<?> future) {
promise.whenComplete((value, ex) -> {
if (isCancellationException(ex)) {
if (!future.isDone())
future.cancel(true);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package io.nop.cluster.elector;

import io.nop.api.core.annotations.ioc.InjectValue;
import io.nop.api.core.config.AppConfig;
import io.nop.commons.concurrent.executor.GlobalExecutors;
import io.nop.commons.concurrent.executor.IScheduledExecutor;
import io.nop.commons.io.net.IServerAddrFinder;
import io.nop.commons.util.NetHelper;
import io.nop.commons.util.StringHelper;

import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

public abstract class AbstractLeaderElector implements ILeaderElector {

private String addr;
private int port;
private IServerAddrFinder serverAddrFinder;

private volatile LeaderEpoch leaderEpoch;
protected volatile CompletableFuture<LeaderEpoch> electionPromise = new CompletableFuture<>();

protected IScheduledExecutor scheduledExecutor;

private String clusterId;

private int leaseMs = 10000;
private int checkIntervalMs = 2000;

private final CopyOnWriteArrayList<ILeaderElectionListener> listeners = new CopyOnWriteArrayList<>();

private volatile boolean active;

@InjectValue("@cfg:nop.leader.lease-time-ms|10000")
public void setLeaseMs(int leaseMs) {
this.leaseMs = leaseMs;
}

@InjectValue("@cfg:nop.leader.check-interval-ms|2000")
public void setCheckIntervalMs(int checkIntervalMs) {
this.checkIntervalMs = checkIntervalMs;
}

public int getCheckIntervalMs() {
return checkIntervalMs;
}

@InjectValue("@cfg:nop.cluster.id,nop.application.name")
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

public String getClusterId() {
return clusterId;
}

public int getLeaseMs() {
return leaseMs;
}

public int getPort() {
return port;
}

public String getAddr() {
return addr;
}

@InjectValue("@cfg:nop.server.addr|")
public void setAddr(String addr) {
this.addr = addr;
}

@InjectValue("@cfg:nop.server.port|0")
public void setPort(int port) {
this.port = port;
}

public IServerAddrFinder getServerAddrFinder() {
return serverAddrFinder;
}

@Nullable
@Inject
public void setServerAddrFinder(IServerAddrFinder serverAddrFinder) {
this.serverAddrFinder = serverAddrFinder;
}

public void setScheduledExecutor(IScheduledExecutor scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
}

@PostConstruct
public void init() {
active = true;
if (StringHelper.isEmpty(addr)) {
if (serverAddrFinder != null) {
addr = serverAddrFinder.findAddr();
} else {
addr = NetHelper.findLocalIp();
}
}

if (scheduledExecutor == null) {
scheduledExecutor = GlobalExecutors.globalTimer().executeOn(GlobalExecutors.globalWorker());
}

scheduledExecutor.schedule(this::checkLeader, 0, TimeUnit.MILLISECONDS);
}

public boolean isActive() {
return active;
}

@PreDestroy
public void destroy() {
active = false;
electionPromise.cancel(false);
}

protected String getLeaderAddr() {
if (port > 0)
return addr + ':' + port;
return addr;
}

@Override
public String getHostId() {
return AppConfig.hostId();
}

@Override
public LeaderEpoch getLeaderEpoch() {
return leaderEpoch;
}

@Override
public AutoCloseable addElectionListener(ILeaderElectionListener listener) {
listeners.add(listener);
return () -> {
listeners.remove(listener);
};
}

@Override
public boolean isLeader() {
LeaderEpoch leaderEpoch = this.leaderEpoch;
return leaderEpoch == null ? false : leaderEpoch.getLeaderId().equals(getHostId());
}

@Override
public CompletionStage<LeaderEpoch> whenElectionCompleted() {
return electionPromise;
}

protected void newElection() {
this.electionPromise = new CompletableFuture<>();
}

protected void updateLeader(LeaderEpoch leader) {
LeaderEpoch current = this.leaderEpoch;
if (current == null) {

}
}

protected abstract Void checkLeader();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public void refreshConfig() {
timer.refreshConfig();
}

@Override
public IScheduledExecutor executeOn(Executor executor) {
if (this.executor == executor)
return this;

return new BindScheduledExecutor(timer, executor);
}

@Override
public <V> CompletableFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
CompletableFuture<V> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.nop.api.core.util.FutureHelper;
import io.nop.commons.concurrent.thread.NamedThreadFactory;
import io.nop.commons.metrics.GlobalMeterRegistry;
import io.nop.commons.util.MathHelper;
import io.nop.commons.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class ExecutorHelper {
static Executor SYNC_EXECUTOR = task -> task.run();
Expand Down Expand Up @@ -193,4 +195,34 @@ public static void registerMetrics(ExecutorService executor, ThreadPoolConfig co
}
}
}

public static <T> CompletableFuture<T> scheduleWithRandomDelay(
IScheduledExecutor executor, Runnable task, long initialDelay,
long minDelay, long maxDelay,
TimeUnit timeUnit) {
CompletableFuture<T> future = new CompletableFuture<>();

AtomicReference<Future<?>> ref = new AtomicReference<>();
Callable<Void> command = new Callable<>() {
@Override
public Void call() {
try {
task.run();
ref.set(executor.schedule(this, MathHelper.random().nextLong(minDelay, maxDelay), timeUnit));
} catch (Throwable e) {
future.completeExceptionally(e);
}
return null;
}
};

Future<?> f = executor.schedule(command, initialDelay, timeUnit);
ref.set(f);

future.exceptionally(ex -> {
ref.get().cancel(false);
return null;
});
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,30 @@

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public interface IScheduledExecutor extends IThreadPoolExecutor {

/**
* 返回一个包装后的executor,它再调度任务时投递到Executor上执行
*
* @param executor 任务执行器
* @return 一个包装后的Executor
*/
default IScheduledExecutor executeOn(Executor executor) {
return new BindScheduledExecutor(this, executor);
}

<V> CompletableFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

Future<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

Future<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

default Future<?> scheduleWithRandomDelay(Runnable command, long initialDelay,
long minDelay, long maxDelay, TimeUnit unit) {
return ExecutorHelper.scheduleWithRandomDelay(this, command, initialDelay, minDelay, maxDelay, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.nop.api.core.time.CoreMetrics;
import io.nop.api.core.util.Guard;
import io.nop.commons.concurrent.executor.GlobalExecutors;
import io.nop.commons.concurrent.executor.IScheduledExecutor;
import io.nop.commons.concurrent.lock.IResourceLock;
import io.nop.commons.concurrent.lock.IResourceLockManager;
Expand All @@ -19,7 +20,6 @@
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.sql.Timestamp;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -39,7 +39,6 @@ public class LocalResourceLockManager implements IResourceLockManager, IResource

private volatile long lastCleanUpTime;

@Resource(name = "defaultTimer")
private IScheduledExecutor timer;

Future<?> schedulePromise;
Expand Down Expand Up @@ -100,6 +99,8 @@ LocalResourceLockState newLock(String resourceId, String lockerId) {
@PostConstruct
public void init() {
Guard.checkArgument(schedulePromise == null, "nop.lock.lock-manager-already-inited");
if (timer == null)
timer = GlobalExecutors.globalTimer();
schedulePromise = timer.scheduleWithFixedDelay(this::checkTimeout, expireInterval, expireInterval,
TimeUnit.MILLISECONDS);
}
Expand Down
14 changes: 14 additions & 0 deletions nop-dao/src/main/java/io/nop/dao/api/IEntityDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.nop.api.core.beans.PageBean;
import io.nop.api.core.beans.query.OrderFieldBean;
import io.nop.api.core.beans.query.QueryBean;
import io.nop.api.core.time.IEstimatedClock;
import io.nop.commons.collections.iterator.FindNextPageIterator;
import io.nop.commons.collections.iterator.SelectNextIterator;
import io.nop.commons.util.StringHelper;
Expand Down Expand Up @@ -87,6 +88,17 @@ public interface IEntityDao<T extends IDaoEntity> {

void updateEntity(T entity);

/**
* 立刻保存到数据库中,不存入session
*
* @param entity
*/
void saveEntityDirectly(T entity);

void updateEntityDirectly(T entity);

void deleteEntityDirectly(T entity);

void saveOrUpdateEntity(T entity);

void deleteEntity(T entity);
Expand Down Expand Up @@ -271,4 +283,6 @@ default void forEachEntity(QueryBean query, Consumer<T> consumer) {
* 清空本实体类对应的二级缓存
*/
void clearEntityGlobalCache();

IEstimatedClock getDbEstimatedClock();
}
Loading

0 comments on commit ec3197e

Please sign in to comment.