View Javadoc
1   package org.apache.onami.lifecycle.core;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.Closeable;
23  import java.lang.annotation.Annotation;
24  import java.util.ArrayDeque;
25  import java.util.Collections;
26  import java.util.Queue;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.TimeUnit;
29  
30  /**
31   * Default {@link Stager} implementation.
32   */
33  public class DefaultStager<A extends Annotation>
34      implements DisposingStager<A>
35  {
36      private final Class<A> stage;
37  
38      /**
39       * Stack of elements have to be disposed.
40       */
41      private final Queue<Stageable> stageables;
42  
43      /**
44       * @param stage the annotation that specifies this stage
45       */
46      public DefaultStager( Class<A> stage )
47      {
48          this( stage, Order.FIRST_IN_FIRST_OUT );
49      }
50  
51  	/**
52       * @param stage the annotation that specifies this stage
53       * @param mode  execution order
54       */
55      public DefaultStager( Class<A> stage, Order mode )
56      {
57          this.stage = stage;
58  
59          Queue<Stageable> localStageables;
60          switch ( mode )
61          {
62              case FIRST_IN_FIRST_OUT:
63              {
64                  localStageables = new ArrayDeque<Stageable>();
65                  break;
66              }
67  
68              case FIRST_IN_LAST_OUT:
69              {
70                  localStageables = Collections.asLifoQueue( new ArrayDeque<Stageable>() );
71                  break;
72              }
73  
74              default:
75              {
76                  throw new IllegalArgumentException( "Unknown mode: " + mode );
77              }
78          }
79          stageables = localStageables;
80      }
81  
82      /**
83       * {@inheritDoc}
84       */
85      @Override
86      public void register( Stageable stageable )
87      {
88          synchronized ( stageables )
89          {
90              stageables.add( stageable );
91          }
92      }
93  
94      /**
95       * {@inheritDoc}
96       */
97      @Override
98      public <T extends ExecutorService> T register( T executorService )
99      {
100         register( new ExecutorServiceStageable( executorService ) );
101         return executorService;
102     }
103 
104     /**
105      * {@inheritDoc}
106      */
107     @Override
108     public <T extends Closeable> T register( T closeable )
109     {
110         register( new CloseableStageable( closeable ) );
111         return closeable;
112     }
113 
114     /**
115      * {@inheritDoc}
116      */
117     @Override
118     public void stage()
119     {
120         stage( null );
121     }
122 
123     /**
124      * {@inheritDoc}
125      */
126     @Override
127     public void stage( StageHandler stageHandler )
128     {
129         if ( stageHandler == null )
130         {
131             stageHandler = new NoOpStageHandler();
132         }
133 
134         while ( true )
135         {
136             Stageable stageable;
137             synchronized ( stageables )
138             {
139                 stageable = stageables.poll();
140             }
141             if ( stageable == null )
142             {
143                 break;
144             }
145             stageable.stage( stageHandler );
146         }
147     }
148 
149     /**
150      * {@inheritDoc}
151      */
152     @Override
153     public Class<A> getStage()
154     {
155         return stage;
156     }
157 
158     /**
159      * specifies ordering for a {@link DefaultStager}
160      */
161     public static enum Order
162     {
163         /**
164          * FIFO
165          */
166         FIRST_IN_FIRST_OUT,
167 
168         /**
169          * FILO/LIFO
170          */
171         FIRST_IN_LAST_OUT
172     }
173 
174     private static class CloseableStageable extends AbstractStageable<Closeable>
175     {
176 
177         public CloseableStageable( Closeable closeable )
178         {
179             super( closeable );
180         }
181 
182         @Override
183         protected void doStage() throws Exception
184         {
185             object.close();
186         }
187 
188     }
189 
190     private static class ExecutorServiceStageable extends AbstractStageable<ExecutorService>
191     {
192 
193         public ExecutorServiceStageable( ExecutorService executor )
194         {
195             super( executor );
196         }
197 
198         @Override
199         protected void doStage() throws Exception
200         {
201             object.shutdown();
202             try
203             {
204                 if ( !object.awaitTermination( 1, TimeUnit.MINUTES ) )
205                 {
206                     object.shutdownNow();
207                 }
208             }
209             catch ( InterruptedException e )
210             {
211                 object.shutdownNow();
212                 Thread.currentThread().interrupt();
213             }
214         }
215 
216     }
217 
218 }