import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; public class IndexBuilder { private class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { private Map<byte[], ImmutableBytesWritable> indexes = new HashMap<byte[], ImmutableBytesWritable>(); private String columnFamily; @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Set<byte[]> keys = indexes.keySet(); for (byte[] k : keys) { ImmutableBytesWritable indexTableName = indexes.get(k); byte[] val = value.getValue(Bytes.toBytes(columnFamily), k); Put put = new Put(val);// 索引表的rowkey為原始表的值 put.add(Bytes.toBytes("f1"), Bytes.toBytes("id"), key.get());// 索引表的內(nèi)容為原始表的rowkey context.write(indexTableName, put); } } @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String tableName = conf.get("tableName"); columnFamily = conf.get("columnFamily"); String[] qualifiers = conf.getStrings("qualifiers"); // indexes的key為列名,value為索引表名 for (String q : qualifiers) { indexes.put( Bytes.toBytes(q), new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + q))); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); String[] otherargs = new GenericOptionsParser(conf, args) .getRemainingArgs();// 去除掉沒有用的命令行參數(shù) // 輸入?yún)?shù):表名,列族名,列名 if (otherargs.length < 3) { System.exit(-1); } String tableName = otherargs[0]; String columnFamily = otherargs[1]; conf.set("tableName", tableName); conf.set("columnFamily", columnFamily); String[] qualifiers = new String[otherargs.length - 2]; for (int i = 0; i < qualifiers.length; i++) { qualifiers[i] = otherargs[i + 2]; } conf.setStrings("qualifiers", qualifiers); Job job = new Job(conf, tableName); job.setJarByClass(IndexBuilder.class); job.setMapperClass(MyMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(TableInputFormat.class); // 可以輸出多張表 job.setOutputFormatClass(MultiTableOutputFormat.class); Scan scan = new Scan(); scan.setCaching(1000); TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class, Put.class, job); job.waitForCompletion(true); } }
當(dāng)前題目:Mapreduce構(gòu)建hbase二級(jí)索引
當(dāng)前路徑:http://chinadenli.net/article38/jgcspp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)、定制網(wǎng)站、網(wǎng)站維護(hù)、網(wǎng)頁設(shè)計(jì)公司、移動(dòng)網(wǎng)站建設(shè)、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)