欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

hadoopstreaming如何實(shí)現(xiàn)多路輸出擴(kuò)展

這篇文章主要介紹hadoop streaming如何實(shí)現(xiàn)多路輸出擴(kuò)展,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

成都創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供冷水江網(wǎng)站建設(shè)、冷水江做網(wǎng)站、冷水江網(wǎng)站設(shè)計(jì)、冷水江網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)與制作、冷水江企業(yè)網(wǎng)站模板建站服務(wù),十余年冷水江做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。

PrefixMultipleOutputFormat 實(shí)現(xiàn)的功能點(diǎn)有兩個(gè)

  • 按照key的前綴輸入到不同的目錄

  • 刪除最終輸出結(jié)果中的tab

##使用方式### ####按照key 的 前綴輸出到不同目錄中

 $maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar 
    -D mapred.job.name=$name \  
    -D mapred.reduce.tasks=5 \  
     -inputformat org.apache.hadoop.mapred.TextInputFormat \  
    -outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \  
    -input $input \  
    -output $output \  
    -mapper ./m_mapper.sh \  
    -reducer ./m_reducer.sh \  
    -file m_mapper.sh \  
    -file m_reducer.sh

其中outputformat 指定的是 自己時(shí)間的類 -libjars ./adts.jar導(dǎo)入的是自己的jar包

###mapper 和 reduer.sh 
 ##m_maper.sh##
 #!/bin/bash
 awk -F " " '{                 
        for(i=1;i<=NF;i++)       
	print $i;    
  }' 

 ###m_reduer.sh###
 #!/bin/bash
 awk -F "\t" '{
if(NR%3==0)
	print "A#"$1;
if(NR%3==1)
	print "B#"$1;
if(NR%3==2)
	print "C#"$1;
    }'

這樣就可以將數(shù)字分別輸入到不同的路徑中了

####刪除行尾的tab 只需要加入com.sogou.adt.adts.ignoreseparator=true指定忽略行尾的tab 即可

    $maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar 
    -D mapred.job.name=$name \  
    -D mapred.reduce.tasks=5 \  
    -D com.sogou.adt.adts.ignoreseparator=true \
     -inputformat org.apache.hadoop.mapred.TextInputFormat \  
    -outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \  
    -input $input \  
    -output $output \  
    -mapper ./m_mapper.sh \  
    -reducer ./m_reducer.sh \  
    -file m_mapper.sh \  
    -file m_reducer.sh

###PrefixMultipleOutputFormat的實(shí)現(xiàn)方式 由于并不熟悉java語言,在大學(xué)學(xué)的那點(diǎn)java也早就還給老師了^v^ 搭建編譯環(huán)境費(fèi)了些時(shí)日,不過好在有個(gè)現(xiàn)成的eclipse java 環(huán)境 還有兩年前搭建好的hadoop環(huán)境(它稍微修復(fù)一點(diǎn)點(diǎn)就ok了, 能跑程序了, 真是萬幸)。

###我的環(huán)境

  • eclipse

  • jdk1.6.0

  • jar包

    • hadoop-common-2.6.0.jar

  • hadoop-mapreduce-client-core-2.6.0.jar

這個(gè)簡單介紹一下 編譯之前我還在擔(dān)心hadoop streaming 依賴的jar包哪里去找,用不用自己編譯(hadoop所有的源碼編譯讓人有點(diǎn)頭疼),后來發(fā)現(xiàn)jar 包都可以在 hadoop 運(yùn)行環(huán)境中找到,瞬間釋然了。

###源碼 這段代碼挺好理解的了一個(gè)LineRecordWriter類 (大部分都是從現(xiàn)有的TextOutputFormat 類中扒的 只是改動(dòng)一點(diǎn) 讀配置 關(guān)閉輸出tab) generateFileNameForKeyValue 實(shí)現(xiàn)了從前綴讀取并輸出到不同的目錄中,代碼一目了然

package com.sogou.adt.adts;
  import java.io.DataOutputStream;   
  import java.io.IOException;   
  import java.io.UnsupportedEncodingException;
  import org.apache.hadoop.fs.FSDataOutputStream;    
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;    
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;    
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
 public class PrefixMultipleOutputFormat extends MultipleTextOutputFormat<Text, Text>  {
[@Override](https://my.oschina.net/u/1162528)
protected Text generateActualKey(Text key, Text value) {
	// TODO Auto-generated method stub
	return super.generateActualKey(key, value);
}

protected static class LineRecordWriter<K, V>
implements RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
  try {
    newline = "\n".getBytes(utf8);
  } catch (UnsupportedEncodingException uee) {
    throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  }
}

protected DataOutputStream out;
private final byte[] keyValueSeparator;

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  this.out = out;
  try {
    this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  } catch (UnsupportedEncodingException uee) {
    throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  }
}

public LineRecordWriter(DataOutputStream out) {
  this(out, "\t");
}

/**
 * Write the object to the byte stream, handling Text as a special
 * case.
 * [@param](https://my.oschina.net/u/2303379) o the object to print
 * [@throws](https://my.oschina.net/throws) IOException if the write throws, we pass it on
 */
