1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.io.CloseMode;
35 import org.apache.hc.core5.io.Closer;
36 import org.apache.hc.core5.util.Args;
37 import org.apache.hc.core5.util.TimeValue;
38 import org.apache.hc.core5.util.Timeout;
39
40 class MultiCoreIOReactor implements IOReactor {
41
42 private final IOReactor[] ioReactors;
43 private final Thread[] threads;
44 private final AtomicReference<IOReactorStatus> status;
45 private final AtomicBoolean terminated;
46
47 MultiCoreIOReactor(final IOReactor[] ioReactors, final Thread[] threads) {
48 super();
49 this.ioReactors = ioReactors.clone();
50 this.threads = threads.clone();
51 this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
52 this.terminated = new AtomicBoolean();
53 }
54
55 @Override
56 public IOReactorStatus getStatus() {
57 return this.status.get();
58 }
59
60
61
62
63
64
65
66
67 public final void start() {
68 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
69 for (int i = 0; i < this.threads.length; i++) {
70 this.threads[i].start();
71 }
72 }
73 }
74
75 @Override
76 public final void initiateShutdown() {
77 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN) ||
78 this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
79 for (int i = 0; i < this.ioReactors.length; i++) {
80 final IOReactor ioReactor = this.ioReactors[i];
81 ioReactor.initiateShutdown();
82 }
83 }
84 }
85
86 @Override
87 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
88 Args.notNull(waitTime, "Wait time");
89 final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
90 long remaining = waitTime.toMilliseconds();
91 for (int i = 0; i < this.ioReactors.length; i++) {
92 final IOReactor ioReactor = this.ioReactors[i];
93 if (ioReactor.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
94 ioReactor.awaitShutdown(TimeValue.of(remaining, TimeUnit.MILLISECONDS));
95 remaining = deadline - System.currentTimeMillis();
96 if (remaining <= 0) {
97 return;
98 }
99 }
100 }
101 for (int i = 0; i < this.threads.length; i++) {
102 final Thread thread = this.threads[i];
103 thread.join(remaining);
104 remaining = deadline - System.currentTimeMillis();
105 if (remaining <= 0) {
106 return;
107 }
108 }
109 }
110
111 @Override
112 public final void close(final CloseMode closeMode) {
113 close(closeMode, Timeout.ofSeconds(5));
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127 public void close(final CloseMode closeMode, final Timeout timeout) {
128 if (closeMode == CloseMode.GRACEFUL) {
129 initiateShutdown();
130 try {
131 awaitShutdown(timeout);
132 } catch (final InterruptedException e) {
133 Thread.currentThread().interrupt();
134 }
135 }
136 this.status.set(IOReactorStatus.SHUT_DOWN);
137 if (this.terminated.compareAndSet(false, true)) {
138 for (int i = 0; i < this.ioReactors.length; i++) {
139 Closer.close(this.ioReactors[i], CloseMode.IMMEDIATE);
140 }
141 for (int i = 0; i < this.threads.length; i++) {
142 this.threads[i].interrupt();
143 }
144 }
145 }
146
147 @Override
148 public final void close() {
149 close(CloseMode.GRACEFUL);
150 }
151
152 @Override
153 public String toString() {
154 return getClass().getSimpleName() + " [status=" + status + "]";
155 }
156
157 }