Table of Contents
hbase.regionserver.handler.count
hfile.block.cache.size
hbase.regionserver.global.memstore.upperLimit
hbase.regionserver.global.memstore.lowerLimit
hbase.hstore.blockingStoreFiles
hbase.hregion.memstore.block.multiplier
hbase.regionserver.checksum.verify
Perhaps the most important factor in avoiding network issues degrading Hadoop and HBbase performance is the switching hardware that is used, decisions made early in the scope of the project can cause major problems when you double or triple the size of your cluster (or more).
Important items to consider:
The single most important factor in this configuration is that the switching capacity of the hardware is capable of handling the traffic which can be generated by all systems connected to the switch. Some lower priced commodity hardware can have a slower switching capacity than could be utilized by a full switch.
Multiple switches are a potential pitfall in the architecture. The most common configuration of lower priced hardware is a simple 1Gbps uplink from one switch to another. This often overlooked pinch point can easily become a bottleneck for cluster communication. Especially with MapReduce jobs that are both reading and writing a lot of data the communication across this uplink could be saturated.
Mitigation of this issue is fairly simple and can be accomplished in multiple ways:
Multiple rack configurations carry the same potential issues as multiple switches, and can suffer performance degradation from two main areas:
If the the switches in your rack have appropriate switching capacity to handle all the hosts at full speed, the next most likely issue will be caused by homing more of your cluster across racks. The easiest way to avoid issues when spanning multiple racks is to use port trunking to create a bonded uplink to other racks. The downside of this method however, is in the overhead of ports that could potentially be used. An example of this is, creating an 8Gbps port channel from rack A to rack B, using 8 of your 24 ports to communicate between racks gives you a poor ROI, using too few however can mean you're not getting the most out of your cluster.
Using 10Gbe links between racks will greatly increase performance, and assuming your switches support a 10Gbe uplink or allow for an expansion card will allow you to save your ports for machines as opposed to uplinks.
Are all the network interfaces functioning correctly? Are you sure? See the Troubleshooting Case Study in ???.
In his presentation, Avoiding
Full GCs with MemStore-Local Allocation Buffers, Todd Lipcon
describes two cases of stop-the-world garbage collections common in
HBase, especially during loading; CMS failure modes and old generation
heap fragmentation brought. To address the first, start the CMS
earlier than default by adding
-XX:CMSInitiatingOccupancyFraction
and setting it down
from defaults. Start at 60 or 70 percent (The lower you bring down the
threshold, the more GCing is done, the more CPU used). To address the
second fragmentation issue, Todd added an experimental facility,
, that
must be explicitly enabled in Apache HBase 0.90.x (Its defaulted to be on in
Apache 0.92.x HBase). See hbase.hregion.memstore.mslab.enabled
to true in your Configuration
. See the cited
slides for background and detail[1].
Be aware that when enabled, each MemStore instance will occupy at least
an MSLAB instance of memory. If you have thousands of regions or lots
of regions each with many column families, this allocation of MSLAB
may be responsible for a good portion of your heap allocation and in
an extreme case cause you to OOME. Disable MSLAB in this case, or
lower the amount of memory it uses or float less regions per server.
For more information about GC logs, see ???.
See ???.
The number of regions for an HBase table is driven by the ???. Also, see the architecture section on ???
For larger systems, managing compactions and splits may be something you want to consider.
See ???.
See ???. A memory setting for the RegionServer process.
See ???. This memory setting is often adjusted for the RegionServer process depending on needs.
See ???. This memory setting is often adjusted for the RegionServer process depending on needs.
See ???. If there is blocking in the RegionServer logs, increasing this can help.
See ???. If there is enough RAM, increasing this can help.
Have HBase write the checksum into the datablock and save having to do the checksum seek whenever you read. See the release note on HBASE-5074 support checksums in HBase block cache.
See ??? for information on configuring ZooKeeper, and see the part about having a dedicated disk.
See ???.
See ???. See also Section 1.6.7.1, “However...” for compression caveats.
The regionsize can be set on a per-table basis via setFileSize
on
HTableDescriptor in the
event where certain tables require different regionsizes than the configured default regionsize.
See Section 1.4.1, “Number of Regions” for more information.
Bloom Filters can be enabled per-ColumnFamily.
Use HColumnDescriptor.setBloomFilterType(NONE | ROW |
ROWCOL)
to enable blooms per Column Family. Default =
NONE
for no bloom filters. If
ROW
, the hash of the row will be added to the bloom
on each insert. If ROWCOL
, the hash of the row +
column family + column family qualifier will be added to the bloom on
each key insert.
See HColumnDescriptor and Section 1.8.9, “Bloom Filters” for more information or this answer up in quora, How are bloom filters used in HBase?.
The blocksize can be configured for each ColumnFamily in a table, and this defaults to 64k. Larger cell values require larger blocksizes. There is an inverse relationship between blocksize and the resulting StoreFile indexes (i.e., if the blocksize is doubled then the resulting indexes should be roughly halved).
See HColumnDescriptor and ???for more information.
ColumnFamilies can optionally be defined as in-memory. Data is still persisted to disk, just like any other ColumnFamily. In-memory blocks have the highest priority in the ???, but it is not a guarantee that the entire table will be in memory.
See HColumnDescriptor for more information.
Production systems should use compression with their ColumnFamily definitions. See ??? for more information.
Compression deflates data on disk. When it's in-memory (e.g., in the MemStore) or on the wire (e.g., transferring between RegionServer and Client) it's inflated. So while using ColumnFamily compression is a best practice, but it's not going to completely eliminate the impact of over-sized Keys, over-sized ColumnFamily names, or over-sized Column names.
See ??? on for schema design tips, and ??? for more information on HBase stores data internally.
Use the bulk load tool if you can. See ???. Otherwise, pay attention to the below.
Tables in HBase are initially created with one region by default. For bulk imports, this means that all clients will write to the same region until it is large enough to split and become distributed across the cluster. A useful pattern to speed up the bulk import process is to pre-create empty regions. Be somewhat conservative in this, because too-many regions can actually degrade performance.
There are two different approaches to pre-creating splits. The first approach is to rely on the default HBaseAdmin
strategy
(which is implemented in Bytes.split
)...
byte[] startKey = ...; // your lowest keuy byte[] endKey = ...; // your highest key int numberOfRegions = ...; // # of regions to create admin.createTable(table, startKey, endKey, numberOfRegions);
And the other approach is to define the splits yourself...
byte[][] splits = ...; // create your own splits admin.createTable(table, splits);
See ??? for issues related to understanding your keyspace and pre-creating regions.
The default behavior for Puts using the Write Ahead Log (WAL) is that HLog
edits will be written immediately. If deferred log flush is used,
WAL edits are kept in memory until the flush period. The benefit is aggregated and asynchronous HLog
- writes, but the potential downside is that if
the RegionServer goes down the yet-to-be-flushed edits are lost. This is safer, however, than not using WAL at all with Puts.
Deferred log flush can be configured on tables via HTableDescriptor. The default value of hbase.regionserver.optionallogflushinterval
is 1000ms.
When performing a lot of Puts, make sure that setAutoFlush is set
to false on your HTable
instance. Otherwise, the Puts will be sent one at a time to the
RegionServer. Puts added via htable.add(Put)
and htable.add( <List> Put)
wind up in the same write buffer. If autoFlush = false
,
these messages are not sent until the write-buffer is filled. To
explicitly flush the messages, call flushCommits
.
Calling close
on the HTable
instance will invoke flushCommits
.
A frequently discussed option for increasing throughput on Put
s is to call writeToWAL(false)
. Turning this off means
that the RegionServer will not write the Put
to the Write Ahead Log,
only into the memstore, HOWEVER the consequence is that if there
is a RegionServer failure there will be data loss.
If writeToWAL(false)
is used, do so with extreme caution. You may find in actuality that
it makes little difference if your load is well distributed across the cluster.
In general, it is best to use WAL for Puts, and where loading throughput is a concern to use bulk loading techniques instead.
In addition to using the writeBuffer, grouping Put
s by RegionServer can reduce the number of client RPC calls per writeBuffer flush.
There is a utility HTableUtil
currently on TRUNK that does this, but you can either copy that or implement your own verison for
those still on 0.90.x or earlier.
When writing a lot of data to an HBase table from a MR job (e.g., with TableOutputFormat), and specifically where Puts are being emitted from the Mapper, skip the Reducer step. When a Reducer step is used, all of the output (Puts) from the Mapper will get spooled to disk, then sorted/shuffled to other Reducers that will most likely be off-node. It's far more efficient to just write directly to HBase.
For summary jobs where HBase is used as a source and a sink, then writes will be coming from the Reducer step (e.g., summarize values then write out result). This is a different processing problem than from the the above case.
If all your data is being written to one region at a time, then re-read the section on processing timeseries data.
Also, if you are pre-splitting regions and all your data is still winding up in a single region even though your keys aren't monotonically increasing, confirm that your keyspace actually works with the split strategy. There are a variety of reasons that regions may appear "well split" but won't work with your data. As the HBase client communicates directly with the RegionServers, this can be obtained via HTable.getRegionLocation.
See Section 1.7.2, “ Table Creation: Pre-Creating Regions ”, as well as Section 1.4, “HBase Configurations”
If HBase is used as an input source for a MapReduce job, for
example, make sure that the input Scan
instance to the MapReduce job has setCaching
set to something greater
than the default (which is 1). Using the default value means that the
map-task will make call back to the region-server for every record
processed. Setting this value to 500, for example, will transfer 500
rows at a time to the client to be processed. There is a cost/benefit to
have the cache value be large because it costs more in memory for both
client and RegionServer, so bigger isn't always better.
Scan settings in MapReduce jobs deserve special attention. Timeouts can result (e.g., UnknownScannerException) in Map tasks if it takes longer to process a batch of records before the client goes back to the RegionServer for the next set of data. This problem can occur because there is non-trivial processing occuring per row. If you process rows quickly, set caching higher. If you process rows more slowly (e.g., lots of transformations per row, writes), then set caching lower.
Timeouts can also happen in a non-MapReduce use case (i.e., single threaded HBase client doing a Scan), but the processing that is often performed in MapReduce jobs tends to exacerbate this issue.
Whenever a Scan is used to process large numbers of rows (and especially when used
as a MapReduce source), be aware of which attributes are selected. If scan.addFamily
is called
then all of the attributes in the specified ColumnFamily will be returned to the client.
If only a small number of the available attributes are to be processed, then only those attributes should be specified
in the input scan because attribute over-selection is a non-trivial performance penalty over large datasets.
When columns are selected explicitly with scan.addColumn
, HBase will schedule seek operations to seek between the
selected columns. When rows have few columns and each column has only a few versions this can be inefficient. A seek operation is generally
slower if does not seek at least past 5-10 columns/versions or 512-1024 bytes.
In order to opportunistically look ahead a few columns/versions to see if the next column/version can be found that
way before a seek operation is scheduled, a new attribute Scan.HINT_LOOKAHEAD
can be set the on Scan object. The following code instructs the
RegionServer to attempt two iterations of next before a seek is scheduled:
Scan scan = new Scan(); scan.addColumn(...); scan.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2)); table.getScanner(scan);
For MapReduce jobs that use HBase tables as a source, if there a pattern where the "slow" map tasks seem to have the same Input Split (i.e., the RegionServer serving the data), see the Troubleshooting Case Study in ???.
This isn't so much about improving performance but rather avoiding performance problems. If you forget to close ResultScanners you can cause problems on the RegionServers. Always have ResultScanner processing enclosed in try/catch blocks...
Scan scan = new Scan(); // set attrs... ResultScanner rs = htable.getScanner(scan); try { for (Result r = rs.next(); r != null; r = rs.next()) { // process result... } finally { rs.close(); // always close the ResultScanner! } htable.close();
Scan
instances can be set to use the block cache in the RegionServer via the
setCacheBlocks
method. For input Scans to MapReduce jobs, this should be
false
. For frequently accessed rows, it is advisable to use the block
cache.
When performing a table scan
where only the row keys are needed (no families, qualifiers, values or timestamps), add a FilterList with a
MUST_PASS_ALL
operator to the scanner using setFilter
. The filter list
should include both a FirstKeyOnlyFilter
and a KeyOnlyFilter.
Using this filter combination will result in a worst case scenario of a RegionServer reading a single value from disk
and minimal network traffic to the client for a single row.
When performing a high number of concurrent reads, monitor the data spread of the target tables. If the target table(s) have too few regions then the reads could likely be served from too few nodes.
See Section 1.7.2, “ Table Creation: Pre-Creating Regions ”, as well as Section 1.4, “HBase Configurations”
Enabling Bloom Filters can save your having to go to disk and can help improve read latencys.
Bloom filters were developed over in HBase-1200 Add bloomfilters.[2][3]
See also Section 1.6.4, “Bloom Filters”.
Bloom filters add an entry to the StoreFile
general FileInfo
data structure and then two
extra entries to the StoreFile
metadata
section.
FileInfo
has a
BLOOM_FILTER_TYPE
entry which is set to
NONE
, ROW
or
ROWCOL.
BLOOM_FILTER_META
holds Bloom Size, Hash
Function used, etc. Its small in size and is cached on
StoreFile.Reader
load
BLOOM_FILTER_DATA
is the actual bloomfilter
data. Obtained on-demand. Stored in the LRU cache, if it is enabled
(Its enabled by default).
io.hfile.bloom.enabled
in
Configuration
serves as the kill switch in case
something goes wrong. Default = true
.
io.hfile.bloom.error.rate
= average false
positive rate. Default = 1%. Decrease rate by ½ (e.g. to .5%) == +1
bit per bloom entry.
io.hfile.bloom.max.fold
= guaranteed minimum
fold rate. Most people should leave this alone. Default = 7, or can
collapse to at least 1/128th of original size. See the
Development Process section of the document BloomFilters
in HBase for more on what this option means.
HBase tables are sometimes used as queues. In this case, special care must be taken to regularly perform major compactions on tables used in this manner. As is documented in ???, marking rows as deleted creates additional StoreFiles which then need to be processed on reads. Tombstones only get cleaned up with major compactions.
See also ??? and HBaseAdmin.majorCompact.
Be aware that htable.delete(Delete)
doesn't use the writeBuffer. It will execute an RegionServer RPC with each invocation.
For a large number of deletes, consider htable.delete(List)
.
Because HBase runs on ??? it is important to understand how it works and how it affects HBase.
The original use-case for HDFS was batch processing. As such, there low-latency reads were historically not a priority. With the increased adoption of Apache HBase this is changing, and several improvements are already in development. See the Umbrella Jira Ticket for HDFS Improvements for HBase.
Since Hadoop 1.0.0 (also 0.22.1, 0.23.1, CDH3u3 and HDP 1.0) via HDFS-2246, it is possible for the DFSClient to take a "short circuit" and read directly from disk instead of going through the DataNode when the data is local. What this means for HBase is that the RegionServers can read directly off their machine's disks instead of having to open a socket to talk to the DataNode, the former being generally much faster[4]. Also see HBase, mail # dev - read short circuit thread for more discussion around short circuit reads.
To enable "short circuit" reads, you must set two configurations.
First, the hdfs-site.xml needs to be amended. Set
the property dfs.block.local-path-access.user
to be the only user that can use the shortcut.
This has to be the user that started HBase. Then in hbase-site.xml,
set dfs.client.read.shortcircuit
to be true
For optimal performance when short-circuit reads are enabled, it is recommended that HDFS checksums are disabled.
To maintain data integrity with HDFS checksums disabled, HBase can be configured to write its own checksums into
its datablocks and verify against these. See Section 1.4.9, “hbase.regionserver.checksum.verify
”.
The DataNodes need to be restarted in order to pick up the new configuration. Be aware that if a process started under another username than the one configured here also has the shortcircuit enabled, it will get an Exception regarding an unauthorized access but the data will still be read.
A fairly common question on the dist-list is why HBase isn't as performant as HDFS files in a batch context (e.g., as a MapReduce source or sink). The short answer is that HBase is doing a lot more than HDFS (e.g., reading the KeyValues, returning the most current row or specified timestamps, etc.), and as such HBase is 4-5 times slower than HDFS in this processing context. Not that there isn't room for improvement (and this gap will, over time, be reduced), but HDFS will always be faster in this use-case.
Performance questions are common on Amazon EC2 environments because it is a shared environment. You will not see the same throughput as a dedicated server. In terms of running tests on EC2, run them several times for the same reason (i.e., it's a shared environment and you don't know what else is happening on the server).
If you are running on EC2 and post performance questions on the dist-list, please state this fact up-front that because EC2 issues are practically a separate class of performance issues.
For Performance and Troubleshooting Case Studies, see ???.
[1] The latest jvms do better regards fragmentation so make sure you are running a recent release. Read down in the message, Identifying concurrent mode failures caused by fragmentation.
[2] For description of the development process -- why static blooms rather than dynamic -- and for an overview of the unique properties that pertain to blooms in HBase, as well as possible future directions, see the Development Process section of the document BloomFilters in HBase attached to HBase-1200.
[3] The bloom filters described here are actually version two of blooms in HBase. In versions up to 0.19.x, HBase had a dynamic bloom option based on work done by the European Commission One-Lab Project 034819. The core of the HBase bloom work was later pulled up into Hadoop to implement org.apache.hadoop.io.BloomMapFile. Version 1 of HBase blooms never worked that well. Version 2 is a rewrite from scratch though again it starts with the one-lab work.
[4] See JD's Performance Talk