Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ThreadGroup by ThreadPool #76

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ThreadPoolExecutor;

import org.jdiameter.api.Avp;
import org.jdiameter.api.AvpDataException;
Expand Down Expand Up @@ -340,12 +343,12 @@ public void stopped() {
// boolean interrupted = false;
long remWaitTime = 2000;
logger.debug("Stopping thread group and waiting a max of {}ms for all threads to finish", remWaitTime);
while (concurrentFactory.getThreadGroup().activeCount() > 0 && remWaitTime > 0) {
while (((ThreadPoolExecutor) concurrentFactory.getThreadPool()).getActiveCount() > 0 && remWaitTime > 0) {
long waitTime = 250;
Thread.sleep(waitTime);
remWaitTime -= waitTime;
logger.debug("Waited {}ms. Time remaining to wait: {}ms. {} Thread still active.",
new Object[]{waitTime, remWaitTime, concurrentFactory.getThreadGroup().activeCount()});
new Object[]{waitTime, remWaitTime, ((ThreadPoolExecutor) concurrentFactory.getThreadPool()).getActiveCount()});
// it did not terminated, let's interrupt
// FIXME: remove ASAP, this is very bad, it kills threads in middle of op,
// killing FSM of peer for instance, after that its not usable.
Expand Down Expand Up @@ -390,7 +393,7 @@ public void destroy() {
catch (IllegalThreadStateException itse) {
if (logger.isDebugEnabled()) {
logger.debug("Failure trying to destroy ThreadGroup probably due to existing active threads. Use stop() before destroy(). (nr_threads={})",
concurrentFactory.getThreadGroup().activeCount());
((ThreadPoolExecutor) concurrentFactory.getThreadPool()).getActiveCount());
}
}
catch (ThreadDeath td) {
Expand Down Expand Up @@ -419,26 +422,15 @@ public <T> T unwrap(Class<T> aClass) throws InternalException {
return null;
}

protected class PeerTableThreadFactory implements ThreadFactory {
protected class PeerTableThreadFactory {

public final AtomicLong sequence = new AtomicLong(0);
private int priority = Thread.NORM_PRIORITY;
private ThreadGroup factoryThreadGroup = new ThreadGroup("JDiameterThreadGroup[" + sequence.incrementAndGet() + "]");

private ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
public PeerTableThreadFactory(int priority) {
super();
this.priority = priority;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(this.factoryThreadGroup, r);
if (logger.isDebugEnabled()) {
logger.debug("Creating new thread in thread group JDiameterThreadGroup. Thread name is [{}]", t.getName());
}
t.setPriority(this.priority);
// TODO ? t.start();
return t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ public void run() {
//PCB added FSM multithread
for (int i = 1; i <= FSM_THREAD_COUNT; i++) {
logger.debug("Starting FSM Thread {} of {}", i, FSM_THREAD_COUNT);
Thread executor = concurrentFactory.getThread("FSM-" + context.getPeerDescription() + "_" + i, fsmQueueProcessor);
executor.start();
concurrentFactory.getThreadPool().execute(fsmQueueProcessor);
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,7 @@ public void start() throws NotInitializedException {
throw new NotInitializedException("No parent connection is set is set");
}
if (selfThread == null || !selfThread.isAlive()) {
selfThread = concurrentFactory.getThread("TCPReader", this);
}

if (!selfThread.isAlive()) {
selfThread.setDaemon(true);
selfThread.start();
concurrentFactory.getThreadPool().execute(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,7 @@ public void start() throws NotInitializedException {
throw new NotInitializedException("No parent connection is set is set");
}
if (this.readThread == null || !this.readThread.isAlive()) {
this.readThread = this.concurrentFactory.getThread("TLSReader", this.readTash);
}

if (!this.readThread.isAlive()) {
this.readThread.setDaemon(true);
this.readThread.start();
this.concurrentFactory.getThreadPool().execute(this.readTash);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import org.jdiameter.common.api.statistic.IStatistic;

Expand All @@ -58,23 +59,9 @@
*/
public class DummyConcurrentFactory implements IConcurrentFactory {


@Override
public Thread getThread(Runnable runnuble) {
return new Thread(runnuble);
}

@Override
public Thread getThread(String namePrefix, Runnable runnuble) {
return new Thread(runnuble, namePrefix);
}

@Override
public List<Thread> getThreads() {
return new ArrayList<Thread>();
}

@Override
public ThreadGroup getThreadGroup() {
public ThreadPoolExecutor getThreadPool() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*/
public interface IConcurrentEntityFactory {

ThreadFactory newThreadFactory(String threadPoolName);
ThreadFactory newThreadFactory();

RejectedExecutionHandler newRejectedExecutionHandler(IStatisticRecord rejectedCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import org.jdiameter.common.api.statistic.IStatistic;

Expand All @@ -66,14 +68,7 @@ enum ScheduledExecServices {
ApplicationSession
}

// Thread
Thread getThread(Runnable runnuble);

Thread getThread(String namePrefix, Runnable runnuble);

List<Thread> getThreads();

ThreadGroup getThreadGroup();
ExecutorService getThreadPool();

// ScheduledExecutorService
ScheduledExecutorService getScheduledExecutorService(String name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@

package org.jdiameter.common.impl.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -52,29 +59,22 @@
*/
class BaseThreadFactory implements ThreadFactory {

public static final String ENTITY_NAME = "ThreadGroup";
public static final String ENTITY_NAME = "ThreadPool";

private ThreadGroup threadGroup;
private String threadPoolName;
private AtomicInteger count = new AtomicInteger(0);
private ExecutorService threadPoolExecutor;

BaseThreadFactory(String threadPoolName) {
this.threadPoolName = threadPoolName;
BaseThreadFactory() {
this.threadPoolExecutor = Executors.newCachedThreadPool();
}

this.threadGroup = new ThreadGroup("jd " + threadPoolName + " group");
public ExecutorService getThreadPool() {
return this.threadPoolExecutor;
}

@Override
public Thread newThread(Runnable runnable) {
return new Thread(threadGroup, runnable, threadPoolName + "-" + count.getAndIncrement());
}

public Thread newThread(String namePrefix, Runnable runnable) {
return new Thread(threadGroup, runnable, namePrefix + "-" + count.getAndIncrement());
return null;
}

public ThreadGroup getThreadGroup() {
return threadGroup;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,7 @@ class CommonScheduledExecutorService extends ScheduledThreadPoolExecutor {

statistic.appendCounter(statisticFactory.newCounterRecord(WorkingThread), statisticFactory.newCounterRecord(CanceledTasks),
statisticFactory.newCounterRecord(BrokenTasks), execTimeCounter, waitTimeCounter, statisticFactory.newCounterRecord(WaitTimeTask));

if (config == null) {
this.setThreadFactory(entityFactory.newThreadFactory(name));
} else {
this.setThreadFactory(entityFactory.newThreadFactory(config.getStringValue(Parameters.ConcurrentEntityDescription.ordinal(), name)));
}


super.setRejectedExecutionHandler(entityFactory.newRejectedExecutionHandler(rejectedCount));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public ConcurrentEntityFactory() {
}

@Override
public ThreadFactory newThreadFactory(String threadPoolName) {
return new BaseThreadFactory(threadPoolName);
public ThreadFactory newThreadFactory() {
return new BaseThreadFactory();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import org.jdiameter.api.Configuration;
import org.jdiameter.client.impl.helpers.Parameters;
Expand Down Expand Up @@ -82,7 +83,7 @@ public ConcurrentFactory(Configuration config, IStatisticManager statisticFactor
dgConfig.getStringValue(Parameters.ConcurrentEntityDescription.ordinal(), (String) Parameters.ConcurrentEntityDescription.defValue()) :
(String) Parameters.ConcurrentEntityDescription.defValue();

threadFactory = (BaseThreadFactory) entityFactory.newThreadFactory(defThreadGroupName);
threadFactory = (BaseThreadFactory) entityFactory.newThreadFactory();

scheduledExecutorServices = new ConcurrentHashMap<String, CommonScheduledExecutorService>();
IStatisticRecord threadCount = statisticFactory.newCounterRecord(
Expand All @@ -95,7 +96,7 @@ public String getValueAsString() {

@Override
public int getValueAsInt() {
return getThreadGroup().activeCount();
return ((ThreadPoolExecutor) getThreadPool()).getActiveCount();
}
});

Expand Down Expand Up @@ -133,25 +134,8 @@ private Configuration getConfigByName(String name) {
}

@Override
public Thread getThread(Runnable runnable) {
return threadFactory.newThread(runnable);
}

@Override
public Thread getThread(String namePrefix, Runnable runnuble) {
return threadFactory.newThread(namePrefix, runnuble);
}

@Override
public List<Thread> getThreads() {
Thread[] threads = new Thread[threadFactory.getThreadGroup().activeCount()];
threadFactory.getThreadGroup().enumerate(threads);
return Arrays.asList(threads);
}

@Override
public ThreadGroup getThreadGroup() {
return threadFactory.getThreadGroup();
public ExecutorService getThreadPool() {
return threadFactory.getThreadPool();
}

@Override
Expand All @@ -162,6 +146,7 @@ public ScheduledExecutorService getScheduledExecutorService(String name) {
synchronized (ConcurrentFactory.class) {
if (!scheduledExecutorServices.containsKey(name)) {
service = new CommonScheduledExecutorService(name, getConfigByName(name), this.entityFactory, statisticFactory);

scheduledExecutorServices.put(name, service);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ private class RequestDelivery implements Runnable {
public void run() {
try {
switch (request.getCommandCode()) {

case JCreditControlAnswer.code:
handleEvent(new Event(true, factory.createCreditControlRequest(request), null));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,10 @@ public NetworkGuard(InetAddress[] inetAddress, int port,
try {
for (int addrIdx = 0; addrIdx < inetAddress.length; addrIdx++) {
GuardTask guardTask = new GuardTask(new InetSocketAddress(inetAddress[addrIdx], port));
Thread t = this.concurrentFactory.getThread(guardTask);
guardTask.thread = t;
this.concurrentFactory.getThreadPool().execute(guardTask);
tasks.add(guardTask);
}
isWork = true;
for (GuardTask gt : this.tasks) {
gt.start();
}
//thread.start();
}
catch (Exception exc) {
destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public NetworkGuard(InetAddress inetAddress, int port, IConcurrentFactory concur
this.port = port;
this.parser = parser;
this.concurrentFactory = concurrentFactory == null ? new DummyConcurrentFactory() : concurrentFactory;
this.thread = this.concurrentFactory.getThread("NetworkGuard", this);
// this.thread = this.concurrentFactory.getThread("NetworkGuard", this);
// extract sec_ref from local peer;
Configuration conf = data.getConfiguration();

Expand All @@ -112,7 +112,7 @@ public NetworkGuard(InetAddress inetAddress, int port, IConcurrentFactory concur

this.isWork = true;
logger.info("Open server socket {} ", serverSocket);
this.thread.start();
this.concurrentFactory.getThreadPool().execute(this);
}
catch (Exception exc) {
destroy();
Expand Down