在離線任務(wù)場景中,MapReduce訪問HBASE數(shù)據(jù),加快分析速度和擴(kuò)展分析能力。
從hbase中讀取數(shù)據(jù)(result)
創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作與策劃設(shè)計(jì),靈寶網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)十余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:靈寶等地區(qū)。靈寶做網(wǎng)站價(jià)格咨詢:18982081108
public class ReadHBaseDataMR {
private static final String ZK_KEY = "hbase.zookeeper.quorum";
private static final String ZK_VALUE = "hadoop01:2181,hadoop01:2182,hadoop03:2181";
private static Configuration conf;
static {
conf=HBaseConfiguration.create();
conf.set(ZK_KEY,ZK_VALUE);
//因?yàn)槭菑膆base中讀取到自己的hdfs集群中,所以這里需要加載hdfs的配置文件
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
}
//job
public static void main(String[] args) {
Job job = null;
try {
//這里使用hbase的 conf
job = Job.getInstance(conf);
job.setJarByClass(ReadHBaseDataMR.class);
//全表掃描
Scan scans=new Scan();
String tableName="user_info";
//設(shè)置MapReduce與hbase的整合
TableMapReduceUtil.initTableMapperJob(tableName,
scans,
ReadHBaseDataMR_Mapper.class,
Text.class,
NullWritable.class,
job,
false);
//設(shè)置ReducerTask 的個(gè)數(shù)為0
job.setNumReduceTasks(0);
//設(shè)置輸出搭配hdfs上的路徑
Path output=new Path("/output/hbase/hbaseToHDFS");
if(output.getFileSystem(conf).exists(output)) {
output.getFileSystem(conf).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
//提交任務(wù)
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
//使用TableMapper,去讀取hbase中的表的數(shù)據(jù)
private static class ReadHBaseDataMR_Mapper extends TableMapper<Text, NullWritable> {
Text mk = new Text();
NullWritable kv = NullWritable.get();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//默認(rèn)的按照每一個(gè)rowkey讀取
List<Cell> cells = value.listCells();
//這里以四個(gè)坐標(biāo)確定一行記錄,行鍵,列簇,列,時(shí)間戳
for(Cell cell:cells){
String row= Bytes.toString(CellUtil.cloneRow(cell)); //行鍵
String cf=Bytes.toString(CellUtil.cloneFamily(cell)); //列簇
String column=Bytes.toString(CellUtil.cloneQualifier(cell)); //列
String values=Bytes.toString(CellUtil.cloneValue(cell)); //值
long time=cell.getTimestamp(); //時(shí)間戳
mk.set(row+"\t"+cf+"\t"+column+"\t"+value+"\t"+time);
context.write(mk,kv);
}
}
}
}
寫入數(shù)據(jù)到hbase中(put)
public class HDFSToHbase {
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "hadoop02:2181,hadoop03:2181,hadoop01:2181";
private static Configuration conf;
static {
conf=HBaseConfiguration.create();
conf.set(ZK_CONNECT_KEY,ZK_CONNECT_VALUE);
//因?yàn)槭菑膆base中讀取到自己的hdfs集群中,所以這里需要加載hdfs的配置文件
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
}
//job
public static void main(String[] args) {
try {
Job job = Job.getInstance(conf);
job.setJarByClass(HDFSToHbase.class);
job.setMapperClass(MyMapper.class);
//指定Map端的輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
/**
* 指定為nulL的表示使用默認(rèn)的
*/
String tableName="student";
//整合MapReduce reducer 到hbase
TableMapReduceUtil.initTableReducerJob(tableName,MyReducer.class,
job,null, null, null, null,
false );
//指定MapReducer的輸入路徑
Path input = new Path("/in/mingxing.txt");
FileInputFormat.addInputPath(job, input);
//提交任務(wù)
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
NullWritable mv = NullWritable.get();
//map端不做任何操作,直接將讀取的數(shù)據(jù)輸出到reduce端
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, mv);
}
}
//Reudcer,使用TableReducer的Reudcer
/**
* TableReducer<KEYIN, VALUEIN, KEYOUT>
* KEYIN:mapper輸出的key
* VALUEIN:mapper輸出的value
* KEYOUT:reduce輸出的key
* 默認(rèn)的有第四個(gè)參數(shù):Mutation,表示put/delete操作
*/
private static class MyReducer extends TableReducer<Text, NullWritable, NullWritable>{
//列簇
String family[] = { "basicinfo","extrainfo"};
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// zhangfenglun,M,20,13522334455,zfl@163.com,23521472 字段
for(NullWritable value:values){
String fields[]=key.toString().split(",");
//以名稱作為rowkey
Put put=new Put(fields[0].getBytes());
put.addColumn(fields[0].getBytes(),"sex".getBytes(),fields[1].getBytes());
put.addColumn(fields[0].getBytes(),"age".getBytes(),fields[2].getBytes());
put.addColumn(fields[1].getBytes(),"phone".getBytes(),fields[3].getBytes());
put.addColumn(fields[1].getBytes(),"email".getBytes(),fields[4].getBytes());
put.addColumn(fields[1].getBytes(),"qq".getBytes(),fields[5].getBytes());
context.write(value, put);
}
}
}
}
#使用sqoop從MySQL導(dǎo)入HBASE
sqoop import \
--connect jdbc:mysql://hadoop01:3306/test \ #MySQL的入口
--username hadoop \ #登錄MySQL的用戶名
--password root \ #登錄MySQL的密碼
--table book \ #插入的到MySQL的表
--hbase-table book \ #HBASE的表名
--column-family info \ #HBASE表中的列簇
--hbase-row-key bid \ #mysql中的哪一個(gè)列為rowkey
#ps:這里由于版本不兼容的問題,所以,這里的HBASE中插入的表必須提前創(chuàng)建,并且不能使用:--hbase-create-table \,這個(gè)語句
原理:Hive與HBASE利用兩者本身對(duì)外的API來實(shí)現(xiàn)整合,主要靠的是HBaseStorageHandler 進(jìn) 行通信,利用 HBaseStorageHandler,Hive 可以獲取到 Hive 表對(duì)應(yīng)的 HBase 表名,列簇以及 列,InputFormat 和 OutputFormat 類,創(chuàng)建和刪除 HBase 表等。
Hive 訪問 HBase 中表數(shù)據(jù),實(shí)質(zhì)上是通過 MapReduce 讀取 HBase 表數(shù)據(jù),其實(shí)現(xiàn)是在 MR 中,使用 HiveHBaseTableInputFormat 完成對(duì) HBase 表的切分,獲取 RecordReader 對(duì)象來讀 取數(shù)據(jù)。
對(duì)HBASE表的切分原則:一個(gè)region切分成一個(gè)split,即表中有多少個(gè)region,MapReduce就有多少個(gè)map task。
讀取HBASE表數(shù)據(jù)都是通過scanner,對(duì)表進(jìn)行全表掃描,如果有過濾條件,則轉(zhuǎn)化為filter,當(dāng)過濾條件為rowkey時(shí),則轉(zhuǎn)化為rowkey的過濾。
具體操作:
#指定 hbase 所使用的 zookeeper 集群的地址:默認(rèn)端口是 2181,可以不寫:
hive>set hbase.zookeeper.quorum=hadoop02:2181,hadoop03:2181,hadoop04:2181;
#指定 hbase 在 zookeeper 中使用的根目錄
hive>set zookeeper.znode.parent=/hbase;
#創(chuàng)建基于 HBase 表的 hive 表
hive>create external table mingxing(rowkey string, base_info map, extra_info map) row format delimited fields terminated by '\t'
>stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>with serdeproperties ("hbase.columns.mapping" = ":key,base_info:,extra_info:")
>tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="mingxing");
#ps:org.apache.hadoop.hive.hbase.HBaseStorageHandler:處理 hive 到 hbase 轉(zhuǎn)換關(guān)系的處理器
#ps:hbase.columns.mapping:定義 hbase 的列簇和列到 hive 的映射關(guān)系
#ps:hbase.table.name:hbase 表名
雖然hive整合了hbase,但是實(shí)際的數(shù)據(jù)還是存儲(chǔ)在hbase上,hive相應(yīng)的表目錄下對(duì)應(yīng)的文件為空,但是每次hbase中有數(shù)據(jù)添加時(shí),hive在執(zhí)行這張表查詢的時(shí)候,也會(huì)更新相應(yīng)的字段。
分享文章:hbase的典型場景
網(wǎng)站路徑:http://chinadenli.net/article24/ppigje.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供、ChatGPT、外貿(mào)建站、微信小程序、標(biāo)簽優(yōu)化、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)