1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.serial;
21
22 import gnu.io.SerialPort;
23 import gnu.io.SerialPortEvent;
24 import gnu.io.SerialPortEventListener;
25
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.util.TooManyListenersException;
30
31 import org.apache.mina.core.session.AbstractIoSession;
32 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
33 import org.apache.mina.core.service.DefaultTransportMetadata;
34 import org.apache.mina.util.ExceptionMonitor;
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.filterchain.IoFilterChain;
37 import org.apache.mina.core.service.IoHandler;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.service.IoService;
40 import org.apache.mina.core.service.IoServiceListenerSupport;
41 import org.apache.mina.core.service.TransportMetadata;
42 import org.apache.mina.core.write.WriteRequest;
43
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47
48
49
50
51
52
53 class SerialSessionImpl extends AbstractIoSession implements
54 SerialSession, SerialPortEventListener {
55
56 static final TransportMetadata METADATA =
57 new DefaultTransportMetadata(
58 "rxtx", "serial", false, true, SerialAddress.class,
59 SerialSessionConfig.class, IoBuffer.class);
60
61 private final SerialSessionConfig config = new DefaultSerialSessionConfig();
62 private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
63 private final IoHandler ioHandler;
64 private final IoFilterChain filterChain;
65 private final SerialConnector service;
66 private final IoServiceListenerSupport serviceListeners;
67 private final SerialAddress address;
68 private final SerialPort port;
69 private final Logger log;
70
71 private InputStream inputStream;
72 private OutputStream outputStream;
73
74 SerialSessionImpl(
75 SerialConnector service, IoServiceListenerSupport serviceListeners,
76 SerialAddress address, SerialPort port) {
77 this.service = service;
78 this.serviceListeners = serviceListeners;
79 ioHandler = service.getHandler();
80 filterChain = new DefaultIoFilterChain(this);
81 this.port = port;
82 this.address = address;
83
84 log = LoggerFactory.getLogger(SerialSessionImpl.class);
85 }
86
87 public SerialSessionConfig getConfig() {
88 return config;
89 }
90
91 public IoFilterChain getFilterChain() {
92 return filterChain;
93 }
94
95 public IoHandler getHandler() {
96 return ioHandler;
97 }
98
99 public TransportMetadata getTransportMetadata() {
100 return METADATA;
101 }
102
103 public SerialAddress getLocalAddress() {
104 return null;
105 }
106
107 public SerialAddress getRemoteAddress() {
108 return address;
109 }
110
111 @Override
112 public SerialAddress getServiceAddress() {
113 return (SerialAddress) super.getServiceAddress();
114 }
115
116 public IoService getService() {
117 return service;
118 }
119
120
121
122
123
124
125
126 void start() throws IOException, TooManyListenersException {
127 inputStream = port.getInputStream();
128 outputStream = port.getOutputStream();
129 ReadWorker w = new ReadWorker();
130 w.start();
131 port.addEventListener(this);
132 service.getIdleStatusChecker0().addSession(this);
133 try {
134 getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
135 serviceListeners.fireSessionCreated(this);
136 } catch (Throwable e) {
137 getFilterChain().fireExceptionCaught(e);
138 processor.remove(this);
139 }
140 }
141
142 private final Object writeMonitor = new Object();
143 private WriteWorker writeWorker;
144
145 private class WriteWorker extends Thread {
146 @Override
147 public void run() {
148 while (isConnected() && !isClosing()) {
149 flushWrites();
150
151
152 synchronized (writeMonitor) {
153 try {
154 writeMonitor.wait();
155 } catch (InterruptedException e) {
156 log.error("InterruptedException", e);
157 }
158 }
159 }
160 }
161 }
162
163 private void flushWrites() {
164 for (; ;) {
165 WriteRequest req = getCurrentWriteRequest();
166 if (req == null) {
167 req = getWriteRequestQueue().poll(this);
168 if (req == null) {
169 break;
170 }
171 }
172
173 IoBuffer buf = (IoBuffer) req.getMessage();
174 if (buf.remaining() == 0) {
175 setCurrentWriteRequest(null);
176 buf.reset();
177
178 this.getFilterChain().fireMessageSent(req);
179 continue;
180 }
181
182 int writtenBytes = buf.remaining();
183 try {
184 outputStream.write(buf.array(), buf.position(), writtenBytes);
185 buf.position(buf.position() + writtenBytes);
186 req.getFuture().setWritten();
187 } catch (IOException e) {
188 this.getFilterChain().fireExceptionCaught(e);
189 }
190 }
191 }
192
193 private final Object readReadyMonitor = new Object();
194
195 private class ReadWorker extends Thread {
196 @Override
197 public void run() {
198 while (isConnected() && !isClosing()) {
199 synchronized (readReadyMonitor) {
200 try {
201 readReadyMonitor.wait();
202 } catch (InterruptedException e) {
203 log.error("InterruptedException", e);
204 }
205 if (isClosing() || !isConnected()) {
206 break;
207 }
208 int dataSize;
209 try {
210 dataSize = inputStream.available();
211 byte[] data = new byte[dataSize];
212 int readBytes = inputStream.read(data);
213
214 if (readBytes > 0) {
215 IoBuffer buf = IoBuffer
216 .wrap(data, 0, readBytes);
217 buf.put(data, 0, readBytes);
218 buf.flip();
219 getFilterChain().fireMessageReceived(
220 buf);
221 }
222 } catch (IOException e) {
223 getFilterChain().fireExceptionCaught(
224 e);
225 }
226 }
227 }
228 }
229 }
230
231 public void serialEvent(SerialPortEvent evt) {
232 if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
233 synchronized (readReadyMonitor) {
234 readReadyMonitor.notifyAll();
235 }
236 }
237 }
238
239 @Override
240 public IoProcessor<SerialSessionImpl> getProcessor() {
241 return processor;
242 }
243
244 private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
245 public void add(SerialSessionImpl session) {
246
247 }
248
249 public void flush(SerialSessionImpl session) {
250 if (writeWorker == null) {
251 writeWorker = new WriteWorker();
252 writeWorker.start();
253 } else {
254 synchronized (writeMonitor) {
255 writeMonitor.notifyAll();
256 }
257 }
258 }
259
260 public void remove(SerialSessionImpl session) {
261 try {
262 inputStream.close();
263 } catch (IOException e) {
264 ExceptionMonitor.getInstance().exceptionCaught(e);
265 }
266 try {
267 outputStream.close();
268 } catch (IOException e) {
269 ExceptionMonitor.getInstance().exceptionCaught(e);
270 }
271
272 port.close();
273 flush(session);
274 synchronized (readReadyMonitor) {
275 readReadyMonitor.notifyAll();
276 }
277
278 serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
279 }
280
281 public void updateTrafficControl(SerialSessionImpl session) {
282 throw new UnsupportedOperationException();
283 }
284
285 public void dispose() {
286
287 }
288
289 public boolean isDisposed() {
290 return false;
291 }
292
293 public boolean isDisposing() {
294 return false;
295 }
296 }
297 }