在向Hbase中寫入數(shù)據(jù)時,常見的寫入方法有使用HBase API,Mapreduce批量導入數(shù)據(jù),使用這些方式帶入數(shù)據(jù)時,一條數(shù)據(jù)寫入到HBase數(shù)據(jù)庫中的大致流程如圖。
10年積累的成都網(wǎng)站設計、網(wǎng)站建設經(jīng)驗,可以快速應對客戶對網(wǎng)站的新想法和需求。提供各種問題對應的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡服務。我雖然不認識你,你也不認識我。但先網(wǎng)站設計后付款的網(wǎng)站建設流程,更有南城免費網(wǎng)站建設讓你可以放心的選擇與我們合作。
數(shù)據(jù)發(fā)出后首先寫入到雨鞋日志W(wǎng)Al中,寫入到預寫日志中之后,隨后寫入到內(nèi)存MemStore中,最后在Flush到Hfile中。這樣寫數(shù)據(jù)的方式不會導致數(shù)據(jù)的丟失,并且道正數(shù)據(jù)的有序性,但是當遇到大量的數(shù)據(jù)寫入時,寫入的速度就難以保證。所以,介紹一種性能更高的寫入方式BulkLoad。
使用BulkLoad批量寫入數(shù)據(jù)主要分為兩部分:
一、使用HFileOutputFormat2通過自己編寫的MapReduce作業(yè)將HFile寫入到HDFS目錄,由于寫入到HBase中的數(shù)據(jù)是按照順序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、將Hfile從HDFS移動到HBase表中,大致過程如圖
實例代碼pom依賴:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.99.2</version>
</dependency>
package com.yangshou;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//讀取文件中的每一條數(shù)據(jù),以序號作為行鍵
String line = value.toString();
//將數(shù)據(jù)進行切分
//切分后數(shù)組中的元素分別為:序號,用戶id,商品id,用戶行為,商品分類,時間,地址
String[] str = line.split(" ");
String id = str[0];
String user_id = str[1];
String item_id = str[2];
String behavior = str[3];
String item_type = str[4];
String time = str[5];
String address = "156";
//拼接rowkey和put
ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes());
Put put = new Put(id.getBytes());
put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes());
put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes());
put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes());
put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes());
put.add("info".getBytes(),"time".getBytes(),time.getBytes());
put.add("info".getBytes(),"address".getBytes(),address.getBytes());
//將數(shù)據(jù)寫出
context.write(rowkry,put);
}
}
package com.yangshou;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class BulkLoadDriver {
public static void main(String[] args) throws Exception {
//獲取Hbase配置
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("BulkLoadDemo"));
Admin admin = conn.getAdmin();
//設置job
Job job = Job.getInstance(conf,"BulkLoad");
job.setJarByClass(BulkLoadDriver.class);
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//設置文件的輸入輸出路徑
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1"));
//將數(shù)據(jù)加載到Hbase表中
HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));
if(job.waitForCompletion(true)){
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));
}
}
}
實例數(shù)據(jù)
44979 100640791 134060896 1 5271 2014-12-09 天津市
44980 100640791 96243605 1 13729 2014-12-02 新疆
在Hbase shell 中創(chuàng)建表
create 'BulkLoadDemo','info'
打包后執(zhí)行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver
注意:在執(zhí)行hadoop jar之前應該先將Hbase中的相關(guān)包加載過來
export HADOOP_CLASSPATH=$HBASE_HOME/lib/*
文章題目:使用BulkLoad從HDFS批量導入數(shù)據(jù)到HBase
本文鏈接:http://chinadenli.net/article26/geoojg.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站、用戶體驗、網(wǎng)站導航、品牌網(wǎng)站建設、靜態(tài)網(wǎng)站、網(wǎng)站策劃
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)