View Javadoc
1   package org.eclipse.aether.util.concurrency;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static java.util.Objects.requireNonNull;
23  
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  import java.util.concurrent.locks.LockSupport;
27  
28  /**
29   * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
30   * a worker thread back to the parent thread. The simplified usage pattern looks like this:
31   * 
32   * <pre>
33   * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
34   * for ( Runnable task : tasks )
35   * {
36   *     executor.execute( errorForwarder.wrap( task ) );
37   * }
38   * errorForwarder.await();
39   * </pre>
40   */
41  public final class RunnableErrorForwarder
42  {
43  
44      private final Thread thread = Thread.currentThread();
45  
46      private final AtomicInteger counter = new AtomicInteger();
47  
48      private final AtomicReference<Throwable> error = new AtomicReference<>();
49  
50      /**
51       * Creates a new error forwarder for worker threads spawned by the current thread.
52       */
53      public RunnableErrorForwarder()
54      {
55      }
56  
57      /**
58       * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
59       *
60       * @param runnable The runnable from which to forward errors, must not be {@code null}.
61       * @return The error-forwarding runnable to eventually execute, never {@code null}.
62       */
63      public Runnable wrap( final Runnable runnable )
64      {
65          requireNonNull( runnable, "runnable cannot be null" );
66  
67          counter.incrementAndGet();
68  
69          return new Runnable()
70          {
71              public void run()
72              {
73                  try
74                  {
75                      runnable.run();
76                  }
77                  catch ( RuntimeException | Error e )
78                  {
79                      error.compareAndSet( null, e );
80                      throw e;
81                  }
82                  finally
83                  {
84                      counter.decrementAndGet();
85                      LockSupport.unpark( thread );
86                  }
87              }
88          };
89      }
90  
91      /**
92       * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
93       * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
94       * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
95       * method must be called from the same thread that created this error forwarder instance.
96       */
97      public void await()
98      {
99          awaitTerminationOfAllRunnables();
100 
101         Throwable error = this.error.get();
102         if ( error != null )
103         {
104             if ( error instanceof RuntimeException )
105             {
106                 throw (RuntimeException) error;
107             }
108             else if ( error instanceof ThreadDeath )
109             {
110                 throw new IllegalStateException( error );
111             }
112             else if ( error instanceof Error )
113             {
114                 throw (Error) error;
115             }
116             throw new IllegalStateException( error );
117         }
118     }
119 
120     private void awaitTerminationOfAllRunnables()
121     {
122         if ( !thread.equals( Thread.currentThread() ) )
123         {
124             throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
125                 + Thread.currentThread() );
126         }
127 
128         boolean interrupted = false;
129 
130         while ( counter.get() > 0 )
131         {
132             LockSupport.park();
133 
134             if ( Thread.interrupted() )
135             {
136                 interrupted = true;
137             }
138         }
139 
140         if ( interrupted )
141         {
142             Thread.currentThread().interrupt();
143         }
144     }
145 
146 }