View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.util;
20  
21  
22  import java.util.Random;
23  import java.util.regex.*;
24  import org.apache.hadoop.chukwa.*;
25  import org.apache.hadoop.chukwa.datacollection.*;
26  import org.apache.hadoop.chukwa.datacollection.adaptor.*;
27  import org.apache.hadoop.conf.Configuration;
28  
29  /**
30   * Emits chunks at a roughly constant data rate. Chunks are in a very particular
31   * format: the output data is verifiable, but sufficiently non-deterministic
32   * that two different instances of this adaptor are very likely to have
33   * distinct outputs.
34   * 
35   * 
36   * Each chunk is full of random bytes; the randomness comes from 
37   * an instance of java.util.Random seeded with the offset xored
38   * with the time-of-generation. The time of generation is stored, big-endian,
39   * in the first eight bytes of each chunk.
40   */
41  public class ConstRateAdaptor extends AbstractAdaptor implements Runnable {
42  
43    private int SLEEP_VARIANCE = 200;
44    private int MIN_SLEEP = 300;
45  
46    private long offset;
47    private int bytesPerSec;
48  
49    Random timeCoin;
50    long seed;
51    
52    private volatile boolean stopping = false;
53  
54    public String getCurrentStatus() {
55      return type.trim() + " " + bytesPerSec + " " + seed;
56    }
57  
58    public void start(long offset) throws AdaptorException {
59  
60      this.offset = offset;
61      Configuration conf = control.getConfiguration();
62      MIN_SLEEP = conf.getInt("constAdaptor.minSleep", MIN_SLEEP);
63      SLEEP_VARIANCE = conf.getInt("constAdaptor.sleepVariance", SLEEP_VARIANCE);
64      
65      timeCoin = new Random(seed);
66      long o =0;
67      while(o < offset)
68        o += (int) ((timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP) *
69            (long) bytesPerSec / 1000L) + 8;
70      new Thread(this).start(); // this is a Thread.start
71    }
72  
73    public String parseArgs(String bytesPerSecParam) {
74      try {
75        Matcher m = Pattern.compile("([0-9]+)(?:\\s+([0-9]+))?\\s*").matcher(bytesPerSecParam);
76        if(!m.matches())
77          return null;
78        bytesPerSec = Integer.parseInt(m.group(1));
79        String rate = m.group(2);
80        if(rate != null)
81          seed = Long.parseLong(m.group(2));
82        else
83          seed = System.currentTimeMillis();
84      } catch (NumberFormatException e) {
85        //("bad argument to const rate adaptor: ["  + bytesPerSecParam + "]");
86        return null;
87      }
88      return bytesPerSecParam;
89    }
90  
91    public void run() {
92      try {
93        while (!stopping) {
94          int MSToSleep = timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; 
95          int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L) + 8;
96          ChunkImpl evt = nextChunk(arraySize );
97  
98          dest.add(evt);
99  
100         Thread.sleep(MSToSleep);
101       } // end while
102     } catch (InterruptedException ie) {
103     } // abort silently
104   }
105 
106   public ChunkImpl nextChunk(int arraySize) {
107     byte[] data = new byte[arraySize];
108     Random dataPattern = new Random(offset ^ seed);
109     long s = this.seed;
110     offset += data.length;
111     dataPattern.nextBytes(data);
112     for(int i=0; i < 8; ++i)  {
113       data[7-i] = (byte) (s & 0xFF);
114       s >>= 8;
115     }
116     ChunkImpl evt = new ChunkImpl(type, "random ("+ this.seed+")", offset, data,
117         this);
118     return evt;
119   }
120 
121   public String toString() {
122     return "const rate " + type;
123   }
124 
125   @Override
126   public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
127     stopping = true;
128     return offset;
129   }
130   
131   public static boolean checkChunk(Chunk chunk) {
132     byte[] data = chunk.getData();
133     byte[] correctData = new byte[data.length];
134     
135     long seed = 0;
136     for(int i=0; i < 8; ++i) 
137       seed = (seed << 8) | (0xFF & data[i] );
138 
139     seed ^= (chunk.getSeqID() - data.length);
140     Random dataPattern = new Random(seed);
141     dataPattern.nextBytes(correctData);
142     for(int i=8; i < data.length ; ++i) 
143       if(data [i] != correctData[i])
144         return false;
145      
146     return true;
147   }
148   
149   void test_init(String type) {
150     this.type = type;
151     seed = System.currentTimeMillis();
152   }
153 }