1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import java.net.InetSocketAddress;
22 import java.util.List;
23 import com.google.common.collect.Lists;
24
25 import io.netty.channel.Channel;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import org.apache.log4j.Logger;
29
30
31
32
33
34 public class ChannelRotater {
35
36 private static final Logger LOG = Logger.getLogger(ChannelRotater.class);
37
38 private int index = 0;
39
40 private final List<Channel> channelList = Lists.newArrayList();
41
42 private final Integer taskId;
43
44 private final InetSocketAddress address;
45
46
47
48
49
50
51
52 public ChannelRotater(Integer taskId, InetSocketAddress address) {
53 this.taskId = taskId;
54 this.address = address;
55 }
56
57 public Integer getTaskId() {
58 return taskId;
59 }
60
61
62
63
64
65
66 public synchronized void addChannel(Channel channel) {
67 synchronized (channelList) {
68 channelList.add(channel);
69 }
70 }
71
72
73
74
75
76
77 public synchronized Channel nextChannel() {
78 if (channelList.isEmpty()) {
79 LOG.warn("nextChannel: No channels exist for hostname " +
80 address.getHostName());
81 return null;
82 }
83
84 ++index;
85 if (index >= channelList.size()) {
86 index = 0;
87 }
88 return channelList.get(index);
89 }
90
91
92
93
94
95
96
97 public synchronized boolean removeChannel(Channel channel) {
98 boolean success = channelList.remove(channel);
99 if (index >= channelList.size()) {
100 index = 0;
101 }
102 return success;
103 }
104
105
106
107
108
109
110 public synchronized int size() {
111 return channelList.size();
112 }
113
114
115
116
117
118
119 public synchronized void closeChannels(
120 ChannelFutureListener channelFutureListener) {
121 for (Channel channel : channelList) {
122 ChannelFuture channelFuture = channel.close();
123 if (channelFutureListener != null) {
124 channelFuture.addListener(channelFutureListener);
125 }
126 }
127 }
128
129
130
131
132
133
134 public synchronized Iterable<Channel> getChannels() {
135 return Lists.newArrayList(channelList);
136 }
137 }