001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.reef.examples.suspend; 020 021import org.apache.reef.tang.Configuration; 022import org.apache.reef.tang.Injector; 023import org.apache.reef.tang.JavaConfigurationBuilder; 024import org.apache.reef.tang.Tang; 025import org.apache.reef.tang.annotations.Name; 026import org.apache.reef.tang.annotations.NamedParameter; 027import org.apache.reef.tang.annotations.Parameter; 028import org.apache.reef.tang.exceptions.BindException; 029import org.apache.reef.tang.formats.CommandLine; 030import org.apache.reef.wake.EStage; 031import org.apache.reef.wake.EventHandler; 032import org.apache.reef.wake.impl.LoggingEventHandler; 033import org.apache.reef.wake.impl.ThreadPoolStage; 034import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; 035import org.apache.reef.wake.remote.impl.TransportEvent; 036import org.apache.reef.wake.remote.transport.Link; 037import org.apache.reef.wake.remote.transport.Transport; 038import org.apache.reef.wake.remote.transport.TransportFactory; 039 040import javax.inject.Inject; 041import java.io.IOException; 042import java.net.InetSocketAddress; 043import java.util.logging.Level; 044import java.util.logging.Logger; 045 046public final class Control { 047 048 private static final Logger LOG = Logger.getLogger(Control.class.getName()); 049 private final transient String command; 050 private final transient String taskId; 051 private final transient int port; 052 private final TransportFactory tpFactory; 053 054 @Inject 055 public Control(@Parameter(SuspendClientControl.Port.class) final int port, 056 @Parameter(TaskId.class) final String taskId, 057 @Parameter(Command.class) final String command, 058 final TransportFactory tpFactory) { 059 this.command = command.trim().toLowerCase(); 060 this.taskId = taskId; 061 this.port = port; 062 this.tpFactory = tpFactory; 063 } 064 065 private static Configuration getConfig(final String[] args) throws IOException, BindException { 066 final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); 067 new CommandLine(cb).processCommandLine(args, SuspendClientControl.Port.class, TaskId.class, Command.class); 068 return cb.build(); 069 } 070 071 public static void main(final String[] args) throws Exception { 072 final Configuration config = getConfig(args); 073 final Injector injector = Tang.Factory.getTang().newInjector(config); 074 final Control control = injector.getInstance(Control.class); 075 control.run(); 076 } 077 078 public void run() throws Exception { 079 080 LOG.log(Level.INFO, "command: {0} task: {1} port: {2}", 081 new Object[]{this.command, this.taskId, this.port}); 082 083 final ObjectSerializableCodec<String> codec = new ObjectSerializableCodec<>(); 084 085 final EStage<TransportEvent> stage = new ThreadPoolStage<>("suspend-control-client", 086 new LoggingEventHandler<TransportEvent>(), 1, new EventHandler<Throwable>() { 087 @Override 088 public void onNext(final Throwable throwable) { 089 throw new RuntimeException(throwable); 090 } 091 }); 092 093 try (final Transport transport = tpFactory.newInstance("localhost", 0, stage, stage, 1, 10000)) { 094 final Link link = transport.open(new InetSocketAddress("localhost", this.port), codec, null); 095 link.write(this.command + " " + this.taskId); 096 } 097 } 098 099 @NamedParameter(doc = "Task id", short_name = "task") 100 public static final class TaskId implements Name<String> { 101 } 102 103 @NamedParameter(doc = "Command: 'suspend' or 'resume'", short_name = "cmd") 104 public static final class Command implements Name<String> { 105 } 106}