@RequestScoped
public class ManagedService {
@Resource
private ManagedExecutorService executor;
public CompletableFuture<Integer> asyncTask(final int value) {
return CompletableFuture
.supplyAsync(longTask(value, 100, null), executor)
.thenApply(i -> i + 1);
}
public CompletableFuture<Integer> asyncTaskWithException(final int value) {
return CompletableFuture
.supplyAsync(longTask(value, 100, "Planned exception"), executor)
.thenApply(i -> i + 1);
}
private Supplier<Integer> longTask(final int value,
final int taskDurationMs,
final String errorMessage) {
return () -> {
if (nonNull(errorMessage)) {
throw new RuntimeException(errorMessage);
}
try {
TimeUnit.MILLISECONDS.sleep(taskDurationMs);
} catch (InterruptedException e) {
throw new RuntimeException("Problem while waiting");
}
return value + 1;
};
}
}
Concurrency Utilities for Java EE
In this example will be used Concurrency Utilities for Java EE, or JSR 236.
This standard allows application developers to use concurrency utilities managed by the application server. In this way, the developer no longer has the responsibility to manually manage thread polls or threads. Also, in a non-managed Thread object, the container cannot guarantee that other Java EE platform services work correctly. For these reasons, it is recommended the usage of managed threads whenever the need arise. More information can be found here.
Main Components of the Concurrency Utilities
The standard specifies main components of the concurrency utilities. In short, these components are managed objects that offer concurrency facilities. These objects, since are managed by the application, can be injected either using CDI, either JNDI. More information can be found here.
ManagedExecutorService
A ManagedExecutorService
is an object that allows application developers to submit tasks asynchronously. Tasks are executed on threads that are managed by the container.
Example
Here is a class that uses a ManagedExecutorService
(full code can be found here):
The ManagedExecutorService
object, being an managed object, is injected using the @Resource
annotation.
This example simulates a long running computation, defined in the longTask
method.
The capabilities of ManagedExecutorService
are exemplified in the asyncTask
and asyncTaskWithException
methods.
Both methods invoke the longTask
method defined above; each execution of longTask
is performed in a thread managed by the application.
The method asyncTask
simulates a successful execution, while the asyncTaskWithException
simulates a execution that will throw an exception.
The methods are used in the following test class (full example can be found here):
@RunWith(Arquillian.class)
public class ManagedServiceTest {
@Inject
private ManagedService managedService;
@Deployment()
public static final WebArchive app() {
return ShrinkWrap.create(WebArchive.class, "example.war")
.addClasses(ManagedService.class)
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
}
@Test
public void managedInvocationTest() {
final CompletableFuture<Integer> future = managedService.asyncTask(1);
try {
assertEquals(3, future.get(200, TimeUnit.MILLISECONDS).intValue());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
}
@Test(expected = TimeoutException.class)
public void managedInvocationTestWithTimeout() throws InterruptedException, ExecutionException, TimeoutException {
final CompletableFuture<Integer> future = managedService.asyncTask(1);
future.get(10, TimeUnit.MILLISECONDS);
}
@Test
public void managedInvocationTestWithException() {
final CompletableFuture<Integer> future = managedService.asyncTaskWithException(1);
try {
future.get(200, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
assertEquals("Planned exception", e.getCause().getMessage());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
}
}
ManagedScheduledExecutorService
A ManagedScheduledExecutorService
is an object that allows developers to execute tasks asynchronously at specific times. The tasks are executed on threads started by the container.
Example
Full example can be found here:
@RequestScoped
public class ManagedScheduledService {
@Resource
private ManagedScheduledExecutorService executor;
public Future<Integer> singleFixedDelayTask(final int value,
final String errorMessage) {
return executor.schedule(
longCallableTask(value, 10, errorMessage), 100, TimeUnit.MILLISECONDS);
}
public ScheduledFuture<?> periodicFixedDelayTask(final int value,
final String errorMessage,
final CountDownLatch countDownLatch) {
return executor.scheduleAtFixedRate(
longRunnableTask(value, 10, errorMessage, countDownLatch), 0, 100, TimeUnit.MILLISECONDS);
}
private Runnable longRunnableTask(final int value,
final int taskDurationMs,
final String errorMessage,
final CountDownLatch countDownLatch) {
return () -> {
failOrWait(taskDurationMs, errorMessage);
Integer result = value + 1;
countDownLatch.countDown();
};
}
private Callable<Integer> longCallableTask(final int value,
final int taskDurationMs,
final String errorMessage) {
return () -> {
failOrWait(taskDurationMs, errorMessage);
return value + 1;
};
}
private void failOrWait(final int taskDurationMs,
final String errorMessage) {
if (nonNull(errorMessage)) {
throw new RuntimeException(errorMessage);
}
try {
TimeUnit.MILLISECONDS.sleep(taskDurationMs);
} catch (InterruptedException e) {
throw new RuntimeException("Problem while waiting");
}
}
}
This example also defines a method, longCallableTask
, simulating the execution of a long running computation.
The method singleFixedDelayTask
schedules a long running task (by calling longCallableTask
), but the execution will start after 100 ms.
The method periodicFixedDelayTask
schedules tasks to be run periodically, after each 100 ms, with an initial delay of 0.
The methods are used in the following test class (full code can be found here):
@RunWith(Arquillian.class)
public class ManagedScheduledServiceTest {
@Inject
private ManagedScheduledService scheduledService;
@Deployment()
public static final WebArchive app() {
return ShrinkWrap.create(WebArchive.class, "example.war")
.addClasses(ManagedScheduledService.class)
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
}
@Test
public void singleFixedDelayTask() throws InterruptedException, ExecutionException, TimeoutException {
final Future<Integer> futureA = scheduledService.singleFixedDelayTask(1, null);
final Future<Integer> futureB = scheduledService.singleFixedDelayTask(50, null);
assertEquals(2, futureA.get(200, TimeUnit.MILLISECONDS).intValue());
assertEquals(51, futureB.get(200, TimeUnit.MILLISECONDS).intValue());
}
@Test
public void periodicFixedDelayTask() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(4); // execute 4 times
final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, null, countDownLatch);
countDownLatch.await(500, TimeUnit.MILLISECONDS);
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
}
@Test
public void singleFixedDelayTaskWithException() {
final Future<Integer> future = scheduledService.singleFixedDelayTask(1, "Planned exception");
try {
future.get(200, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
assertEquals("Planned exception", e.getCause().getMessage());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
}
@Test
public void periodicFixedDelayTaskWithException() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ScheduledFuture<?> scheduledFuture = scheduledService.periodicFixedDelayTask(1, "Planned exception", countDownLatch);
try {
countDownLatch.await(200, TimeUnit.MILLISECONDS);
scheduledFuture.get(200, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
assertEquals("Planned exception", e.getCause().getMessage());
} catch (Exception e) {
fail("Unexpected exception" + e);
}
if (!scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
}
}
}
ManagedThreadFactory
A ManagedThreadFactory
is an object that allows developers to create container managed threads.
Example
Full example can be found here:
@RequestScoped
public class ThreadFactoryService {
@Resource
private ManagedThreadFactory factory;
public void asyncTask(final LongTask longTask) throws InterruptedException {
final Thread thread = factory.newThread(longTask);
thread.setName("pretty asyncTask");
thread.start();
}
public void asyncHangingTask(final Runnable longTask) {
final Thread thread = factory.newThread(longTask);
thread.setName("pretty asyncHangingTask");
thread.start();
if (thread.isAlive()) {
thread.interrupt();
}
}
public static class LongTask implements Runnable {
private final int value;
private final long taskDurationMs;
private final CountDownLatch countDownLatch;
private int result;
private AtomicBoolean isTerminated = new AtomicBoolean(false);
public LongTask(final int value,
final long taskDurationMs,
final CountDownLatch countDownLatch) {
this.value = value;
this.taskDurationMs = taskDurationMs;
this.countDownLatch = countDownLatch;
}
public int getResult() {
return result;
}
public boolean getIsTerminated() {
return isTerminated.get();
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(taskDurationMs);
} catch (InterruptedException e) {
isTerminated.set(true);
countDownLatch.countDown();
throw new RuntimeException("Problem while waiting");
}
result = value + 1;
countDownLatch.countDown();
}
}
}
This example defines a class implementing Runnable
, executing a long running task in the run
method.
The method asyncTask
just creates a managed thread (using the injected ManagedThreadFactory
) then starts it.
The method asyncHangingTask
also creates a managed thread, starts it, but then stops it.
The following class tests these methods (full code can be found here):
@RunWith(Arquillian.class)
public class ThreadFactoryServiceTest {
@Inject
private ThreadFactoryService factoryService;
@Deployment()
public static final WebArchive app() {
return ShrinkWrap.create(WebArchive.class, "example.war")
.addClasses(ThreadFactoryService.class)
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
}
@Test
public void asyncTask() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final LongTask longTask = new LongTask(1, 50, countDownLatch);
factoryService.asyncTask(longTask);
countDownLatch.await(200, TimeUnit.MILLISECONDS);
assertEquals(2, longTask.getResult());
}
@Test
public void asyncHangingTask() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final LongTask longTask = new LongTask(1, 1000000, countDownLatch);
factoryService.asyncHangingTask(longTask);
countDownLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue(longTask.getIsTerminated());
}
}
Full project example can be found here.
It’s a Maven project, and all the tests can be executed by running mvn clean install
command.