View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.array;
19  
20  import java.util.ArrayList;
21  
22  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
24  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
25  import org.apache.giraph.block_app.framework.piece.global_comm.array.ArrayHandle;
26  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
27  import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
28  import org.apache.giraph.function.Supplier;
29  import org.apache.giraph.function.primitive.Int2ObjFunction;
30  import org.apache.giraph.worker.WorkerBroadcastUsage;
31  
32  /**
33   * ArrayHandle implemented as an array of individual handles.
34   *
35   * @param <H> Handle type
36   */
37  public class ArrayOfHandles<H> implements ArrayHandle<H> {
38    protected final ArrayList<H> handles;
39  
40    public ArrayOfHandles(int count, Supplier<H> reduceHandleFactory) {
41      handles = new ArrayList<>();
42      for (int i = 0; i < count; i++) {
43        handles.add(reduceHandleFactory.get());
44      }
45    }
46  
47    public ArrayOfHandles(int count, Int2ObjFunction<H> reduceHandleFactory) {
48      handles = new ArrayList<>();
49      for (int i = 0; i < count; i++) {
50        handles.add(reduceHandleFactory.apply(i));
51      }
52    }
53  
54    @Override
55    public H get(int index) {
56      return handles.get(index);
57    }
58  
59    @Override
60    public int getStaticSize() {
61      return handles.size();
62    }
63  
64    /**
65     * ReducerArrayHandle implemented as an array of separate reducer handles.
66     *
67     * @param <S> Handle type
68     * @param <R> Reduce value type
69     */
70    public static class ArrayOfReducers<S, R>
71        extends ArrayOfHandles<ReducerHandle<S, R>>
72        implements ReducerArrayHandle<S, R> {
73  
74      public ArrayOfReducers(
75          int count, Supplier<ReducerHandle<S, R>> reduceHandleFactory) {
76        super(count, reduceHandleFactory);
77      }
78  
79      public ArrayOfReducers(
80          int count, Int2ObjFunction<ReducerHandle<S, R>> reduceHandleFactory) {
81        super(count, reduceHandleFactory);
82      }
83  
84      @Override
85      public int getReducedSize(BlockMasterApi master) {
86        return getStaticSize();
87      }
88  
89      @Override
90      public BroadcastArrayHandle<R> broadcastValue(final BlockMasterApi master) {
91        return new ArrayOfBroadcasts<>(
92            getStaticSize(),
93            new Int2ObjFunction<BroadcastHandle<R>>() {
94              @Override
95              public BroadcastHandle<R> apply(int index) {
96                return get(index).broadcastValue(master);
97              }
98            });
99      }
100   }
101 
102   /**
103    * BroadcastArrayHandle implemented as an array of separate broadcast handles.
104    *
105    * @param <T> Handle type
106    */
107   public static class ArrayOfBroadcasts<T>
108       extends ArrayOfHandles<BroadcastHandle<T>>
109       implements BroadcastArrayHandle<T> {
110 
111     public ArrayOfBroadcasts(
112         int count,
113         Int2ObjFunction<BroadcastHandle<T>> broadcastHandleFactory) {
114       super(count, broadcastHandleFactory);
115     }
116 
117     public ArrayOfBroadcasts(
118         int count,
119         Supplier<BroadcastHandle<T>> broadcastHandleFactory) {
120       super(count, broadcastHandleFactory);
121     }
122 
123     @Override
124     public int getBroadcastedSize(WorkerBroadcastUsage worker) {
125       return getStaticSize();
126     }
127   }
128 }