Posted by Unknown on 9:59 AM
Labels:

A thread can be in one of the following states: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED. Now its a internal implementation of Spring ThreadPoolTaskExecutor how it manipulates the thread state after invoking execute(). That's why getActiveCount() can't give exact numbers but approximate.


Instead of injecting ThreadPoolTaskExecutor object I am extending ThreadPoolExecutor class, which is giving extra callback method (afterExecute()) to check number of active threads or catching exceptions.

ThreadPoolExecutor object can also be injected through Spring context.

Client/Caller Class

package test.hiren;

/**
*
* @author Hiren
*/
public class GenerateReportServiceHelperImpl {

public boolean needProgress;
TaskExecutor threadPool;

public static void main(String[] args) throws Exception {
GenerateReportServiceHelperImpl obj = new GenerateReportServiceHelperImpl();
obj.reportExecution();
}

public void reportExecution() throws Exception {
int listSize = 4;//in your case its rpt list size
threadPool = new TaskExecutor();
threadPool.setCallbackInvoker(this);
threadPool.setComparableThreadCount(listSize);
for (int i = 0; i <>
threadPool.execute(new RetrieveDataFromDataPowerTask());

}
if(needProgress == false){
Thread.sleep(200);
}
System.out.println("I am here");
}

public void callbackInvoker() {
needProgress = true;
System.out.println("Counter =>" + needProgress);
threadPool.shutdownNow();

}

}

Runnable Class

package test.hiren;

/**
*
* @author Hiren
*/
class RetrieveDataFromDataPowerTask implements Runnable {
public void run() {
System.out.println("I am at Run");
}
}

Executer Class
Callback method is synchronized to ensure that decrement operation after thread completion should not have any ambiguities.AfterExecute() is the callback method invoked by Spring callback handler when thread execution is completed or stopped (in case of any exception).

package test.hiren;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
*
* @author Hiren
*/
public class TaskExecutor extends ThreadPoolExecutor {

private GenerateReportServiceHelperImpl callbackInvoker;
private int comparableThreadCount;
TaskExecutor() {
super(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
callback(r, t);
}

public synchronized void callback(Runnable r, Throwable t) {
comparableThreadCount = comparableThreadCount - 1;
System.out.println("comparableThreadCount=>" + comparableThreadCount);
if (t != null) {
System.out.println("ThreadPool Runnable's Exception caught!");
} else {
System.out.println("ThreadPool Runnable's gave no Exception");
if (comparableThreadCount == 0) {
System.out.println("Returning to Callback");
callbackInvoker.callbackInvoker();
}
}
}

public void setCallbackInvoker(GenerateReportServiceHelperImpl callbackInvoker) {
this.callbackInvoker = callbackInvoker;
}

public void setComparableThreadCount(int comparableThreadCount) {
this.comparableThreadCount = comparableThreadCount;
}
}

0 comments: