1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.channels.Channel;
33 import java.nio.channels.SelectionKey;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.http.annotation.Contract;
37 import org.apache.http.annotation.ThreadingBehavior;
38 import org.apache.http.nio.reactor.IOSession;
39 import org.apache.http.nio.reactor.SessionRequest;
40 import org.apache.http.nio.reactor.SessionRequestCallback;
41 import org.apache.http.util.Args;
42
43
44
45
46
47
48 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
49 public class SessionRequestImpl implements SessionRequest {
50
51 enum SessionRequestState {
52 ACTIVE,
53 SUCCESSFUL,
54 TIMEDOUT,
55 CANCELLED,
56 FAILED,
57 }
58
59 private final SocketAddress remoteAddress;
60 private final SocketAddress localAddress;
61 private final Object attachment;
62 private final SessionRequestCallback callback;
63 private final AtomicReference<SessionRequestState> state;
64
65 private volatile SelectionKey key;
66
67 private volatile int connectTimeout;
68 private volatile IOSession session = null;
69 private volatile IOException exception = null;
70
71 public SessionRequestImpl(
72 final SocketAddress remoteAddress,
73 final SocketAddress localAddress,
74 final Object attachment,
75 final SessionRequestCallback callback) {
76 super();
77 Args.notNull(remoteAddress, "Remote address");
78 this.remoteAddress = remoteAddress;
79 this.localAddress = localAddress;
80 this.attachment = attachment;
81 this.callback = callback;
82 this.state = new AtomicReference<SessionRequestState>(SessionRequestState.ACTIVE);
83 }
84
85 @Override
86 public SocketAddress getRemoteAddress() {
87 return this.remoteAddress;
88 }
89
90 @Override
91 public SocketAddress getLocalAddress() {
92 return this.localAddress;
93 }
94
95 @Override
96 public Object getAttachment() {
97 return this.attachment;
98 }
99
100 @Override
101 public boolean isCompleted() {
102 return this.state.get().compareTo(SessionRequestState.ACTIVE) != 0;
103 }
104
105 boolean isTerminated() {
106 return this.state.get().compareTo(SessionRequestState.SUCCESSFUL) > 0;
107 }
108
109 protected void setKey(final SelectionKey key) {
110 this.key = key;
111 if (this.isCompleted()) {
112 key.cancel();
113 final Channel channel = key.channel();
114 if (channel.isOpen()) {
115 try {
116 channel.close();
117 } catch (final IOException ignore) {}
118 }
119 }
120 }
121
122 @Override
123 public void waitFor() throws InterruptedException {
124 if (this.isCompleted()) {
125 return;
126 }
127 synchronized (this) {
128 while (!this.isCompleted()) {
129 wait();
130 }
131 }
132 }
133
134 @Override
135 public IOSession getSession() {
136 synchronized (this) {
137 return this.session;
138 }
139 }
140
141 @Override
142 public IOException getException() {
143 synchronized (this) {
144 return this.exception;
145 }
146 }
147
148 public void completed(final IOSession session) {
149 Args.notNull(session, "Session");
150 if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.SUCCESSFUL)) {
151 synchronized (this) {
152 this.session = session;
153 if (this.callback != null) {
154 this.callback.completed(this);
155 }
156 notifyAll();
157 }
158 }
159 }
160
161 public void failed(final IOException exception) {
162 if (exception == null) {
163 return;
164 }
165 if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.FAILED)) {
166 final SelectionKey key = this.key;
167 if (key != null) {
168 key.cancel();
169 final Channel channel = key.channel();
170 try {
171 channel.close();
172 } catch (final IOException ignore) {}
173 }
174 synchronized (this) {
175 this.exception = exception;
176 if (this.callback != null) {
177 this.callback.failed(this);
178 }
179 notifyAll();
180 }
181 }
182 }
183
184 public void timeout() {
185 if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.TIMEDOUT)) {
186 final SelectionKey key = this.key;
187 if (key != null) {
188 key.cancel();
189 final Channel channel = key.channel();
190 if (channel.isOpen()) {
191 try {
192 channel.close();
193 } catch (final IOException ignore) {}
194 }
195 }
196 synchronized (this) {
197 if (this.callback != null) {
198 this.callback.timeout(this);
199 }
200 }
201 }
202 }
203
204 @Override
205 public int getConnectTimeout() {
206 return this.connectTimeout;
207 }
208
209 @Override
210 public void setConnectTimeout(final int timeout) {
211 if (this.connectTimeout != timeout) {
212 this.connectTimeout = timeout;
213 final SelectionKey key = this.key;
214 if (key != null) {
215 key.selector().wakeup();
216 }
217 }
218 }
219
220 @Override
221 public void cancel() {
222 if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.CANCELLED)) {
223 final SelectionKey key = this.key;
224 if (key != null) {
225 key.cancel();
226 final Channel channel = key.channel();
227 if (channel.isOpen()) {
228 try {
229 channel.close();
230 } catch (final IOException ignore) {}
231 }
232 }
233 synchronized (this) {
234 if (this.callback != null) {
235 this.callback.cancelled(this);
236 }
237 notifyAll();
238 }
239 }
240 }
241
242 }