View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.nio.conn;
28  
29  import java.io.IOException;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.http.concurrent.BasicFuture;
36  import org.apache.http.concurrent.FutureCallback;
37  import org.apache.http.conn.routing.HttpRoute;
38  import org.apache.http.nio.conn.ClientAsyncConnectionFactory;
39  import org.apache.http.nio.conn.ClientAsyncConnectionManager;
40  import org.apache.http.nio.conn.ManagedClientAsyncConnection;
41  import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
42  import org.apache.http.nio.reactor.ConnectingIOReactor;
43  import org.apache.http.nio.reactor.IOEventDispatch;
44  import org.apache.http.nio.reactor.IOReactorException;
45  import org.apache.http.nio.reactor.IOReactorStatus;
46  import org.apache.http.pool.ConnPoolControl;
47  import org.apache.http.pool.PoolStats;
48  import org.apache.http.util.Args;
49  
50  @Deprecated
51  public class PoolingClientAsyncConnectionManager
52                                implements ClientAsyncConnectionManager, ConnPoolControl<HttpRoute> {
53  
54      private final Log log = LogFactory.getLog(getClass());
55  
56      private final ConnectingIOReactor ioreactor;
57      private final HttpNIOConnPool pool;
58      private final AsyncSchemeRegistry schemeRegistry;
59      private final ClientAsyncConnectionFactory connFactory;
60  
61      public PoolingClientAsyncConnectionManager(
62              final ConnectingIOReactor ioreactor,
63              final AsyncSchemeRegistry schemeRegistry,
64              final long timeToLive, final TimeUnit tunit) {
65          super();
66          Args.notNull(ioreactor, "I/O reactor");
67          Args.notNull(schemeRegistry, "Scheme registory");
68          Args.notNull(tunit, "Time unit");
69          this.ioreactor = ioreactor;
70          this.pool = new HttpNIOConnPool(this.log, ioreactor, schemeRegistry, timeToLive, tunit);
71          this.schemeRegistry = schemeRegistry;
72          this.connFactory = createClientAsyncConnectionFactory();
73      }
74  
75      public PoolingClientAsyncConnectionManager(
76              final ConnectingIOReactor ioreactor,
77              final AsyncSchemeRegistry schemeRegistry) throws IOReactorException {
78          this(ioreactor, schemeRegistry, -1, TimeUnit.MILLISECONDS);
79      }
80  
81      public PoolingClientAsyncConnectionManager(
82              final ConnectingIOReactor ioreactor) throws IOReactorException {
83          this(ioreactor, AsyncSchemeRegistryFactory.createDefault());
84      }
85  
86      @Override
87      protected void finalize() throws Throwable {
88          try {
89              shutdown();
90          } finally {
91              super.finalize();
92          }
93      }
94  
95      protected ClientAsyncConnectionFactory createClientAsyncConnectionFactory() {
96          return new DefaultClientAsyncConnectionFactory();
97      }
98  
99      @Override
100     public AsyncSchemeRegistry getSchemeRegistry() {
101         return this.schemeRegistry;
102     }
103 
104     @Override
105     public void execute(final IOEventDispatch eventDispatch) throws IOException {
106         this.ioreactor.execute(eventDispatch);
107     }
108 
109     @Override
110     public IOReactorStatus getStatus() {
111         return this.ioreactor.getStatus();
112     }
113 
114     @Override
115     public void shutdown(final long waitMs) throws IOException {
116         this.log.debug("Connection manager is shutting down");
117         this.pool.shutdown(waitMs);
118         this.log.debug("Connection manager shut down");
119     }
120 
121     @Override
122     public void shutdown() throws IOException {
123         this.log.debug("Connection manager is shutting down");
124         this.pool.shutdown(2000);
125         this.log.debug("Connection manager shut down");
126     }
127 
128     private String format(final HttpRoute route, final Object state) {
129         final StringBuilder buf = new StringBuilder();
130         buf.append("[route: ").append(route).append("]");
131         if (state != null) {
132             buf.append("[state: ").append(state).append("]");
133         }
134         return buf.toString();
135     }
136 
137     private String formatStats(final HttpRoute route) {
138         final StringBuilder buf = new StringBuilder();
139         final PoolStats totals = this.pool.getTotalStats();
140         final PoolStats stats = this.pool.getStats(route);
141         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
142         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
143         buf.append(" of ").append(stats.getMax()).append("; ");
144         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
145         buf.append(" of ").append(totals.getMax()).append("]");
146         return buf.toString();
147     }
148 
149     private String format(final HttpPoolEntry entry) {
150         final StringBuilder buf = new StringBuilder();
151         buf.append("[id: ").append(entry.getId()).append("]");
152         buf.append("[route: ").append(entry.getRoute()).append("]");
153         final Object state = entry.getState();
154         if (state != null) {
155             buf.append("[state: ").append(state).append("]");
156         }
157         return buf.toString();
158     }
159 
160     @Override
161     public Future<ManagedClientAsyncConnection> leaseConnection(
162             final HttpRoute route,
163             final Object state,
164             final long connectTimeout,
165             final TimeUnit tunit,
166             final FutureCallback<ManagedClientAsyncConnection> callback) {
167         Args.notNull(route, "HTTP route");
168         Args.notNull(tunit, "Time unit");
169         if (this.log.isDebugEnabled()) {
170             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
171         }
172         final BasicFuture<ManagedClientAsyncConnection> future = new BasicFuture<ManagedClientAsyncConnection>(
173                 callback);
174         this.pool.lease(route, state, connectTimeout, tunit, new InternalPoolEntryCallback(future));
175         return future;
176     }
177 
178     @Override
179     public void releaseConnection(
180             final ManagedClientAsyncConnection conn,
181             final long keepalive,
182             final TimeUnit tunit) {
183         Args.notNull(conn, "HTTP connection");
184         if (!(conn instanceof ManagedClientAsyncConnectionImpl)) {
185             throw new IllegalArgumentException("Connection class mismatch, " +
186                  "connection not obtained from this manager");
187         }
188         Args.notNull(tunit, "Time unit");
189         final ManagedClientAsyncConnectionImpl managedConn = (ManagedClientAsyncConnectionImpl) conn;
190         final ClientAsyncConnectionManager manager = managedConn.getManager();
191         if (manager != null && manager != this) {
192             throw new IllegalArgumentException("Connection not obtained from this manager");
193         }
194         if (this.pool.isShutdown()) {
195             return;
196         }
197 
198         synchronized (managedConn) {
199             final HttpPoolEntry entry = managedConn.getPoolEntry();
200             if (entry == null) {
201                 return;
202             }
203             try {
204                 if (managedConn.isOpen() && !managedConn.isMarkedReusable()) {
205                     try {
206                         managedConn.shutdown();
207                     } catch (final IOException iox) {
208                         if (this.log.isDebugEnabled()) {
209                             this.log.debug("I/O exception shutting down released connection", iox);
210                         }
211                     }
212                 }
213                 if (managedConn.isOpen()) {
214                     entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
215                     if (this.log.isDebugEnabled()) {
216                         final String s;
217                         if (keepalive > 0) {
218                             s = "for " + keepalive + " " + tunit;
219                         } else {
220                             s = "indefinitely";
221                         }
222                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
223                     }
224                     // Do not time out pooled connection
225                     managedConn.setSocketTimeout(0);
226                 }
227             } finally {
228                 this.pool.release(managedConn.detach(), managedConn.isMarkedReusable());
229             }
230             if (this.log.isDebugEnabled()) {
231                 this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
232             }
233         }
234     }
235 
236     @Override
237     public PoolStats getTotalStats() {
238         return this.pool.getTotalStats();
239     }
240 
241     @Override
242     public PoolStats getStats(final HttpRoute route) {
243         return this.pool.getStats(route);
244     }
245 
246     @Override
247     public void setMaxTotal(final int max) {
248         this.pool.setMaxTotal(max);
249     }
250 
251     @Override
252     public void setDefaultMaxPerRoute(final int max) {
253         this.pool.setDefaultMaxPerRoute(max);
254     }
255 
256     @Override
257     public void setMaxPerRoute(final HttpRoute route, final int max) {
258         this.pool.setMaxPerRoute(route, max);
259     }
260 
261     @Override
262     public int getMaxTotal() {
263         return this.pool.getMaxTotal();
264     }
265 
266     @Override
267     public int getDefaultMaxPerRoute() {
268         return this.pool.getDefaultMaxPerRoute();
269     }
270 
271     @Override
272     public int getMaxPerRoute(final HttpRoute route) {
273         return this.pool.getMaxPerRoute(route);
274     }
275 
276     public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
277         if (log.isDebugEnabled()) {
278             log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
279         }
280         this.pool.closeIdle(idleTimeout, tunit);
281     }
282 
283     public void closeExpiredConnections() {
284         log.debug("Closing expired connections");
285         this.pool.closeExpired();
286     }
287 
288     class InternalPoolEntryCallback implements FutureCallback<HttpPoolEntry> {
289 
290         private final BasicFuture<ManagedClientAsyncConnection> future;
291 
292         public InternalPoolEntryCallback(
293                 final BasicFuture<ManagedClientAsyncConnection> future) {
294             super();
295             this.future = future;
296         }
297 
298         @Override
299         public void completed(final HttpPoolEntry entry) {
300             if (log.isDebugEnabled()) {
301                 log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
302             }
303             final ManagedClientAsyncConnection conn = new ManagedClientAsyncConnectionImpl(
304                     PoolingClientAsyncConnectionManager.this,
305                     PoolingClientAsyncConnectionManager.this.connFactory,
306                     entry);
307             if (!this.future.completed(conn)) {
308                 pool.release(entry, true);
309             }
310         }
311 
312         @Override
313         public void failed(final Exception ex) {
314             if (log.isDebugEnabled()) {
315                 log.debug("Connection request failed", ex);
316             }
317             this.future.failed(ex);
318         }
319 
320         @Override
321         public void cancelled() {
322             log.debug("Connection request cancelled");
323             this.future.cancel(true);
324         }
325 
326     }
327 
328 }