1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.dataloader;
19
20 import java.io.IOException;
21 import java.util.concurrent.CompletionService;
22 import java.util.concurrent.ExecutorCompletionService;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.TimeUnit;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.fs.FileStatus;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33
34 public class MetricDataLoaderPool extends DataLoaderFactory {
35 private static Log log = LogFactory.getLog(MetricDataLoaderPool.class);
36
37 protected MetricDataLoader threads[] = null;
38 private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
39 private int size = 1;
40 private static CompletionService completion = null;
41 private static ExecutorService executor = null;
42
43 public MetricDataLoaderPool() {
44 }
45
46 public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] fileList) throws IOException {
47
48 if(executor==null) {
49 try {
50 this.size = Integer.parseInt(conf.get(DATA_LOADER_THREAD_LIMIT));
51 } catch(Exception e) {
52 this.size = 1;
53 }
54 executor = Executors.newFixedThreadPool(size);
55 }
56 if(completion==null) {
57 completion = new ExecutorCompletionService(executor);
58 }
59 try {
60 for(int i=0;i<fileList.length;i++) {
61 String filename = fileList[i].getPath().toUri().toString();
62 log.info("Processing: "+filename);
63 completion.submit(new MetricDataLoader(conf, fs, filename));
64 }
65 for(int i=0;i<fileList.length;i++) {
66 completion.take().get();
67 }
68 } catch(Exception e) {
69 log.error(ExceptionUtil.getStackTrace(e));
70 throw new IOException();
71 }
72 }
73
74 public void shutdown() throws InterruptedException {
75 executor.shutdown();
76 executor.awaitTermination(30, TimeUnit.SECONDS);
77 executor.shutdownNow();
78 }
79 }