/* * (c) Copyright 2009 Hewlett-Packard Development Company, LP * All rights reserved. * [See endLog of file] */ package riot.comms; import java.io.IOException ; import java.io.InputStream ; import java.io.OutputStream ; import java.net.Socket ; import java.nio.ByteBuffer ; import java.nio.channels.SocketChannel ; import org.openjena.atlas.io.BufferingWriter ; import org.openjena.atlas.lib.Sink ; import org.openjena.riot.tokens.Tokenizer ; import org.openjena.riot.tokens.TokenizerFactory ; import riot.comms.client.DirectChannel ; import riot.io.TokenInputStream ; import riot.io.TokenInputStreamBase ; import riot.io.TokenOutputStream ; import riot.io.TokenOutputStreamWriter ; /** A two-way flow of tuples */ public class TokenStreamEndpoint { /** Actively create */ public static TokenStreamEndpoint create(String hostname, int port) { DirectChannel directChannel = new DirectChannel(hostname, port) ; String label = hostname+":"+port ; return create(label,label, directChannel.getSocket()) ; } /** Passively create */ public static TokenStreamEndpoint create(SocketChannel channel) { Socket socket = channel.socket() ; try { InputStream input = socket.getInputStream() ; OutputStream output = socket.getOutputStream() ; return new TokenStreamEndpoint("input", "output", socket, input, output) ; } catch (IOException ex) { throw new CommsException(ex) ; } } public static TokenStreamEndpoint create(String labelInput, String labelOutput, InputStream input, OutputStream output) { return new TokenStreamEndpoint(labelInput, labelOutput, null, input, output) ; } public static TokenStreamEndpoint create(String labelInput, String labelOutput, Socket socket) { try { return new TokenStreamEndpoint(labelInput, labelOutput, socket, socket.getInputStream(), socket.getOutputStream()) ; } catch (IOException ex) { throw new CommsException(ex) ; } } private TokenInputStream tokenInput ; private TokenOutputStream tokenOutput ; private String labelInput ; private String labelOutput ; private Socket socket ; private TokenStreamEndpoint(String labelInput, String labelOutput, Socket socket, InputStream input, OutputStream output) { this.socket = socket ; this.labelInput = labelInput ; this.labelOutput = labelOutput ; // Build all the stack of classes so that a raw byte stream is treated as a tuple stream. Tokenizer stream = TokenizerFactory.makeTokenizerUTF8(input) ; this.tokenInput = new TokenInputStreamBase(labelInput, stream) ; // Output. Sink dest = new BufferingWriter.SinkOutputStream(output) ; BufferingWriter bw = new BufferingWriter(dest) ; this.tokenOutput = new TokenOutputStreamWriter(labelOutput, bw) ; } @Override public String toString() { return labelInput+" -> "+labelOutput ; } public boolean endOfInput() { return ! tokenInput.hasNext() ; } public TokenInputStream getInput() { return tokenInput ; } public TokenOutputStream getOutput() { return tokenOutput ; } public void close() { try { tokenInput.close() ; if ( socket != null ) socket.close() ; tokenOutput.close() ; } catch (Exception ex) {} } } /* * (c) Copyright 2009 Hewlett-Packard Development Company, LP * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */