1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.hc.core5.io.CloseMode;
35 import org.apache.hc.core5.io.Closer;
36 import org.apache.hc.core5.util.Args;
37 import org.apache.hc.core5.util.TimeValue;
38
39 class MultiCoreIOReactor implements IOReactor {
40
41 private final IOReactor[] ioReactors;
42 private final Thread[] threads;
43 private final AtomicReference<IOReactorStatus> status;
44 private final AtomicBoolean terminated;
45
46 MultiCoreIOReactor(final IOReactor[] ioReactors, final Thread[] threads) {
47 super();
48 this.ioReactors = ioReactors.clone();
49 this.threads = threads.clone();
50 this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
51 this.terminated = new AtomicBoolean();
52 }
53
54 @Override
55 public IOReactorStatus getStatus() {
56 return this.status.get();
57 }
58
59
60
61
62
63
64
65
66 public final void start() {
67 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
68 for (int i = 0; i < this.threads.length; i++) {
69 this.threads[i].start();
70 }
71 }
72 }
73
74 @Override
75 public final void initiateShutdown() {
76 if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN) ||
77 this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
78 for (int i = 0; i < this.ioReactors.length; i++) {
79 final IOReactor ioReactor = this.ioReactors[i];
80 ioReactor.initiateShutdown();
81 }
82 }
83 }
84
85 @Override
86 public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
87 Args.notNull(waitTime, "Wait time");
88 final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
89 long remaining = waitTime.toMilliseconds();
90 for (int i = 0; i < this.ioReactors.length; i++) {
91 final IOReactor ioReactor = this.ioReactors[i];
92 if (ioReactor.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
93 ioReactor.awaitShutdown(TimeValue.of(remaining, TimeUnit.MILLISECONDS));
94 remaining = deadline - System.currentTimeMillis();
95 if (remaining <= 0) {
96 return;
97 }
98 }
99 }
100 for (int i = 0; i < this.threads.length; i++) {
101 final Thread thread = this.threads[i];
102 thread.join(remaining);
103 remaining = deadline - System.currentTimeMillis();
104 if (remaining <= 0) {
105 return;
106 }
107 }
108 }
109
110 @Override
111 public final void close(final CloseMode closeMode) {
112 if (closeMode == CloseMode.GRACEFUL) {
113 initiateShutdown();
114 try {
115 awaitShutdown(TimeValue.ofSeconds(5));
116 } catch (final InterruptedException e) {
117 Thread.currentThread().interrupt();
118 }
119 }
120 this.status.set(IOReactorStatus.SHUT_DOWN);
121 if (this.terminated.compareAndSet(false, true)) {
122 for (int i = 0; i < this.ioReactors.length; i++) {
123 Closer.close(this.ioReactors[i], CloseMode.IMMEDIATE);
124 }
125 for (int i = 0; i < this.threads.length; i++) {
126 this.threads[i].interrupt();
127 }
128 }
129 }
130
131 @Override
132 public final void close() {
133 close(CloseMode.GRACEFUL);
134 }
135
136 @Override
137 public String toString() {
138 return getClass().getSimpleName() + " [status=" + status + "]";
139 }
140
141 }