Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
AccumuloVertexOutputFormat |
|
| 1.4285714285714286;1.429 | ||||
AccumuloVertexOutputFormat$AccumuloVertexWriter |
|
| 1.4285714285714286;1.429 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.io.accumulo; | |
19 | ||
20 | import java.io.IOException; | |
21 | import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; | |
22 | import org.apache.accumulo.core.data.Mutation; | |
23 | import org.apache.giraph.io.VertexOutputFormat; | |
24 | import org.apache.giraph.io.VertexWriter; | |
25 | import org.apache.hadoop.io.Text; | |
26 | import org.apache.hadoop.io.Writable; | |
27 | import org.apache.hadoop.io.WritableComparable; | |
28 | import org.apache.hadoop.mapreduce.JobContext; | |
29 | import org.apache.hadoop.mapreduce.OutputCommitter; | |
30 | import org.apache.hadoop.mapreduce.RecordWriter; | |
31 | import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
32 | /** | |
33 | * | |
34 | * Class which wraps the AccumuloOutputFormat. It's designed | |
35 | * as an extension point to VertexOutputFormat subclasses who wish | |
36 | * to write vertices back to an Accumulo table. | |
37 | * | |
38 | * Works with | |
39 | * {@link AccumuloVertexInputFormat} | |
40 | * | |
41 | * | |
42 | * @param <I> vertex id type | |
43 | * @param <V> vertex value type | |
44 | * @param <E> edge type | |
45 | */ | |
46 | 0 | public abstract class AccumuloVertexOutputFormat< |
47 | I extends WritableComparable, | |
48 | V extends Writable, | |
49 | E extends Writable> | |
50 | extends VertexOutputFormat<I, V, E> { | |
51 | ||
52 | ||
53 | /** | |
54 | * Output table parameter | |
55 | */ | |
56 | public static final String OUTPUT_TABLE = "OUTPUT_TABLE"; | |
57 | ||
58 | /** | |
59 | * Accumulo delegate for table output | |
60 | */ | |
61 | 0 | protected AccumuloOutputFormat accumuloOutputFormat = |
62 | new AccumuloOutputFormat(); | |
63 | ||
64 | /** | |
65 | * | |
66 | * Main abstraction point for vertex writers to persist back | |
67 | * to Accumulo tables. | |
68 | * | |
69 | * @param <I> vertex id type | |
70 | * @param <V> vertex value type | |
71 | * @param <E> edge type | |
72 | */ | |
73 | public abstract static class AccumuloVertexWriter< | |
74 | I extends WritableComparable, | |
75 | V extends Writable, | |
76 | E extends Writable> | |
77 | extends VertexWriter<I, V, E> { | |
78 | ||
79 | /** | |
80 | * task attempt context. | |
81 | */ | |
82 | private TaskAttemptContext context; | |
83 | ||
84 | /** | |
85 | * Accumulo record writer | |
86 | */ | |
87 | private RecordWriter<Text, Mutation> recordWriter; | |
88 | ||
89 | /** | |
90 | * Constructor for use with subclasses | |
91 | * | |
92 | * @param recordWriter accumulo record writer | |
93 | */ | |
94 | 0 | public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) { |
95 | 0 | this.recordWriter = recordWriter; |
96 | 0 | } |
97 | ||
98 | /** | |
99 | * initialize | |
100 | * | |
101 | * @param context Context used to write the vertices. | |
102 | * @throws IOException | |
103 | */ | |
104 | public void initialize(TaskAttemptContext context) throws IOException { | |
105 | 0 | this.context = context; |
106 | 0 | } |
107 | ||
108 | /** | |
109 | * close | |
110 | * | |
111 | * @param context the context of the task | |
112 | * @throws IOException | |
113 | * @throws InterruptedException | |
114 | */ | |
115 | public void close(TaskAttemptContext context) | |
116 | throws IOException, InterruptedException { | |
117 | 0 | recordWriter.close(context); |
118 | 0 | } |
119 | ||
120 | /** | |
121 | * Get the table record writer; | |
122 | * | |
123 | * @return Record writer to be used for writing. | |
124 | */ | |
125 | public RecordWriter<Text, Mutation> getRecordWriter() { | |
126 | 0 | return recordWriter; |
127 | } | |
128 | ||
129 | /** | |
130 | * Get the context. | |
131 | * | |
132 | * @return Context passed to initialize. | |
133 | */ | |
134 | public TaskAttemptContext getContext() { | |
135 | 0 | return context; |
136 | } | |
137 | ||
138 | } | |
139 | /** | |
140 | * | |
141 | * checkOutputSpecs | |
142 | * | |
143 | * @param context information about the job | |
144 | * @throws IOException | |
145 | * @throws InterruptedException | |
146 | */ | |
147 | @Override | |
148 | public void checkOutputSpecs(JobContext context) | |
149 | throws IOException, InterruptedException { | |
150 | try { | |
151 | 0 | accumuloOutputFormat.checkOutputSpecs(context); |
152 | 0 | } catch (IOException e) { |
153 | 0 | if (e.getMessage().contains("Output info has not been set")) { |
154 | 0 | throw new IOException(e.getMessage() + " Make sure you initialized" + |
155 | " AccumuloOutputFormat static setters " + | |
156 | "before passing the config to GiraphJob."); | |
157 | } | |
158 | 0 | } |
159 | 0 | } |
160 | ||
161 | /** | |
162 | * getOutputCommitter | |
163 | * | |
164 | * @param context the task context | |
165 | * @return OutputCommitter | |
166 | * @throws IOException | |
167 | * @throws InterruptedException | |
168 | */ | |
169 | @Override | |
170 | public OutputCommitter getOutputCommitter(TaskAttemptContext context) | |
171 | throws IOException, InterruptedException { | |
172 | 0 | return accumuloOutputFormat.getOutputCommitter(context); |
173 | } | |
174 | } |