View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.serial;
21  
22  import gnu.io.SerialPort;
23  import gnu.io.SerialPortEvent;
24  import gnu.io.SerialPortEventListener;
25  
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.util.TooManyListenersException;
30  
31  import org.apache.mina.core.session.AbstractIoSession;
32  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
33  import org.apache.mina.core.service.DefaultTransportMetadata;
34  import org.apache.mina.util.ExceptionMonitor;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.service.IoHandler;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.IoService;
40  import org.apache.mina.core.service.IoServiceListenerSupport;
41  import org.apache.mina.core.service.TransportMetadata;
42  import org.apache.mina.core.write.WriteRequest;
43  
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  /**
48   * An imlpementation of {@link SerialSession}.
49   *
50   * @author The Apache MINA Project (dev@mina.apache.org)
51   * @version $Rev: 751630 $, $Date: 2009-03-09 10:20:18 +0100 (Mon, 09 Mar 2009) $
52   */
53  class SerialSessionImpl extends AbstractIoSession implements
54          SerialSession, SerialPortEventListener {
55  
56      static final TransportMetadata METADATA =
57          new DefaultTransportMetadata(
58                  "rxtx", "serial", false, true, SerialAddress.class,
59                  SerialSessionConfig.class, IoBuffer.class);
60  
61      private final SerialSessionConfig config = new DefaultSerialSessionConfig();
62      private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
63      private final IoHandler ioHandler;
64      private final IoFilterChain filterChain;
65      private final SerialConnector service;
66      private final IoServiceListenerSupport serviceListeners;
67      private final SerialAddress address;
68      private final SerialPort port;
69      private final Logger log;
70  
71      private InputStream inputStream;
72      private OutputStream outputStream;
73  
74      SerialSessionImpl(
75              SerialConnector service, IoServiceListenerSupport serviceListeners,
76              SerialAddress address, SerialPort port) {
77          this.service = service;
78          this.serviceListeners = serviceListeners;
79          ioHandler = service.getHandler();
80          filterChain = new DefaultIoFilterChain(this);
81          this.port = port;
82          this.address = address;
83  
84          log = LoggerFactory.getLogger(SerialSessionImpl.class);
85      }
86  
87      public SerialSessionConfig getConfig() {
88          return config;
89      }
90  
91      public IoFilterChain getFilterChain() {
92          return filterChain;
93      }
94  
95      public IoHandler getHandler() {
96          return ioHandler;
97      }
98  
99      public TransportMetadata getTransportMetadata() {
100         return METADATA;
101     }
102 
103     public SerialAddress getLocalAddress() {
104         return null; // not applicable
105     }
106 
107     public SerialAddress getRemoteAddress() {
108         return address;
109     }
110 
111     @Override
112     public SerialAddress getServiceAddress() {
113         return (SerialAddress) super.getServiceAddress();
114     }
115 
116     public IoService getService() {
117         return service;
118     }
119 
120     /**
121      * start handling streams
122      *
123      * @throws IOException
124      * @throws TooManyListenersException
125      */
126     void start() throws IOException, TooManyListenersException {
127         inputStream = port.getInputStream();
128         outputStream = port.getOutputStream();
129         ReadWorker w = new ReadWorker();
130         w.start();
131         port.addEventListener(this);
132         service.getIdleStatusChecker0().addSession(this);
133         try {
134         	getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
135         	serviceListeners.fireSessionCreated(this);
136         } catch (Throwable e) {
137         	getFilterChain().fireExceptionCaught(e);
138         	processor.remove(this);
139         }
140     }
141 
142     private final Object writeMonitor = new Object();
143     private WriteWorker writeWorker;
144 
145     private class WriteWorker extends Thread {
146         @Override
147         public void run() {
148             while (isConnected() && !isClosing()) {
149                 flushWrites();
150 
151                 // wait for more data
152                 synchronized (writeMonitor) {
153                     try {
154                         writeMonitor.wait();
155                     } catch (InterruptedException e) {
156                         log.error("InterruptedException", e);
157                     }
158                 }
159             }
160         }
161     }
162 
163     private void flushWrites() {
164         for (; ;) {
165             WriteRequest req = getCurrentWriteRequest();
166             if (req == null) {
167                 req = getWriteRequestQueue().poll(this);
168                 if (req == null) {
169                     break;
170                 }
171             }
172 
173             IoBuffer buf = (IoBuffer) req.getMessage();
174             if (buf.remaining() == 0) {
175                 setCurrentWriteRequest(null);
176                 buf.reset();
177 
178                 this.getFilterChain().fireMessageSent(req);
179                 continue;
180             }
181 
182             int writtenBytes = buf.remaining();
183             try {
184             	outputStream.write(buf.array(), buf.position(), writtenBytes);
185                 buf.position(buf.position() + writtenBytes);
186                 req.getFuture().setWritten();
187             } catch (IOException e) {
188                 this.getFilterChain().fireExceptionCaught(e);
189             }
190         }
191     }
192 
193     private final Object readReadyMonitor = new Object();
194 
195     private class ReadWorker extends Thread {
196         @Override
197         public void run() {
198             while (isConnected() && !isClosing()) {
199                 synchronized (readReadyMonitor) {
200                     try {
201                         readReadyMonitor.wait();
202                     } catch (InterruptedException e) {
203                         log.error("InterruptedException", e);
204                     }
205                     if (isClosing() || !isConnected()) {
206                         break;
207                     }
208                     int dataSize;
209                     try {
210                         dataSize = inputStream.available();
211                         byte[] data = new byte[dataSize];
212                         int readBytes = inputStream.read(data);
213 
214                         if (readBytes > 0) {
215                             IoBuffer buf = IoBuffer
216                                     .wrap(data, 0, readBytes);
217                             buf.put(data, 0, readBytes);
218                             buf.flip();
219                             getFilterChain().fireMessageReceived(
220                                     buf);
221                         }
222                     } catch (IOException e) {
223                         getFilterChain().fireExceptionCaught(
224                                 e);
225                     }
226                 }
227             }
228         }
229     }
230 
231     public void serialEvent(SerialPortEvent evt) {
232         if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
233             synchronized (readReadyMonitor) {
234                 readReadyMonitor.notifyAll();
235             }
236         }
237     }
238 
239     @Override
240     public IoProcessor<SerialSessionImpl> getProcessor() {
241         return processor;
242     }
243 
244     private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
245         public void add(SerialSessionImpl session) {
246             // It's already added when the session is constructed.
247         }
248 
249         public void flush(SerialSessionImpl session) {
250             if (writeWorker == null) {
251                 writeWorker = new WriteWorker();
252                 writeWorker.start();
253             } else {
254                 synchronized (writeMonitor) {
255                     writeMonitor.notifyAll();
256                 }
257             }
258         }
259 
260         public void remove(SerialSessionImpl session) {
261             try {
262                 inputStream.close();
263             } catch (IOException e) {
264                 ExceptionMonitor.getInstance().exceptionCaught(e);
265             }
266             try {
267                 outputStream.close();
268             } catch (IOException e) {
269                 ExceptionMonitor.getInstance().exceptionCaught(e);
270             }
271 
272             port.close();
273             flush(session);
274             synchronized (readReadyMonitor) {
275                 readReadyMonitor.notifyAll();
276             }
277 
278             serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
279         }
280 
281         public void updateTrafficControl(SerialSessionImpl session) {
282             throw new UnsupportedOperationException();
283         }
284 
285         public void dispose() {
286             // Nothing to dispose
287         }
288 
289         public boolean isDisposed() {
290             return false;
291         }
292 
293         public boolean isDisposing() {
294             return false;
295         }
296     }
297 }