1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.util;
20
21
22 import java.util.Random;
23 import org.apache.hadoop.chukwa.ChunkImpl;
24 import org.apache.hadoop.chukwa.datacollection.*;
25 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
26 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
27
28 public class MaxRateSender extends Thread implements Adaptor {
29
30 public static final int BUFFER_SIZE = 60 * 1024;
31 public static final String ADAPTOR_NAME = "MaxRateSender";
32
33 private volatile boolean stopping = false;
34 private long offset;
35 private String type;
36 ChunkReceiver dest;
37
38 public String getCurrentStatus() {
39 return type;
40 }
41
42 public void start(String adaptorID, String type, long offset,
43 ChunkReceiver dest) throws AdaptorException {
44 this.setName("MaxRateSender adaptor");
45 this.offset = offset;
46 this.type = type;
47 this.dest = dest;
48 super.start();
49 }
50
51 @Override
52 public String parseArgs(String d, String s,AdaptorManager c) {
53 return s;
54 }
55
56 public void run() {
57 Random r = new Random();
58
59 try {
60 while (!stopping) {
61 byte[] data = new byte[BUFFER_SIZE];
62 r.nextBytes(data);
63 offset += data.length;
64 ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
65 this);
66 dest.add(evt);
67
68 }
69 } catch (InterruptedException ie) {
70 }
71 }
72
73 public String toString() {
74 return ADAPTOR_NAME;
75 }
76
77
78 @Override
79 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
80 stopping = true;
81 return offset;
82 }
83
84 @Override
85 public String getType() {
86 return type;
87 }
88
89 }