1 package org.apache.onami.lifecycle.core;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.Closeable;
23 import java.lang.annotation.Annotation;
24 import java.util.ArrayDeque;
25 import java.util.Collections;
26 import java.util.Queue;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.TimeUnit;
29
30
31
32
33 public class DefaultStager<A extends Annotation>
34 implements DisposingStager<A>
35 {
36 private final Class<A> stage;
37
38
39
40
41 private final Queue<Stageable> stageables;
42
43
44
45
46 public DefaultStager( Class<A> stage )
47 {
48 this( stage, Order.FIRST_IN_FIRST_OUT );
49 }
50
51
52
53
54
55 public DefaultStager( Class<A> stage, Order mode )
56 {
57 this.stage = stage;
58
59 Queue<Stageable> localStageables;
60 switch ( mode )
61 {
62 case FIRST_IN_FIRST_OUT:
63 {
64 localStageables = new ArrayDeque<Stageable>();
65 break;
66 }
67
68 case FIRST_IN_LAST_OUT:
69 {
70 localStageables = Collections.asLifoQueue( new ArrayDeque<Stageable>() );
71 break;
72 }
73
74 default:
75 {
76 throw new IllegalArgumentException( "Unknown mode: " + mode );
77 }
78 }
79 stageables = localStageables;
80 }
81
82
83
84
85 @Override
86 public void register( Stageable stageable )
87 {
88 synchronized ( stageables )
89 {
90 stageables.add( stageable );
91 }
92 }
93
94
95
96
97 @Override
98 public <T extends ExecutorService> T register( T executorService )
99 {
100 register( new ExecutorServiceStageable( executorService ) );
101 return executorService;
102 }
103
104
105
106
107 @Override
108 public <T extends Closeable> T register( T closeable )
109 {
110 register( new CloseableStageable( closeable ) );
111 return closeable;
112 }
113
114
115
116
117 @Override
118 public void stage()
119 {
120 stage( null );
121 }
122
123
124
125
126 @Override
127 public void stage( StageHandler stageHandler )
128 {
129 if ( stageHandler == null )
130 {
131 stageHandler = new NoOpStageHandler();
132 }
133
134 while ( true )
135 {
136 Stageable stageable;
137 synchronized ( stageables )
138 {
139 stageable = stageables.poll();
140 }
141 if ( stageable == null )
142 {
143 break;
144 }
145 stageable.stage( stageHandler );
146 }
147 }
148
149
150
151
152 @Override
153 public Class<A> getStage()
154 {
155 return stage;
156 }
157
158
159
160
161 public static enum Order
162 {
163
164
165
166 FIRST_IN_FIRST_OUT,
167
168
169
170
171 FIRST_IN_LAST_OUT
172 }
173
174 private static class CloseableStageable extends AbstractStageable<Closeable>
175 {
176
177 public CloseableStageable( Closeable closeable )
178 {
179 super( closeable );
180 }
181
182 @Override
183 protected void doStage() throws Exception
184 {
185 object.close();
186 }
187
188 }
189
190 private static class ExecutorServiceStageable extends AbstractStageable<ExecutorService>
191 {
192
193 public ExecutorServiceStageable( ExecutorService executor )
194 {
195 super( executor );
196 }
197
198 @Override
199 protected void doStage() throws Exception
200 {
201 object.shutdown();
202 try
203 {
204 if ( !object.awaitTermination( 1, TimeUnit.MINUTES ) )
205 {
206 object.shutdownNow();
207 }
208 }
209 catch ( InterruptedException e )
210 {
211 object.shutdownNow();
212 Thread.currentThread().interrupt();
213 }
214 }
215
216 }
217
218 }