1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.serial;
21
22 import gnu.io.SerialPort;
23 import gnu.io.SerialPortEvent;
24 import gnu.io.SerialPortEventListener;
25
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.util.TooManyListenersException;
30
31 import org.apache.mina.core.session.AbstractIoSession;
32 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
33 import org.apache.mina.core.service.DefaultTransportMetadata;
34 import org.apache.mina.util.ExceptionMonitor;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.filterchain.IoFilterChain;
37 import org.apache.mina.core.service.IoHandler;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.service.IoService;
40 import org.apache.mina.core.service.IoServiceListenerSupport;
41 import org.apache.mina.core.service.TransportMetadata;
42 import org.apache.mina.core.write.WriteRequest;
43
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47
48
49
50
51
52
53 class SerialSessionImpl extends AbstractIoSession implements
54 SerialSession, SerialPortEventListener {
55
56 static final TransportMetadata METADATA =
57 new DefaultTransportMetadata(
58 "rxtx", "serial", false, true, SerialAddress.class,
59 SerialSessionConfig.class, IoBuffer.class);
60
61 private final SerialSessionConfig config = new DefaultSerialSessionConfig();
62 private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
63 private final IoHandler ioHandler;
64 private final IoFilterChain filterChain;
65 private final SerialConnector service;
66 private final IoServiceListenerSupport serviceListeners;
67 private final SerialAddress address;
68 private final SerialPort port;
69 private final Logger log;
70
71 private InputStream inputStream;
72 private OutputStream outputStream;
73
74 SerialSessionImpl(
75 SerialConnector service, IoServiceListenerSupport serviceListeners,
76 SerialAddress address, SerialPort port) {
77 this.service = service;
78 this.serviceListeners = serviceListeners;
79 ioHandler = service.getHandler();
80 filterChain = new DefaultIoFilterChain(this);
81 this.port = port;
82 this.address = address;
83
84 log = LoggerFactory.getLogger(SerialSessionImpl.class);
85 }
86
87 public SerialSessionConfig getConfig() {
88 return config;
89 }
90
91 public IoFilterChain getFilterChain() {
92 return filterChain;
93 }
94
95 public IoHandler getHandler() {
96 return ioHandler;
97 }
98
99 public TransportMetadata getTransportMetadata() {
100 return METADATA;
101 }
102
103 public SerialAddress getLocalAddress() {
104 return null;
105 }
106
107 public SerialAddress getRemoteAddress() {
108 return address;
109 }
110
111 @Override
112 public SerialAddress getServiceAddress() {
113 return (SerialAddress) super.getServiceAddress();
114 }
115
116 public IoService getService() {
117 return service;
118 }
119
120
121
122
123
124
125
126 void start() throws IOException, TooManyListenersException {
127 inputStream = port.getInputStream();
128 outputStream = port.getOutputStream();
129 ReadWorker w = new ReadWorker();
130 w.start();
131 port.addEventListener(this);
132 service.getIdleStatusChecker0().addSession(this);
133 try {
134 getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
135 serviceListeners.fireSessionCreated(this);
136 } catch (Throwable e) {
137 getFilterChain().fireExceptionCaught(e);
138 processor.remove(this);
139 }
140 }
141
142 private final Object writeMonitor = new Object();
143 private WriteWorker writeWorker;
144
145 private class WriteWorker extends Thread {
146 @Override
147 public void run() {
148 while (isConnected() && !isClosing()) {
149 flushWrites();
150
151
152 synchronized (writeMonitor) {
153 try {
154 writeMonitor.wait();
155 } catch (InterruptedException e) {
156 log.error("InterruptedException", e);
157 }
158 }
159 }
160 }
161 }
162
163 private void flushWrites() {
164 for (; ;) {
165 WriteRequest req = getCurrentWriteRequest();
166 if (req == null) {
167 req = getWriteRequestQueue().poll(this);
168 if (req == null) {
169 break;
170 }
171 }
172
173 IoBuffer buf = (IoBuffer) req.getMessage();
174 if (buf.remaining() == 0) {
175 setCurrentWriteRequest(null);
176 buf.reset();
177
178 this.getFilterChain().fireMessageSent(req);
179 continue;
180 }
181
182 int writtenBytes = buf.remaining();
183 try {
184 outputStream.write(buf.array());
185 buf.position(buf.position() + writtenBytes);
186 } catch (IOException e) {
187 this.getFilterChain().fireExceptionCaught(e);
188 }
189 }
190 }
191
192 private final Object readReadyMonitor = new Object();
193
194 private class ReadWorker extends Thread {
195 @Override
196 public void run() {
197 while (isConnected() && !isClosing()) {
198 synchronized (readReadyMonitor) {
199 try {
200 readReadyMonitor.wait();
201 } catch (InterruptedException e) {
202 log.error("InterruptedException", e);
203 }
204 if (isClosing() || !isConnected()) {
205 break;
206 }
207 int dataSize;
208 try {
209 dataSize = inputStream.available();
210 byte[] data = new byte[dataSize];
211 int readBytes = inputStream.read(data);
212
213 if (readBytes > 0) {
214 IoBuffer buf = IoBuffer
215 .wrap(data, 0, readBytes);
216 buf.put(data, 0, readBytes);
217 buf.flip();
218 getFilterChain().fireMessageReceived(
219 buf);
220 }
221 } catch (IOException e) {
222 getFilterChain().fireExceptionCaught(
223 e);
224 }
225 }
226 }
227 }
228 }
229
230 public void serialEvent(SerialPortEvent evt) {
231 if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
232 synchronized (readReadyMonitor) {
233 readReadyMonitor.notifyAll();
234 }
235 }
236 }
237
238 @Override
239 public IoProcessor<SerialSessionImpl> getProcessor() {
240 return processor;
241 }
242
243 private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
244 public void add(SerialSessionImpl session) {
245
246 }
247
248 public void flush(SerialSessionImpl session) {
249 if (writeWorker == null) {
250 writeWorker = new WriteWorker();
251 writeWorker.start();
252 } else {
253 synchronized (writeMonitor) {
254 writeMonitor.notifyAll();
255 }
256 }
257 }
258
259 public void remove(SerialSessionImpl session) {
260 try {
261 inputStream.close();
262 } catch (IOException e) {
263 ExceptionMonitor.getInstance().exceptionCaught(e);
264 }
265 try {
266 outputStream.close();
267 } catch (IOException e) {
268 ExceptionMonitor.getInstance().exceptionCaught(e);
269 }
270
271 port.close();
272 flush(session);
273 synchronized (readReadyMonitor) {
274 readReadyMonitor.notifyAll();
275 }
276
277 serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
278 }
279
280 public void updateTrafficMask(SerialSessionImpl session) {
281 throw new UnsupportedOperationException();
282 }
283
284 public void dispose() {
285
286 }
287
288 public boolean isDisposed() {
289 return false;
290 }
291
292 public boolean isDisposing() {
293 return false;
294 }
295 }
296 }