1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.test;
20
21
22 import org.apache.hadoop.chukwa.Chunk;
23 import org.apache.hadoop.chukwa.datacollection.*;
24 import org.apache.hadoop.chukwa.datacollection.agent.*;
25 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
26 import java.util.*;
27
28
29
30
31
32 public class ConsoleOutConnector extends Thread implements Connector {
33
34 final ChukwaAgent agent;
35 volatile boolean shutdown;
36 final boolean silent;
37
38 public ConsoleOutConnector(ChukwaAgent a) {
39 this(a, false);
40 }
41
42 public ConsoleOutConnector(ChukwaAgent a, boolean silent) {
43 agent = a;
44 this.silent = silent;
45 }
46
47 public void run() {
48 try {
49 System.out.println("console connector started");
50 ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
51 if (!silent)
52 System.out.println("-------------------");
53
54 while (!shutdown) {
55 List<Chunk> evts = new ArrayList<Chunk>();
56 eventQueue.collect(evts, 1);
57
58 for (Chunk e : evts) {
59 if (!silent) {
60 System.out.println("Console out connector got event at offset "
61 + e.getSeqID());
62 System.out.println("data type was " + e.getDataType());
63 if (e.getData().length > 1000)
64 System.out.println("data length was " + e.getData().length
65 + ", not printing");
66 else
67 System.out.println(new String(e.getData()));
68 }
69
70 agent.reportCommit(e.getInitiator(), e.getSeqID());
71
72 if (!silent)
73 System.out.println("-------------------");
74 }
75 }
76 } catch (InterruptedException e) {
77 }
78 }
79
80 public void shutdown() {
81 shutdown = true;
82 this.interrupt();
83 }
84
85 @Override
86 public void reloadConfiguration() {
87 System.out.println("reloadConfiguration");
88 }
89
90 }