這篇文章主要講解了“怎么用Spark讀取HBASE數(shù)據(jù)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么用Spark讀取HBASE數(shù)據(jù)”吧!

“專業(yè)、務(wù)實(shí)、高效、創(chuàng)新、把客戶的事當(dāng)成自己的事”是我們每一個(gè)人一直以來堅(jiān)持追求的企業(yè)文化。 成都創(chuàng)新互聯(lián)是您可以信賴的網(wǎng)站建設(shè)服務(wù)商、專業(yè)的互聯(lián)網(wǎng)服務(wù)提供商! 專注于成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、軟件開發(fā)、設(shè)計(jì)服務(wù)業(yè)務(wù)。我們始終堅(jiān)持以客戶需求為導(dǎo)向,結(jié)合用戶體驗(yàn)與視覺傳達(dá),提供有針對性的項(xiàng)目解決方案,提供專業(yè)性的建議,創(chuàng)新互聯(lián)建站將不斷地超越自我,追逐市場,引領(lǐng)市場!
scala訪問HBASE通常2種方式,一種是使用SPARK方式讀取HBASE數(shù)據(jù)直接轉(zhuǎn)換成RDD, 一種采用和JAVA類似的方式,通過HTable操作HBASE,數(shù)據(jù)獲取之后再自己進(jìn)行處理。 這2種方式區(qū)別應(yīng)該是RDD是跑在多節(jié)點(diǎn)通過從HBASE獲取數(shù)據(jù),而采用HTable的方式,應(yīng)該是串行了,僅僅是HBASE層面是分布式而已。
1. 轉(zhuǎn)換為RDD
package com.isesol.spark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
object hbasescan {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setMaster("local").setAppName("this is for spark SQL")
//conf.setSparkHome("d:\\spark_home")
val hbaseconf = HBaseConfiguration.create()
hbaseconf.set("hbase.zookeeper.quorum", "datanode01.isesol.com,datanode02.isesol.com,datanode03.isesol.com,datanode04.isesol.com,cmserver.isesol.com")
hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseconf.set("maxSessionTimeout", "6")
val sc = new SparkContext(conf)
try {
println("start to read from hbase")
val hbaseContext = new HBaseContext(sc, hbaseconf)
val scan = new Scan()
scan.setMaxVersions()
//scan.setRowPrefixFilter(Bytes.toBytes("i51530048-1007-9223370552914159518"))
scan.setCaching(100)
val filter = new SingleColumnValueFilter(Bytes.toBytes("cf"), Bytes.toBytes("age"), CompareOp.LESS, Bytes.toBytes("1"));
scan.setFilter(filter)
val hbaserdd = hbaseContext.hbaseRDD(TableName.valueOf("bank"), scan)
hbaserdd.cache()
println(hbaserdd.count())
} catch {
case ex: Exception => println("can not connect hbase")
}
}
}
2. 采用 HTable方式處理
val htable = new HTable(hbaseconf, "t_device_fault_statistics")
val scan1 = new Scan()
scan1.setCaching(3*1024*1024)
val scaner = htable.getScanner(scan1)
while(scaner.iterator().hasNext()){
val result = scaner.next()
if(result.eq(null)){
} else {
println(Bytes.toString(result.getRow) + "\t" + Bytes.toString(result.getValue("cf".getBytes, "fault_level2_name".getBytes)))
}
}
scaner.close()
htable.close()
感謝各位的閱讀,以上就是“怎么用Spark讀取HBASE數(shù)據(jù)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么用Spark讀取HBASE數(shù)據(jù)這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
名稱欄目:怎么用Spark讀取HBASE數(shù)據(jù)
文章轉(zhuǎn)載:http://chinadenli.net/article40/jhjcho.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)公司、電子商務(wù)、外貿(mào)建站、網(wǎng)站排名、網(wǎng)站維護(hù)、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)