View Javadoc
1   package org.eclipse.aether.util.concurrency;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static java.util.Objects.requireNonNull;
23  
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.LockSupport;
27  
28  /**
29   * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
30   * a worker thread back to the parent thread. The simplified usage pattern looks like this:
31   * 
32   * <pre>
33   * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
34   * for ( Runnable task : tasks )
35   * {
36   *     executor.execute( errorForwarder.wrap( task ) );
37   * }
38   * errorForwarder.await();
39   * </pre>
40   */
41  public final class RunnableErrorForwarder
42  {
43  
44      private final Thread thread = Thread.currentThread();
45  
46      private final AtomicInteger counter = new AtomicInteger();
47  
48      private final AtomicReference<Throwable> error = new AtomicReference<>();
49  
50      /**
51       * Creates a new error forwarder for worker threads spawned by the current thread.
52       */
53      public RunnableErrorForwarder()
54      {
55      }
56  
57      /**
58       * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
59       *
60       * @param runnable The runnable from which to forward errors, must not be {@code null}.
61       * @return The error-forwarding runnable to eventually execute, never {@code null}.
62       */
63      public Runnable wrap( final Runnable runnable )
64      {
65          requireNonNull( runnable, "runnable cannot be null" );
66  
67          counter.incrementAndGet();
68  
69          return () ->
70          {
71              try
72              {
73                  runnable.run();
74              }
75              catch ( RuntimeException | Error e )
76              {
77                  error.compareAndSet( null, e );
78                  throw e;
79              }
80              finally
81              {
82                  counter.decrementAndGet();
83                  LockSupport.unpark( thread );
84              }
85          };
86      }
87  
88      /**
89       * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
90       * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
91       * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
92       * method must be called from the same thread that created this error forwarder instance.
93       */
94      public void await()
95      {
96          awaitTerminationOfAllRunnables();
97  
98          Throwable error = this.error.get();
99          if ( error != null )
100         {
101             if ( error instanceof RuntimeException )
102             {
103                 throw (RuntimeException) error;
104             }
105             else if ( error instanceof ThreadDeath )
106             {
107                 throw new IllegalStateException( error );
108             }
109             else if ( error instanceof Error )
110             {
111                 throw (Error) error;
112             }
113             throw new IllegalStateException( error );
114         }
115     }
116 
117     private void awaitTerminationOfAllRunnables()
118     {
119         if ( !thread.equals( Thread.currentThread() ) )
120         {
121             throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
122                 + Thread.currentThread() );
123         }
124 
125         boolean interrupted = false;
126 
127         while ( counter.get() > 0 )
128         {
129             LockSupport.park();
130 
131             if ( Thread.interrupted() )
132             {
133                 interrupted = true;
134             }
135         }
136 
137         if ( interrupted )
138         {
139             Thread.currentThread().interrupt();
140         }
141     }
142 
143 }