1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.net.SocketException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.SocketChannel;
37 import java.util.Deque;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.concurrent.atomic.AtomicReference;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.io.Closer;
46 import org.apache.hc.core5.util.Args;
47 import org.apache.hc.core5.util.Timeout;
48
49 class IOSessionImpl implements IOSession {
50
51
52 private final static AtomicLong COUNT = new AtomicLong(0);
53
54 private final SelectionKey key;
55 private final SocketChannel channel;
56 private final Deque<Command> commandQueue;
57 private final Lock lock;
58 private final String id;
59 private final AtomicReference<IOEventHandler> handlerRef;
60 private final AtomicReference<IOSession.Status> status;
61
62 private volatile Timeout socketTimeout;
63 private volatile long lastReadTime;
64 private volatile long lastWriteTime;
65 private volatile long lastEventTime;
66
67 public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
68 super();
69 this.key = Args.notNull(key, "Selection key");
70 this.channel = Args.notNull(socketChannel, "Socket channel");
71 this.commandQueue = new ConcurrentLinkedDeque<>();
72 this.lock = new ReentrantLock();
73 this.socketTimeout = Timeout.DISABLED;
74 this.id = String.format(type + "-%08X", COUNT.getAndIncrement());
75 this.handlerRef = new AtomicReference<>();
76 this.status = new AtomicReference<>(Status.ACTIVE);
77 final long currentTimeMillis = System.currentTimeMillis();
78 this.lastReadTime = currentTimeMillis;
79 this.lastWriteTime = currentTimeMillis;
80 this.lastEventTime = currentTimeMillis;
81 }
82
83 @Override
84 public String getId() {
85 return id;
86 }
87
88 @Override
89 public IOEventHandler getHandler() {
90 return handlerRef.get();
91 }
92
93 @Override
94 public void upgrade(final IOEventHandler handler) {
95 handlerRef.set(handler);
96 }
97
98 @Override
99 public Lock getLock() {
100 return lock;
101 }
102
103 @Override
104 public void enqueue(final Command command, final Command.Priority priority) {
105 if (priority == Command.Priority.IMMEDIATE) {
106 commandQueue.addFirst(command);
107 } else {
108 commandQueue.add(command);
109 }
110 setEvent(SelectionKey.OP_WRITE);
111 }
112
113 @Override
114 public boolean hasCommands() {
115 return !commandQueue.isEmpty();
116 }
117
118 @Override
119 public Command poll() {
120 return commandQueue.poll();
121 }
122
123 @Override
124 public ByteChannel channel() {
125 return this.channel;
126 }
127
128 @Override
129 public SocketAddress getLocalAddress() {
130 return this.channel.socket().getLocalSocketAddress();
131 }
132
133 @Override
134 public SocketAddress getRemoteAddress() {
135 return this.channel.socket().getRemoteSocketAddress();
136 }
137
138 @Override
139 public int getEventMask() {
140 return this.key.interestOps();
141 }
142
143 @Override
144 public void setEventMask(final int newValue) {
145 if (isStatusClosed()) {
146 return;
147 }
148 this.key.interestOps(newValue);
149 this.key.selector().wakeup();
150 }
151
152 @Override
153 public void setEvent(final int op) {
154 if (isStatusClosed()) {
155 return;
156 }
157 lock.lock();
158 try {
159 this.key.interestOps(this.key.interestOps() | op);
160 } finally {
161 lock.unlock();
162 }
163 this.key.selector().wakeup();
164 }
165
166 @Override
167 public void clearEvent(final int op) {
168 if (isStatusClosed()) {
169 return;
170 }
171 lock.lock();
172 try {
173 this.key.interestOps(this.key.interestOps() & ~op);
174 } finally {
175 lock.unlock();
176 }
177 this.key.selector().wakeup();
178 }
179
180 @Override
181 public Timeout getSocketTimeout() {
182 return this.socketTimeout;
183 }
184
185 @Override
186 public void setSocketTimeout(final Timeout timeout) {
187 this.socketTimeout = Timeout.defaultsToDisabled(timeout);
188 this.lastEventTime = System.currentTimeMillis();
189 }
190
191 @Override
192 public int read(final ByteBuffer dst) throws IOException {
193 return this.channel.read(dst);
194 }
195
196 @Override
197 public int write(final ByteBuffer src) throws IOException {
198 return this.channel.write(src);
199 }
200
201 @Override
202 public void updateReadTime() {
203 lastReadTime = System.currentTimeMillis();
204 lastEventTime = lastReadTime;
205 }
206
207 @Override
208 public void updateWriteTime() {
209 lastWriteTime = System.currentTimeMillis();
210 lastEventTime = lastWriteTime;
211 }
212
213 @Override
214 public long getLastReadTime() {
215 return lastReadTime;
216 }
217
218 @Override
219 public long getLastWriteTime() {
220 return lastWriteTime;
221 }
222
223 @Override
224 public long getLastEventTime() {
225 return lastEventTime;
226 }
227
228 @Override
229 public Status getStatus() {
230 return this.status.get();
231 }
232
233 private boolean isStatusClosed() {
234 return this.status.get() == Status.CLOSED;
235 }
236
237 @Override
238 public boolean isOpen() {
239 return this.status.get() == Status.ACTIVE && this.channel.isOpen();
240 }
241
242 @Override
243 public void close() {
244 close(CloseMode.GRACEFUL);
245 }
246
247 @Override
248 public void close(final CloseMode closeMode) {
249 if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
250 if (closeMode == CloseMode.IMMEDIATE) {
251 try {
252 this.channel.socket().setSoLinger(true, 0);
253 } catch (final SocketException e) {
254
255 }
256 }
257 this.key.cancel();
258 this.key.attach(null);
259 Closer.closeQuietly(this.key.channel());
260 if (this.key.selector().isOpen()) {
261 this.key.selector().wakeup();
262 }
263 }
264 }
265
266 private static void formatOps(final StringBuilder buffer, final int ops) {
267 if ((ops & SelectionKey.OP_READ) > 0) {
268 buffer.append('r');
269 }
270 if ((ops & SelectionKey.OP_WRITE) > 0) {
271 buffer.append('w');
272 }
273 if ((ops & SelectionKey.OP_ACCEPT) > 0) {
274 buffer.append('a');
275 }
276 if ((ops & SelectionKey.OP_CONNECT) > 0) {
277 buffer.append('c');
278 }
279 }
280
281 @Override
282 public String toString() {
283 final StringBuilder buffer = new StringBuilder();
284 buffer.append(id).append("[");
285 buffer.append(this.status);
286 buffer.append("][");
287 if (this.key.isValid()) {
288 formatOps(buffer, this.key.interestOps());
289 buffer.append(":");
290 formatOps(buffer, this.key.readyOps());
291 }
292 buffer.append("]");
293 return buffer.toString();
294 }
295
296 }