1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.nio.channels.ClosedSelectorException;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.Selector;
35 import java.util.Set;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.core5.function.Callback;
40 import org.apache.hc.core5.io.CloseMode;
41 import org.apache.hc.core5.io.Closer;
42 import org.apache.hc.core5.util.Args;
43 import org.apache.hc.core5.util.TimeValue;
44
45 abstract class AbstractSingleCoreIOReactor implements IOReactor {
46
47 private final Callback<Exception> exceptionCallback;
48 private final AtomicReference<IOReactorStatus> status;
49 private final AtomicBoolean terminated;
50 private final Object shutdownMutex;
51
52 final Selector selector;
53
54 AbstractSingleCoreIOReactor(final Callback<Exception> exceptionCallback) {
55 super();
56 this.exceptionCallback = exceptionCallback;
57 this.shutdownMutex = new Object();
58 this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
59 this.terminated = new AtomicBoolean();
60 try {
61 this.selector = Selector.open();
62 } catch (final IOException ex) {
63 throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
64 }
65 }
66
67 @Override
68 public final IOReactorStatus getStatus() {
69 return this.status.get();
70 }
71
72 void logException(final Exception ex) {
73 if (exceptionCallback != null) {
74 exceptionCallback.execute(ex);
75 }
76 }
77
78 abstract void doExecute() throws IOException;
79
80 abstract void doTerminate() throws IOException;
81
82 public void execute() {
83 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
84 try {
85 doExecute();
86 } catch (final ClosedSelectorException ignore) {
87
88 } catch (final Exception ex) {
89 logException(ex);
90 } finally {
91 try {
92 doTerminate();
93 } catch (final Exception ex) {
94 logException(ex);
95 } finally {
96 close(CloseMode.IMMEDIATE);
97 }
98 }
99 }
100 }
101
102 @Override
103 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
104 Args.notNull(waitTime, "Wait time");
105 final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
106 long remaining = waitTime.toMilliseconds();
107 synchronized (this.shutdownMutex) {
108 while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
109 this.shutdownMutex.wait(remaining);
110 remaining = deadline - System.currentTimeMillis();
111 if (remaining <= 0) {
112 return;
113 }
114 }
115 }
116 }
117
118 @Override
119 public final void initiateShutdown() {
120 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
121 synchronized (this.shutdownMutex) {
122 this.shutdownMutex.notifyAll();
123 }
124 } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
125 this.selector.wakeup();
126 }
127 }
128
129 @Override
130 public final void close(final CloseMode closeMode) {
131 if (closeMode == CloseMode.GRACEFUL) {
132 initiateShutdown();
133 try {
134 awaitShutdown(TimeValue.ofSeconds(5));
135 } catch (final InterruptedException e) {
136 Thread.currentThread().interrupt();
137 }
138 }
139 this.status.set(IOReactorStatus.SHUT_DOWN);
140 if (terminated.compareAndSet(false, true)) {
141 try {
142 final Set<SelectionKey> keys = this.selector.keys();
143 for (final SelectionKey key : keys) {
144 try {
145 Closer.close((Closeable) key.attachment());
146 } catch (final IOException ex) {
147 logException(ex);
148 }
149 key.channel().close();
150 }
151 selector.close();
152 } catch (final Exception ex) {
153 logException(ex);
154 }
155 }
156 synchronized (this.shutdownMutex) {
157 this.shutdownMutex.notifyAll();
158 }
159 }
160
161 @Override
162 public final void close() {
163 close(CloseMode.GRACEFUL);
164 }
165
166 @Override
167 public String toString() {
168 return super.toString() + " [status=" + status + "]";
169 }
170
171 }