1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.io.BufferedOutputStream;
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.io.PrintStream;
28 import java.net.*;
29 import java.util.Map;
30 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
31 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
32 import org.apache.log4j.Logger;
33 import org.apache.hadoop.chukwa.util.ExceptionUtil;
34
35
36
37
38
39
40
41
42
43
44 public class AgentControlSocketListener extends Thread {
45
46 static Logger log = Logger.getLogger(AgentControlSocketListener.class);
47
48 protected ChukwaAgent agent;
49 protected int portno;
50 protected ServerSocket s = null;
51 volatile boolean closing = false;
52 static final String VERSION = "0.4.0-dev";
53 public boolean ALLOW_REMOTE = true;
54 public static final String REMOTE_ACCESS_OPT = "chukwaAgent.control.remote";
55
56 private class ListenThread extends Thread {
57 Socket connection;
58
59 ListenThread(Socket conn) {
60 connection = conn;
61 try {
62 connection.setSoTimeout(60000);
63 } catch (SocketException e) {
64 log.warn("Error while settin soTimeout to 60000");
65 e.printStackTrace();
66 }
67 this.setName("listen thread for " + connection.getRemoteSocketAddress());
68 }
69
70 public void run() {
71 try {
72 InputStream in = connection.getInputStream();
73 BufferedReader br = new BufferedReader(new InputStreamReader(in));
74 PrintStream out = new PrintStream(new BufferedOutputStream(connection
75 .getOutputStream()));
76 String cmd = null;
77 while ((cmd = br.readLine()) != null) {
78 processCommand(cmd, out);
79 }
80 connection.close();
81 if (log.isDebugEnabled()) {
82 log.debug("control connection closed");
83 }
84 } catch (SocketException e) {
85 if (e.getMessage().equals("Socket Closed"))
86 log.info("control socket closed");
87 } catch (IOException e) {
88 log.warn("a control connection broke", e);
89 try {
90 connection.close();
91 } catch(Exception ex) {
92 log.debug(ExceptionUtil.getStackTrace(ex));
93 }
94 }
95 }
96
97
98
99
100
101
102
103
104 public void processCommand(String cmd, PrintStream out) throws IOException {
105 String[] words = cmd.split("\\s+");
106 if (log.isDebugEnabled()) {
107 log.debug("command from " + connection.getRemoteSocketAddress() + ":"
108 + cmd);
109 }
110
111 if (words[0].equalsIgnoreCase("help")) {
112 out.println("you're talking to the Chukwa agent. Commands available: ");
113 out.println("add [adaptorname] [args] [offset] -- start an adaptor");
114 out.println("shutdown [adaptornumber] -- graceful stop");
115 out.println("stop [adaptornumber] -- abrupt stop");
116 out.println("list -- list running adaptors");
117 out.println("close -- close this connection");
118 out.println("stopagent -- stop the whole agent process");
119 out.println("stopall -- stop all adaptors");
120 out.println("reloadCollectors -- reload the list of collectors");
121 out.println("help -- print this message");
122 out.println("\t Command names are case-blind.");
123 } else if (words[0].equalsIgnoreCase("close")) {
124 connection.close();
125 } else if (words[0].equalsIgnoreCase("add")) {
126 try {
127 String newID = agent.processAddCommandE(cmd);
128 if (newID != null)
129 out.println("OK add completed; new ID is " + newID);
130 else
131 out.println("failed to start adaptor...check logs for details");
132 } catch(AdaptorException e) {
133 out.println(e);
134 }
135 } else if (words[0].equalsIgnoreCase("shutdown")) {
136 if (words.length < 2) {
137 out.println("need to specify an adaptor to shut down, by number");
138 } else {
139 sanitizeAdaptorName(out, words);
140 long offset = agent.stopAdaptor(words[1], AdaptorShutdownPolicy.GRACEFULLY);
141 if (offset != -1)
142 out.println("OK adaptor " + words[1] + " stopping gracefully at "
143 + offset);
144 else
145 out.println("FAIL: perhaps adaptor " + words[1] + " does not exist");
146 }
147 } else if (words[0].equalsIgnoreCase("stop")) {
148 if (words.length < 2) {
149 out.println("need to specify an adaptor to shut down, by number");
150 } else {
151 sanitizeAdaptorName(out, words);
152 agent.stopAdaptor(words[1], AdaptorShutdownPolicy.HARD_STOP);
153 out.println("OK adaptor " + words[1] + " stopped");
154 }
155 } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
156 agent.getConnector().reloadConfiguration();
157 out.println("OK reloadCollectors done");
158 } else if (words[0].equalsIgnoreCase("list")) {
159 java.util.Map<String, String> adaptorList = agent.getAdaptorList();
160
161 if (log.isDebugEnabled()) {
162 log.debug("number of adaptors: " + adaptorList.size());
163 }
164
165 for (Map.Entry<String, String> a: adaptorList.entrySet()) {
166 out.print(a.getKey());
167 out.print(") ");
168 out.print(" ");
169 out.println(a.getValue());
170 }
171 out.println("");
172
173 } else if (words[0].equalsIgnoreCase("stopagent")) {
174 out.println("stopping agent process.");
175 connection.close();
176 agent.shutdown(true);
177 } else if(words[0].equalsIgnoreCase("stopall")) {
178 int stopped = 0;
179 for(String id: agent.getAdaptorList().keySet()) {
180 agent.stopAdaptor(id, false);
181 stopped++;
182 }
183 out.println("stopped " + stopped + " adaptors");
184 } else if (words[0].equals("")) {
185 out.println(getStatusLine());
186 } else {
187 log.warn("unknown command " + words[0]);
188 out.println("unknown command " + words[0]);
189 out.println("say 'help' for a list of legal commands");
190 }
191 out.flush();
192 }
193
194 private void sanitizeAdaptorName(PrintStream out, String[] words) {
195 if(!words[1].startsWith("adaptor_")) {
196 words[1] = "adaptor_" + words[1];
197 out.println("adaptor names should start with adaptor_; "
198 +"assuming you meant"+ words[1] );
199 }
200 }
201
202 }
203
204
205
206
207
208
209 public AgentControlSocketListener(ChukwaAgent agent) {
210
211 this.setDaemon(false);
212 this.agent = agent;
213 this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
214 9093);
215 this.ALLOW_REMOTE = agent.getConfiguration().getBoolean(REMOTE_ACCESS_OPT, ALLOW_REMOTE);
216 log.info("AgentControlSocketListerner ask for port: " + portno);
217 this.setName("control socket listener");
218 }
219
220
221
222
223
224
225 public void run() {
226 try {
227 if (!isBound())
228 tryToBind();
229 } catch (IOException e) {
230 return;
231 }
232
233 while (!closing) {
234 try {
235 Socket connection = s.accept();
236 if (log.isDebugEnabled()) {
237 log.debug("new connection from " + connection.getInetAddress());
238 }
239 ListenThread l = new ListenThread(connection);
240 l.setDaemon(true);
241 l.start();
242 } catch (IOException e) {
243 if (!closing)
244 log.warn("control socket error: ", e);
245 else {
246 log.warn("shutting down listen thread due to shutdown() call");
247 break;
248 }
249 }
250 }
251 }
252
253
254
255
256 public void shutdown() {
257 closing = true;
258 try {
259 if (s != null)
260 s.close();
261 s = null;
262 } catch (IOException e) {
263 log.debug(ExceptionUtil.getStackTrace(e));
264 }
265 }
266
267 public boolean isBound() {
268 return s != null && s.isBound();
269 }
270
271 public void tryToBind() throws IOException {
272 if(ALLOW_REMOTE)
273 s = new ServerSocket(portno);
274 else {
275 s = new ServerSocket();
276 s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
277 }
278 s.setReuseAddress(true);
279 portno = s.getLocalPort();
280 if (s.isBound())
281 log.info("socket bound to " + s.getLocalPort());
282 else
283 log.info("socket isn't bound");
284 }
285
286 public int getPort() {
287 if (!s.isBound()) {
288 return -1;
289 } else {
290 return portno;
291 }
292 }
293
294
295
296 private static String localHostAddr;
297 static {
298 try {
299 localHostAddr = InetAddress.getLocalHost().getHostName();
300 } catch (UnknownHostException e) {
301 localHostAddr = "localhost";
302 }
303 }
304
305 public String getStatusLine() {
306 int adaptorCount = agent.adaptorCount();
307
308 return localHostAddr + ": Chukwa Agent running, version " + VERSION + ", with " + adaptorCount + " adaptors";
309 }
310 }