1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.chukwa.util;
21
22 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
23 import org.apache.hadoop.chukwa.ChunkImpl;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.ChecksumException;
26 import org.apache.hadoop.fs.FSDataOutputStream;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.io.SequenceFile;
30 import org.apache.log4j.Logger;
31
32
33
34
35
36
37
38 public class CopySequenceFile {
39 static Logger log = Logger.getLogger(CopySequenceFile.class);
40 private static SequenceFile.Writer seqFileWriter = null;
41 private static SequenceFile.Reader seqFileReader = null;
42 private static FSDataOutputStream newOutputStr = null;
43
44 public static void createValidSequenceFile(Configuration conf,
45 String originalFileDir,
46 String originalFileName,
47 FileSystem localFs) {
48 try {
49 if (!originalFileDir.endsWith("/")) {
50 originalFileDir += "/";
51 }
52 String originalCompleteDir= originalFileDir + originalFileName;
53 Path originalPath= new Path(originalCompleteDir);
54 int extensionIndex= originalFileName.indexOf(".chukwa",0);
55
56 String recoverFileName=originalFileName.substring(0, extensionIndex)+".recover";
57 String recoverDir= originalFileDir + recoverFileName;
58 Path recoverPath= new Path(recoverDir);
59 String recoverDoneFileName=originalFileName.substring(0, extensionIndex)+".recoverDone";
60 String recoverDoneDir= originalFileDir + recoverDoneFileName;
61 Path recoverDonePath= new Path(recoverDoneDir);
62 String doneFileName=originalFileName.substring(0, extensionIndex)+".done";
63 String doneDir= originalFileDir + doneFileName;
64 Path donePath= new Path(doneDir);
65
66 ChukwaArchiveKey key = new ChukwaArchiveKey();
67 ChunkImpl evt = ChunkImpl.getBlankChunk();
68
69 newOutputStr = localFs.create(recoverPath);
70 seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
71 ChukwaArchiveKey.class, ChunkImpl.class,
72 SequenceFile.CompressionType.NONE, null);
73 seqFileReader = new SequenceFile.Reader(localFs, originalPath, conf);
74
75 System.out.println("key class name is " + seqFileReader.getKeyClassName());
76 System.out.println("value class name is " + seqFileReader.getValueClassName());
77 try {
78 while (seqFileReader.next(key, evt)) {
79 seqFileWriter.append(key, evt);
80 }
81 } catch (ChecksumException e) {
82 log.info("Encountered Bad Chunk while copying .chukwa file, continuing",e);
83 }
84 seqFileReader.close();
85 seqFileWriter.close();
86 newOutputStr.close();
87 try {
88 localFs.rename(recoverPath, recoverDonePath);
89 localFs.delete(originalPath,false);
90 localFs.rename(recoverDonePath, donePath);
91 } catch (Exception e) {
92 log.warn("Error occured while renaming .recoverDone to .recover or deleting .chukwa",e);
93 e.printStackTrace();
94 }
95
96 } catch(Exception e) {
97 log.warn("Error during .chukwa file recovery",e);
98 e.printStackTrace();
99 }
100 }
101 }