第10章 HBase:Hadoop数据库
10.7 HBase 批量导入
10.7.1 批量导入数据的方法
向HBase表中导入一条数据可以使用HBase Shell的put命令或HBase API的Put类,但是面对海量数据如何批量导入呢? 一般有三种方法:ImportTsv工具、编写MapReduce程序和Hive外表。本节重点介绍ImportTsv工具和编写MapReduce程序两种方法,Hive外表将在第11章介绍。
10.7.2 ImportTsv
(1)介绍
ImportTsv是HBase官方提供了基于mapreduce进行批量数据导入的工具,ImportTsv可以将HDFS上的TSV格式(默认制表符分隔\t,或者自定义分隔符的其他格式数据文件也可,逗号‘,’分隔的txt亲测可以)的数据文件通过命令简单方便地导入到HBase中,对于大数据量的导入非常实用。
(2)准备数据
生成数据
[root@node1 ~]# cd data
[root@node1 data]# ls
books.txt cite75_99.txt ncdc.txt rs.txt temperature.txt word1.txt
[root@node1 data]# vi gen.sh
[root@node1 data]# cat gen.sh
#!/bin/sh
for i in {1..100000};do
echo -e $i'\t'$RANDOM'\t'$RANDOM'\t'$RANDOM
done;
[root@node1 data]# sh gen.sh > mydata.txt
[root@node1 data]# tail -5 mydata.txt
99996 6512 21537 7475
99997 9544 22444 1030
99998 18157 11864 16616
99999 28231 10187 21657
100000 21188 27568 14994
[root@node1 data]#
将生成的数据上传到HDFS
[root@node1 data]# hdfs dfs -mkdir -p input
[root@node1 data]# hdfs dfs -put mydata.txt input
[root@node1 data]# hdfs dfs -ls input
Found 1 items
-rw-r--r-- 3 root supergroup 2287354 2017-08-05 05:30 input/mydata.txt
[root@node1 data]#
(3)创建表
[root@node1 data]# hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.6, rUnknown, Mon May 29 02:25:32 CDT 2017
hbase(main):001:0> create 'mydata','info'
0 row(s) in 1.8170 seconds
=> Hbase::Table - mydata
hbase(main):002:0> quit
[root@node1 data]#
(4)上传数据
执行导入命令:
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,info:data1,info:data2,info:data3 mydata input/mydata.txt
[root@node1 ~]# hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
> -Dimporttsv.columns=HBASE_ROW_KEY,info:data1,info:data2,info:data3 mydata input/mydata.txt
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2017-08-05 05:37:23,936 INFO [main] zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x292b08d6 connecting to ZooKeeper ensemble=node1:2181,node2:2181,node3:2181
2017-08-05 05:37:23,970 INFO [main] zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-08-05 05:37:23,970 INFO [main] zookeeper.ZooKeeper: Client environment:host.name=node1
2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.version=1.8.0_112
2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.home=/opt/jdk1.8.0_112/jre
2017-08-05 05:37:23,971 INFO [main] zookeeper.ZooKeeper: Client environment:java.class.path=/opt/hbase-1.2.6/conf:/opt/jdk1.8.0_112/lib/tools.jar:/opt/hbase-1.2.6:/opt/hbase-
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:java.library.path=/opt/hadoop-2.7.3/lib/native
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:os.name=Linux
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:os.arch=amd64
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:os.version=3.10.0-514.el7.x86_64
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:user.name=root
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:user.home=/root
2017-08-05 05:37:23,979 INFO [main] zookeeper.ZooKeeper: Client environment:user.dir=/root
2017-08-05 05:37:23,981 INFO [main] zookeeper.ZooKeeper: Initiating client connection, connectString=node1:2181,node2:2181,node3:2181 sessionTimeout=90000 watcher=hconnection-0x292b08d60x0, quorum=node1:2181,node2:2181,node3:2181, baseZNode=/hbase
2017-08-05 05:37:24,030 INFO [main-SendThread(node1:2181)] zookeeper.ClientCnxn: Opening socket connection to server node1/192.168.80.131:2181. Will not attempt to authenticate using SASL (unknown error)
2017-08-05 05:37:24,055 INFO [main-SendThread(node1:2181)] zookeeper.ClientCnxn: Socket connection established to node1/192.168.80.131:2181, initiating session
2017-08-05 05:37:24,103 INFO [main-SendThread(node1:2181)] zookeeper.ClientCnxn: Session establishment complete on server node1/192.168.80.131:2181, sessionid = 0x15dafce38380006, negotiated timeout = 40000
2017-08-05 05:37:26,136 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2017-08-05 05:37:26,351 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x15dafce38380006
2017-08-05 05:37:26,372 INFO [main] zookeeper.ZooKeeper: Session: 0x15dafce38380006 closed
2017-08-05 05:37:26,372 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
2017-08-05 05:37:26,977 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2017-08-05 05:37:31,821 INFO [main] input.FileInputFormat: Total input paths to process : 1
2017-08-05 05:37:32,132 INFO [main] mapreduce.JobSubmitter: number of splits:1
2017-08-05 05:37:32,460 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2017-08-05 05:37:33,154 INFO [main] mapreduce.JobSubmitter: Submitting tokens for job: job_1501893260326_0001
2017-08-05 05:37:34,742 INFO [main] impl.YarnClientImpl: Submitted application application_1501893260326_0001
2017-08-05 05:37:34,945 INFO [main] mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1501893260326_0001/
2017-08-05 05:37:34,946 INFO [main] mapreduce.Job: Running job: job_1501893260326_0001
2017-08-05 05:37:56,389 INFO [main] mapreduce.Job: Job job_1501893260326_0001 running in uber mode : false
2017-08-05 05:37:56,390 INFO [main] mapreduce.Job: map 0% reduce 0%
2017-08-05 05:38:17,452 INFO [main] mapreduce.Job: map 15% reduce 0%
2017-08-05 05:38:20,567 INFO [main] mapreduce.Job: map 46% reduce 0%
2017-08-05 05:38:23,686 INFO [main] mapreduce.Job: map 71% reduce 0%
2017-08-05 05:38:26,787 INFO [main] mapreduce.Job: map 100% reduce 0%
2017-08-05 05:38:27,826 INFO [main] mapreduce.Job: Job job_1501893260326_0001 completed successfully
2017-08-05 05:38:28,034 INFO [main] mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=150773
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2287457
HDFS: Number of bytes written=0
HDFS: Number of read operations=2
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Launched map tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=27098
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=27098
Total vcore-seconds taken by all map tasks=27098
Total megabyte-seconds taken by all map tasks=27748352
Map-Reduce Framework
Map input records=100000
Map output records=100000
Input split bytes=103
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=946
CPU time spent (ms)=12890
Physical memory (bytes) snapshot=128389120
Virtual memory (bytes) snapshot=2108084224
Total committed heap usage (bytes)=30474240
ImportTsv
Bad Lines=0
File Input Format Counters
Bytes Read=2287354
File Output Format Counters
Bytes Written=0
[root@node1 ~]#
(5)查看导入结果
[root@node2 ~]# hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.6, rUnknown, Mon May 29 02:25:32 CDT 2017
hbase(main):001:0> count 'mydata'
Current count: 1000, row: 10897
Current count: 2000, row: 11797
Current count: 3000, row: 12697
Current count: 4000, row: 13597
Current count: 5000, row: 14497
Current count: 6000, row: 15397
Current count: 7000, row: 16297
Current count: 8000, row: 17197
Current count: 9000, row: 18097
Current count: 10000, row: 18998
Current count: 11000, row: 19898
Current count: 12000, row: 20797
Current count: 13000, row: 21697
Current count: 14000, row: 22597
Current count: 15000, row: 23497
Current count: 16000, row: 24397
Current count: 17000, row: 25297
Current count: 18000, row: 26197
Current count: 19000, row: 27097
Current count: 20000, row: 27998
Current count: 21000, row: 28898
Current count: 22000, row: 29798
Current count: 23000, row: 30697
Current count: 24000, row: 31597
Current count: 25000, row: 32497
Current count: 26000, row: 33397
Current count: 27000, row: 34297
Current count: 28000, row: 35197
Current count: 29000, row: 36097
Current count: 30000, row: 36998
Current count: 31000, row: 37898
Current count: 32000, row: 38798
Current count: 33000, row: 39698
Current count: 34000, row: 40597
Current count: 35000, row: 41497
Current count: 36000, row: 42397
Current count: 37000, row: 43297
Current count: 38000, row: 44197
Current count: 39000, row: 45097
Current count: 40000, row: 45998
Current count: 41000, row: 46898
Current count: 42000, row: 47798
Current count: 43000, row: 48698
Current count: 44000, row: 49598
Current count: 45000, row: 50497
Current count: 46000, row: 51397
Current count: 47000, row: 52297
Current count: 48000, row: 53197
Current count: 49000, row: 54097
Current count: 50000, row: 54998
Current count: 51000, row: 55898
Current count: 52000, row: 56798
Current count: 53000, row: 57698
Current count: 54000, row: 58598
Current count: 55000, row: 59498
Current count: 56000, row: 60397
Current count: 57000, row: 61297
Current count: 58000, row: 62197
Current count: 59000, row: 63097
Current count: 60000, row: 63998
Current count: 61000, row: 64898
Current count: 62000, row: 65798
Current count: 63000, row: 66698
Current count: 64000, row: 67598
Current count: 65000, row: 68498
Current count: 66000, row: 69398
Current count: 67000, row: 70297
Current count: 68000, row: 71197
Current count: 69000, row: 72097
Current count: 70000, row: 72998
Current count: 71000, row: 73898
Current count: 72000, row: 74798
Current count: 73000, row: 75698
Current count: 74000, row: 76598
Current count: 75000, row: 77498
Current count: 76000, row: 78398
Current count: 77000, row: 79298
Current count: 78000, row: 80197
Current count: 79000, row: 81097
Current count: 80000, row: 81998
Current count: 81000, row: 82898
Current count: 82000, row: 83798
Current count: 83000, row: 84698
Current count: 84000, row: 85598
Current count: 85000, row: 86498
Current count: 86000, row: 87398
Current count: 87000, row: 88298
Current count: 88000, row: 89198
Current count: 89000, row: 90097
Current count: 90000, row: 90998
Current count: 91000, row: 91898
Current count: 92000, row: 92798
Current count: 93000, row: 93698
Current count: 94000, row: 94598
Current count: 95000, row: 95498
Current count: 96000, row: 96398
Current count: 97000, row: 97298
Current count: 98000, row: 98198
Current count: 99000, row: 99098
Current count: 100000, row: 99999
100000 row(s) in 14.6730 seconds
=> 100000
hbase(main):002:0> get 'mydata','1'
COLUMN CELL
info:data1 timestamp=1501925842893, value=14506
info:data2 timestamp=1501925842893, value=10037
info:data3 timestamp=1501925842893, value=10938
3 row(s) in 0.1180 seconds
hbase(main):003:0>
10.7.3 MapReduce
package hbaseDemo.dao;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class BatchImport {
public static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
// super.setup( context );
//System.out.println(key + ":" + value);
context.write(key, value);
};
}
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable> {
protected void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text text : values) {
final String[] splited = text.toString().split("\t");
final Put put = new Put(Bytes.toBytes(splited[0]));// 第一列行键
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("data1"), Bytes.toBytes(splited[1]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("data2"), Bytes.toBytes(splited[2]));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("data3"), Bytes.toBytes(splited[3]));
context.write(NullWritable.get(), put);
}
};
}
/**
* 之前一直报错,failed on connection exception 拒绝连接:nb0:8020
* 因为namenode节点不在192.168.1.160上,而在192.168.1.161和192.168.1.162
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
conf.set("hbase.rootdir", "hdfs://cetc32/hbase");
// 设置Zookeeper,直接设置IP地址
conf.set("hbase.zookeeper.quorum", "192.168.1.160,192.168.1.161,192.168.1.162");
// 设置hbase表名称(先在shell下创建一个表:create 'mydata','info')
conf.set(TableOutputFormat.OUTPUT_TABLE, "mydata");
// 将该值改大,防止hbase超时退出
conf.set("dfs.socket.timeout", "180000");
//System.setProperty("HADOOP_USER_NAME", "root");
// 设置fs.defaultFS
conf.set("fs.defaultFS", "hdfs://192.168.1.161:8020");
// 设置yarn.resourcemanager节点
conf.set("yarn.resourcemanager.hostname", "nb1");
Job job = Job.getInstance(conf);
job.setJobName("HBaseBatchImport");
job.setMapperClass(BatchImportMapper.class);
job.setReducerClass(BatchImportReducer.class);
// 设置map的输出,不设置reduce的输出类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
// 不再设置输出路径,而是设置输出格式类型
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, "hdfs://192.168.1.161:8020/user/root/input/mydata.txt");
boolean flag=job.waitForCompletion(true);
System.out.println(flag);
}
}