View Javadoc
1   package org.eclipse.aether.util.concurrency;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.concurrent.atomic.AtomicInteger;
23  import java.util.concurrent.atomic.AtomicReference;
24  import java.util.concurrent.locks.LockSupport;
25  
26  /**
27   * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
28   * a worker thread back to the parent thread. The simplified usage pattern looks like this:
29   * 
30   * <pre>
31   * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
32   * for ( Runnable task : tasks )
33   * {
34   *     executor.execute( errorForwarder.wrap( task ) );
35   * }
36   * errorForwarder.await();
37   * </pre>
38   */
39  public final class RunnableErrorForwarder
40  {
41  
42      private final Thread thread = Thread.currentThread();
43  
44      private final AtomicInteger counter = new AtomicInteger();
45  
46      private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
47  
48      /**
49       * Creates a new error forwarder for worker threads spawned by the current thread.
50       */
51      public RunnableErrorForwarder()
52      {
53      }
54  
55      /**
56       * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
57       * 
58       * @param runnable The runnable from which to forward errors, must not be {@code null}.
59       * @return The error-forwarding runnable to eventually execute, never {@code null}.
60       */
61      public Runnable wrap( final Runnable runnable )
62      {
63          if ( runnable == null )
64          {
65              throw new IllegalArgumentException( "runnable missing" );
66          }
67  
68          counter.incrementAndGet();
69  
70          return new Runnable()
71          {
72              public void run()
73              {
74                  try
75                  {
76                      runnable.run();
77                  }
78                  catch ( RuntimeException e )
79                  {
80                      error.compareAndSet( null, e );
81                      throw e;
82                  }
83                  catch ( Error e )
84                  {
85                      error.compareAndSet( null, e );
86                      throw e;
87                  }
88                  finally
89                  {
90                      counter.decrementAndGet();
91                      LockSupport.unpark( thread );
92                  }
93              }
94          };
95      }
96  
97      /**
98       * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
99       * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
100      * case multiple runnables encountered uncaught errors, one error is arbitrarily selected. <em>Note:</em> This
101      * method must be called from the same thread that created this error forwarder instance.
102      */
103     public void await()
104     {
105         awaitTerminationOfAllRunnables();
106 
107         Throwable error = this.error.get();
108         if ( error != null )
109         {
110             if ( error instanceof RuntimeException )
111             {
112                 throw (RuntimeException) error;
113             }
114             else if ( error instanceof ThreadDeath )
115             {
116                 throw new IllegalStateException( error );
117             }
118             else if ( error instanceof Error )
119             {
120                 throw (Error) error;
121             }
122             throw new IllegalStateException( error );
123         }
124     }
125 
126     private void awaitTerminationOfAllRunnables()
127     {
128         if ( !thread.equals( Thread.currentThread() ) )
129         {
130             throw new IllegalStateException( "wrong caller thread, expected " + thread + " and not "
131                 + Thread.currentThread() );
132         }
133 
134         boolean interrupted = false;
135 
136         while ( counter.get() > 0 )
137         {
138             LockSupport.park();
139 
140             if ( Thread.interrupted() )
141             {
142                 interrupted = true;
143             }
144         }
145 
146         if ( interrupted )
147         {
148             Thread.currentThread().interrupt();
149         }
150     }
151 
152 }