1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
23 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
24 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
25
26 import org.apache.hadoop.mapred.JobConf;
27 import org.apache.hadoop.mapred.Partitioner;
28 import org.apache.log4j.Logger;
29
30 public class ChukwaRecordPartitioner<K, V> implements
31 Partitioner<ChukwaRecordKey, ChukwaRecord> {
32 static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
33
34 public void configure(JobConf arg0) {
35 }
36
37 public int getPartition(
38 org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
39 org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record,
40 int numReduceTasks) {
41 if (log.isDebugEnabled()) {
42
43 log
44 .debug("Partitioner key: ["
45 + key.getReduceType()
46 + "] - Reducer:"
47 + ((key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));
48 }
49 String hashkey = key.getReduceType();
50 if(key.getKey().startsWith(CHUKWA_CONSTANT.INCLUDE_KEY_IN_PARTITIONER)) hashkey = key.getReduceType()+"#"+key.getKey();
51 return (hashkey.hashCode() & Integer.MAX_VALUE)
52 % numReduceTasks;
53 }
54
55 }