diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/controller/PeerTableImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/controller/PeerTableImpl.java index cb3b19c15..55368da86 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/controller/PeerTableImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/controller/PeerTableImpl.java @@ -55,8 +55,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadPoolExecutor; import org.jdiameter.api.Avp; import org.jdiameter.api.AvpDataException; @@ -340,12 +343,12 @@ public void stopped() { // boolean interrupted = false; long remWaitTime = 2000; logger.debug("Stopping thread group and waiting a max of {}ms for all threads to finish", remWaitTime); - while (concurrentFactory.getThreadGroup().activeCount() > 0 && remWaitTime > 0) { + while (((ThreadPoolExecutor) concurrentFactory.getThreadPool()).getActiveCount() > 0 && remWaitTime > 0) { long waitTime = 250; Thread.sleep(waitTime); remWaitTime -= waitTime; logger.debug("Waited {}ms. Time remaining to wait: {}ms. {} Thread still active.", - new Object[]{waitTime, remWaitTime, concurrentFactory.getThreadGroup().activeCount()}); + new Object[]{waitTime, remWaitTime, ((ThreadPoolExecutor) concurrentFactory.getThreadPool()).getActiveCount()}); // it did not terminated, let's interrupt // FIXME: remove ASAP, this is very bad, it kills threads in middle of op, // killing FSM of peer for instance, after that its not usable. @@ -390,7 +393,7 @@ public void destroy() { catch (IllegalThreadStateException itse) { if (logger.isDebugEnabled()) { logger.debug("Failure trying to destroy ThreadGroup probably due to existing active threads. Use stop() before destroy(). (nr_threads={})", - concurrentFactory.getThreadGroup().activeCount()); + ((ThreadPoolExecutor) concurrentFactory.getThreadPool()).getActiveCount()); } } catch (ThreadDeath td) { @@ -419,26 +422,15 @@ public T unwrap(Class aClass) throws InternalException { return null; } - protected class PeerTableThreadFactory implements ThreadFactory { + protected class PeerTableThreadFactory { public final AtomicLong sequence = new AtomicLong(0); private int priority = Thread.NORM_PRIORITY; - private ThreadGroup factoryThreadGroup = new ThreadGroup("JDiameterThreadGroup[" + sequence.incrementAndGet() + "]"); - + private ExecutorService threadPoolExecutor = Executors.newCachedThreadPool(); + public PeerTableThreadFactory(int priority) { super(); this.priority = priority; } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(this.factoryThreadGroup, r); - if (logger.isDebugEnabled()) { - logger.debug("Creating new thread in thread group JDiameterThreadGroup. Thread name is [{}]", t.getName()); - } - t.setPriority(this.priority); - // TODO ? t.start(); - return t; - } } } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/fsm/PeerFSMImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/fsm/PeerFSMImpl.java index 1fae431f0..b062221e6 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/fsm/PeerFSMImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/fsm/PeerFSMImpl.java @@ -273,8 +273,7 @@ public void run() { //PCB added FSM multithread for (int i = 1; i <= FSM_THREAD_COUNT; i++) { logger.debug("Starting FSM Thread {} of {}", i, FSM_THREAD_COUNT); - Thread executor = concurrentFactory.getThread("FSM-" + context.getPeerDescription() + "_" + i, fsmQueueProcessor); - executor.start(); + concurrentFactory.getThreadPool().execute(fsmQueueProcessor); } } finally { diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java index 88d619226..b8ac1464c 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPTransportClient.java @@ -162,12 +162,7 @@ public void start() throws NotInitializedException { throw new NotInitializedException("No parent connection is set is set"); } if (selfThread == null || !selfThread.isAlive()) { - selfThread = concurrentFactory.getThread("TCPReader", this); - } - - if (!selfThread.isAlive()) { - selfThread.setDaemon(true); - selfThread.start(); + concurrentFactory.getThreadPool().execute(this); } } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tls/TLSTransportClient.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tls/TLSTransportClient.java index d2532bda6..be84183e6 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tls/TLSTransportClient.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tls/TLSTransportClient.java @@ -196,12 +196,7 @@ public void start() throws NotInitializedException { throw new NotInitializedException("No parent connection is set is set"); } if (this.readThread == null || !this.readThread.isAlive()) { - this.readThread = this.concurrentFactory.getThread("TLSReader", this.readTash); - } - - if (!this.readThread.isAlive()) { - this.readThread.setDaemon(true); - this.readThread.start(); + this.concurrentFactory.getThreadPool().execute(this.readTash); } } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/DummyConcurrentFactory.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/DummyConcurrentFactory.java index e8e6e618a..bef740386 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/DummyConcurrentFactory.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/DummyConcurrentFactory.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.jdiameter.common.api.statistic.IStatistic; @@ -58,23 +59,9 @@ */ public class DummyConcurrentFactory implements IConcurrentFactory { + @Override - public Thread getThread(Runnable runnuble) { - return new Thread(runnuble); - } - - @Override - public Thread getThread(String namePrefix, Runnable runnuble) { - return new Thread(runnuble, namePrefix); - } - - @Override - public List getThreads() { - return new ArrayList(); - } - - @Override - public ThreadGroup getThreadGroup() { + public ThreadPoolExecutor getThreadPool() { return null; } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentEntityFactory.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentEntityFactory.java index 3bd0305ee..53ce65cb3 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentEntityFactory.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentEntityFactory.java @@ -57,7 +57,7 @@ */ public interface IConcurrentEntityFactory { - ThreadFactory newThreadFactory(String threadPoolName); + ThreadFactory newThreadFactory(); RejectedExecutionHandler newRejectedExecutionHandler(IStatisticRecord rejectedCount); diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentFactory.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentFactory.java index fba6facc2..00bf0a39a 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentFactory.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/concurrent/IConcurrentFactory.java @@ -44,7 +44,9 @@ import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.jdiameter.common.api.statistic.IStatistic; @@ -66,14 +68,7 @@ enum ScheduledExecServices { ApplicationSession } - // Thread - Thread getThread(Runnable runnuble); - - Thread getThread(String namePrefix, Runnable runnuble); - - List getThreads(); - - ThreadGroup getThreadGroup(); + ExecutorService getThreadPool(); // ScheduledExecutorService ScheduledExecutorService getScheduledExecutorService(String name); diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/BaseThreadFactory.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/BaseThreadFactory.java index dc87710f3..f61a6bedd 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/BaseThreadFactory.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/BaseThreadFactory.java @@ -42,7 +42,14 @@ package org.jdiameter.common.impl.concurrent; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -52,29 +59,22 @@ */ class BaseThreadFactory implements ThreadFactory { - public static final String ENTITY_NAME = "ThreadGroup"; + public static final String ENTITY_NAME = "ThreadPool"; - private ThreadGroup threadGroup; - private String threadPoolName; - private AtomicInteger count = new AtomicInteger(0); + private ExecutorService threadPoolExecutor; - BaseThreadFactory(String threadPoolName) { - this.threadPoolName = threadPoolName; + BaseThreadFactory() { + this.threadPoolExecutor = Executors.newCachedThreadPool(); + } - this.threadGroup = new ThreadGroup("jd " + threadPoolName + " group"); + public ExecutorService getThreadPool() { + return this.threadPoolExecutor; } @Override public Thread newThread(Runnable runnable) { - return new Thread(threadGroup, runnable, threadPoolName + "-" + count.getAndIncrement()); - } - - public Thread newThread(String namePrefix, Runnable runnable) { - return new Thread(threadGroup, runnable, namePrefix + "-" + count.getAndIncrement()); + return null; } - public ThreadGroup getThreadGroup() { - return threadGroup; - } } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/CommonScheduledExecutorService.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/CommonScheduledExecutorService.java index c77c24f61..d6fc84eaf 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/CommonScheduledExecutorService.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/CommonScheduledExecutorService.java @@ -98,13 +98,7 @@ class CommonScheduledExecutorService extends ScheduledThreadPoolExecutor { statistic.appendCounter(statisticFactory.newCounterRecord(WorkingThread), statisticFactory.newCounterRecord(CanceledTasks), statisticFactory.newCounterRecord(BrokenTasks), execTimeCounter, waitTimeCounter, statisticFactory.newCounterRecord(WaitTimeTask)); - - if (config == null) { - this.setThreadFactory(entityFactory.newThreadFactory(name)); - } else { - this.setThreadFactory(entityFactory.newThreadFactory(config.getStringValue(Parameters.ConcurrentEntityDescription.ordinal(), name))); - } - + super.setRejectedExecutionHandler(entityFactory.newRejectedExecutionHandler(rejectedCount)); } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentEntityFactory.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentEntityFactory.java index d54ca2209..3ef5174a2 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentEntityFactory.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentEntityFactory.java @@ -62,8 +62,8 @@ public ConcurrentEntityFactory() { } @Override - public ThreadFactory newThreadFactory(String threadPoolName) { - return new BaseThreadFactory(threadPoolName); + public ThreadFactory newThreadFactory() { + return new BaseThreadFactory(); } @Override diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentFactory.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentFactory.java index b77269555..3f20bba9a 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentFactory.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/concurrent/ConcurrentFactory.java @@ -50,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.jdiameter.api.Configuration; import org.jdiameter.client.impl.helpers.Parameters; @@ -82,7 +83,7 @@ public ConcurrentFactory(Configuration config, IStatisticManager statisticFactor dgConfig.getStringValue(Parameters.ConcurrentEntityDescription.ordinal(), (String) Parameters.ConcurrentEntityDescription.defValue()) : (String) Parameters.ConcurrentEntityDescription.defValue(); - threadFactory = (BaseThreadFactory) entityFactory.newThreadFactory(defThreadGroupName); + threadFactory = (BaseThreadFactory) entityFactory.newThreadFactory(); scheduledExecutorServices = new ConcurrentHashMap(); IStatisticRecord threadCount = statisticFactory.newCounterRecord( @@ -95,7 +96,7 @@ public String getValueAsString() { @Override public int getValueAsInt() { - return getThreadGroup().activeCount(); + return ((ThreadPoolExecutor) getThreadPool()).getActiveCount(); } }); @@ -133,25 +134,8 @@ private Configuration getConfigByName(String name) { } @Override - public Thread getThread(Runnable runnable) { - return threadFactory.newThread(runnable); - } - - @Override - public Thread getThread(String namePrefix, Runnable runnuble) { - return threadFactory.newThread(namePrefix, runnuble); - } - - @Override - public List getThreads() { - Thread[] threads = new Thread[threadFactory.getThreadGroup().activeCount()]; - threadFactory.getThreadGroup().enumerate(threads); - return Arrays.asList(threads); - } - - @Override - public ThreadGroup getThreadGroup() { - return threadFactory.getThreadGroup(); + public ExecutorService getThreadPool() { + return threadFactory.getThreadPool(); } @Override @@ -162,6 +146,7 @@ public ScheduledExecutorService getScheduledExecutorService(String name) { synchronized (ConcurrentFactory.class) { if (!scheduledExecutorServices.containsKey(name)) { service = new CommonScheduledExecutorService(name, getConfigByName(name), this.entityFactory, statisticFactory); + scheduledExecutorServices.put(name, service); } } diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/app/cca/ServerCCASessionImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/app/cca/ServerCCASessionImpl.java index c3e2cf37e..9cf05a5a3 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/app/cca/ServerCCASessionImpl.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/app/cca/ServerCCASessionImpl.java @@ -517,6 +517,7 @@ private class RequestDelivery implements Runnable { public void run() { try { switch (request.getCommandCode()) { + case JCreditControlAnswer.code: handleEvent(new Event(true, factory.createCreditControlRequest(request), null)); break; diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tcp/NetworkGuard.java b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tcp/NetworkGuard.java index 2b21036d7..317a28f51 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tcp/NetworkGuard.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tcp/NetworkGuard.java @@ -119,15 +119,10 @@ public NetworkGuard(InetAddress[] inetAddress, int port, try { for (int addrIdx = 0; addrIdx < inetAddress.length; addrIdx++) { GuardTask guardTask = new GuardTask(new InetSocketAddress(inetAddress[addrIdx], port)); - Thread t = this.concurrentFactory.getThread(guardTask); - guardTask.thread = t; + this.concurrentFactory.getThreadPool().execute(guardTask); tasks.add(guardTask); } isWork = true; - for (GuardTask gt : this.tasks) { - gt.start(); - } - //thread.start(); } catch (Exception exc) { destroy(); diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tls/NetworkGuard.java b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tls/NetworkGuard.java index 2606cfbe1..9c1f75a62 100644 --- a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tls/NetworkGuard.java +++ b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/io/tls/NetworkGuard.java @@ -86,7 +86,7 @@ public NetworkGuard(InetAddress inetAddress, int port, IConcurrentFactory concur this.port = port; this.parser = parser; this.concurrentFactory = concurrentFactory == null ? new DummyConcurrentFactory() : concurrentFactory; - this.thread = this.concurrentFactory.getThread("NetworkGuard", this); + // this.thread = this.concurrentFactory.getThread("NetworkGuard", this); // extract sec_ref from local peer; Configuration conf = data.getConfiguration(); @@ -112,7 +112,7 @@ public NetworkGuard(InetAddress inetAddress, int port, IConcurrentFactory concur this.isWork = true; logger.info("Open server socket {} ", serverSocket); - this.thread.start(); + this.concurrentFactory.getThreadPool().execute(this); } catch (Exception exc) { destroy();