1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer;
20
21
22 import java.io.IOException;
23 import java.util.List;
24
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.security.SecurityUtil;
29 import org.apache.log4j.Logger;
30
31
32
33
34
35
36
37 public class PipelineStageWriter implements ChukwaWriter {
38 Logger log = Logger.getLogger(PipelineStageWriter.class);
39
40 ChukwaWriter writer;
41
42 public PipelineStageWriter() throws WriterException {
43 Configuration conf = new ChukwaConfiguration();
44 init(conf);
45 }
46
47 public PipelineStageWriter(Configuration conf) throws WriterException {
48 init(conf);
49 }
50
51 @Override
52 public CommitStatus add(List<Chunk> chunks) throws WriterException {
53 return writer.add(chunks);
54 }
55
56 @Override
57 public void close() throws WriterException {
58 writer.close();
59 }
60
61 @Override
62 public void init(Configuration conf) throws WriterException {
63 if (conf.get("chukwa.pipeline") != null) {
64 String pipeline = conf.get("chukwa.pipeline");
65 try {
66 String[] classes = pipeline.split(",");
67 log.info("using pipelined writers, pipe length is " + classes.length);
68 PipelineableWriter lastWriter = null;
69 if (classes.length > 1) {
70 lastWriter = (PipelineableWriter) conf.getClassByName(classes[0])
71 .newInstance();
72 lastWriter.init(conf);
73 writer = lastWriter;
74 }
75
76 for (int i = 1; i < classes.length - 1; ++i) {
77 Class<?> stageClass = conf.getClassByName(classes[i]);
78 Object st = stageClass.newInstance();
79 if (!(st instanceof PipelineableWriter))
80 log.error("class " + classes[i]
81 + " in processing pipeline isn't a PipelineableWriter.");
82
83 PipelineableWriter stage = (PipelineableWriter) stageClass
84 .newInstance();
85 stage.init(conf);
86
87
88
89 lastWriter.setNextStage(stage);
90 lastWriter = stage;
91 }
92
93 for(int i=0; i<classes.length; i++) {
94 if(classes[i].contains("HBaseWriter")) {
95 loginToKerberos (conf);
96 }
97 }
98
99 Class<?> stageClass = conf.getClassByName(classes[classes.length - 1]);
100 Object st = stageClass.newInstance();
101
102 if (!(st instanceof ChukwaWriter)) {
103 log.error("class " + classes[classes.length - 1]
104 + " at end of processing pipeline isn't a ChukwaWriter");
105 throw new WriterException("bad pipeline");
106 } else {
107 ((ChukwaWriter)st).init(conf);
108 if (lastWriter != null)
109 lastWriter.setNextStage((ChukwaWriter) st);
110 else
111 writer = (ChukwaWriter) st;
112 }
113 return;
114 } catch (Exception e) {
115
116 log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
117
118 throw new WriterException("bad pipeline");
119 }
120 } else {
121 throw new WriterException("must set chukwa.pipeline");
122 }
123 }
124
125
126
127
128
129
130
131
132 private static void loginToKerberos (Configuration config) throws IOException {
133 String agentAuthType = config.get ("chukwaAgent.hadoop.authentication.type");
134 if (null != agentAuthType && "kerberos".equalsIgnoreCase (agentAuthType)) {
135 SecurityUtil.login (config, "chukwaAgent.hadoop.authentication.kerberos.keytab",
136 "chukwaAgent.hadoop.authentication.kerberos.principal");
137 }
138 }
139
140 }