1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
package org.apache.commons.pipeline.stage; |
21 | |
|
22 | |
import java.util.EventObject; |
23 | |
import java.util.Map; |
24 | |
import java.util.Queue; |
25 | |
import java.util.Set; |
26 | |
import java.util.TreeMap; |
27 | |
import java.util.TreeSet; |
28 | |
import org.apache.commons.pipeline.StageContext; |
29 | |
import org.apache.commons.pipeline.StageEventListener; |
30 | |
import org.apache.commons.pipeline.StageException; |
31 | |
import org.apache.commons.pipeline.event.KeyAvailableEvent; |
32 | |
import org.apache.commons.pipeline.util.KeyFactory; |
33 | |
import org.apache.commons.pipeline.util.QueueFactory; |
34 | |
|
35 | |
|
36 | |
|
37 | |
|
38 | |
|
39 | |
public class KeyWaitBufferStage extends BaseStage implements StageEventListener { |
40 | |
|
41 | 3 | private Set<Object> receivedKeys = new TreeSet<Object>(); |
42 | 3 | private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>(); |
43 | |
|
44 | |
|
45 | 3 | public KeyWaitBufferStage() { |
46 | 3 | } |
47 | |
|
48 | |
public void notify(EventObject ev) { |
49 | 161 | if (ev instanceof KeyAvailableEvent) { |
50 | 81 | KeyAvailableEvent e = (KeyAvailableEvent) ev; |
51 | 81 | synchronized(receivedKeys) { |
52 | 81 | receivedKeys.add(e.getKey()); |
53 | 81 | } |
54 | |
|
55 | |
|
56 | |
|
57 | 81 | if (buffers.containsKey(e.getKey())) { |
58 | 63 | for (Object obj : buffers.remove(e.getKey())) this.emit(obj); |
59 | |
} |
60 | |
} |
61 | 161 | } |
62 | |
|
63 | |
public void init(StageContext context) { |
64 | 3 | super.init(context); |
65 | 3 | context.registerListener(this); |
66 | 3 | } |
67 | |
|
68 | |
public void process(Object obj) throws StageException { |
69 | 81 | Object key = keyFactory.generateKey(obj); |
70 | 81 | synchronized(receivedKeys) { |
71 | 81 | if (!receivedKeys.contains(key)) { |
72 | |
|
73 | 63 | if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue()); |
74 | 63 | buffers.get(key).add(obj); |
75 | 63 | return; |
76 | |
} |
77 | 18 | } |
78 | |
|
79 | 18 | this.emit(obj); |
80 | 18 | } |
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
private KeyFactory<Object,? extends Object> keyFactory; |
86 | |
|
87 | |
|
88 | |
|
89 | |
|
90 | |
|
91 | |
public KeyFactory<Object,? extends Object> getKeyFactory() { |
92 | 0 | return this.keyFactory; |
93 | |
} |
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) { |
100 | 2 | this.keyFactory = keyFactory; |
101 | 2 | } |
102 | |
|
103 | |
|
104 | |
|
105 | |
|
106 | |
private QueueFactory<Object> queueFactory; |
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
|
112 | |
public QueueFactory<Object> getQueueFactory() { |
113 | 1 | return this.queueFactory; |
114 | |
} |
115 | |
|
116 | |
|
117 | |
|
118 | |
|
119 | |
|
120 | |
public void setQueueFactory(QueueFactory<Object> queueFactory) { |
121 | 2 | this.queueFactory = queueFactory; |
122 | 2 | } |
123 | |
} |