1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22
23 import javax.jms.Message;
24 import javax.jms.JMSException;
25 import java.util.ArrayList;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public class JMSMessagePropertyTransformer implements JMSMessageTransformer {
47 protected Log log = LogFactory.getLog(getClass());
48
49 private static final String DEFAULT_DELIMITER = "\t";
50
51 ArrayList<String> propertyNames = null;
52 ArrayList<String> requiredPropertyNames = null;
53 String delimiter = DEFAULT_DELIMITER;
54
55 public String parseArgs(String args) {
56 if (args == null || args.length() == 0) {
57 log.error("propertyNames must be set for this transformer");
58 return null;
59 }
60
61 log.info("Initializing JMSMessagePropertyTransformer: args=" + args);
62
63 propertyNames = new ArrayList<String>();
64
65 String[] tokens = args.split(" ");
66 for (String propertyName : tokens[0].split(",")) {
67 propertyNames.add(propertyName);
68 }
69
70 for(int i = 1; i < tokens.length; i++) {
71 String token = tokens[i];
72
73 if ("-d".equals(token) && i <= tokens.length - 2) {
74 String value = tokens[++i];
75
76
77 while (i <= tokens.length - 2 && !tokens[i + 1].startsWith("-")) {
78 value = value + " " + tokens[++i];
79 }
80
81 delimiter = trimSingleQuotes(value);
82 }
83 else if ("-r".equals(token) && i <= tokens.length - 2) {
84
85 requiredPropertyNames = new ArrayList<String>();
86
87 String[] required = tokens[++i].split(",");
88 for (String r : required) {
89 requiredPropertyNames.add(r);
90 }
91 }
92 }
93
94 log.info("Initialized JMSMessagePropertyTransformer: delimiter='" +
95 delimiter + "', propertyNames=" + propertyNames +
96 ", requiredProperties=" +
97 (requiredPropertyNames == null ? "ALL" : requiredPropertyNames));
98 return args;
99 }
100
101
102
103
104
105
106
107
108
109
110 public byte[] transform(Message message) throws JMSException {
111
112 if (propertyNames == null || propertyNames.size() == 0) {
113 log.error("No message properties configured for this JMS transformer.");
114 return null;
115 }
116
117 int valuesFound = 0;
118 StringBuilder sb = new StringBuilder();
119 for (String propertyName : propertyNames) {
120 Object propertyValue = message.getObjectProperty(propertyName);
121 String value = transformValue(propertyName, propertyValue);
122
123
124 if (value == null && (requiredPropertyNames == null ||
125 requiredPropertyNames.contains(propertyName))) {
126 return null;
127 }
128
129 if (valuesFound > 0) {
130 sb.append(delimiter);
131 }
132
133 if (value != null) {
134 sb.append(value);
135 }
136
137 valuesFound++;
138 }
139
140 if (sb.length() == 0 || valuesFound != propertyNames.size()) {
141 return null;
142 }
143
144 return sb.toString().getBytes();
145 }
146
147
148
149
150
151
152
153
154
155
156 protected String transformValue(String propertyName, Object propertyValue) {
157
158 if (propertyValue == null) {
159 return null;
160 }
161 else if (propertyValue instanceof String) {
162 return (String)propertyValue;
163 }
164 else if (propertyValue instanceof Number) {
165 return propertyValue.toString();
166 }
167
168 return null;
169 }
170
171 private static String trimSingleQuotes(String value) {
172 if (value.length() == 0) {
173 return value;
174 }
175
176
177 if (value.charAt(0) == '\'') {
178 value = value.substring(1);
179 }
180 if (value.length() > 0 && value.charAt(value.length() - 1) == '\'') {
181 value = value.substring(0, value.length() - 1);
182 }
183
184 return value;
185 }
186
187 }