1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers;
19
20 import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.util.PriorityQueue;
26
27
28
29
30
31
32 public class TopNReduce<S extends Comparable<S>>
33 extends KryoWrappedReduceOperation<S, PriorityQueue<S>> {
34 private int capacity;
35
36 public TopNReduce(int capacity) {
37 this.capacity = capacity;
38 }
39
40 public TopNReduce() { }
41
42 @Override
43 public PriorityQueue<S> createValue() {
44 return new PriorityQueue<S>();
45 }
46
47 @Override
48 public void reduce(PriorityQueue<S> heap, S value) {
49 if (capacity == 0) {
50 return;
51 }
52
53 if (heap.size() < capacity) {
54 heap.add(value);
55 } else {
56 S head = heap.peek();
57 if (head.compareTo(value) < 0) {
58 heap.poll();
59 heap.add(value);
60 }
61 }
62 }
63
64 @Override
65 public void reduceMerge(
66 PriorityQueue<S> reduceInto,
67 PriorityQueue<S> toReduce
68 ) {
69 for (S element : toReduce) {
70 reduce(reduceInto, element);
71 }
72 }
73
74 @Override
75 public void write(DataOutput out) throws IOException {
76 super.write(out);
77 out.writeInt(capacity);
78 }
79
80 @Override
81 public void readFields(DataInput in) throws IOException {
82 super.readFields(in);
83 capacity = in.readInt();
84 }
85 }