1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.block_app.framework.output; |
19 | |
|
20 | |
import java.util.HashMap; |
21 | |
import java.util.Map; |
22 | |
import java.util.Queue; |
23 | |
import java.util.concurrent.Callable; |
24 | |
import java.util.concurrent.ConcurrentLinkedQueue; |
25 | |
|
26 | |
import org.apache.giraph.block_app.framework.api.BlockOutputApi; |
27 | |
import org.apache.giraph.conf.GiraphConstants; |
28 | |
import org.apache.giraph.utils.CallableFactory; |
29 | |
import org.apache.giraph.utils.ProgressableUtils; |
30 | |
import org.apache.hadoop.conf.Configuration; |
31 | |
import org.apache.hadoop.util.Progressable; |
32 | |
|
33 | |
|
34 | |
|
35 | |
|
36 | |
@SuppressWarnings("unchecked") |
37 | |
public class BlockOutputHandle implements BlockOutputApi { |
38 | |
private transient Configuration conf; |
39 | |
private transient Progressable progressable; |
40 | |
private final Map<String, BlockOutputDesc> outputDescMap; |
41 | 0 | private final Map<String, Queue<BlockOutputWriter>> freeWriters = |
42 | |
new HashMap<>(); |
43 | 0 | private final Map<String, Queue<BlockOutputWriter>> occupiedWriters = |
44 | |
new HashMap<>(); |
45 | |
|
46 | 0 | public BlockOutputHandle() { |
47 | 0 | outputDescMap = null; |
48 | 0 | } |
49 | |
|
50 | |
public BlockOutputHandle(String jobIdentifier, Configuration conf, |
51 | 0 | Progressable hadoopProgressable) { |
52 | 0 | outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap( |
53 | |
conf, jobIdentifier); |
54 | 0 | for (Map.Entry<String, BlockOutputDesc> entry : outputDescMap.entrySet()) { |
55 | 0 | entry.getValue().preWriting(); |
56 | 0 | freeWriters.put(entry.getKey(), |
57 | |
new ConcurrentLinkedQueue<BlockOutputWriter>()); |
58 | 0 | occupiedWriters.put(entry.getKey(), |
59 | |
new ConcurrentLinkedQueue<BlockOutputWriter>()); |
60 | 0 | } |
61 | 0 | initialize(conf, hadoopProgressable); |
62 | 0 | } |
63 | |
|
64 | |
public void initialize(Configuration conf, Progressable progressable) { |
65 | 0 | this.conf = conf; |
66 | 0 | this.progressable = progressable; |
67 | 0 | } |
68 | |
|
69 | |
|
70 | |
@Override |
71 | |
public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> |
72 | |
OD getOutputDesc(String confOption) { |
73 | 0 | if (outputDescMap == null) { |
74 | 0 | throw new IllegalArgumentException( |
75 | |
"Output cannot be used with checkpointing"); |
76 | |
} |
77 | 0 | return (OD) outputDescMap.get(confOption); |
78 | |
} |
79 | |
|
80 | |
@Override |
81 | |
public <OW extends BlockOutputWriter> OW getWriter(String confOption) { |
82 | 0 | if (outputDescMap == null) { |
83 | 0 | throw new IllegalArgumentException( |
84 | |
"Output cannot be used with checkpointing"); |
85 | |
} |
86 | 0 | OW outputWriter = (OW) freeWriters.get(confOption).poll(); |
87 | 0 | if (outputWriter == null) { |
88 | 0 | outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter( |
89 | |
conf, progressable); |
90 | |
} |
91 | 0 | occupiedWriters.get(confOption).add(outputWriter); |
92 | 0 | return outputWriter; |
93 | |
} |
94 | |
|
95 | |
public void returnAllWriters() { |
96 | |
for (Map.Entry<String, Queue<BlockOutputWriter>> entry : |
97 | 0 | occupiedWriters.entrySet()) { |
98 | 0 | freeWriters.get(entry.getKey()).addAll(entry.getValue()); |
99 | 0 | entry.getValue().clear(); |
100 | 0 | } |
101 | 0 | } |
102 | |
|
103 | |
public void closeAllWriters() { |
104 | 0 | final Queue<BlockOutputWriter> allWriters = new ConcurrentLinkedQueue<>(); |
105 | 0 | for (Queue<BlockOutputWriter> blockOutputWriters : freeWriters.values()) { |
106 | 0 | allWriters.addAll(blockOutputWriters); |
107 | 0 | } |
108 | 0 | if (allWriters.isEmpty()) { |
109 | 0 | return; |
110 | |
} |
111 | |
|
112 | 0 | CallableFactory<Void> callableFactory = new CallableFactory<Void>() { |
113 | |
@Override |
114 | |
public Callable<Void> newCallable(int callableId) { |
115 | 0 | return new Callable<Void>() { |
116 | |
@Override |
117 | |
public Void call() throws Exception { |
118 | 0 | BlockOutputWriter writer = allWriters.poll(); |
119 | 0 | while (writer != null) { |
120 | 0 | writer.close(); |
121 | 0 | writer = allWriters.poll(); |
122 | |
} |
123 | 0 | return null; |
124 | |
} |
125 | |
}; |
126 | |
} |
127 | |
}; |
128 | 0 | ProgressableUtils.getResultsWithNCallables(callableFactory, |
129 | 0 | Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf), |
130 | 0 | allWriters.size()), "close-writers-%d", progressable); |
131 | |
|
132 | 0 | for (BlockOutputDesc outputDesc : outputDescMap.values()) { |
133 | 0 | outputDesc.postWriting(); |
134 | 0 | } |
135 | 0 | } |
136 | |
} |