Coverage Report - org.apache.commons.pipeline.stage.KeyWaitBufferStage
 
Classes in this File Line Coverage Branch Coverage Complexity
KeyWaitBufferStage
97%
29/30
90%
9/10
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  * 
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  * 
 12  
  * Unless required by applicable law or agreed to in writing,
 13  
  * software distributed under the License is distributed on an
 14  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15  
  * KIND, either express or implied.  See the License for the
 16  
  * specific language governing permissions and limitations
 17  
  * under the License.    
 18  
  */ 
 19  
 
 20  
 package org.apache.commons.pipeline.stage;
 21  
 
 22  
 import java.util.EventObject;
 23  
 import java.util.Map;
 24  
 import java.util.Queue;
 25  
 import java.util.Set;
 26  
 import java.util.TreeMap;
 27  
 import java.util.TreeSet;
 28  
 import org.apache.commons.pipeline.StageContext;
 29  
 import org.apache.commons.pipeline.StageEventListener;
 30  
 import org.apache.commons.pipeline.StageException;
 31  
 import org.apache.commons.pipeline.event.KeyAvailableEvent;
 32  
 import org.apache.commons.pipeline.util.KeyFactory;
 33  
 import org.apache.commons.pipeline.util.QueueFactory;
 34  
 
 35  
 /**
 36  
  *
 37  
  * @author kjn
 38  
  */
 39  
 public class KeyWaitBufferStage extends BaseStage implements StageEventListener {
 40  
     
 41  3
     private Set<Object> receivedKeys = new TreeSet<Object>();
 42  3
     private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>();
 43  
     
 44  
     /** Creates a new instance of KeyWaitBufferStage */
 45  3
     public KeyWaitBufferStage() {
 46  3
     }
 47  
     
 48  
     public void notify(EventObject ev) {
 49  161
         if (ev instanceof KeyAvailableEvent) {
 50  81
             KeyAvailableEvent e = (KeyAvailableEvent) ev;
 51  81
             synchronized(receivedKeys) {
 52  81
                 receivedKeys.add(e.getKey());
 53  81
             }
 54  
             
 55  
             //at this point, we know that no more objects will be added to
 56  
             //the pending queue for the key, so we can remove and empty it.
 57  81
             if (buffers.containsKey(e.getKey())) {
 58  63
                 for (Object obj : buffers.remove(e.getKey())) this.emit(obj);
 59  
             }
 60  
         }
 61  161
     }
 62  
     
 63  
     public void init(StageContext context) {
 64  3
         super.init(context);
 65  3
         context.registerListener(this);
 66  3
     }
 67  
     
 68  
     public void process(Object obj) throws StageException {
 69  81
         Object key = keyFactory.generateKey(obj);
 70  81
         synchronized(receivedKeys) {
 71  81
             if (!receivedKeys.contains(key)) {
 72  
                 //store the object in a pending queue.
 73  63
                 if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue());
 74  63
                 buffers.get(key).add(obj);
 75  63
                 return;
 76  
             }
 77  18
         }
 78  
         
 79  18
         this.emit(obj);
 80  18
     }
 81  
 
 82  
     /**
 83  
      * Holds value of property keyFactory.
 84  
      */
 85  
     private KeyFactory<Object,? extends Object> keyFactory;
 86  
 
 87  
     /**
 88  
      * Getter for property keyFactory.
 89  
      * @return Value of property keyFactory.
 90  
      */
 91  
     public KeyFactory<Object,? extends Object> getKeyFactory() {
 92  0
         return this.keyFactory;
 93  
     }
 94  
 
 95  
     /**
 96  
      * Setter for property keyFactory.
 97  
      * @param keyFactory New value of property keyFactory.
 98  
      */
 99  
     public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) {
 100  2
         this.keyFactory = keyFactory;
 101  2
     }
 102  
 
 103  
     /**
 104  
      * Holds value of property queueFactory.
 105  
      */
 106  
     private QueueFactory<Object> queueFactory;
 107  
 
 108  
     /**
 109  
      * Getter for property queueFactory.
 110  
      * @return Value of property queueFactory.
 111  
      */
 112  
     public QueueFactory<Object> getQueueFactory() {
 113  1
         return this.queueFactory;
 114  
     }
 115  
 
 116  
     /**
 117  
      * Setter for property queueFactory.
 118  
      * @param queueFactory New value of property queueFactory.
 119  
      */
 120  
     public void setQueueFactory(QueueFactory<Object> queueFactory) {
 121  2
         this.queueFactory = queueFactory;
 122  2
     }
 123  
 }