1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.serial;
21
22 import gnu.io.SerialPort;
23 import gnu.io.SerialPortEvent;
24 import gnu.io.SerialPortEventListener;
25
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.util.TooManyListenersException;
30
31 import org.apache.mina.core.session.AbstractIoSession;
32 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
33 import org.apache.mina.core.service.DefaultTransportMetadata;
34 import org.apache.mina.util.ExceptionMonitor;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.filterchain.IoFilterChain;
37 import org.apache.mina.core.service.IoHandler;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.service.IoService;
40 import org.apache.mina.core.service.IoServiceListenerSupport;
41 import org.apache.mina.core.service.TransportMetadata;
42 import org.apache.mina.core.write.WriteRequest;
43
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47
48
49
50
51
52 class SerialSessionImpl extends AbstractIoSession implements
53 SerialSession, SerialPortEventListener {
54
55 static final TransportMetadata METADATA =
56 new DefaultTransportMetadata(
57 "rxtx", "serial", false, true, SerialAddress.class,
58 SerialSessionConfig.class, IoBuffer.class);
59
60 private final SerialSessionConfig config = new DefaultSerialSessionConfig();
61 private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
62 private final IoHandler ioHandler;
63 private final IoFilterChain filterChain;
64 private final SerialConnector service;
65 private final IoServiceListenerSupport serviceListeners;
66 private final SerialAddress address;
67 private final SerialPort port;
68 private final Logger log;
69
70 private InputStream inputStream;
71 private OutputStream outputStream;
72
73 SerialSessionImpl(
74 SerialConnector service, IoServiceListenerSupport serviceListeners,
75 SerialAddress address, SerialPort port) {
76 this.service = service;
77 this.serviceListeners = serviceListeners;
78 ioHandler = service.getHandler();
79 filterChain = new DefaultIoFilterChain(this);
80 this.port = port;
81 this.address = address;
82
83 log = LoggerFactory.getLogger(SerialSessionImpl.class);
84 }
85
86 public SerialSessionConfig getConfig() {
87 return config;
88 }
89
90 public IoFilterChain getFilterChain() {
91 return filterChain;
92 }
93
94 public IoHandler getHandler() {
95 return ioHandler;
96 }
97
98 public TransportMetadata getTransportMetadata() {
99 return METADATA;
100 }
101
102 public SerialAddress getLocalAddress() {
103 return null;
104 }
105
106 public SerialAddress getRemoteAddress() {
107 return address;
108 }
109
110 @Override
111 public SerialAddress getServiceAddress() {
112 return (SerialAddress) super.getServiceAddress();
113 }
114
115 public IoService getService() {
116 return service;
117 }
118
119
120
121
122
123
124
125 void start() throws IOException, TooManyListenersException {
126 inputStream = port.getInputStream();
127 outputStream = port.getOutputStream();
128 ReadWorker w = new ReadWorker();
129 w.start();
130 port.addEventListener(this);
131 service.getIdleStatusChecker0().addSession(this);
132 try {
133 getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
134 serviceListeners.fireSessionCreated(this);
135 } catch (Throwable e) {
136 getFilterChain().fireExceptionCaught(e);
137 processor.remove(this);
138 }
139 }
140
141 private final Object writeMonitor = new Object();
142 private WriteWorker writeWorker;
143
144 private class WriteWorker extends Thread {
145 @Override
146 public void run() {
147 while (isConnected() && !isClosing()) {
148 flushWrites();
149
150
151 synchronized (writeMonitor) {
152 try {
153 writeMonitor.wait();
154 } catch (InterruptedException e) {
155 log.error("InterruptedException", e);
156 }
157 }
158 }
159 }
160 }
161
162 private void flushWrites() {
163 for (; ;) {
164 WriteRequest req = getCurrentWriteRequest();
165 if (req == null) {
166 req = getWriteRequestQueue().poll(this);
167 if (req == null) {
168 break;
169 }
170 }
171
172 IoBuffer buf = (IoBuffer) req.getMessage();
173 if (buf.remaining() == 0) {
174 setCurrentWriteRequest(null);
175 buf.reset();
176
177 this.getFilterChain().fireMessageSent(req);
178 continue;
179 }
180
181 int writtenBytes = buf.remaining();
182 try {
183 outputStream.write(buf.array(), buf.position(), writtenBytes);
184 buf.position(buf.position() + writtenBytes);
185 req.getFuture().setWritten();
186 } catch (IOException e) {
187 this.getFilterChain().fireExceptionCaught(e);
188 }
189 }
190 }
191
192 private final Object readReadyMonitor = new Object();
193
194 private class ReadWorker extends Thread {
195 @Override
196 public void run() {
197 while (isConnected() && !isClosing()) {
198 synchronized (readReadyMonitor) {
199 try {
200 readReadyMonitor.wait();
201 } catch (InterruptedException e) {
202 log.error("InterruptedException", e);
203 }
204 if (isClosing() || !isConnected()) {
205 break;
206 }
207 int dataSize;
208 try {
209 dataSize = inputStream.available();
210 byte[] data = new byte[dataSize];
211 int readBytes = inputStream.read(data);
212
213 if (readBytes > 0) {
214 IoBuffer buf = IoBuffer
215 .wrap(data, 0, readBytes);
216 buf.put(data, 0, readBytes);
217 buf.flip();
218 getFilterChain().fireMessageReceived(
219 buf);
220 }
221 } catch (IOException e) {
222 getFilterChain().fireExceptionCaught(
223 e);
224 }
225 }
226 }
227 }
228 }
229
230 public void serialEvent(SerialPortEvent evt) {
231 if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
232 synchronized (readReadyMonitor) {
233 readReadyMonitor.notifyAll();
234 }
235 }
236 }
237
238 @Override
239 public IoProcessor<SerialSessionImpl> getProcessor() {
240 return processor;
241 }
242
243 private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
244 public void add(SerialSessionImpl session) {
245
246 }
247
248 public void flush(SerialSessionImpl session) {
249 if (writeWorker == null) {
250 writeWorker = new WriteWorker();
251 writeWorker.start();
252 } else {
253 synchronized (writeMonitor) {
254 writeMonitor.notifyAll();
255 }
256 }
257 }
258
259 public void remove(SerialSessionImpl session) {
260 try {
261 inputStream.close();
262 } catch (IOException e) {
263 ExceptionMonitor.getInstance().exceptionCaught(e);
264 }
265 try {
266 outputStream.close();
267 } catch (IOException e) {
268 ExceptionMonitor.getInstance().exceptionCaught(e);
269 }
270
271 port.close();
272 flush(session);
273 synchronized (readReadyMonitor) {
274 readReadyMonitor.notifyAll();
275 }
276
277 serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
278 }
279
280 public void updateTrafficControl(SerialSessionImpl session) {
281 throw new UnsupportedOperationException();
282 }
283
284 public void dispose() {
285
286 }
287
288 public boolean isDisposed() {
289 return false;
290 }
291
292 public boolean isDisposing() {
293 return false;
294 }
295 }
296 }