TelnetInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.commons.net.telnet;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;

final class TelnetInputStream extends BufferedInputStream implements Runnable {
    /** End of file has been reached */
    private static final int EOF = -1;

    /** Read would block */
    private static final int WOULD_BLOCK = -2;

    // TODO should these be private enums?
    static final int STATE_DATA = 0, STATE_IAC = 1, STATE_WILL = 2, STATE_WONT = 3, STATE_DO = 4, STATE_DONT = 5, STATE_SB = 6, STATE_SE = 7, STATE_CR = 8,
            STATE_IAC_SB = 9;

    private boolean hasReachedEOF; // @GuardedBy("queue")
    private volatile boolean isClosed;
    private boolean readIsWaiting;
    private int receiveState, queueHead, queueTail, bytesAvailable;
    private final int[] queue;
    private final TelnetClient client;
    private final Thread thread;
    private IOException ioException;

    /* TERMINAL-TYPE option (start) */
    private final int suboption[];
    private int suboptionCount;
    /* TERMINAL-TYPE option (end) */

    private volatile boolean threaded;

    TelnetInputStream(final InputStream input, final TelnetClient client) {
        this(input, client, true);
    }

    TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
        super(input);
        this.client = client;
        this.receiveState = STATE_DATA;
        this.isClosed = true;
        this.hasReachedEOF = false;
        // Make it 2049, because when full, one slot will go unused, and we
        // want a 2048 byte buffer just to have a round number (base 2 that is)
        this.queue = new int[2049];
        this.queueHead = 0;
        this.queueTail = 0;
        this.suboption = new int[client.maxSubnegotiationLength];
        this.bytesAvailable = 0;
        this.ioException = null;
        this.readIsWaiting = false;
        this.threaded = false;
        if (readerThread) {
            this.thread = new Thread(this);
        } else {
            this.thread = null;
        }
    }

    @Override
    public int available() throws IOException {
        // Critical section because run() may change bytesAvailable
        synchronized (queue) {
            if (threaded) { // Must not call super.available when running threaded: NET-466
                return bytesAvailable;
            }
            return bytesAvailable + super.available();
        }
    }

    // Cannot be synchronized. Will cause deadlock if run() is blocked
    // in read because BufferedInputStream read() is synchronized.
    @Override
    public void close() throws IOException {
        // Completely disregard the fact thread may still be running.
        // We can't afford to block on this close by waiting for
        // thread to terminate because few if any JVM's will actually
        // interrupt a system read() from the interrupt() method.
        super.close();

        synchronized (queue) {
            hasReachedEOF = true;
            isClosed = true;

            if (thread != null && thread.isAlive()) {
                thread.interrupt();
            }

            queue.notifyAll();
        }

    }

    /** Returns false. Mark is not supported. */
    @Override
    public boolean markSupported() {
        return false;
    }

    // synchronized(client) critical sections are to protect against
    // TelnetOutputStream writing through the telnet client at same time
    // as a processDo/Will/etc. command invoked from TelnetInputStream
    // tries to write. Returns true if buffer was previously empty.
    private boolean processChar(final int ch) throws InterruptedException {
        // Critical section because we're altering bytesAvailable,
        // queueTail, and the contents of _queue.
        final boolean bufferWasEmpty;
        synchronized (queue) {
            bufferWasEmpty = bytesAvailable == 0;
            while (bytesAvailable >= queue.length - 1) {
                // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
                // will consume some data soon!
                if (!threaded) {
                    // We've been asked to add another character to the queue, but it is already full and there's
                    // no other thread to drain it. This should not have happened!
                    throw new IllegalStateException("Queue is full! Cannot process another character.");
                }
                queue.notify();
                try {
                    queue.wait();
                } catch (final InterruptedException e) {
                    throw e;
                }
            }

            // Need to do this in case we're not full, but block on a read
            if (readIsWaiting && threaded) {
                queue.notify();
            }

            queue[queueTail] = ch;
            ++bytesAvailable;

            if (++queueTail >= queue.length) {
                queueTail = 0;
            }
        }
        return bufferWasEmpty;
    }

    @Override
    public int read() throws IOException {
        // Critical section because we're altering bytesAvailable,
        // queueHead, and the contents of _queue in addition to
        // testing value of hasReachedEOF.
        synchronized (queue) {

            while (true) {
                if (ioException != null) {
                    final IOException e;
                    e = ioException;
                    ioException = null;
                    throw e;
                }

                if (bytesAvailable == 0) {
                    // Return EOF if at end of file
                    if (hasReachedEOF) {
                        return EOF;
                    }

                    // Otherwise, we have to wait for queue to get something
                    if (threaded) {
                        queue.notify();
                        try {
                            readIsWaiting = true;
                            queue.wait();
                            readIsWaiting = false;
                        } catch (final InterruptedException e) {
                            throw new InterruptedIOException("Fatal thread interruption during read.");
                        }
                    } else {
                        // alreadyread = false;
                        readIsWaiting = true;
                        int ch;
                        boolean mayBlock = true; // block on the first read only

                        do {
                            try {
                                if ((ch = read(mayBlock)) < 0) { // must be EOF
                                    if (ch != WOULD_BLOCK) {
                                        return ch;
                                    }
                                }
                            } catch (final InterruptedIOException e) {
                                synchronized (queue) {
                                    ioException = e;
                                    queue.notifyAll();
                                    try {
                                        queue.wait(100);
                                    } catch (final InterruptedException interrupted) {
                                        // Ignored
                                    }
                                }
                                return EOF;
                            }

                            try {
                                if (ch != WOULD_BLOCK) {
                                    processChar(ch);
                                }
                            } catch (final InterruptedException e) {
                                if (isClosed) {
                                    return EOF;
                                }
                            }

                            // Reads should not block on subsequent iterations. Potentially, this could happen if the
                            // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
                            mayBlock = false;

                        }
                        // Continue reading as long as there is data available and the queue is not full.
                        while (super.available() > 0 && bytesAvailable < queue.length - 1);

                        readIsWaiting = false;
                    }
                    continue;
                }
                final int ch;

                ch = queue[queueHead];

                if (++queueHead >= queue.length) {
                    queueHead = 0;
                }

                --bytesAvailable;

                // Need to explicitly notify() so available() works properly
                if (bytesAvailable == 0 && threaded) {
                    queue.notify();
                }

                return ch;
            }
        }
    }

    // synchronized(client) critical sections are to protect against
    // TelnetOutputStream writing through the telnet client at same time
    // as a processDo/Will/etc. command invoked from TelnetInputStream
    // tries to write.
    /**
     * Gets the next byte of data. IAC commands are processed internally and do not return data.
     *
     * @param mayBlock true if method is allowed to block
     * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
     */
    private int read(final boolean mayBlock) throws IOException {
        int ch;

        while (true) {

            // If there is no more data AND we were told not to block,
            // just return WOULD_BLOCK (-2). (More efficient than exception.)
            if (!mayBlock && super.available() == 0) {
                return WOULD_BLOCK;
            }

            // Otherwise, exit only when we reach end of stream.
            if ((ch = super.read()) < 0) {
                return EOF;
            }

            ch &= 0xff;

            /* Code Section added for supporting AYT (start) */
            synchronized (client) {
                client.processAYTResponse();
            }
            /* Code Section added for supporting AYT (end) */

            /* Code Section added for supporting spystreams (start) */
            client.spyRead(ch);
            /* Code Section added for supporting spystreams (end) */

            switch (receiveState) {

            case STATE_CR:
                if (ch == '\0') {
                    // Strip null
                    continue;
                }
                // How do we handle newline after cr?
                // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&

                // Handle as normal data by falling through to _STATE_DATA case

                //$FALL-THROUGH$
            case STATE_DATA:
                if (ch == TelnetCommand.IAC) {
                    receiveState = STATE_IAC;
                    continue;
                }

                if (ch == '\r') {
                    synchronized (client) {
                        if (client.requestedDont(TelnetOption.BINARY)) {
                            receiveState = STATE_CR;
                        } else {
                            receiveState = STATE_DATA;
                        }
                    }
                } else {
                    receiveState = STATE_DATA;
                }
                break;

            case STATE_IAC:
                switch (ch) {
                case TelnetCommand.WILL:
                    receiveState = STATE_WILL;
                    continue;
                case TelnetCommand.WONT:
                    receiveState = STATE_WONT;
                    continue;
                case TelnetCommand.DO:
                    receiveState = STATE_DO;
                    continue;
                case TelnetCommand.DONT:
                    receiveState = STATE_DONT;
                    continue;
                /* TERMINAL-TYPE option (start) */
                case TelnetCommand.SB:
                    suboptionCount = 0;
                    receiveState = STATE_SB;
                    continue;
                /* TERMINAL-TYPE option (end) */
                case TelnetCommand.IAC:
                    receiveState = STATE_DATA;
                    break; // exit to enclosing switch to return IAC from read
                case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
                    receiveState = STATE_DATA;
                    continue;
                default:
                    receiveState = STATE_DATA;
                    client.processCommand(ch); // Notify the user
                    continue; // move on the next char
                }
                break; // exit and return from read
            case STATE_WILL:
                synchronized (client) {
                    client.processWill(ch);
                    client.flushOutputStream();
                }
                receiveState = STATE_DATA;
                continue;
            case STATE_WONT:
                synchronized (client) {
                    client.processWont(ch);
                    client.flushOutputStream();
                }
                receiveState = STATE_DATA;
                continue;
            case STATE_DO:
                synchronized (client) {
                    client.processDo(ch);
                    client.flushOutputStream();
                }
                receiveState = STATE_DATA;
                continue;
            case STATE_DONT:
                synchronized (client) {
                    client.processDont(ch);
                    client.flushOutputStream();
                }
                receiveState = STATE_DATA;
                continue;
            /* TERMINAL-TYPE option (start) */
            case STATE_SB:
                switch (ch) {
                case TelnetCommand.IAC:
                    receiveState = STATE_IAC_SB;
                    continue;
                default:
                    // store suboption char
                    if (suboptionCount < suboption.length) {
                        suboption[suboptionCount++] = ch;
                    }
                    break;
                }
                receiveState = STATE_SB;
                continue;
            case STATE_IAC_SB: // IAC received during SB phase
                switch (ch) {
                case TelnetCommand.SE:
                    synchronized (client) {
                        client.processSuboption(suboption, suboptionCount);
                        client.flushOutputStream();
                    }
                    receiveState = STATE_DATA;
                    continue;
                case TelnetCommand.IAC: // De-dup the duplicated IAC
                    if (suboptionCount < suboption.length) {
                        suboption[suboptionCount++] = ch;
                    }
                    break;
                default: // unexpected byte! ignore it
                    break;
                }
                receiveState = STATE_SB;
                continue;
            /* TERMINAL-TYPE option (end) */
            }

            break;
        }

        return ch;
    }

    /**
     * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
     *
     * @param buffer The byte array in which to store the data.
     * @return The number of bytes read. Returns -1 if the end of the message has been reached.
     * @throws IOException If an error occurs in reading the underlying stream.
     */
    @Override
    public int read(final byte buffer[]) throws IOException {
        return read(buffer, 0, buffer.length);
    }

    /**
     * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
     * The characters are stored in the array starting from the given offset and up to the length specified.
     *
     * @param buffer The byte array in which to store the data.
     * @param offset The offset into the array at which to start storing data.
     * @param length The number of bytes to read.
     * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
     * @throws IOException If an error occurs while reading the underlying stream.
     */
    @Override
    public int read(final byte buffer[], int offset, int length) throws IOException {
        int ch;
        final int off;

        if (length < 1) {
            return 0;
        }

        // Critical section because run() may change bytesAvailable
        synchronized (queue) {
            if (length > bytesAvailable) {
                length = bytesAvailable;
            }
        }

        if ((ch = read()) == EOF) {
            return EOF;
        }

        off = offset;

        do {
            buffer[offset++] = (byte) ch;
        } while (--length > 0 && (ch = read()) != EOF);

        // client._spyRead(buffer, off, offset - off);
        return offset - off;
    }

    @Override
    public void run() {
        int ch;

        try {
            _outerLoop: while (!isClosed) {
                try {
                    if ((ch = read(true)) < 0) {
                        break;
                    }
                } catch (final InterruptedIOException e) {
                    synchronized (queue) {
                        ioException = e;
                        queue.notifyAll();
                        try {
                            queue.wait(100);
                        } catch (final InterruptedException interrupted) {
                            if (isClosed) {
                                break _outerLoop;
                            }
                        }
                        continue;
                    }
                } catch (final RuntimeException re) {
                    // We treat any runtime exceptions as though the
                    // stream has been closed. We close the
                    // underlying stream just to be sure.
                    super.close();
                    // Breaking the loop has the effect of setting
                    // the state to closed at the end of the method.
                    break _outerLoop;
                }

                // Process new character
                boolean notify = false;
                try {
                    notify = processChar(ch);
                } catch (final InterruptedException e) {
                    if (isClosed) {
                        break _outerLoop;
                    }
                }

                // Notify input listener if buffer was previously empty
                if (notify) {
                    client.notifyInputListener();
                }
            }
        } catch (final IOException ioe) {
            synchronized (queue) {
                ioException = ioe;
            }
            client.notifyInputListener();
        }

        synchronized (queue) {
            isClosed = true; // Possibly redundant
            hasReachedEOF = true;
            queue.notify();
        }

        threaded = false;
    }

    void start() {
        if (thread == null) {
            return;
        }

        int priority;
        isClosed = false;
        // TODO remove this
        // Need to set a higher priority in case JVM does not use pre-emptive
        // threads. This should prevent scheduler induced deadlock (rather than
        // deadlock caused by a bug in this code).
        priority = Thread.currentThread().getPriority() + 1;
        if (priority > Thread.MAX_PRIORITY) {
            priority = Thread.MAX_PRIORITY;
        }
        thread.setPriority(priority);
        thread.setDaemon(true);
        thread.start();
        threaded = true; // tell _processChar that we are running threaded
    }
}