View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.serial;
21  
22  import gnu.io.SerialPort;
23  import gnu.io.SerialPortEvent;
24  import gnu.io.SerialPortEventListener;
25  import gnu.io.UnsupportedCommOperationException;
26  
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.OutputStream;
30  import java.util.TooManyListenersException;
31  
32  import org.apache.mina.core.buffer.IoBuffer;
33  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
34  import org.apache.mina.core.filterchain.IoFilterChain;
35  import org.apache.mina.core.service.DefaultTransportMetadata;
36  import org.apache.mina.core.service.IoProcessor;
37  import org.apache.mina.core.service.IoServiceListenerSupport;
38  import org.apache.mina.core.service.TransportMetadata;
39  import org.apache.mina.core.session.AbstractIoSession;
40  import org.apache.mina.core.write.WriteRequest;
41  import org.apache.mina.core.write.WriteRequestQueue;
42  import org.apache.mina.util.ExceptionMonitor;
43  import org.slf4j.Logger;
44  import org.slf4j.LoggerFactory;
45  
46  /**
47   * An imlpementation of {@link SerialSession}.
48   *
49   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
50   */
51  class SerialSessionImpl extends AbstractIoSession implements SerialSession, SerialPortEventListener {
52  
53      static final TransportMetadata METADATA = new DefaultTransportMetadata("rxtx", "serial", false, true,
54              SerialAddress.class, SerialSessionConfig.class, IoBuffer.class);
55  
56      private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
57  
58      private final IoFilterChain filterChain;
59  
60      private final IoServiceListenerSupport serviceListeners;
61  
62      private final SerialAddress address;
63  
64      private final SerialPort port;
65  
66      private final Logger log;
67  
68      private InputStream inputStream;
69  
70      private OutputStream outputStream;
71  
72      SerialSessionImpl(SerialConnector service, IoServiceListenerSupport serviceListeners, SerialAddress address,
73              SerialPort port) {
74          super(service);
75          config = new DefaultSerialSessionConfig();
76          this.serviceListeners = serviceListeners;
77          filterChain = new DefaultIoFilterChain(this);
78          this.port = port;
79          this.address = address;
80  
81          log = LoggerFactory.getLogger(SerialSessionImpl.class);
82      }
83  
84      public SerialSessionConfig getConfig() {
85          return (SerialSessionConfig) config;
86      }
87  
88      public IoFilterChain getFilterChain() {
89          return filterChain;
90      }
91  
92      public TransportMetadata getTransportMetadata() {
93          return METADATA;
94      }
95  
96      public SerialAddress getLocalAddress() {
97          return null; // not applicable
98      }
99  
100     public SerialAddress getRemoteAddress() {
101         return address;
102     }
103 
104     @Override
105     public SerialAddress getServiceAddress() {
106         return (SerialAddress) super.getServiceAddress();
107     }
108 
109     public void setDTR(boolean dtr) {
110         port.setDTR(dtr);
111     }
112 
113     public boolean isDTR() {
114         return port.isDTR();
115     }
116 
117     public void setRTS(boolean rts) {
118         port.setRTS(rts);
119     }
120 
121     public boolean isRTS() {
122         return port.isRTS();
123     }
124 
125     /**
126      * start handling streams
127      *
128      * @throws IOException
129      * @throws TooManyListenersException
130      */
131     void start() throws IOException, TooManyListenersException {
132         inputStream = port.getInputStream();
133         outputStream = port.getOutputStream();
134         ReadWorker w = new ReadWorker();
135         w.start();
136         port.addEventListener(this);
137         ((SerialConnector) getService()).getIdleStatusChecker0().addSession(this);
138         try {
139             getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
140             serviceListeners.fireSessionCreated(this);
141         } catch (Exception e) {
142             getFilterChain().fireExceptionCaught(e);
143             processor.remove(this);
144         }
145     }
146 
147     private final Object writeMonitor = new Object();
148 
149     private WriteWorker writeWorker;
150 
151     private class WriteWorker extends Thread {
152         @Override
153         public void run() {
154             synchronized (writeMonitor) {
155                 while (isConnected() && !isClosing()) {
156                     flushWrites();
157 
158                     // wait for more data
159                     try {
160                         writeMonitor.wait();
161                     } catch (InterruptedException e) {
162                         log.error("InterruptedException", e);
163                     }
164                 }
165             }
166         }
167     }
168 
169     private void flushWrites() {
170         for (;;) {
171             WriteRequest req = getCurrentWriteRequest();
172             if (req == null) {
173                 req = getWriteRequestQueue().poll(this);
174                 if (req == null) {
175                     break;
176                 }
177             }
178 
179             IoBuffer"../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
180             if (buf.remaining() == 0) {
181                 setCurrentWriteRequest(null);
182                 buf.reset();
183                 this.getFilterChain().fireMessageSent(req);
184                 continue;
185             }
186 
187             int writtenBytes = buf.remaining();
188             try {
189                 outputStream.write(buf.array(), buf.position(), writtenBytes);
190                 buf.position(buf.position() + writtenBytes);
191 
192                 // increase written bytes
193                 increaseWrittenBytes(writtenBytes, System.currentTimeMillis());
194 
195                 setCurrentWriteRequest(null);
196                 buf.reset();
197 
198                 // fire the message sent event
199                 getFilterChain().fireMessageSent(req);
200             } catch (IOException e) {
201                 this.getFilterChain().fireExceptionCaught(e);
202             }
203 
204         }
205     }
206 
207     private final Object readReadyMonitor = new Object();
208 
209     private class ReadWorker extends Thread {
210         @Override
211         public void run() {
212             while (isConnected() && !isClosing()) {
213                 synchronized (readReadyMonitor) {
214                     try {
215                         readReadyMonitor.wait();
216                     } catch (InterruptedException e) {
217                         log.error("InterruptedException", e);
218                     }
219                     if (isClosing() || !isConnected()) {
220                         break;
221                     }
222                     int dataSize;
223                     try {
224                         dataSize = inputStream.available();
225                         byte[] data = new byte[dataSize];
226                         int readBytes = inputStream.read(data);
227 
228                         if (readBytes > 0) {
229                             IoBuffer buf = IoBuffer.wrap(data, 0, readBytes);
230                             buf.put(data, 0, readBytes);
231                             buf.flip();
232                             getFilterChain().fireMessageReceived(buf);
233                         }
234                     } catch (IOException e) {
235                         getFilterChain().fireExceptionCaught(e);
236                     }
237                 }
238             }
239         }
240     }
241 
242     public void serialEvent(SerialPortEvent evt) {
243         if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
244             synchronized (readReadyMonitor) {
245                 readReadyMonitor.notifyAll();
246             }
247         }
248     }
249 
250     @Override
251     public IoProcessor<SerialSessionImpl> getProcessor() {
252         return processor;
253     }
254 
255     private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
256         public void add(SerialSessionImpl session) {
257             // It's already added when the session is constructed.
258         }
259 
260         /**
261          * {@inheritDoc}
262          */
263         public void write(SerialSessionImpl session, WriteRequest writeRequest) {
264             WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
265 
266             writeRequestQueue.offer(session, writeRequest);
267 
268             if (!session.isWriteSuspended()) {
269                 session.getProcessor().flush(session);
270             }
271         }
272 
273         /**
274          * {@inheritDoc}
275          */
276         public void flush(SerialSessionImpl session) {
277             if (writeWorker == null) {
278                 writeWorker = new WriteWorker();
279                 writeWorker.start();
280             } else {
281                 synchronized (writeMonitor) {
282                     writeMonitor.notifyAll();
283                 }
284             }
285         }
286 
287         public void remove(SerialSessionImpl session) {
288             try {
289                 inputStream.close();
290             } catch (IOException e) {
291                 ExceptionMonitor.getInstance().exceptionCaught(e);
292             }
293             try {
294                 outputStream.close();
295             } catch (IOException e) {
296                 ExceptionMonitor.getInstance().exceptionCaught(e);
297             }
298 
299             try { // Turn flow control off right before close to avoid deadlock
300                 port.setFlowControlMode(SerialPort.FLOWCONTROL_NONE);
301             } catch (UnsupportedCommOperationException e) {
302                 ExceptionMonitor.getInstance().exceptionCaught(e);
303             }
304 
305             port.close();
306             flush(session);
307             synchronized (readReadyMonitor) {
308                 readReadyMonitor.notifyAll();
309             }
310 
311             serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
312         }
313 
314         public void updateTrafficControl(SerialSessionImpl session) {
315             throw new UnsupportedOperationException();
316         }
317 
318         public void dispose() {
319             // Nothing to dispose
320         }
321 
322         public boolean isDisposed() {
323             return false;
324         }
325 
326         public boolean isDisposing() {
327             return false;
328         }
329     }
330 }