1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20
21 import java.io.*;
22 import java.util.List;
23 import org.apache.hadoop.chukwa.Chunk;
24 import org.apache.hadoop.chukwa.ChunkImpl;
25 import org.apache.hadoop.conf.Configuration;
26
27 public class InMemoryWriter implements ChukwaWriter {
28
29 ByteArrayOutputStream buf;
30
31 public void close() {
32 buf.reset();
33 }
34
35 public void init(Configuration conf) throws WriterException {
36 buf = new ByteArrayOutputStream();
37 }
38
39 public void add(Chunk data) throws WriterException {
40 DataOutputStream dos = new DataOutputStream(buf);
41 try {
42 data.write(dos);
43 } catch (IOException e) {
44 e.printStackTrace();
45 throw new WriterException(e);
46 }
47 synchronized (this) {
48 notify();
49 }
50 }
51
52 @Override
53 public CommitStatus add(List<Chunk> chunks) throws WriterException {
54 for (Chunk chunk : chunks) {
55 add(chunk);
56 }
57 return COMMIT_OK;
58 }
59
60 DataInputStream dis = null;
61
62
63
64
65
66
67
68
69
70 public Chunk readOutChunk(int bytes, int ms) throws IOException {
71
72 long readStartTime = System.currentTimeMillis();
73 try {
74 while (buf.size() < bytes) {
75 synchronized (this) {
76 long timeLeft = ms - System.currentTimeMillis() + readStartTime;
77 if (timeLeft > 0)
78 wait(timeLeft);
79 }
80 }
81 if (dis == null)
82 dis = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
83 return ChunkImpl.read(dis);
84 } catch (InterruptedException e) {
85 Thread.currentThread().interrupt();
86 return null;
87 }
88 }
89
90 }