View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.channels.Channel;
33  import java.nio.channels.SelectionKey;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.http.annotation.Contract;
37  import org.apache.http.annotation.ThreadingBehavior;
38  import org.apache.http.nio.reactor.IOSession;
39  import org.apache.http.nio.reactor.SessionRequest;
40  import org.apache.http.nio.reactor.SessionRequestCallback;
41  import org.apache.http.util.Args;
42  
43  /**
44   * Default implementation of {@link SessionRequest}.
45   *
46   * @since 4.0
47   */
48  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
49  public class SessionRequestImpl implements SessionRequest {
50  
51      enum SessionRequestState {
52          ACTIVE,
53          SUCCESSFUL,
54          TIMEDOUT,
55          CANCELLED,
56          FAILED,
57      }
58  
59      private final SocketAddress remoteAddress;
60      private final SocketAddress localAddress;
61      private final Object attachment;
62      private final SessionRequestCallback callback;
63      private final AtomicReference<SessionRequestState> state;
64  
65      private volatile SelectionKey key;
66  
67      private volatile int connectTimeout;
68      private volatile IOSession session = null;
69      private volatile IOException exception = null;
70  
71      public SessionRequestImpl(
72              final SocketAddress remoteAddress,
73              final SocketAddress localAddress,
74              final Object attachment,
75              final SessionRequestCallback callback) {
76          super();
77          Args.notNull(remoteAddress, "Remote address");
78          this.remoteAddress = remoteAddress;
79          this.localAddress = localAddress;
80          this.attachment = attachment;
81          this.callback = callback;
82          this.state = new AtomicReference<SessionRequestState>(SessionRequestState.ACTIVE);
83      }
84  
85      @Override
86      public SocketAddress getRemoteAddress() {
87          return this.remoteAddress;
88      }
89  
90      @Override
91      public SocketAddress getLocalAddress() {
92          return this.localAddress;
93      }
94  
95      @Override
96      public Object getAttachment() {
97          return this.attachment;
98      }
99  
100     @Override
101     public boolean isCompleted() {
102         return this.state.get().compareTo(SessionRequestState.ACTIVE) != 0;
103     }
104 
105     boolean isTerminated() {
106         return this.state.get().compareTo(SessionRequestState.SUCCESSFUL) > 0;
107     }
108 
109     protected void setKey(final SelectionKey key) {
110         this.key = key;
111         if (this.isCompleted()) {
112             key.cancel();
113             final Channel channel = key.channel();
114             if (channel.isOpen()) {
115                 try {
116                     channel.close();
117                 } catch (final IOException ignore) {}
118             }
119         }
120     }
121 
122     @Override
123     public void waitFor() throws InterruptedException {
124         if (this.isCompleted()) {
125             return;
126         }
127         synchronized (this) {
128             while (!this.isCompleted()) {
129                 wait();
130             }
131         }
132     }
133 
134     @Override
135     public IOSession getSession() {
136         synchronized (this) {
137             return this.session;
138         }
139     }
140 
141     @Override
142     public IOException getException() {
143         synchronized (this) {
144             return this.exception;
145         }
146     }
147 
148     public void completed(final IOSession session) {
149         Args.notNull(session, "Session");
150         if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.SUCCESSFUL)) {
151             synchronized (this) {
152                 this.session = session;
153                 if (this.callback != null) {
154                     this.callback.completed(this);
155                 }
156                 notifyAll();
157             }
158         }
159     }
160 
161     public void failed(final IOException exception) {
162         if (exception == null) {
163             return;
164         }
165         if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.FAILED)) {
166             final SelectionKey key = this.key;
167             if (key != null) {
168                 key.cancel();
169                 final Channel channel = key.channel();
170                 try {
171                     channel.close();
172                 } catch (final IOException ignore) {}
173             }
174             synchronized (this) {
175                 this.exception = exception;
176                 if (this.callback != null) {
177                     this.callback.failed(this);
178                 }
179                 notifyAll();
180             }
181         }
182     }
183 
184     public void timeout() {
185         if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.TIMEDOUT)) {
186             final SelectionKey key = this.key;
187             if (key != null) {
188                 key.cancel();
189                 final Channel channel = key.channel();
190                 if (channel.isOpen()) {
191                     try {
192                         channel.close();
193                     } catch (final IOException ignore) {}
194                 }
195             }
196             synchronized (this) {
197                 if (this.callback != null) {
198                     this.callback.timeout(this);
199                 }
200             }
201         }
202     }
203 
204     @Override
205     public int getConnectTimeout() {
206         return this.connectTimeout;
207     }
208 
209     @Override
210     public void setConnectTimeout(final int timeout) {
211         if (this.connectTimeout != timeout) {
212             this.connectTimeout = timeout;
213             final SelectionKey key = this.key;
214             if (key != null) {
215                 key.selector().wakeup();
216             }
217         }
218     }
219 
220     @Override
221     public void cancel() {
222         if (this.state.compareAndSet(SessionRequestState.ACTIVE, SessionRequestState.CANCELLED)) {
223             final SelectionKey key = this.key;
224             if (key != null) {
225                 key.cancel();
226                 final Channel channel = key.channel();
227                 if (channel.isOpen()) {
228                     try {
229                         channel.close();
230                     } catch (final IOException ignore) {}
231                 }
232             }
233             synchronized (this) {
234                 if (this.callback != null) {
235                     this.callback.cancelled(this);
236                 }
237                 notifyAll();
238             }
239         }
240     }
241 
242 }