HBase and MapReduce

    Apache MapReduce 是 提供的软件框架,用来进行大规模数据分析.MapReduce 已超出本文档范围,可通过如下文档学习https://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html. MapReduce version 2 (MR2)目前是的一部分.

    本章将讨论在 HBase 中使用 MapReduce 处理数据时需要进行的一些特定配置步骤;另外,还将讨论 HBase 与 MapReduce jobs 之间的交互以及存在的一些问题;最后将介绍 MapReduce 的替代 API:.

    and mapreduce

    与 MapReduce 一样,在 HBase 中也有 2 种 mapreduce API 包.org.apache.hadoop.hbase.mapred and org.apache.hadoop.hbase.mapreduce.前者使用旧式风格的 API,后者采用新的模式.相比于前者,后者更加灵活,你可以在旧式 API 中找到等价的.选择 API 时,请使用 MapReduce 部署时选择的包.如果不知道如何选择或者想从头再来,那就使用 org.apache.hadoop.hbase.mapreduce.在接下来的文章中,将使用 o.a.h.h.mapreduce 如果你使用的是 o.a.h.h.mapred 就自行替换.

    默认情况下,部署在 MapReduce 集群中的 MapReduce jobs 没有权限访问$HBASE_CONF_DIR路径下的 HBase 配置 或者 HBase classes.

    通过以下方式可以为 MapReduce jobs 配置权限.

    或者

    编辑 $HADOOP_HOME/conf/hadoop-env.sh 将 HBase 依赖添加到 HADOOP_CLASSPATH.

    以上配置均不推荐,因为它会让 Hadoop 安装 HBase 的依赖,并且需要重启 Hadoop 集群才能使用 HBase 中的数据.

    推荐的方式是 HBase 使用HADOOP_CLASSPATH or -libjars添加其依赖的 jar 包.

    从 HBase 0.90.x,HBase 添加依赖 jar 包到任务自身配置中. 依赖项只需要在本地CLASSPATH可用,然后被打包部署到 MapReduce 集群的 fat job jar 中.一种取巧的方式是传递全量的 HBase classpath(即 hbase,独立的 jars 还有配置)到 mapreduce job 运行器中令 hbase 工具从全量的 classpath 挑选依赖最终配置到 MapReduce job 的配置中(可以查看源码实现TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)).

    下面的例子是在表usertable上运行的 HBase 的 MapReduce 任务: 表行数统计任务RowCounter.设置在 MapReduce 上下文运行需要的 hbase jars 以及配置文件如 hbase-site.xml 到 HADOOP_CLASSPATH. 一定要确保使用了与你的系统相对应的 HBase Jar.替换以下命令中的 VERSION 字段为本地 HBASE 版本. 反引号(`)使 shell 执行子命令,将hbase classpath的输出设置为HADOOP_CLASSPATH. 这个例子需要在 Bash-compatible 执行.

    以上命令将启动一个运行在本地配置指定的 hbase 集群的 mapreduce 作业,用来统计表行数.这个集群也是 Hadoop 配置指定的.

    hbase-mapreduce.jar 核心是一个驱动,罗列了 HBASE 装载的一些基础的 MapReduce 任务.例如,假设你安装的是2.0.0-SNAPSHOT版本:

    1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
    2. ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar
    3. An example program must be given as the first argument.
    4. Valid program names are:
    5. CellCounter: Count cells in HBase table.
    6. WALPlayer: Replay WAL files.
    7. completebulkload: Complete a bulk data load.
    8. copytable: Export a table from local cluster to peer cluster.
    9. export: Write table data to HDFS.
    10. exportsnapshot: Export the specific snapshot to a given FileSystem.
    11. import: Import data written by Export.
    12. importtsv: Import data in TSV format.
    13. rowcounter: Count rows in HBase table.
    14. verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

    您可以使用上面列出的 MapReduce 任务的简写采用以下命令重新执行表行数统计任务(同样,假设安装的 HBASE 是2.0.0-SNAPSHOT版本):

    1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
    2. ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar \
    3. rowcounter usertable

    您可能发现了hbase mapredcp工具的输出; 它列出了在 hbase 运行基础 mapreduce 作业所需的最小 jar 文件集合(不包括配置,如果希望 MapReduce 作业能准确找到目标集群,则可能需要添加些配置)。 一旦你开始做任何实质性的事情,你还需要添加额外依赖,这些依赖需在运行hbase mapredcp时通过传递系统属性-Dtmpjars来指定。

    对于那些没有打包依赖的 jobs 或者直接调用TableMapReduceUtil#addDependencyJars,则下面的命令格式就非常必要了:

    1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...

    如果您是在 HBase 的构建地址而不是安装地址执行以上示例,您会遇到如下错误:

    1. java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper

    如果出现了以上问题,请参照以下命令修改,它从构建环境的 target/ 目录下使用 HBASE jars

    1. $ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar rowcounter usertable

    为了满足新类加载器需要,hbase-protocol.jar必须包含在 Hadoop 的 环境变量下.可通过查阅解决 一些 classpath 错误的推荐解决方法.
    The following is included for historical purposes.
    >

    在 Hadoop 的 lib 目录里通过系统连接或者直接拷贝方式引入hbase-protocol.jar,可以系统范围内解决 classpath 问题.

    这也可以在每个作业启动的基础上实现,方法是在作业提交时将其(hbase-protocol.jar)包含在HADOOP_CLASSPATH环境变量中。启动时打包其依赖项,以下所有三个作业启动命令都满足此要求

    1. $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
    2. $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
    3. $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

    下面的命令对于那些不打包自己依赖的 Jar 文件很有必要:

    可以查阅 HBASE-10304进行更深入的讨论.

    49. MapReduce Scan Caching

    现在 TableMapReduceUtil 恢复在传入的 Scan 对象上设置扫描器缓存(在将结果返回到客户端之前缓存的行数)的选项。由于 HBase 0.95 中的错误([HBASE-11558]),此功能丢失 ( HBase 0.98.5 和 0.96.3。 选择扫描程序缓存的优先顺序如下:

    1.在扫描对象上设置的缓存设置。

    2.通过配置选项hbase.client.scanner.caching指定的缓存设置,可以在 hbase-site.xml 中手动设置,也可以通过辅助方法TableMapReduceUtil.setScannerCaching()设置。

    3.默认值HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING,设置为“100”。

    优化缓存设置是平衡客户端等待结果的时间与客户端需要接收的结果集数量。 如果缓存设置太大,客户端可能会等待很长时间,甚至可能超时。 如果设置太小,则需要将结果分多个部分返回。 如果您将扫描看作是铲子,那么更大的缓存设置相当于更大的铲子,而更小的缓存设置等价于为了填满桶而进行更多的铲动。

    上面提到的优先级列表允许您设置一个合理的默认值,也可以根据需要重写。

    有关Scan的更多细节,请参阅 API 文档。

    50.捆绑的 HBase MapReduce 作业

    HBase JAR 还可用作某些捆绑 MapReduce 作业的驱动程序。要了解捆绑的 MapReduce 作业,请运行以下命令。

    1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
    2. An example program must be given as the first argument.
    3. Valid program names are:
    4. copytable: Export a table from local cluster to peer cluster
    5. completebulkload: Complete a bulk data load.
    6. export: Write table data to HDFS.
    7. import: Import data written by Export.
    8. importtsv: Import data in TSV format.
    9. rowcounter: Count rows in HBase table

    每个有效的程序名称都捆绑了 MapReduce 作业。 要运行其中一个作业,请根据以下示例构建命令.

    1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable

    HBase 可以被用作 MapReduce Job 的数据源 , 和数据接收器TableOutputFormat or .编写对 HBase 读写的 MapReduce jbos 时,建议使用子类 TableMapper 或者 . 有关基本用法请参阅不做任何处理的传递类 IdentityTableMapper 和 . 对于更复杂的例子, 请参阅 RowCounter 或者查看 org.apache.hadoop.hbase.mapreduce.TestTableMapReduce 的单元测试.

    如果运行使用 HBase 作为源或接收器的 MapReduce job,则需要在配置中指定源和接收的表和列名称。
    当您从 HBase 读取时,TableInputFormat从 HBase 请求分区列表并生成一个映射,该映射可以是“map-per-region”或“mapreduce.job.maps”映射,以较小者为准。如果您的 job 只有两个 maps,请将mapreduce.job.maps提升到大于分区数的数字。如果您每个节点正在运行 TaskTracer / NodeManager 和 RegionServer,则 Maps 将在相邻的 TaskTracker / NodeManager 上运行。写入 HBase 时,避免 Reduce 步骤并从 Map 中写回 HBase 可能是有意义的。当您的作业不需要 MapReduce 对 map 发出的数据进行排序和整理时,此方法有效。
    在插入时,对于 HBase’排序’,除非您需要,否则没有必要对您的 MapReduce 集群进行双重排序(以及对数据进行移动)。如果您不需要 Reduce,您的 map 可能会在作业结束时输出为了报告而处理的记录数,或者将 Reduced 数设置为零并使用 TableOutputFormat。如果在您的情况下运行 Reduce 步骤是有意义的,您通常应该使用多个 reducer,以便负载分布在 HBase 集群中。

    新的 HBase 分区程序[HRegionPartitioner]( 是合适的,并且您的上传在完成后不会大大改变现有区域的数量。否则请使用默认分区程序。

    52. 批量导入时直接写 HFiles

    如果要导入新表,则可以绕过 HBase API 并将内容直接写入文件系统,格式化为 HBase 数据文件(HFiles)。您的导入将运行得更快,可能会快一个数量级。有关此机制如何工作的更多信息,请参阅Bulk Loading

    53. 行数统计的例子

    包含 的 MapReduce 作业使用 TableInputFormat 并且 统计了指定表格的行数.请使用以下命令运行:

    1. $ ./bin/hadoop jar hbase-X.X.X.jar

    这将调用 HBase MapReduce 驱动程序类。请从提供的 jobs 中选择rowcounter。这将打印 rowcounter 使用建议到标准输出。指定表名,要计数的列和输出目录。如果您有类路径错误,请参阅 HBase, MapReduce, and the CLASSPATH.

    54.2. 自定义拆分器

    如果对自定义拆分器感兴趣,请参阅中的getSplits方法,它是 map 任务拆分逻辑所在.

    55. HBase MapReduce 示例

    55.1. HBase MapReduce 读示例

    以下是以只读方式使用 HBase 作为 MapReduce 源的示例。 具体来说,有一个 Mapper 实例但没有 Reducer,并且 Mapper 没有发出任何内容。 Job 将定义如下……

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config, "ExampleRead");
    3. job.setJarByClass(MyReadJob.class); // class that contains mapper
    4. Scan scan = new Scan();
    5. scan.setCacheBlocks(false); // don't set to true for MR jobs
    6. // set other scan attrs
    7. ...
    8. tableName, // input HBase table name
    9. scan, // Scan instance to control CF and attribute selection
    10. MyMapper.class, // mapper
    11. null, // mapper output key
    12. null, // mapper output value
    13. job);
    14. job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
    15. boolean b = job.waitForCompletion(true);
    16. if (!b) {
    17. throw new IOException("error with job!");
    18. }

    …​并且 mapper 实例将继承 TableMapper…​

    1. public static class MyMapper extends TableMapper<Text, Text> {
    2. public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
    3. // process data for the row from the Result instance.
    4. }
    5. }

    以下 HBase 既作为源也作为 MapReduce 的接收器的示例。 此示例将简单地将数据从一个表复制到另一个表。

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config,"ExampleReadWrite");
    3. job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
    4. Scan scan = new Scan();
    5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
    6. scan.setCacheBlocks(false); // don't set to true for MR jobs
    7. // set other scan attrs
    8. TableMapReduceUtil.initTableMapperJob(
    9. sourceTable, // input table
    10. scan, // Scan instance to control CF and attribute selection
    11. MyMapper.class, // mapper class
    12. null, // mapper output key
    13. null, // mapper output value
    14. job);
    15. TableMapReduceUtil.initTableReducerJob(
    16. targetTable, // output table
    17. null, // reducer class
    18. job);
    19. job.setNumReduceTasks(0);
    20. boolean b = job.waitForCompletion(true);
    21. if (!b) {
    22. throw new IOException("error with job!");
    23. }

    很有必要解释一下TableMapReduceUtil的作用是什么,尤其是 reducer. 被用作 outputFormat class ,一些参数已经进行了配置,例如TableOutputFormat.OUTPUT_TABLE,同时设置了 reducer 的 output key 为TableOutputFormat.OUTPUT_TABLE 并且 value 为Writable. 这些配置项可以由开发工程师在 job 和配置文件中进行设置,TableMapReduceUtil试图将这些工作进行简化.

    下面是一个 mapper 的例子,它将创建一个”Put” ,匹配输入的”Result “并输出.而这些工作正是 CopyTable 工具的作用.

    1. public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
    2. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    3. // this example is just copying the data from the source table...
    4. context.write(row, resultToPut(row,value));
    5. }
    6. private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
    7. Put put = new Put(key.get());
    8. for (KeyValue kv : result.raw()) {
    9. put.add(kv);
    10. }
    11. return put;
    12. }
    13. }

    这实际上并不是一个 reducer 过程, 所以由TableOutputFormat 负责将’Put’发送到目标表.
    这只是一个例子,开发人员可以选择不使用TableOutputFormat并自行链接到目标表.

    55.3. HBase MapReduce 多表输出的读写示例

    TODO: MultiTableOutputFormat 样例.

    55.4. HBase MapReduce 汇总到 HBase 示例

    以下示例使用 HBase 作为 MapReduce 源并接收汇总信息。此示例将计算表中某个 value 的不同实例的数量,并将这些汇总计数写入另一个表中。

    在示例中的 mapper 在一个 String 类型的 value 上进行汇总操作,并将 value作为 mapper 输出的 key,IntWritable表示实例计数器。

    1. public static class MyMapper extends TableMapper<Text, IntWritable> {
    2. public static final byte[] CF = "cf".getBytes();
    3. public static final byte[] ATTR1 = "attr1".getBytes();
    4. private final IntWritable ONE = new IntWritable(1);
    5. private Text text = new Text();
    6. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    7. String val = new String(value.getValue(CF, ATTR1));
    8. text.set(val); // we can only emit Writables...
    9. context.write(text, ONE);
    10. }
    11. }

    在 reducer 中,计算“ones”(就像执行此操作的任何其他 MR 示例一样),然后发出“Put”。

    1. public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    2. public static final byte[] CF = "cf".getBytes();
    3. public static final byte[] COUNT = "count".getBytes();
    4. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    5. int i = 0;
    6. for (IntWritable val : values) {
    7. i += val.get();
    8. }
    9. Put put = new Put(Bytes.toBytes(key.toString()));
    10. put.add(CF, COUNT, Bytes.toBytes(i));
    11. context.write(null, put);
    12. }

    这与上面的汇总示例很相似,不同之处在于该汇总使用 HBase 作为 MapReduce 的数据源而使用 HDFS 作为接收器.这样的不同体现在 job 启动和 reduce 过程,而 mapper 过程没有区别.

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config,"ExampleSummaryToFile");
    3. job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
    4. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
    5. scan.setCacheBlocks(false); // don't set to true for MR jobs
    6. // set other scan attrs
    7. TableMapReduceUtil.initTableMapperJob(
    8. sourceTable, // input table
    9. scan, // Scan instance to control CF and attribute selection
    10. MyMapper.class, // mapper class
    11. Text.class, // mapper output key
    12. IntWritable.class, // mapper output value
    13. job);
    14. job.setReducerClass(MyReducer.class); // reducer class
    15. job.setNumReduceTasks(1); // at least one, adjust as required
    16. FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
    17. boolean b = job.waitForCompletion(true);
    18. if (!b) {
    19. throw new IOException("error with job!");
    20. }

    如上所述,本示例的中的 mappper 与上例无异,至于 Reducer,则采用一个’通用’的而不是继承自 TableMapper 并且发出 Puts.

    1. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    2. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    3. int i = 0;
    4. for (IntWritable val : values) {
    5. i += val.get();
    6. }
    7. context.write(key, new IntWritable(i));
    8. }
    9. }

    55.6 不使用 Reducer ,HBase MapReduce 汇总到 HBase

    如果使用 HBase 作为 Reducer,也可以在没有 Reducer 的情况下进行汇总.
    汇总任务要求 HBase 目标表存在.表方法incrementColumnValue将被用作值的原子增长.从性能角度看,为每个 map-task 中那些会值增长的值保留一个 Map,并且在 mapper 执行cleanup 方法时每个 key 更新一次,这可能是有意义的.但是,您的里程可能会根据要处理的行数和惟一键的不同而有所不同。

    最后,汇总结果在 HBase 中.

    55.7. HBase MapReduce 汇总到 RDBMS

    有时,为 RDBMS 生成摘要更合适。对于这些情况,可以通过自定义 reducer 直接生成 RDBMS 的摘要。 setup方法可以连接到 RDBMS(连接信息可以通过上下文中的自定义参数传递),清理方法可以关闭连接。

    一个 job 的 reducer 数量对汇总实现至关重要,您将必须将其设计到 reducer 中.具体来说,不管被设计成一个 reducer 还是多个 reducer,这没有对错之分,完全依赖于您的用例.指定给 job 的 reducer 越多,与 RDMS 建立的实时链接越多,这可以在一定程度上提高吞吐量.

    1. public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    2. private Connection c = null;
    3. public void setup(Context context) {
    4. // create DB connection...
    5. }
    6. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    7. // do summarization
    8. // in this example the keys are Text, but this is just an example
    9. }
    10. public void cleanup(Context context) {
    11. // close db connection
    12. }
    13. }

    最终,汇总结果写入到 RDMS 表中.

    56. 在 MapReduce 任务中访问其他 HBase 表

    尽管目前框架允许一个 HBase 表作为 MapReduce 作业的输入,但其他 HBase 表只可以通过作为查找表(lookup tables)才能访问,例如在 MapReduce 作业中通过 mapper 的 setup 方法创建 Table 实例.

    1. public class MyMapper extends TableMapper<Text, LongWritable> {
    2. private Table myOtherTable;
    3. public void setup(Context context) {
    4. // In here create a Connection to the cluster and save it or use the Connection
    5. // from the existing table
    6. myOtherTable = connection.getTable("myOtherTable");
    7. }
    8. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    9. // process Result...
    10. // use 'myOtherTable' for lookups
    11. }

    通常建议关闭以 HBASE 为数据源的 MapReduce 作业的推测执行(speculative execution).关闭推测执行可以通过设置单个任务的属性,也可以设置整个集群.特对是对于执行时间较长的任务,推测执行(speculative execution)为其创建一个重复任务,将进行双重数据写入,这可能不是你想要的.
    查阅 spec.ex 获取更多信息.

    58. 级联(Cascading)

    [[Cascading]( MapReduce 的替代 API,本质上使用 MapReduce,但允许您以简化的方式编写 MapReduce 代码。

    1. // read data from the default filesystem
    2. // emits two fields: "offset" and "line"
    3. Tap source = new Hfs( new TextLine(), inputFileLhs );
    4. // store data in an HBase cluster
    5. // accepts fields "num", "lower", and "upper"
    6. // will automatically scope incoming fields to their proper familyname, "left" or "right"
    7. Fields keyFields = new Fields( "num" );
    8. String[] familyNames = {"left", "right"};
    9. Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" ) };
    10. Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
    11. // a simple pipe assembly to parse the input into fields
    12. // a real app would likely chain multiple Pipes together for more complex processing
    13. Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), " " ) );
    14. // "plan" a cluster executable Flow
    15. // this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
    16. Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );
    17. // start the flow, and block until complete
    18. parseFlow.complete();
    19. // open an iterator on the HBase table we stuffed data into
    20. TupleEntryIterator iterator = parseFlow.openSink();
    21. while(iterator.hasNext())
    22. {
    23. // print out each tuple from HBase
    24. System.out.println( "iterator.next() = " + iterator.next() );
    25. }
    26. iterator.close();