1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.InterruptedIOException;
31 import java.nio.channels.CancelledKeyException;
32 import java.nio.channels.SelectionKey;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.Set;
36
37 import org.apache.http.nio.reactor.EventMask;
38 import org.apache.http.nio.reactor.IOEventDispatch;
39 import org.apache.http.nio.reactor.IOReactorException;
40 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
41 import org.apache.http.nio.reactor.IOSession;
42 import org.apache.http.util.Args;
43
44 /**
45 * Default implementation of {@link AbstractIOReactor} that serves as a base
46 * for more advanced {@link org.apache.http.nio.reactor.IOReactor}
47 * implementations. This class adds support for the I/O event dispatching
48 * using {@link IOEventDispatch}, management of buffering sessions, and
49 * session timeout handling.
50 *
51 * @since 4.0
52 */
53 public class BaseIOReactor extends AbstractIOReactor {
54
55 private final long timeoutCheckInterval;
56 private final Set<IOSession> bufferingSessions;
57
58 private long lastTimeoutCheck;
59
60 private IOReactorExceptionHandler exceptionHandler = null;
61 private IOEventDispatch eventDispatch = null;
62
63 /**
64 * Creates new BaseIOReactor instance.
65 *
66 * @param selectTimeout the select timeout.
67 * @throws IOReactorException in case if a non-recoverable I/O error.
68 */
69 public BaseIOReactor(final long selectTimeout) throws IOReactorException {
70 this(selectTimeout, false);
71 }
72
73 /**
74 * Creates new BaseIOReactor instance.
75 *
76 * @param selectTimeout the select timeout.
77 * @param interestOpsQueueing Ops queueing flag.
78 *
79 * @throws IOReactorException in case if a non-recoverable I/O error.
80 *
81 * @since 4.1
82 */
83 public BaseIOReactor(
84 final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
85 super(selectTimeout, interestOpsQueueing);
86 this.bufferingSessions = new HashSet<IOSession>();
87 this.timeoutCheckInterval = selectTimeout;
88 this.lastTimeoutCheck = System.currentTimeMillis();
89 }
90
91 /**
92 * Activates the I/O reactor. The I/O reactor will start reacting to I/O
93 * events and dispatch I/O event notifications to the given
94 * {@link IOEventDispatch}.
95 *
96 * @throws InterruptedIOException if the dispatch thread is interrupted.
97 * @throws IOReactorException in case if a non-recoverable I/O error.
98 */
99 @Override
100 public void execute(
101 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
102 Args.notNull(eventDispatch, "Event dispatcher");
103 this.eventDispatch = eventDispatch;
104 execute();
105 }
106
107 /**
108 * Sets exception handler for this I/O reactor.
109 *
110 * @param exceptionHandler the exception handler.
111 */
112 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
113 this.exceptionHandler = exceptionHandler;
114 }
115
116 /**
117 * Handles the given {@link RuntimeException}. This method delegates
118 * handling of the exception to the {@link IOReactorExceptionHandler},
119 * if available.
120 *
121 * @param ex the runtime exception.
122 */
123 protected void handleRuntimeException(final RuntimeException ex) {
124 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
125 throw ex;
126 }
127 }
128
129 /**
130 * This I/O reactor implementation does not react to the
131 * {@link SelectionKey#OP_ACCEPT} event.
132 * <p>
133 * Super-classes can override this method to react to the event.
134 */
135 @Override
136 protected void acceptable(final SelectionKey key) {
137 }
138
139 /**
140 * This I/O reactor implementation does not react to the
141 * {@link SelectionKey#OP_CONNECT} event.
142 * <p>
143 * Super-classes can override this method to react to the event.
144 */
145 @Override
146 protected void connectable(final SelectionKey key) {
147 }
148
149 /**
150 * Processes {@link SelectionKey#OP_READ} event on the given selection key.
151 * This method dispatches the event notification to the
152 * {@link IOEventDispatch#inputReady(IOSession)} method.
153 */
154 @Override
155 protected void readable(final SelectionKey key) {
156 final IOSession session = getSession(key);
157 try {
158 // Try to gently feed more data to the event dispatcher
159 // if the session input buffer has not been fully exhausted
160 // (the choice of 5 iterations is purely arbitrary)
161 for (int i = 0; i < 5; i++) {
162 this.eventDispatch.inputReady(session);
163 if (!session.hasBufferedInput()
164 || (session.getEventMask() & SelectionKey.OP_READ) == 0) {
165 break;
166 }
167 }
168 if (session.hasBufferedInput()) {
169 this.bufferingSessions.add(session);
170 }
171 } catch (final CancelledKeyException ex) {
172 throw ex;
173 } catch (final RuntimeException ex) {
174 handleRuntimeException(ex);
175 }
176 }
177
178 /**
179 * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
180 * This method dispatches the event notification to the
181 * {@link IOEventDispatch#outputReady(IOSession)} method.
182 */
183 @Override
184 protected void writable(final SelectionKey key) {
185 final IOSession session = getSession(key);
186 try {
187 this.eventDispatch.outputReady(session);
188 } catch (final CancelledKeyException ex) {
189 throw ex;
190 } catch (final RuntimeException ex) {
191 handleRuntimeException(ex);
192 }
193 }
194
195 /**
196 * Verifies whether any of the sessions associated with the given selection
197 * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
198 * method.
199 * <p>
200 * This method will also invoke the
201 * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
202 * that have buffered input data.
203 */
204 @Override
205 protected void validate(final Set<SelectionKey> keys) {
206 final long currentTime = System.currentTimeMillis();
207 if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
208 this.lastTimeoutCheck = currentTime;
209 if (keys != null) {
210 for (final SelectionKey key : keys) {
211 timeoutCheck(key, currentTime);
212 }
213 }
214 }
215 if (!this.bufferingSessions.isEmpty()) {
216 for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
217 final IOSession session = it.next();
218 if (!session.hasBufferedInput()) {
219 it.remove();
220 continue;
221 }
222 try {
223 if ((session.getEventMask() & EventMask.READ) > 0) {
224 this.eventDispatch.inputReady(session);
225 if (!session.hasBufferedInput()) {
226 it.remove();
227 }
228 }
229 } catch (final CancelledKeyException ex) {
230 it.remove();
231 session.close();
232 } catch (final RuntimeException ex) {
233 handleRuntimeException(ex);
234 }
235 }
236 }
237 }
238
239 /**
240 * Processes newly created I/O session. This method dispatches the event
241 * notification to the {@link IOEventDispatch#connected(IOSession)} method.
242 */
243 @Override
244 protected void sessionCreated(final SelectionKey key, final IOSession session) {
245 try {
246 this.eventDispatch.connected(session);
247 } catch (final CancelledKeyException ex) {
248 throw ex;
249 } catch (final RuntimeException ex) {
250 handleRuntimeException(ex);
251 }
252 }
253
254 /**
255 * Processes timed out I/O session. This method dispatches the event
256 * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
257 */
258 @Override
259 protected void sessionTimedOut(final IOSession session) {
260 try {
261 this.eventDispatch.timeout(session);
262 } catch (final CancelledKeyException ex) {
263 throw ex;
264 } catch (final RuntimeException ex) {
265 handleRuntimeException(ex);
266 }
267 }
268
269 /**
270 * Processes closed I/O session. This method dispatches the event
271 * notification to the {@link IOEventDispatch#disconnected(IOSession)}
272 * method.
273 */
274 @Override
275 protected void sessionClosed(final IOSession session) {
276 try {
277 this.eventDispatch.disconnected(session);
278 } catch (final CancelledKeyException ex) {
279 // ignore
280 } catch (final RuntimeException ex) {
281 handleRuntimeException(ex);
282 }
283 }
284
285 }