View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.Closeable;
31  import java.io.IOException;
32  import java.nio.channels.ClosedSelectorException;
33  import java.nio.channels.SelectionKey;
34  import java.nio.channels.Selector;
35  import java.util.Set;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicReference;
39  import java.util.concurrent.locks.Condition;
40  import java.util.concurrent.locks.ReentrantLock;
41  
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.io.CloseMode;
44  import org.apache.hc.core5.io.Closer;
45  import org.apache.hc.core5.util.Args;
46  import org.apache.hc.core5.util.TimeValue;
47  import org.apache.hc.core5.util.Timeout;
48  
49  abstract class AbstractSingleCoreIOReactor implements IOReactor {
50  
51      private final Callback<Exception> exceptionCallback;
52      private final AtomicReference<IOReactorStatus> status;
53      private final AtomicBoolean terminated;
54      private final Condition condition;
55  
56      private final ReentrantLock lock;
57  
58      final Selector selector;
59  
60      AbstractSingleCoreIOReactor(final Callback<Exception> exceptionCallback) {
61          super();
62          this.exceptionCallback = exceptionCallback;
63          this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
64          this.terminated = new AtomicBoolean();
65          try {
66              this.selector = Selector.open();
67          } catch (final IOException ex) {
68              throw new IllegalStateException("Unexpected failure opening I/O selector", ex);
69          }
70          this.lock = new ReentrantLock();
71          this.condition = lock.newCondition();
72      }
73  
74      @Override
75      public final IOReactorStatus getStatus() {
76          return this.status.get();
77      }
78  
79      void logException(final Exception ex) {
80          if (exceptionCallback != null) {
81              exceptionCallback.execute(ex);
82          }
83      }
84  
85      abstract void doExecute() throws IOException;
86  
87      abstract void doTerminate() throws IOException;
88  
89      public void execute() {
90          if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) {
91              try {
92                  doExecute();
93              } catch (final ClosedSelectorException ignore) {
94                  // ignore
95              } catch (final Exception ex) {
96                  logException(ex);
97              } finally {
98                  try {
99                      doTerminate();
100                 } catch (final Exception ex) {
101                     logException(ex);
102                 } finally {
103                     close(CloseMode.IMMEDIATE);
104                 }
105             }
106         }
107     }
108 
109     @Override
110     public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
111         Args.notNull(waitTime, "Wait time");
112         final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
113         long remaining = waitTime.toMilliseconds();
114         lock.lock();
115         try {
116             while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
117                 condition.await(remaining, TimeUnit.MILLISECONDS);
118                 remaining = deadline - System.currentTimeMillis();
119                 if (remaining <= 0) {
120                     return;
121                 }
122             }
123         } finally {
124             lock.unlock();
125         }
126     }
127 
128     @Override
129     public final void initiateShutdown() {
130         if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) {
131             lock.lock();
132             try {
133                 condition.signalAll();
134             } finally {
135                 lock.unlock();
136             }
137         } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
138             this.selector.wakeup();
139         }
140     }
141 
142     @Override
143     public final void close(final CloseMode closeMode) {
144         close(closeMode, Timeout.ofSeconds(5));
145     }
146 
147     /**
148      * Shuts down the I/O reactor either gracefully or immediately.
149      * During graceful shutdown individual I/O sessions should be
150      * informed about imminent termination and be given a grace period
151      * to complete the ongoing I/O sessions. During immediate shutdown
152      * all ongoing I/O sessions get aborted immediately.
153      *
154      * @param closeMode How to close the IO reactor.
155      * @param timeout  How long to wait for the IO reactor to close gracefully.
156      * @since 5.2
157      */
158     public void close(final CloseMode closeMode, final Timeout timeout) {
159         if (closeMode == CloseMode.GRACEFUL) {
160             initiateShutdown();
161             try {
162                 awaitShutdown(timeout);
163             } catch (final InterruptedException e) {
164                 Thread.currentThread().interrupt();
165             }
166         }
167         this.status.set(IOReactorStatus.SHUT_DOWN);
168         if (terminated.compareAndSet(false, true)) {
169             try {
170                 final Set<SelectionKey> keys = this.selector.keys();
171                 for (final SelectionKey key : keys) {
172                     try {
173                         Closer.close((Closeable) key.attachment());
174                     } catch (final IOException ex) {
175                         logException(ex);
176                     }
177                     key.channel().close();
178                 }
179                 selector.close();
180             } catch (final Exception ex) {
181                 logException(ex);
182             }
183         }
184         lock.lock();
185         try {
186             condition.signalAll();
187         } finally {
188             lock.unlock();
189         }
190     }
191 
192     @Override
193     public final void close() {
194         close(CloseMode.GRACEFUL);
195     }
196 
197     @Override
198     public String toString() {
199         return super.toString() + " [status=" + status + "]";
200     }
201 
202 }