View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24  import org.apache.hadoop.fs.FileStatus;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.FileUtil;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.util.Tool;
29  import org.apache.hadoop.util.ToolRunner;
30  import org.apache.log4j.Logger;
31  import org.apache.hadoop.chukwa.util.ExceptionUtil;
32  
33  public class RecordMerger extends Thread {
34    static Logger log = Logger.getLogger(RecordMerger.class);
35    ChukwaConfiguration conf = null;
36    FileSystem fs = null;
37    String[] mergeArgs = null;
38    Tool tool = null;
39    boolean deleteRawData = false;
40  
41    public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool,
42                        String[] mergeArgs, boolean deleteRawData) {
43      this.conf = conf;
44      this.fs = fs;
45      this.tool = tool;
46      this.mergeArgs = mergeArgs.clone();
47      this.deleteRawData = deleteRawData;
48    }
49  
50    @Override
51    public void run() {
52      mergeRecords();
53    }
54  
55    void mergeRecords() {
56      System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
57      int res;
58      try {
59        res = ToolRunner.run(conf, tool, mergeArgs);
60        System.out.println("MR exit status: " + res);
61        if (res == 0) {
62          writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2],
63              mergeArgs[3]);
64  
65          // delete input
66          if (deleteRawData) {
67            FileUtil.fullyDelete(fs, new Path(mergeArgs[0]));
68  
69            Path hours = new Path(mergeArgs[2]);
70            FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
71            for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) {
72              String dirName = hourOrMinuteFS.getPath().getName();
73  
74              try {
75                Integer.parseInt(dirName);
76                FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName));
77                if (log.isDebugEnabled()) {
78                  log.debug("Deleting Hour directory: " + mergeArgs[2] + "/"
79                      + dirName);
80                }
81              } catch (NumberFormatException e) { /*
82                                                   * Not an Hour or Minutes
83                                                   * directory- Do nothing
84                                                   */
85                log.debug(ExceptionUtil.getStackTrace(e));
86              }
87            }
88          }
89  
90          // delete rolling tag
91          FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
92          // delete M/R temp directory
93          FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
94        } else {
95          throw new RuntimeException("Error in M/R merge operation!");
96        }
97  
98      } catch (Exception e) {
99        e.printStackTrace();
100       throw new RuntimeException("Error in M/R merge operation!", e);
101     }
102   }
103 
104   void writeRecordFile(String input, String outputDir, String fileName)
105       throws IOException {
106     boolean done = false;
107     int count = 1;
108     Path recordFile = new Path(input);
109     do {
110       Path destDirPath = new Path(outputDir);
111       Path destFilePath = new Path(outputDir + "/" + fileName + "." + count
112           + ".evt");
113 
114       if (!fs.exists(destDirPath)) {
115         fs.mkdirs(destDirPath);
116         log.info(">>>>>>>>>>>> create Dir" + destDirPath);
117       }
118 
119       if (!fs.exists(destFilePath)) {
120         boolean res = fs.rename(recordFile, destFilePath);
121 
122         if (res == false) {
123           log.info(">>>>>>>>>>>> Use standard copy rename failded");
124           FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
125         }
126         done = true;
127       } else {
128         log.info("Start MoveToRepository main()");
129       }
130       count++;
131       // Just put a limit here
132       // TODO read from config
133       if (count > 1000) {
134         throw new IOException("too many files in this directory: "
135             + destDirPath);
136       }
137     } while (!done);
138   }
139 }