1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24 import org.apache.hadoop.fs.FileStatus;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.FileUtil;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.util.Tool;
29 import org.apache.hadoop.util.ToolRunner;
30 import org.apache.log4j.Logger;
31 import org.apache.hadoop.chukwa.util.ExceptionUtil;
32
33 public class RecordMerger extends Thread {
34 static Logger log = Logger.getLogger(RecordMerger.class);
35 ChukwaConfiguration conf = null;
36 FileSystem fs = null;
37 String[] mergeArgs = null;
38 Tool tool = null;
39 boolean deleteRawData = false;
40
41 public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool,
42 String[] mergeArgs, boolean deleteRawData) {
43 this.conf = conf;
44 this.fs = fs;
45 this.tool = tool;
46 this.mergeArgs = mergeArgs.clone();
47 this.deleteRawData = deleteRawData;
48 }
49
50 @Override
51 public void run() {
52 mergeRecords();
53 }
54
55 void mergeRecords() {
56 System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
57 int res;
58 try {
59 res = ToolRunner.run(conf, tool, mergeArgs);
60 System.out.println("MR exit status: " + res);
61 if (res == 0) {
62 writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2],
63 mergeArgs[3]);
64
65
66 if (deleteRawData) {
67 FileUtil.fullyDelete(fs, new Path(mergeArgs[0]));
68
69 Path hours = new Path(mergeArgs[2]);
70 FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
71 for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) {
72 String dirName = hourOrMinuteFS.getPath().getName();
73
74 try {
75 Integer.parseInt(dirName);
76 FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName));
77 if (log.isDebugEnabled()) {
78 log.debug("Deleting Hour directory: " + mergeArgs[2] + "/"
79 + dirName);
80 }
81 } catch (NumberFormatException e) {
82
83
84
85 log.debug(ExceptionUtil.getStackTrace(e));
86 }
87 }
88 }
89
90
91 FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
92
93 FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
94 } else {
95 throw new RuntimeException("Error in M/R merge operation!");
96 }
97
98 } catch (Exception e) {
99 e.printStackTrace();
100 throw new RuntimeException("Error in M/R merge operation!", e);
101 }
102 }
103
104 void writeRecordFile(String input, String outputDir, String fileName)
105 throws IOException {
106 boolean done = false;
107 int count = 1;
108 Path recordFile = new Path(input);
109 do {
110 Path destDirPath = new Path(outputDir);
111 Path destFilePath = new Path(outputDir + "/" + fileName + "." + count
112 + ".evt");
113
114 if (!fs.exists(destDirPath)) {
115 fs.mkdirs(destDirPath);
116 log.info(">>>>>>>>>>>> create Dir" + destDirPath);
117 }
118
119 if (!fs.exists(destFilePath)) {
120 boolean res = fs.rename(recordFile, destFilePath);
121
122 if (res == false) {
123 log.info(">>>>>>>>>>>> Use standard copy rename failded");
124 FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
125 }
126 done = true;
127 } else {
128 log.info("Start MoveToRepository main()");
129 }
130 count++;
131
132
133 if (count > 1000) {
134 throw new IOException("too many files in this directory: "
135 + destDirPath);
136 }
137 } while (!done);
138 }
139 }