View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection;
20  
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.util.Iterator;
25  
26  import org.apache.hadoop.conf.*;
27  import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
28  import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
29  import org.apache.log4j.Logger;
30  
31  public class DataFactory {
32    static Logger log = Logger.getLogger(DataFactory.class);
33    static final int QUEUE_SIZE_KB = 10 * 1024;
34    static final String COLLECTORS_FILENAME = "collectors";
35    private static DataFactory dataFactory = null;
36    private ChunkQueue chunkQueue = null;
37  
38    private String defaultTags = "";
39    
40    static {
41      dataFactory = new DataFactory();
42    }
43  
44    private DataFactory() {
45    }
46  
47    public static DataFactory getInstance() {
48      return dataFactory;
49    }
50  
51    public synchronized ChunkQueue getEventQueue() {
52      if (chunkQueue == null) {
53        chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
54      }
55      return chunkQueue;
56    }
57  
58    public String getDefaultTags() {
59      return defaultTags;
60    }
61    
62    public void addDefaultTag(String tag) {
63      this.defaultTags += " " + tag.trim();
64    }
65    
66    /**
67     * @return empty list if file does not exist
68     * @throws IOException on other error
69     */
70    public Iterator<String> getCollectorURLs(Configuration conf, String filename) throws IOException {
71      String chukwaHome = System.getenv("CHUKWA_HOME");
72      if (chukwaHome == null) {
73        chukwaHome = ".";
74      }
75  
76      if (!chukwaHome.endsWith("/")) {
77        chukwaHome = chukwaHome + File.separator;
78      }
79      log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
80  
81      String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
82      if (chukwaConf == null) {
83        chukwaConf = chukwaHome + "conf" + File.separator;
84      }
85  
86      log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
87  
88      log.info("setting up collectors file: " + chukwaConf + File.separator
89          + COLLECTORS_FILENAME);
90      File collectors = new File(chukwaConf + File.separator + filename);
91      try {
92        return new RetryListOfCollectors(collectors, conf);
93      } catch (java.io.IOException e) {
94        log.error("failed to read collectors file: ", e);
95        throw e;
96      }
97    }
98    public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
99      return getCollectorURLs(conf, "collectors");
100   }
101 
102 }