View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.protobuf;
21  
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.UUID;
30  
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellScanner;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.io.SizedCellScanner;
37  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
38  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
41  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
42  import org.apache.hadoop.hbase.wal.WAL.Entry;
43  import org.apache.hadoop.hbase.wal.WALKey;
44  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
45  import org.apache.hadoop.hbase.util.ByteStringer;
46  import org.apache.hadoop.hbase.util.Pair;
47  
48  import com.google.protobuf.ServiceException;
49  
50  @InterfaceAudience.Private
51  public class ReplicationProtbufUtil {
52    /**
53     * A helper to replicate a list of WAL entries using admin protocol.
54     *
55     * @param admin
56     * @param entries
57     * @throws java.io.IOException
58     */
59    public static void replicateWALEntry(final AdminService.BlockingInterface admin,
60        final Entry[] entries) throws IOException {
61      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
62        buildReplicateWALEntryRequest(entries, null);
63      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
64      try {
65        admin.replicateWALEntry(controller, p.getFirst());
66      } catch (ServiceException se) {
67        throw ProtobufUtil.getRemoteException(se);
68      }
69    }
70  
71    /**
72     * Create a new ReplicateWALEntryRequest from a list of WAL entries
73     *
74     * @param entries the WAL entries to be replicated
75     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
76     * found.
77     */
78    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
79        buildReplicateWALEntryRequest(final Entry[] entries) {
80      return buildReplicateWALEntryRequest(entries, null);
81    }
82  
83    /**
84     * Create a new ReplicateWALEntryRequest from a list of WAL entries
85     *
86     * @param entries the WAL entries to be replicated
87     * @param encodedRegionName alternative region name to use if not null
88     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
89     * found.
90     */
91    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
92        buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
93      // Accumulate all the Cells seen in here.
94      List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
95      int size = 0;
96      WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
97      AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
98      AdminProtos.ReplicateWALEntryRequest.Builder builder =
99        AdminProtos.ReplicateWALEntryRequest.newBuilder();
100     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
101     for (Entry entry: entries) {
102       entryBuilder.clear();
103       // TODO: this duplicates a lot in WALKey#getBuilder
104       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
105       WALKey key = entry.getKey();
106       keyBuilder.setEncodedRegionName(
107         ByteStringer.wrap(encodedRegionName == null
108             ? key.getEncodedRegionName()
109             : encodedRegionName));
110       keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
111       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
112       keyBuilder.setWriteTime(key.getWriteTime());
113       if (key.getNonce() != HConstants.NO_NONCE) {
114         keyBuilder.setNonce(key.getNonce());
115       }
116       if (key.getNonceGroup() != HConstants.NO_NONCE) {
117         keyBuilder.setNonceGroup(key.getNonceGroup());
118       }
119       for(UUID clusterId : key.getClusterIds()) {
120         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
121         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
122         keyBuilder.addClusterIds(uuidBuilder.build());
123       }
124       if(key.getOrigLogSeqNum() > 0) {
125         keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
126       }
127       WALEdit edit = entry.getEdit();
128       NavigableMap<byte[], Integer> scopes = key.getScopes();
129       if (scopes != null && !scopes.isEmpty()) {
130         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
131           scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
132           WALProtos.ScopeType scopeType =
133               WALProtos.ScopeType.valueOf(scope.getValue().intValue());
134           scopeBuilder.setScopeType(scopeType);
135           keyBuilder.addScopes(scopeBuilder.build());
136         }
137       }
138       List<Cell> cells = edit.getCells();
139       // Add up the size.  It is used later serializing out the kvs.
140       for (Cell cell: cells) {
141         size += CellUtil.estimatedSerializedSizeOf(cell);
142       }
143       // Collect up the cells
144       allCells.add(cells);
145       // Write out how many cells associated with this entry.
146       entryBuilder.setAssociatedCellCount(cells.size());
147       builder.addEntry(entryBuilder.build());
148     }
149     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
150       getCellScanner(allCells, size));
151   }
152 
153   /**
154    * @param cells
155    * @return <code>cells</code> packaged as a CellScanner
156    */
157   static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
158     return new SizedCellScanner() {
159       private final Iterator<List<? extends Cell>> entries = cells.iterator();
160       private Iterator<? extends Cell> currentIterator = null;
161       private Cell currentCell;
162 
163       @Override
164       public Cell current() {
165         return this.currentCell;
166       }
167 
168       @Override
169       public boolean advance() {
170         if (this.currentIterator == null) {
171           if (!this.entries.hasNext()) return false;
172           this.currentIterator = this.entries.next().iterator();
173         }
174         if (this.currentIterator.hasNext()) {
175           this.currentCell = this.currentIterator.next();
176           return true;
177         }
178         this.currentCell = null;
179         this.currentIterator = null;
180         return advance();
181       }
182 
183       @Override
184       public long heapSize() {
185         return size;
186       }
187     };
188   }
189 }