1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.proxy.event;
21
22 import java.util.LinkedList;
23 import java.util.Queue;
24
25 import org.apache.mina.proxy.handlers.socks.SocksProxyRequest;
26 import org.apache.mina.proxy.session.ProxyIoSession;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30
31
32
33
34
35
36
37 public class IoSessionEventQueue {
38 private static final Logger LOGGER = LoggerFactory.getLogger(IoSessionEventQueue.class);
39
40
41
42
43 private ProxyIoSession proxyIoSession;
44
45
46
47
48 private Queue<IoSessionEvent> sessionEventsQueue = new LinkedList<>();
49
50
51
52
53
54
55 public IoSessionEventQueue(ProxyIoSession proxyIoSession) {
56 this.proxyIoSession = proxyIoSession;
57 }
58
59
60
61
62 private void discardSessionQueueEvents() {
63 synchronized (sessionEventsQueue) {
64
65 sessionEventsQueue.clear();
66
67 if (LOGGER.isDebugEnabled()) {
68 LOGGER.debug("Event queue CLEARED");
69 }
70 }
71 }
72
73
74
75
76
77
78
79
80
81
82
83
84 public void enqueueEventIfNecessary(final IoSessionEvent evt) {
85 if (LOGGER.isDebugEnabled()) {
86 LOGGER.debug("??? >> Enqueue {}", evt);
87 }
88
89 if (proxyIoSession.getRequest() instanceof SocksProxyRequest) {
90
91 evt.deliverEvent();
92
93 return;
94 }
95
96 if (proxyIoSession.getHandler().isHandshakeComplete()) {
97 evt.deliverEvent();
98 } else {
99 if (evt.getType() == IoSessionEventType.CLOSED) {
100 if (proxyIoSession.isAuthenticationFailed()) {
101 proxyIoSession.getConnector().cancelConnectFuture();
102 discardSessionQueueEvents();
103 evt.deliverEvent();
104 } else {
105 discardSessionQueueEvents();
106 }
107 } else if (evt.getType() == IoSessionEventType.OPENED) {
108
109
110 enqueueSessionEvent(evt);
111 evt.deliverEvent();
112 } else {
113 enqueueSessionEvent(evt);
114 }
115 }
116 }
117
118
119
120
121
122
123
124
125 public void flushPendingSessionEvents() throws Exception {
126 synchronized (sessionEventsQueue) {
127 IoSessionEvent evt;
128
129 while ((evt = sessionEventsQueue.poll()) != null) {
130 if (LOGGER.isDebugEnabled()) {
131 LOGGER.debug(" Flushing buffered event: {}", evt);
132 }
133
134 evt.deliverEvent();
135 }
136 }
137 }
138
139
140
141
142
143
144 private void enqueueSessionEvent(final IoSessionEvent evt) {
145 synchronized (sessionEventsQueue) {
146 if (LOGGER.isDebugEnabled()) {
147 LOGGER.debug("Enqueuing event: {}", evt);
148 }
149
150 sessionEventsQueue.offer(evt);
151 }
152 }
153 }