1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.serial;
21
22 import gnu.io.SerialPort;
23 import gnu.io.SerialPortEvent;
24 import gnu.io.SerialPortEventListener;
25 import gnu.io.UnsupportedCommOperationException;
26
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.util.TooManyListenersException;
31
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
34 import org.apache.mina.core.filterchain.IoFilterChain;
35 import org.apache.mina.core.service.DefaultTransportMetadata;
36 import org.apache.mina.core.service.IoProcessor;
37 import org.apache.mina.core.service.IoServiceListenerSupport;
38 import org.apache.mina.core.service.TransportMetadata;
39 import org.apache.mina.core.session.AbstractIoSession;
40 import org.apache.mina.core.write.WriteRequest;
41 import org.apache.mina.core.write.WriteRequestQueue;
42 import org.apache.mina.util.ExceptionMonitor;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46
47
48
49
50
51 class SerialSessionImpl extends AbstractIoSession implements SerialSession, SerialPortEventListener {
52
53 static final TransportMetadata METADATA = new DefaultTransportMetadata("rxtx", "serial", false, true,
54 SerialAddress.class, SerialSessionConfig.class, IoBuffer.class);
55
56 private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
57
58 private final IoFilterChain filterChain;
59
60 private final IoServiceListenerSupport serviceListeners;
61
62 private final SerialAddress address;
63
64 private final SerialPort port;
65
66 private final Logger log;
67
68 private InputStream inputStream;
69
70 private OutputStream outputStream;
71
72 SerialSessionImpl(SerialConnector service, IoServiceListenerSupport serviceListeners, SerialAddress address,
73 SerialPort port) {
74 super(service);
75 config = new DefaultSerialSessionConfig();
76 this.serviceListeners = serviceListeners;
77 filterChain = new DefaultIoFilterChain(this);
78 this.port = port;
79 this.address = address;
80
81 log = LoggerFactory.getLogger(SerialSessionImpl.class);
82 }
83
84 public SerialSessionConfig getConfig() {
85 return (SerialSessionConfig) config;
86 }
87
88 public IoFilterChain getFilterChain() {
89 return filterChain;
90 }
91
92 public TransportMetadata getTransportMetadata() {
93 return METADATA;
94 }
95
96 public SerialAddress getLocalAddress() {
97 return null;
98 }
99
100 public SerialAddress getRemoteAddress() {
101 return address;
102 }
103
104 @Override
105 public SerialAddress getServiceAddress() {
106 return (SerialAddress) super.getServiceAddress();
107 }
108
109 public void setDTR(boolean dtr) {
110 port.setDTR(dtr);
111 }
112
113 public boolean isDTR() {
114 return port.isDTR();
115 }
116
117 public void setRTS(boolean rts) {
118 port.setRTS(rts);
119 }
120
121 public boolean isRTS() {
122 return port.isRTS();
123 }
124
125
126
127
128
129
130
131 void start() throws IOException, TooManyListenersException {
132 inputStream = port.getInputStream();
133 outputStream = port.getOutputStream();
134 ReadWorker w = new ReadWorker();
135 w.start();
136 port.addEventListener(this);
137 ((SerialConnector) getService()).getIdleStatusChecker0().addSession(this);
138 try {
139 getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
140 serviceListeners.fireSessionCreated(this);
141 } catch (Exception e) {
142 getFilterChain().fireExceptionCaught(e);
143 processor.remove(this);
144 }
145 }
146
147 private final Object writeMonitor = new Object();
148
149 private WriteWorker writeWorker;
150
151 private class WriteWorker extends Thread {
152 @Override
153 public void run() {
154 synchronized (writeMonitor) {
155 while (isConnected() && !isClosing()) {
156 flushWrites();
157
158
159 try {
160 writeMonitor.wait();
161 } catch (InterruptedException e) {
162 log.error("InterruptedException", e);
163 }
164 }
165 }
166 }
167 }
168
169 private void flushWrites() {
170 for (;;) {
171 WriteRequest req = getCurrentWriteRequest();
172 if (req == null) {
173 req = getWriteRequestQueue().poll(this);
174 if (req == null) {
175 break;
176 }
177 }
178
179 IoBuffer"../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
180 if (buf.remaining() == 0) {
181 setCurrentWriteRequest(null);
182 buf.reset();
183 this.getFilterChain().fireMessageSent(req);
184 continue;
185 }
186
187 int writtenBytes = buf.remaining();
188 try {
189 outputStream.write(buf.array(), buf.position(), writtenBytes);
190 buf.position(buf.position() + writtenBytes);
191
192
193 increaseWrittenBytes(writtenBytes, System.currentTimeMillis());
194
195 setCurrentWriteRequest(null);
196 buf.reset();
197
198
199 getFilterChain().fireMessageSent(req);
200 } catch (IOException e) {
201 this.getFilterChain().fireExceptionCaught(e);
202 }
203
204 }
205 }
206
207 private final Object readReadyMonitor = new Object();
208
209 private class ReadWorker extends Thread {
210 @Override
211 public void run() {
212 while (isConnected() && !isClosing()) {
213 synchronized (readReadyMonitor) {
214 try {
215 readReadyMonitor.wait();
216 } catch (InterruptedException e) {
217 log.error("InterruptedException", e);
218 }
219 if (isClosing() || !isConnected()) {
220 break;
221 }
222 int dataSize;
223 try {
224 dataSize = inputStream.available();
225 byte[] data = new byte[dataSize];
226 int readBytes = inputStream.read(data);
227
228 if (readBytes > 0) {
229 IoBuffer buf = IoBuffer.wrap(data, 0, readBytes);
230 buf.put(data, 0, readBytes);
231 buf.flip();
232 getFilterChain().fireMessageReceived(buf);
233 }
234 } catch (IOException e) {
235 getFilterChain().fireExceptionCaught(e);
236 }
237 }
238 }
239 }
240 }
241
242 public void serialEvent(SerialPortEvent evt) {
243 if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
244 synchronized (readReadyMonitor) {
245 readReadyMonitor.notifyAll();
246 }
247 }
248 }
249
250 @Override
251 public IoProcessor<SerialSessionImpl> getProcessor() {
252 return processor;
253 }
254
255 private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
256 public void add(SerialSessionImpl session) {
257
258 }
259
260
261
262
263 public void write(SerialSessionImpl session, WriteRequest writeRequest) {
264 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
265
266 writeRequestQueue.offer(session, writeRequest);
267
268 if (!session.isWriteSuspended()) {
269 session.getProcessor().flush(session);
270 }
271 }
272
273
274
275
276 public void flush(SerialSessionImpl session) {
277 if (writeWorker == null) {
278 writeWorker = new WriteWorker();
279 writeWorker.start();
280 } else {
281 synchronized (writeMonitor) {
282 writeMonitor.notifyAll();
283 }
284 }
285 }
286
287 public void remove(SerialSessionImpl session) {
288 try {
289 inputStream.close();
290 } catch (IOException e) {
291 ExceptionMonitor.getInstance().exceptionCaught(e);
292 }
293 try {
294 outputStream.close();
295 } catch (IOException e) {
296 ExceptionMonitor.getInstance().exceptionCaught(e);
297 }
298
299 try {
300 port.setFlowControlMode(SerialPort.FLOWCONTROL_NONE);
301 } catch (UnsupportedCommOperationException e) {
302 ExceptionMonitor.getInstance().exceptionCaught(e);
303 }
304
305 port.close();
306 flush(session);
307 synchronized (readReadyMonitor) {
308 readReadyMonitor.notifyAll();
309 }
310
311 serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
312 }
313
314 public void updateTrafficControl(SerialSessionImpl session) {
315 throw new UnsupportedOperationException();
316 }
317
318 public void dispose() {
319
320 }
321
322 public boolean isDisposed() {
323 return false;
324 }
325
326 public boolean isDisposing() {
327 return false;
328 }
329 }
330 }