1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.trace;
18
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Random;
24
25 import org.apache.accumulo.trace.instrument.receivers.SendSpansViaThrift;
26 import org.apache.accumulo.fate.zookeeper.ZooReader;
27 import org.apache.log4j.Logger;
28 import org.apache.zookeeper.KeeperException;
29 import org.apache.zookeeper.WatchedEvent;
30 import org.apache.zookeeper.Watcher;
31
32
33 /**
34 * Find a Span collector via zookeeper and push spans there via Thrift RPC
35 *
36 */
37 public class ZooTraceClient extends SendSpansViaThrift implements Watcher {
38
39 private static final Logger log = Logger.getLogger(ZooTraceClient.class);
40
41 final ZooReader zoo;
42 final String path;
43 final Random random = new Random();
44 final List<String> hosts = new ArrayList<String>();
45
46 public ZooTraceClient(ZooReader zoo, String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException {
47 super(host, service, millis);
48 this.path = path;
49 this.zoo = zoo;
50 updateHosts(path, zoo.getChildren(path, this));
51 }
52
53 @Override
54 synchronized protected String getSpanKey(Map<String,String> data) {
55 if (hosts.size() > 0) {
56 return hosts.get(random.nextInt(hosts.size()));
57 }
58 return null;
59 }
60
61 @Override
62 public void process(WatchedEvent event) {
63 try {
64 updateHosts(path, zoo.getChildren(path, null));
65 } catch (Exception ex) {
66 log.error("unable to get destination hosts in zookeeper", ex);
67 }
68 }
69
70 synchronized private void updateHosts(String path, List<String> children) {
71 log.debug("Scanning trace hosts in zookeeper: " + path);
72 try {
73 List<String> hosts = new ArrayList<String>();
74 for (String child : children) {
75 byte[] data = zoo.getData(path + "/" + child, null);
76 hosts.add(new String(data));
77 }
78 this.hosts.clear();
79 this.hosts.addAll(hosts);
80 log.debug("Trace hosts: " + this.hosts);
81 } catch (Exception ex) {
82 log.error("unable to get destination hosts in zookeeper", ex);
83 }
84 }
85 }