1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.io.util.Dictionary;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.io.WritableUtils;
33
34 import com.google.common.base.Preconditions;
35
36 import org.apache.hadoop.hbase.wal.WAL;
37 import org.apache.hadoop.hbase.wal.WALFactory;
38 import org.apache.hadoop.hbase.wal.WALProvider;
39
40
41
42
43
44 @InterfaceAudience.Private
45 public class Compressor {
46
47
48
49 public static void main(String[] args) throws IOException {
50 if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
51 printHelp();
52 System.exit(-1);
53 }
54
55 Path inputPath = new Path(args[0]);
56 Path outputPath = new Path(args[1]);
57
58 transformFile(inputPath, outputPath);
59 }
60
61 private static void printHelp() {
62 System.err.println("usage: Compressor <input> <output>");
63 System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
64 System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
65 return;
66 }
67
68 private static void transformFile(Path input, Path output)
69 throws IOException {
70 Configuration conf = HBaseConfiguration.create();
71
72 FileSystem inFS = input.getFileSystem(conf);
73 FileSystem outFS = output.getFileSystem(conf);
74
75 WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
76 WALProvider.Writer out = null;
77
78 try {
79 if (!(in instanceof ReaderBase)) {
80 System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
81 return;
82 }
83 boolean compress = ((ReaderBase)in).hasCompression();
84 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
85 out = WALFactory.createWALWriter(outFS, output, conf);
86
87 WAL.Entry e = null;
88 while ((e = in.next()) != null) out.append(e);
89 } finally {
90 in.close();
91 if (out != null) {
92 out.close();
93 out = null;
94 }
95 }
96 }
97
98
99
100
101
102
103
104
105 @Deprecated
106 static byte[] readCompressed(DataInput in, Dictionary dict)
107 throws IOException {
108 byte status = in.readByte();
109
110 if (status == Dictionary.NOT_IN_DICTIONARY) {
111 int length = WritableUtils.readVInt(in);
112
113 byte[] arr = new byte[length];
114 in.readFully(arr);
115 if (dict != null) dict.addEntry(arr, 0, length);
116 return arr;
117 } else {
118
119
120
121 short dictIdx = toShort(status, in.readByte());
122 byte[] entry = dict.getEntry(dictIdx);
123 if (entry == null) {
124 throw new IOException("Missing dictionary entry for index "
125 + dictIdx);
126 }
127 return entry;
128 }
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142 @Deprecated
143 static int uncompressIntoArray(byte[] to, int offset, DataInput in,
144 Dictionary dict) throws IOException {
145 byte status = in.readByte();
146
147 if (status == Dictionary.NOT_IN_DICTIONARY) {
148
149
150 int length = WritableUtils.readVInt(in);
151 in.readFully(to, offset, length);
152 dict.addEntry(to, offset, length);
153 return length;
154 } else {
155
156
157 short dictIdx = toShort(status, in.readByte());
158 byte[] entry;
159 try {
160 entry = dict.getEntry(dictIdx);
161 } catch (Exception ex) {
162 throw new IOException("Unable to uncompress the log entry", ex);
163 }
164 if (entry == null) {
165 throw new IOException("Missing dictionary entry for index "
166 + dictIdx);
167 }
168
169 Bytes.putBytes(to, offset, entry, 0, entry.length);
170 return entry.length;
171 }
172 }
173
174
175
176
177
178
179
180
181 @Deprecated
182 static void writeCompressed(byte[] data, int offset, int length,
183 DataOutput out, Dictionary dict)
184 throws IOException {
185 short dictIdx = Dictionary.NOT_IN_DICTIONARY;
186 if (dict != null) {
187 dictIdx = dict.findEntry(data, offset, length);
188 }
189 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
190
191 out.writeByte(Dictionary.NOT_IN_DICTIONARY);
192 WritableUtils.writeVInt(out, length);
193 out.write(data, offset, length);
194 } else {
195 out.writeShort(dictIdx);
196 }
197 }
198
199 static short toShort(byte hi, byte lo) {
200 short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
201 Preconditions.checkArgument(s >= 0);
202 return s;
203 }
204 }