Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
ByteValueVertex |
|
| 2.3333333333333335;2.333 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.graph; | |
19 | ||
20 | import org.apache.giraph.edge.Edge; | |
21 | import org.apache.giraph.utils.UnsafeByteArrayInputStream; | |
22 | import org.apache.giraph.utils.UnsafeByteArrayOutputStream; | |
23 | import org.apache.hadoop.io.Writable; | |
24 | import org.apache.hadoop.io.WritableComparable; | |
25 | ||
26 | import java.io.DataInput; | |
27 | import java.io.IOException; | |
28 | ||
29 | ||
30 | /** | |
31 | * Special version of vertex that holds the value in raw byte form to save | |
32 | * memory. | |
33 | * | |
34 | * @param <I> Vertex id | |
35 | * @param <V> Vertex data | |
36 | * @param <E> Edge data | |
37 | */ | |
38 | 0 | public class ByteValueVertex<I extends WritableComparable, |
39 | V extends Writable, E extends Writable> | |
40 | extends DefaultVertex<I, V, E> { | |
41 | ||
42 | /** Vertex value stored as raw bytes */ | |
43 | private byte[] valueBytes; | |
44 | /** Value as an cached object that is only valid during the vertex update */ | |
45 | 0 | private V cachedValue = null; |
46 | ||
47 | @Override | |
48 | public V getValue() { | |
49 | 0 | if (cachedValue != null) { |
50 | 0 | return cachedValue; // Return always same instance |
51 | } | |
52 | 0 | DataInput dis = new UnsafeByteArrayInputStream(valueBytes); |
53 | 0 | cachedValue = getConf().createVertexValue(); |
54 | try { | |
55 | 0 | cachedValue.readFields(dis); |
56 | 0 | } catch (IOException ioe) { |
57 | 0 | throw new RuntimeException("Could not deserialize vertex value", ioe); |
58 | 0 | } |
59 | // Forget the serialized data, because we have cached the object | |
60 | 0 | valueBytes = null; |
61 | 0 | return cachedValue; |
62 | } | |
63 | ||
64 | /** | |
65 | * Serializes the value to bytes, stored in field valueBytes | |
66 | * @param value new vertex value | |
67 | */ | |
68 | private void setSerializedValue(V value) { | |
69 | 0 | UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream(); |
70 | try { | |
71 | 0 | value.write(bos); |
72 | 0 | bos.close(); |
73 | 0 | } catch (IOException ioe) { |
74 | 0 | throw new RuntimeException("Could not serialize vertex value", ioe); |
75 | 0 | } |
76 | 0 | this.valueBytes = bos.toByteArray(); |
77 | 0 | cachedValue = null; |
78 | 0 | } |
79 | ||
80 | @Override | |
81 | public void setValue(V value) { | |
82 | 0 | if (cachedValue != null) { |
83 | 0 | cachedValue = value; |
84 | } else { | |
85 | 0 | setSerializedValue(value); |
86 | } | |
87 | 0 | } |
88 | ||
89 | @Override | |
90 | public void initialize(I id, V value, Iterable<Edge<I, E>> edges) { | |
91 | // Set the parent's value to null, and instead use our own setter | |
92 | 0 | super.initialize(id, null, edges); |
93 | 0 | setValue(value); |
94 | 0 | } |
95 | ||
96 | @Override | |
97 | public void initialize(I id, V value) { | |
98 | 0 | super.initialize(id, null); |
99 | 0 | setValue(value); |
100 | 0 | } |
101 | ||
102 | @Override | |
103 | public void unwrapMutableEdges() { | |
104 | // This method is called always after compute(vertex), so | |
105 | // we use this to commit the vertex value. | |
106 | 0 | if (cachedValue != null) { |
107 | // This means the value has been requested from vertex | |
108 | // and possible mutated -- so we need to update the byte array | |
109 | 0 | setSerializedValue(cachedValue); |
110 | 0 | cachedValue = null; // Uncache the value |
111 | } | |
112 | 0 | super.unwrapMutableEdges(); |
113 | 0 | } |
114 | } |