1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.protobuf;
21
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.UUID;
30
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellScanner;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.io.SizedCellScanner;
37 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
38 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
40 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
41 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
42 import org.apache.hadoop.hbase.wal.WAL.Entry;
43 import org.apache.hadoop.hbase.wal.WALKey;
44 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45 import org.apache.hadoop.hbase.util.ByteStringer;
46 import org.apache.hadoop.hbase.util.Pair;
47
48 import com.google.protobuf.ServiceException;
49
50 @InterfaceAudience.Private
51 public class ReplicationProtbufUtil {
52
53
54
55
56
57
58
59 public static void replicateWALEntry(final AdminService.BlockingInterface admin,
60 final Entry[] entries) throws IOException {
61 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
62 buildReplicateWALEntryRequest(entries, null);
63 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
64 try {
65 admin.replicateWALEntry(controller, p.getFirst());
66 } catch (ServiceException se) {
67 throw ProtobufUtil.getRemoteException(se);
68 }
69 }
70
71
72
73
74
75
76
77
78 public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
79 buildReplicateWALEntryRequest(final Entry[] entries) {
80 return buildReplicateWALEntryRequest(entries, null);
81 }
82
83
84
85
86
87
88
89
90
91 public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
92 buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
93
94 List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
95 int size = 0;
96 WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
97 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
98 AdminProtos.ReplicateWALEntryRequest.Builder builder =
99 AdminProtos.ReplicateWALEntryRequest.newBuilder();
100 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
101 for (Entry entry: entries) {
102 entryBuilder.clear();
103
104 WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
105 WALKey key = entry.getKey();
106 keyBuilder.setEncodedRegionName(
107 ByteStringer.wrap(encodedRegionName == null
108 ? key.getEncodedRegionName()
109 : encodedRegionName));
110 keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
111 keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
112 keyBuilder.setWriteTime(key.getWriteTime());
113 if (key.getNonce() != HConstants.NO_NONCE) {
114 keyBuilder.setNonce(key.getNonce());
115 }
116 if (key.getNonceGroup() != HConstants.NO_NONCE) {
117 keyBuilder.setNonceGroup(key.getNonceGroup());
118 }
119 for(UUID clusterId : key.getClusterIds()) {
120 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
121 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
122 keyBuilder.addClusterIds(uuidBuilder.build());
123 }
124 if(key.getOrigLogSeqNum() > 0) {
125 keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
126 }
127 WALEdit edit = entry.getEdit();
128 NavigableMap<byte[], Integer> scopes = key.getScopes();
129 if (scopes != null && !scopes.isEmpty()) {
130 for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
131 scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
132 WALProtos.ScopeType scopeType =
133 WALProtos.ScopeType.valueOf(scope.getValue().intValue());
134 scopeBuilder.setScopeType(scopeType);
135 keyBuilder.addScopes(scopeBuilder.build());
136 }
137 }
138 List<Cell> cells = edit.getCells();
139
140 for (Cell cell: cells) {
141 size += CellUtil.estimatedSerializedSizeOf(cell);
142 }
143
144 allCells.add(cells);
145
146 entryBuilder.setAssociatedCellCount(cells.size());
147 builder.addEntry(entryBuilder.build());
148 }
149 return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
150 getCellScanner(allCells, size));
151 }
152
153
154
155
156
157 static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
158 return new SizedCellScanner() {
159 private final Iterator<List<? extends Cell>> entries = cells.iterator();
160 private Iterator<? extends Cell> currentIterator = null;
161 private Cell currentCell;
162
163 @Override
164 public Cell current() {
165 return this.currentCell;
166 }
167
168 @Override
169 public boolean advance() {
170 if (this.currentIterator == null) {
171 if (!this.entries.hasNext()) return false;
172 this.currentIterator = this.entries.next().iterator();
173 }
174 if (this.currentIterator.hasNext()) {
175 this.currentCell = this.currentIterator.next();
176 return true;
177 }
178 this.currentCell = null;
179 this.currentIterator = null;
180 return advance();
181 }
182
183 @Override
184 public long heapSize() {
185 return size;
186 }
187 };
188 }
189 }