1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.util;
20
21
22 import java.util.Random;
23 import java.util.regex.*;
24 import org.apache.hadoop.chukwa.*;
25 import org.apache.hadoop.chukwa.datacollection.*;
26 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
27 import org.apache.hadoop.conf.Configuration;
28
29
30
31
32
33
34
35
36
37
38
39
40
41 public class ConstRateAdaptor extends AbstractAdaptor implements Runnable {
42
43 private int SLEEP_VARIANCE = 200;
44 private int MIN_SLEEP = 300;
45
46 private long offset;
47 private int bytesPerSec;
48
49 Random timeCoin;
50 long seed;
51
52 private volatile boolean stopping = false;
53
54 public String getCurrentStatus() {
55 return type.trim() + " " + bytesPerSec + " " + seed;
56 }
57
58 public void start(long offset) throws AdaptorException {
59
60 this.offset = offset;
61 Configuration conf = control.getConfiguration();
62 MIN_SLEEP = conf.getInt("constAdaptor.minSleep", MIN_SLEEP);
63 SLEEP_VARIANCE = conf.getInt("constAdaptor.sleepVariance", SLEEP_VARIANCE);
64
65 timeCoin = new Random(seed);
66 long o =0;
67 while(o < offset)
68 o += (int) ((timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP) *
69 (long) bytesPerSec / 1000L) + 8;
70 new Thread(this).start();
71 }
72
73 public String parseArgs(String bytesPerSecParam) {
74 try {
75 Matcher m = Pattern.compile("([0-9]+)(?:\\s+([0-9]+))?\\s*").matcher(bytesPerSecParam);
76 if(!m.matches())
77 return null;
78 bytesPerSec = Integer.parseInt(m.group(1));
79 String rate = m.group(2);
80 if(rate != null)
81 seed = Long.parseLong(m.group(2));
82 else
83 seed = System.currentTimeMillis();
84 } catch (NumberFormatException e) {
85
86 return null;
87 }
88 return bytesPerSecParam;
89 }
90
91 public void run() {
92 try {
93 while (!stopping) {
94 int MSToSleep = timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP;
95 int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L) + 8;
96 ChunkImpl evt = nextChunk(arraySize );
97
98 dest.add(evt);
99
100 Thread.sleep(MSToSleep);
101 }
102 } catch (InterruptedException ie) {
103 }
104 }
105
106 public ChunkImpl nextChunk(int arraySize) {
107 byte[] data = new byte[arraySize];
108 Random dataPattern = new Random(offset ^ seed);
109 long s = this.seed;
110 offset += data.length;
111 dataPattern.nextBytes(data);
112 for(int i=0; i < 8; ++i) {
113 data[7-i] = (byte) (s & 0xFF);
114 s >>= 8;
115 }
116 ChunkImpl evt = new ChunkImpl(type, "random ("+ this.seed+")", offset, data,
117 this);
118 return evt;
119 }
120
121 public String toString() {
122 return "const rate " + type;
123 }
124
125 @Override
126 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
127 stopping = true;
128 return offset;
129 }
130
131 public static boolean checkChunk(Chunk chunk) {
132 byte[] data = chunk.getData();
133 byte[] correctData = new byte[data.length];
134
135 long seed = 0;
136 for(int i=0; i < 8; ++i)
137 seed = (seed << 8) | (0xFF & data[i] );
138
139 seed ^= (chunk.getSeqID() - data.length);
140 Random dataPattern = new Random(seed);
141 dataPattern.nextBytes(correctData);
142 for(int i=8; i < data.length ; ++i)
143 if(data [i] != correctData[i])
144 return false;
145
146 return true;
147 }
148
149 void test_init(String type) {
150 this.type = type;
151 seed = System.currentTimeMillis();
152 }
153 }