From d4f9e3eabd08b14819d1f5290bbeaa81137a6c0b Mon Sep 17 00:00:00 2001 From: Philipp Dallig Date: Wed, 28 Aug 2024 07:22:58 +0200 Subject: [PATCH] [ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804) * Simple changes * Rename SchedulerThreadFactory to NamedThreadFactory * some small change in job - SimpleDateFormat is not threadsafe, and there is no need to keep the object in memmory * Remove change in InterpreterContext --- .../interpreter/InterpreterContext.java | 2 +- .../remote/RemoteInterpreterServer.java | 4 +-- .../zeppelin/scheduler/AbstractScheduler.java | 3 +- .../zeppelin/scheduler/ExecutorFactory.java | 34 +++++++----------- .../zeppelin/scheduler/FIFOScheduler.java | 2 +- .../org/apache/zeppelin/scheduler/Job.java | 9 ++--- ...adFactory.java => NamedThreadFactory.java} | 15 ++++---- .../zeppelin/scheduler/ParallelScheduler.java | 6 +--- .../zeppelin/scheduler/SchedulerFactory.java | 35 +++++++++---------- .../interpreter/ManagedInterpreterGroup.java | 2 +- .../zeppelin/interpreter/YarnAppMonitor.java | 4 +-- .../notebook/NoteEventAsyncListener.java | 4 +-- .../apache/zeppelin/notebook/Notebook.java | 4 +-- .../zeppelin/scheduler/RemoteScheduler.java | 2 +- 14 files changed, 56 insertions(+), 70 deletions(-) rename zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/{SchedulerThreadFactory.java => NamedThreadFactory.java} (74%) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 89b1e542af5..347e5bc57e9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -295,7 +295,7 @@ public void setProgress(int n) { if (progressMap != null) { n = Math.max(n, 0); n = Math.min(n, 100); - progressMap.put(paragraphId, new Integer(n)); + progressMap.put(paragraphId, Integer.valueOf(n)); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index f84aceb9bcb..6ef35eec74a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -941,7 +941,7 @@ public void cancel(String sessionId, LOGGER.info("cancel {} {}", className, interpreterContext.getParagraphId()); Interpreter intp = getInterpreter(sessionId, className); String jobId = interpreterContext.getParagraphId(); - Job job = intp.getScheduler().getJob(jobId); + Job job = intp.getScheduler().getJob(jobId); if (job != null && job.getStatus() == Status.PENDING) { job.setStatus(Status.ABORT); @@ -1105,7 +1105,7 @@ public String getStatus(String sessionId, String jobId) for (Interpreter intp : interpreters) { Scheduler scheduler = intp.getScheduler(); if (scheduler != null) { - Job job = scheduler.getJob(jobId); + Job job = scheduler.getJob(jobId); if (job != null) { return job.getStatus().name(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index ad21c3c63f3..7e99095b7ff 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -39,10 +39,11 @@ public abstract class AbstractScheduler implements Scheduler { protected final String name; protected volatile boolean terminate = false; protected BlockingQueue> queue = new LinkedBlockingQueue<>(); + // JobId -> Job protected Map> jobs = new ConcurrentHashMap<>(); private Thread schedulerThread; - public AbstractScheduler(String name) { + protected AbstractScheduler(String name) { this.name = name; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java index b20ccc75c4f..00e1976bec4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java @@ -47,25 +47,19 @@ public static ExecutorFactory singleton() { return InstanceHolder.INSTANCE; } - public ExecutorService createOrGet(String name, int numThread) { + public ExecutorService createOrGet(final String name, final int numThread) { synchronized (executors) { - if (!executors.containsKey(name)) { - executors.put(name, Executors.newScheduledThreadPool( - numThread, - new SchedulerThreadFactory(name))); - } - return executors.get(name); + return executors.computeIfAbsent(name, k -> (Executors.newScheduledThreadPool( + numThread, + new NamedThreadFactory(k)))); } } - public ScheduledExecutorService createOrGetScheduled(String name, int numThread) { + public ScheduledExecutorService createOrGetScheduled(final String name, final int numThread) { synchronized (scheduledExecutors) { - if (!scheduledExecutors.containsKey(name)) { - scheduledExecutors.put(name, Executors.newScheduledThreadPool( - numThread, - new SchedulerThreadFactory(name))); - } - return scheduledExecutors.get(name); + return scheduledExecutors.computeIfAbsent(name, k -> (Executors.newScheduledThreadPool( + numThread, + new NamedThreadFactory(k)))); } } @@ -75,22 +69,20 @@ public ScheduledExecutorService createOrGetScheduled(String name, int numThread) * @return */ public ExecutorService getNoteJobExecutor() { - return createOrGet("NoteJobThread-", 50); + return createOrGet("NoteJobThread", 50); } public void shutdown(String name) { synchronized (executors) { - if (executors.containsKey(name)) { - ExecutorService e = executors.get(name); + ExecutorService e = executors.remove(name); + if (e != null) { ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES); - executors.remove(name); } } synchronized (scheduledExecutors) { - if (scheduledExecutors.containsKey(name)) { - ExecutorService e = scheduledExecutors.get(name); + ExecutorService e = scheduledExecutors.remove(name); + if (e != null) { ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES); - scheduledExecutors.remove(name); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index 05519c3a492..f6c5dd5a293 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -33,7 +33,7 @@ public class FIFOScheduler extends AbstractScheduler { FIFOScheduler(String name) { super(name); this.executor = Executors.newSingleThreadExecutor( - new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-")); + new NamedThreadFactory("FIFOScheduler-" + name + "-Worker")); } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 44014ad8c3f..b0ed600f45a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -38,8 +38,9 @@ * Changing/adding/deleting non transitive field name need consideration of that. */ public abstract class Job { + private static final Logger LOGGER = LoggerFactory.getLogger(Job.class); - private static SimpleDateFormat JOB_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd-HHmmss"); + private static final String DATE_FORMAT = "yyyyMMdd-HHmmss"; /** * Job status. @@ -93,15 +94,15 @@ public boolean isFailed() { private transient volatile Throwable exception; private transient JobListener listener; - public Job(String jobName, JobListener listener) { + protected Job(String jobName, JobListener listener) { this.jobName = jobName; this.listener = listener; dateCreated = new Date(); - id = JOB_DATE_FORMAT.format(dateCreated) + "_" + jobName; + id = new SimpleDateFormat(DATE_FORMAT).format(dateCreated) + "_" + jobName; setStatus(Status.READY); } - public Job(String jobId, String jobName, JobListener listener) { + protected Job(String jobId, String jobName, JobListener listener) { this.jobName = jobName; this.listener = listener; dateCreated = new Date(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java similarity index 74% rename from zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java index fe0711e6850..cd53160a7ba 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java @@ -21,19 +21,18 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; -public class SchedulerThreadFactory implements ThreadFactory { +public class NamedThreadFactory implements ThreadFactory { - private String namePrefix; - private AtomicLong count = new AtomicLong(1); + private final String name; + private final AtomicLong count = new AtomicLong(1); - public SchedulerThreadFactory(String namePrefix) { - this.namePrefix = namePrefix; + public NamedThreadFactory(String name) { + this.name = name; } @Override public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setName(namePrefix + count.getAndIncrement()); - return thread; + return new Thread(r, name + "-" + count.getAndIncrement()); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java index a58e349f90e..9f0c27ccccb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -22,22 +22,18 @@ import java.util.concurrent.TimeUnit; import org.apache.zeppelin.util.ExecutorUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Parallel scheduler runs submitted job concurrently. */ public class ParallelScheduler extends AbstractScheduler { - private static final Logger LOGGER = LoggerFactory.getLogger(ParallelScheduler.class); - private ExecutorService executor; ParallelScheduler(String name, int maxConcurrency) { super(name); this.executor = Executors.newFixedThreadPool(maxConcurrency, - new SchedulerThreadFactory("ParallelScheduler-Worker-")); + new NamedThreadFactory("ParallelScheduler-Worker")); } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 34609b49169..e3213467fc9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -69,38 +69,35 @@ public void destroy() { ExecutorUtil.softShutdown("SchedulerFactoryExecutor", executor, 60, TimeUnit.SECONDS); } - public Scheduler createOrGetFIFOScheduler(String name) { + public Scheduler createOrGetFIFOScheduler(final String name) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { - LOGGER.info("Create FIFOScheduler: {}", name); - FIFOScheduler s = new FIFOScheduler(name); - schedulers.put(name, s); + return schedulers.computeIfAbsent(name, k -> { + LOGGER.info("Create FIFOScheduler: {}", k); + FIFOScheduler s = new FIFOScheduler(k); executor.execute(s); - } - return schedulers.get(name); + return s; + }); } } - public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { + public Scheduler createOrGetParallelScheduler(final String name, final int maxConcurrency) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { - LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", name, maxConcurrency); - ParallelScheduler s = new ParallelScheduler(name, maxConcurrency); - schedulers.put(name, s); + return schedulers.computeIfAbsent(name, k -> { + LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", k, maxConcurrency); + ParallelScheduler s = new ParallelScheduler(k, maxConcurrency); executor.execute(s); - } - return schedulers.get(name); + return s; + }); } } - public Scheduler createOrGetScheduler(Scheduler scheduler) { + public Scheduler createOrGetScheduler(final Scheduler scheduler) { synchronized (schedulers) { - if (!schedulers.containsKey(scheduler.getName())) { - schedulers.put(scheduler.getName(), scheduler); + return schedulers.computeIfAbsent(scheduler.getName(), k -> { executor.execute(scheduler); - } - return schedulers.get(scheduler.getName()); + return scheduler; + }); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index c1b5076167b..8f2c16c0743 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -150,7 +150,7 @@ private void closeInterpreter(Interpreter interpreter) { try { if (Boolean.parseBoolean( interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) { - for (final Job job : scheduler.getAllJobs()) { + for (final Job job : scheduler.getAllJobs()) { if (!job.isTerminated()) { job.abort(); job.setStatus(Job.Status.ABORT); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java index 56dc7ea912a..48956b337be 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java @@ -25,7 +25,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; -import org.apache.zeppelin.scheduler.SchedulerThreadFactory; +import org.apache.zeppelin.scheduler.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +65,7 @@ private YarnAppMonitor() { yarnConf.set("yarn.timeline-service.enabled", "false"); yarnClient.init(yarnConf); yarnClient.start(); - this.executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory("YarnAppsMonitor-Thread")); + this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("YarnAppsMonitor-Thread")); this.apps = new ConcurrentHashMap<>(); this.executor.scheduleAtFixedRate(() -> { try { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java index dabc5dde409..a565b02ec33 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java @@ -18,7 +18,7 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.SchedulerThreadFactory; +import org.apache.zeppelin.scheduler.NamedThreadFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; @@ -43,7 +43,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener, Close protected NoteEventAsyncListener(String name) { this.name = name; executor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(), new SchedulerThreadFactory(name)); + new LinkedBlockingQueue<>(), new NamedThreadFactory(name)); } public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 33d468931b1..1f98c716f19 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -57,7 +57,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.SchedulerThreadFactory; +import org.apache.zeppelin.scheduler.NamedThreadFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; import org.apache.zeppelin.util.ExecutorUtil; @@ -138,7 +138,7 @@ public void addInitConsumer(Consumer initConsumer) { public void initNotebook() { if (initExecutor == null || initExecutor.isShutdown() || initExecutor.isTerminated()) { initExecutor = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(), new SchedulerThreadFactory("NotebookInit")); + new LinkedBlockingQueue<>(), new NamedThreadFactory("NotebookInit")); } for (NoteInfo noteInfo : getNotesInfo()) { initExecutor.execute(() -> { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 6836eb33d1e..e5807877f92 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -43,7 +43,7 @@ public RemoteScheduler(String name, RemoteInterpreter remoteInterpreter) { super(name); this.executor = - Executors.newSingleThreadExecutor(new SchedulerThreadFactory("FIFO-" + name + "-")); + Executors.newSingleThreadExecutor(new NamedThreadFactory("FIFO-" + name)); this.remoteInterpreter = remoteInterpreter; }