1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.IOException;
21 import java.net.*;
22 import java.util.Arrays;
23 import java.util.HashMap;
24
25 import org.apache.hadoop.chukwa.*;
26 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27 import org.apache.log4j.Logger;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class SyslogAdaptor extends UDPAdaptor {
49
50 private final static Logger log = Logger.getLogger(SyslogAdaptor.class);
51 public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7 }
52 public HashMap<Integer, String> facilityMap;
53 DatagramSocket ds;
54 volatile boolean running = true;
55 volatile long bytesReceived = 0;
56
57 public SyslogAdaptor() {
58 facilityMap = new HashMap<Integer, String>(FacilityType.values().length);
59 }
60
61 public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
62 StringBuilder source = new StringBuilder();
63 source.append(dp.getAddress());
64 String dataType = type;
65 byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength());
66 String rawPRI = new String(trimmedBuf, 1, 4);
67 int i = rawPRI.indexOf(">");
68 if (i <= 3 && i > -1) {
69 String priorityStr = rawPRI.substring(0,i);
70 int priority = 0;
71 int facility = 0;
72 try {
73 priority = Integer.parseInt(priorityStr);
74 facility = (priority >> 3) << 3;
75 facility = facility / 8;
76 dataType = facilityMap.get(facility);
77 } catch (NumberFormatException nfe) {
78 log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf);
79 }
80 }
81
82 bytesReceived += trimmedBuf.length;
83 Chunk c = new ChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this);
84 dest.add(c);
85 }
86
87 @Override
88 public String parseArgs(String s) {
89 portno = Integer.parseInt(s);
90 ChukwaConfiguration cc = new ChukwaConfiguration();
91 for(FacilityType e : FacilityType.values()) {
92 StringBuilder buffer = new StringBuilder();
93 buffer.append("syslog.adaptor.port.");
94 buffer.append(portno);
95 buffer.append(".facility.");
96 buffer.append(e.name());
97 String dataType = cc.get(buffer.toString(), e.name());
98 facilityMap.put(e.ordinal(), dataType);
99 }
100 return s;
101 }
102
103 }