View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.serial;
21  
22  import gnu.io.SerialPort;
23  import gnu.io.SerialPortEvent;
24  import gnu.io.SerialPortEventListener;
25  
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.util.TooManyListenersException;
30  
31  import org.apache.mina.core.session.AbstractIoSession;
32  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
33  import org.apache.mina.core.service.DefaultTransportMetadata;
34  import org.apache.mina.util.ExceptionMonitor;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.service.IoHandler;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.IoService;
40  import org.apache.mina.core.service.IoServiceListenerSupport;
41  import org.apache.mina.core.service.TransportMetadata;
42  import org.apache.mina.core.write.WriteRequest;
43  
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  /**
48   * An imlpementation of {@link SerialSession}.
49   *
50   * @author The Apache MINA Project (dev@mina.apache.org)
51   */
52  class SerialSessionImpl extends AbstractIoSession implements
53          SerialSession, SerialPortEventListener {
54  
55      static final TransportMetadata METADATA =
56          new DefaultTransportMetadata(
57                  "rxtx", "serial", false, true, SerialAddress.class,
58                  SerialSessionConfig.class, IoBuffer.class);
59  
60      private final SerialSessionConfig config = new DefaultSerialSessionConfig();
61      private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
62      private final IoHandler ioHandler;
63      private final IoFilterChain filterChain;
64      private final SerialConnector service;
65      private final IoServiceListenerSupport serviceListeners;
66      private final SerialAddress address;
67      private final SerialPort port;
68      private final Logger log;
69  
70      private InputStream inputStream;
71      private OutputStream outputStream;
72  
73      SerialSessionImpl(
74              SerialConnector service, IoServiceListenerSupport serviceListeners,
75              SerialAddress address, SerialPort port) {
76          this.service = service;
77          this.serviceListeners = serviceListeners;
78          ioHandler = service.getHandler();
79          filterChain = new DefaultIoFilterChain(this);
80          this.port = port;
81          this.address = address;
82  
83          log = LoggerFactory.getLogger(SerialSessionImpl.class);
84      }
85  
86      public SerialSessionConfig getConfig() {
87          return config;
88      }
89  
90      public IoFilterChain getFilterChain() {
91          return filterChain;
92      }
93  
94      public IoHandler getHandler() {
95          return ioHandler;
96      }
97  
98      public TransportMetadata getTransportMetadata() {
99          return METADATA;
100     }
101 
102     public SerialAddress getLocalAddress() {
103         return null; // not applicable
104     }
105 
106     public SerialAddress getRemoteAddress() {
107         return address;
108     }
109 
110     @Override
111     public SerialAddress getServiceAddress() {
112         return (SerialAddress) super.getServiceAddress();
113     }
114 
115     public IoService getService() {
116         return service;
117     }
118 
119     /**
120      * start handling streams
121      *
122      * @throws IOException
123      * @throws TooManyListenersException
124      */
125     void start() throws IOException, TooManyListenersException {
126         inputStream = port.getInputStream();
127         outputStream = port.getOutputStream();
128         ReadWorker w = new ReadWorker();
129         w.start();
130         port.addEventListener(this);
131         service.getIdleStatusChecker0().addSession(this);
132         try {
133             getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
134             serviceListeners.fireSessionCreated(this);
135         } catch (Throwable e) {
136             getFilterChain().fireExceptionCaught(e);
137             processor.remove(this);
138         }
139     }
140 
141     private final Object writeMonitor = new Object();
142     private WriteWorker writeWorker;
143 
144     private class WriteWorker extends Thread {
145         @Override
146         public void run() {
147             while (isConnected() && !isClosing()) {
148                 flushWrites();
149 
150                 // wait for more data
151                 synchronized (writeMonitor) {
152                     try {
153                         writeMonitor.wait();
154                     } catch (InterruptedException e) {
155                         log.error("InterruptedException", e);
156                     }
157                 }
158             }
159         }
160     }
161 
162     private void flushWrites() {
163         for (; ;) {
164             WriteRequest req = getCurrentWriteRequest();
165             if (req == null) {
166                 req = getWriteRequestQueue().poll(this);
167                 if (req == null) {
168                     break;
169                 }
170             }
171 
172             IoBuffer buf = (IoBuffer) req.getMessage();
173             if (buf.remaining() == 0) {
174                 setCurrentWriteRequest(null);
175                 buf.reset();
176 
177                 this.getFilterChain().fireMessageSent(req);
178                 continue;
179             }
180 
181             int writtenBytes = buf.remaining();
182             try {
183                 outputStream.write(buf.array(), buf.position(), writtenBytes);
184                 buf.position(buf.position() + writtenBytes);
185                 req.getFuture().setWritten();
186             } catch (IOException e) {
187                 this.getFilterChain().fireExceptionCaught(e);
188             }
189         }
190     }
191 
192     private final Object readReadyMonitor = new Object();
193 
194     private class ReadWorker extends Thread {
195         @Override
196         public void run() {
197             while (isConnected() && !isClosing()) {
198                 synchronized (readReadyMonitor) {
199                     try {
200                         readReadyMonitor.wait();
201                     } catch (InterruptedException e) {
202                         log.error("InterruptedException", e);
203                     }
204                     if (isClosing() || !isConnected()) {
205                         break;
206                     }
207                     int dataSize;
208                     try {
209                         dataSize = inputStream.available();
210                         byte[] data = new byte[dataSize];
211                         int readBytes = inputStream.read(data);
212 
213                         if (readBytes > 0) {
214                             IoBuffer buf = IoBuffer
215                                     .wrap(data, 0, readBytes);
216                             buf.put(data, 0, readBytes);
217                             buf.flip();
218                             getFilterChain().fireMessageReceived(
219                                     buf);
220                         }
221                     } catch (IOException e) {
222                         getFilterChain().fireExceptionCaught(
223                                 e);
224                     }
225                 }
226             }
227         }
228     }
229 
230     public void serialEvent(SerialPortEvent evt) {
231         if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
232             synchronized (readReadyMonitor) {
233                 readReadyMonitor.notifyAll();
234             }
235         }
236     }
237 
238     @Override
239     public IoProcessor<SerialSessionImpl> getProcessor() {
240         return processor;
241     }
242 
243     private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
244         public void add(SerialSessionImpl session) {
245             // It's already added when the session is constructed.
246         }
247 
248         public void flush(SerialSessionImpl session) {
249             if (writeWorker == null) {
250                 writeWorker = new WriteWorker();
251                 writeWorker.start();
252             } else {
253                 synchronized (writeMonitor) {
254                     writeMonitor.notifyAll();
255                 }
256             }
257         }
258 
259         public void remove(SerialSessionImpl session) {
260             try {
261                 inputStream.close();
262             } catch (IOException e) {
263                 ExceptionMonitor.getInstance().exceptionCaught(e);
264             }
265             try {
266                 outputStream.close();
267             } catch (IOException e) {
268                 ExceptionMonitor.getInstance().exceptionCaught(e);
269             }
270 
271             port.close();
272             flush(session);
273             synchronized (readReadyMonitor) {
274                 readReadyMonitor.notifyAll();
275             }
276 
277             serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
278         }
279 
280         public void updateTrafficControl(SerialSessionImpl session) {
281             throw new UnsupportedOperationException();
282         }
283 
284         public void dispose() {
285             // Nothing to dispose
286         }
287 
288         public boolean isDisposed() {
289             return false;
290         }
291 
292         public boolean isDisposing() {
293             return false;
294         }
295     }
296 }