1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.sender;
20
21
22 import java.io.*;
23 import java.util.*;
24 import org.apache.hadoop.conf.Configuration;
25
26
27
28
29
30
31
32
33
34
35
36 public class RetryListOfCollectors implements Iterator<String>, Cloneable {
37
38 int maxRetryRateMs;
39 List<String> collectors;
40 long lastLookAtFirstNode;
41 int nextCollector = 0;
42 private String portNo;
43 Configuration conf;
44 public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
45
46 public RetryListOfCollectors(File collectorFile, Configuration conf)
47 throws IOException {
48 this(conf);
49 try {
50 BufferedReader br = new BufferedReader(new FileReader(collectorFile));
51 String line, parsedline;
52 while ((line = br.readLine()) != null) {
53 parsedline = canonicalizeLine(line);
54 collectors.add(parsedline);
55 }
56
57 br.close();
58 } catch (FileNotFoundException e) {
59 System.err.println("Error in RetryListOfCollectors() opening file"
60 + collectorFile.getCanonicalPath() + ", double check that you have"
61 + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"
62 + " exists and is in classpath");
63 throw e;
64 } catch (IOException e) {
65 System.err
66 .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
67 throw e;
68 }
69 shuffleList();
70 }
71
72 private String canonicalizeLine(String line) {
73 String parsedline;
74 if (!line.contains("://")) {
75
76 if (line.matches(".*:\\d+.*")) {
77 parsedline = "http://" + line+"/";
78 } else {
79 parsedline = "http://" + line + ":" + portNo;
80 }
81 } else {
82 if (line.matches(".*:\\d+.*")) {
83 parsedline = line;
84 } else {
85 parsedline = line + ":" + portNo;
86 }
87 }
88 if(!parsedline.matches(".*\\w/.*"))
89 parsedline = parsedline+"/";
90 return parsedline;
91 }
92
93
94
95
96
97
98
99 public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
100 this(conf);
101 this.collectors.addAll(collectors);
102
103 }
104
105 public RetryListOfCollectors(Configuration conf) {
106 collectors = new ArrayList<String>();
107 this.conf = conf;
108 portNo = conf.get("chukwaCollector.http.port", "8080");
109 maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
110 lastLookAtFirstNode = 0;
111 }
112
113
114
115 public void shuffleList() {
116 ArrayList<String> newList = new ArrayList<String>();
117 Random r = new java.util.Random();
118 while (!collectors.isEmpty()) {
119 int toRemove = r.nextInt(collectors.size());
120 String next = collectors.remove(toRemove);
121 newList.add(next);
122 }
123 collectors = newList;
124 }
125
126 public boolean hasNext() {
127 return collectors.size() > 0
128 && ((nextCollector != 0) || (System.currentTimeMillis()
129 - lastLookAtFirstNode > maxRetryRateMs));
130 }
131
132 public String next() {
133 if (hasNext()) {
134 int currCollector = nextCollector;
135 nextCollector = (nextCollector + 1) % collectors.size();
136 if (currCollector == 0)
137 lastLookAtFirstNode = System.currentTimeMillis();
138 return collectors.get(currCollector);
139 } else
140 return null;
141 }
142
143 public void add(String collector) {
144 collectors.add(collector);
145 }
146
147 public void remove() {
148 throw new UnsupportedOperationException();
149
150
151
152 }
153
154
155
156
157
158 int total() {
159 return collectors.size();
160 }
161
162 public RetryListOfCollectors clone() {
163 try {
164 RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
165 return clone;
166 } catch(CloneNotSupportedException e) {
167 return null;
168 }
169 }
170
171 }