1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.nio.channels.ClosedSelectorException;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.Selector;
35 import java.util.Set;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.ReentrantLock;
41
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.io.CloseMode;
44 import org.apache.hc.core5.io.Closer;
45 import org.apache.hc.core5.util.Args;
46 import org.apache.hc.core5.util.TimeValue;
47 import org.apache.hc.core5.util.Timeout;
48
49 abstract class AbstractSingleCoreIOReactor implements IOReactor {
50
51 private final Callback<Exception> exceptionCallback;
52 private final AtomicReference<IOReactorStatus> status;
53 private final AtomicBoolean terminated;
54 private final Condition condition;
55
56 private final ReentrantLock lock;
57
58 final Selector selector;
59
60 AbstractSingleCoreIOReactor(final Callback<Exception> exceptionCallback) {
61 super();
62 this.exceptionCallback = exceptionCallback;
63 this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
64 this.terminated = new AtomicBoolean();
65 try {
66 this.selector = Selector.open();
67 } catch (final IOException ex) {
68 throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
69 }
70 this.lock = new ReentrantLock();
71 this.condition = lock.newCondition();
72 }
73
74 @Override
75 public final IOReactorStatus getStatus() {
76 return this.status.get();
77 }
78
79 void logException(final Exception ex) {
80 if (exceptionCallback != null) {
81 exceptionCallback.execute(ex);
82 }
83 }
84
85 abstract void doExecute() throws IOException;
86
87 abstract void doTerminate() throws IOException;
88
89 public void execute() {
90 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
91 try {
92 doExecute();
93 } catch (final ClosedSelectorException ignore) {
94
95 } catch (final Exception ex) {
96 logException(ex);
97 } finally {
98 try {
99 doTerminate();
100 } catch (final Exception ex) {
101 logException(ex);
102 } finally {
103 close(CloseMode.IMMEDIATE);
104 }
105 }
106 }
107 }
108
109 @Override
110 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
111 Args.notNull(waitTime, "Wait time");
112 final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
113 long remaining = waitTime.toMilliseconds();
114 lock.lock();
115 try {
116 while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
117 condition.await(remaining, TimeUnit.MILLISECONDS);
118 remaining = deadline - System.currentTimeMillis();
119 if (remaining <= 0) {
120 return;
121 }
122 }
123 } finally {
124 lock.unlock();
125 }
126 }
127
128 @Override
129 public final void initiateShutdown() {
130 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
131 lock.lock();
132 try {
133 condition.signalAll();
134 } finally {
135 lock.unlock();
136 }
137 } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
138 this.selector.wakeup();
139 }
140 }
141
142 @Override
143 public final void close(final CloseMode closeMode) {
144 close(closeMode, Timeout.ofSeconds(5));
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158 public void close(final CloseMode closeMode, final Timeout timeout) {
159 if (closeMode == CloseMode.GRACEFUL) {
160 initiateShutdown();
161 try {
162 awaitShutdown(timeout);
163 } catch (final InterruptedException e) {
164 Thread.currentThread().interrupt();
165 }
166 }
167 this.status.set(IOReactorStatus.SHUT_DOWN);
168 if (terminated.compareAndSet(false, true)) {
169 try {
170 final Set<SelectionKey> keys = this.selector.keys();
171 for (final SelectionKey key : keys) {
172 try {
173 Closer.close((Closeable) key.attachment());
174 } catch (final IOException ex) {
175 logException(ex);
176 }
177 key.channel().close();
178 }
179 selector.close();
180 } catch (final Exception ex) {
181 logException(ex);
182 }
183 }
184 lock.lock();
185 try {
186 condition.signalAll();
187 } finally {
188 lock.unlock();
189 }
190 }
191
192 @Override
193 public final void close() {
194 close(CloseMode.GRACEFUL);
195 }
196
197 @Override
198 public String toString() {
199 return super.toString() + " [status=" + status + "]";
200 }
201
202 }