1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.nio.channels.ClosedSelectorException;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.Selector;
35 import java.util.Set;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.core5.function.Callback;
40 import org.apache.hc.core5.io.CloseMode;
41 import org.apache.hc.core5.io.Closer;
42 import org.apache.hc.core5.util.Args;
43 import org.apache.hc.core5.util.TimeValue;
44 import org.apache.hc.core5.util.Timeout;
45
46 abstract class AbstractSingleCoreIOReactor implements IOReactor {
47
48 private final Callback<Exception> exceptionCallback;
49 private final AtomicReference<IOReactorStatus> status;
50 private final AtomicBoolean terminated;
51 private final Object shutdownMutex;
52
53 final Selector selector;
54
55 AbstractSingleCoreIOReactor(final Callback<Exception> exceptionCallback) {
56 super();
57 this.exceptionCallback = exceptionCallback;
58 this.shutdownMutex = new Object();
59 this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
60 this.terminated = new AtomicBoolean();
61 try {
62 this.selector = Selector.open();
63 } catch (final IOException ex) {
64 throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
65 }
66 }
67
68 @Override
69 public final IOReactorStatus getStatus() {
70 return this.status.get();
71 }
72
73 void logException(final Exception ex) {
74 if (exceptionCallback != null) {
75 exceptionCallback.execute(ex);
76 }
77 }
78
79 abstract void doExecute() throws IOException;
80
81 abstract void doTerminate() throws IOException;
82
83 public void execute() {
84 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
85 try {
86 doExecute();
87 } catch (final ClosedSelectorException ignore) {
88
89 } catch (final Exception ex) {
90 logException(ex);
91 } finally {
92 try {
93 doTerminate();
94 } catch (final Exception ex) {
95 logException(ex);
96 } finally {
97 close(CloseMode.IMMEDIATE);
98 }
99 }
100 }
101 }
102
103 @Override
104 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
105 Args.notNull(waitTime, "Wait time");
106 final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
107 long remaining = waitTime.toMilliseconds();
108 synchronized (this.shutdownMutex) {
109 while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
110 this.shutdownMutex.wait(remaining);
111 remaining = deadline - System.currentTimeMillis();
112 if (remaining <= 0) {
113 return;
114 }
115 }
116 }
117 }
118
119 @Override
120 public final void initiateShutdown() {
121 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
122 synchronized (this.shutdownMutex) {
123 this.shutdownMutex.notifyAll();
124 }
125 } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
126 this.selector.wakeup();
127 }
128 }
129
130 @Override
131 public final void close(final CloseMode closeMode) {
132 close(closeMode, Timeout.ofSeconds(5));
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146 public void close(final CloseMode closeMode, final Timeout timeout) {
147 if (closeMode == CloseMode.GRACEFUL) {
148 initiateShutdown();
149 try {
150 awaitShutdown(timeout);
151 } catch (final InterruptedException e) {
152 Thread.currentThread().interrupt();
153 }
154 }
155 this.status.set(IOReactorStatus.SHUT_DOWN);
156 if (terminated.compareAndSet(false, true)) {
157 try {
158 final Set<SelectionKey> keys = this.selector.keys();
159 for (final SelectionKey key : keys) {
160 try {
161 Closer.close((Closeable) key.attachment());
162 } catch (final IOException ex) {
163 logException(ex);
164 }
165 key.channel().close();
166 }
167 selector.close();
168 } catch (final Exception ex) {
169 logException(ex);
170 }
171 }
172 synchronized (this.shutdownMutex) {
173 this.shutdownMutex.notifyAll();
174 }
175 }
176
177 @Override
178 public final void close() {
179 close(CloseMode.GRACEFUL);
180 }
181
182 @Override
183 public String toString() {
184 return super.toString() + " [status=" + status + "]";
185 }
186
187 }