1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer.gora;
19
20 import java.nio.ByteBuffer;
21 import java.util.List;
22
23 import org.apache.gora.store.DataStore;
24 import org.apache.gora.store.DataStoreFactory;
25 import org.apache.gora.util.GoraException;
26 import org.apache.hadoop.chukwa.Chunk;
27 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
29 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
30 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
31 import org.apache.hadoop.chukwa.datacollection.writer.solr.SolrWriter;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.log4j.Logger;
35
36
37
38
39
40
41
42 public class GoraWriter extends PipelineableWriter {
43
44 private static Logger log = Logger.getLogger(SolrWriter.class);
45
46 DataStore<String, ChukwaChunk> chunkStore;
47
48
49
50
51
52 public GoraWriter() throws WriterException {
53 log.debug("Initializing configuration for GoraWriter pipeline...");
54 init(ChukwaAgent.getStaticConfiguration());
55 }
56
57
58
59
60
61
62
63
64 @Override
65 public void init(Configuration c) throws WriterException {
66 try {
67 chunkStore = DataStoreFactory.getDataStore(String.class, ChukwaChunk.class, c);
68 } catch (GoraException e) {
69 log.error(ExceptionUtil.getStackTrace(e));
70 e.printStackTrace();
71 }
72 }
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @Override
91 public void close() throws WriterException {
92 if (chunkStore != null) {
93 chunkStore.flush();
94 } else {
95 chunkStore.close();
96 }
97 log.debug("Gora datastore successfully closed.");
98 }
99
100 @Override
101 public CommitStatus add(List<Chunk> chunks) throws WriterException {
102 CommitStatus cStatus = ChukwaWriter.COMMIT_OK;
103 for(Chunk chunk : chunks) {
104 try {
105 ChukwaChunk chukwaChunk = ChukwaChunk.newBuilder().build();
106 chukwaChunk.setSource(chunk.getSource());
107 chukwaChunk.setDatatype(chunk.getDataType());
108 chukwaChunk.setSequenceID(chunk.getSeqID());
109 chukwaChunk.setName(chunk.getStreamName());
110 chukwaChunk.setTags(chunk.getTags());
111 chukwaChunk.setData(ByteBuffer.wrap(chunk.getData()));
112 } catch (Exception e) {
113 log.error(ExceptionUtil.getStackTrace(e));
114 throw new WriterException("Failed to store data to Solr Cloud.");
115 }
116 }
117 if (next != null) {
118 cStatus = next.add(chunks);
119 }
120 return cStatus;
121 }
122 }