private void writeObject(Object o) throws IOException {
  if (o instanceof Text) {
    Text to = (Text) o;
    out.write(to.getBytes(), 0, to.getLength());
  } else {
    out.write(o.toString().getBytes(utf8));
  }
}

public synchronized void write(K key, V value)
  throws IOException {

  boolean nullKey = key == null || key instanceof NullWritable;
  boolean nullValue = value == null || value instanceof NullWritable;
  if (nullKey && nullValue) {
    return;
  }
  if (!nullKey) {
    writeObject(key);
  }
  if (!(nullKey || nullValue)) {
    out.write(keyValueSeparator);
  }
  if (!nullValue) {
    writeObject(value);
  }
  out.write(newline);
}

public synchronized void close(Reporter reporter) throws IOException {
  out.close();
}
  }


[@Override](https://my.oschina.net/u/1162528)
protected RecordWriter<Text, Text> getBaseRecordWriter(FileSystem fs,
		JobConf job, String name, Progressable arg3) throws IOException {
	 boolean isCompressed = getCompressOutput(job);
	    String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", 
	                                       "\t");
	    
	    Boolean ignoreseparator = job.getBoolean("com.sogou.adt.adts.ignoreseparator", false);
	    if(ignoreseparator)
	    {
	    	keyValueSeparator="";
	    }
	    if (!isCompressed) {
	      Path file = FileOutputFormat.getTaskOutputPath(job, name);
	      fs = file.getFileSystem(job);
	      FSDataOutputStream fileOut = fs.create(file, arg3);
	      return new LineRecordWriter<Text, Text>(fileOut, keyValueSeparator);
	    } else {
	      Class<? extends CompressionCodec> codecClass =
	        getOutputCompressorClass(job, GzipCodec.class);
	      // create the named codec
	      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
	      // build the filename including the extension
	      Path file = 
	        FileOutputFormat.getTaskOutputPath(job, 
	                                           name + codec.getDefaultExtension());
	       fs = file.getFileSystem(job);
	      FSDataOutputStream fileOut = fs.create(file, arg3);
	      return new LineRecordWriter<Text, Text>(new DataOutputStream
	                                        (codec.createOutputStream(fileOut)),
	                                        keyValueSeparator);
	    }
}


[@Override](https://my.oschina.net/u/1162528)
protected String generateFileNameForKeyValue(Text key, Text value,
		String name) {
	 int keyLength = key.getLength();  
        String outputName = name;  
        
        if(keyLength < 2)
        	return outputName;
        
        Text sep = new Text();
        sep.append(key.getBytes(), 1, 1);
        
        if(sep.find("#") != -1)
        {  
        	Text newFlag = new Text();
            newFlag.append(key.getBytes(), 0, 1);
            String flag = newFlag.toString();
            //outputName = name+"-"+flag;
            outputName = flag+"/"+name+"-"+flag;
        	
            Text newValue = new Text();
            newValue.append(key.getBytes(), 2, keyLength-2);
			key.set(newValue);		        	            
        } 
        System.out.printf("[shishuai]System[key [%s]][value:[%s]] output[%s]\n",key.toString(),value.toString(),outputName);
        return outputName;  
}

}

以上是“hadoop streaming如何實(shí)現(xiàn)多路輸出擴(kuò)展”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

名稱欄目:hadoopstreaming如何實(shí)現(xiàn)多路輸出擴(kuò)展
網(wǎng)頁路徑:http://chinadenli.net/article40/joipeo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計(jì)公司響應(yīng)式網(wǎng)站、品牌網(wǎng)站制作網(wǎng)站設(shè)計(jì)、動(dòng)態(tài)網(wǎng)站企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化
中文字幕在线区中文色| 欧美日韩免费观看视频| 国产日韩精品欧美综合区| 白白操白白在线免费观看| 日韩精品小视频在线观看| 日本少妇aa特黄大片| a久久天堂国产毛片精品| 麻豆国产精品一区二区三区| 亚洲一区二区久久观看| 亚洲熟女精品一区二区成人| 东北女人的逼操的舒服吗| 欧美三级不卡在线观线看| 欧美尤物在线观看西比尔| 国内外激情免费在线视频| 中文字幕日韩欧美理伦片| 久久精品亚洲情色欧美| 亚洲第一香蕉视频在线| 欧美日韩综合在线第一页| 日本人妻丰满熟妇久久| 国产精品日韩精品最新| 欧美精品亚洲精品日韩专区| 亚洲天堂精品1024| 国产成人精品在线播放| 激情五月激情婷婷丁香| 欧美特色特黄一级大黄片| 亚洲精品福利视频你懂的| 亚洲精品熟女国产多毛| 日韩中文字幕人妻精品| 欧美成人免费一级特黄| 亚洲国产成人爱av在线播放下载 | 国产精品欧美一区二区三区| 免费黄片视频美女一区| 日本高清视频在线播放| 在线播放欧美精品一区| 日本精品中文字幕人妻| 国产毛片av一区二区三区小说| 中文字幕在线五月婷婷| 激情五月天免费在线观看| 久久香蕉综合网精品视频| 国产亚洲欧美日韩国亚语| 国产精品美女午夜福利|