计算机教程

当前位置:3522.com > 计算机教程 > Hadoop 源码解析之-TextOutputFormat【3522.com】

Hadoop 源码解析之-TextOutputFormat【3522.com】

来源:http://www.4sports-uk.com 作者:3522.com 时间:2019-11-05 01:10

因为需要自定义实现输出文件的格式,现在来分析一下TextOutputFormat的源码;

由于Hadoop默认编码为UTF-8,并且将UTF-8进行了硬编码,所以我们在处理中文时需要重写OutputFormat类。方法为:

源码如下,注释会直接放在源码之中

1、新建类GBKFileOutputFormat,代码如下:
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.util.*; 
 
/** An {@link OutputFormat} that writes plain text files. */ 
public class GBKFileOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式 
  protected static class LineRecordWriter<K, V>//默认 
    extends RecordWriter<K, V> { 
    private static final String utf8 = "GBK";  //硬编码,将“UTF-8”改为“GBK” 
    private static final byte[] newline;//行结束符? 
    static { 
      try { 
        newline = "n".getBytes(utf8); 
      } catch (UnsupportedEncodingException uee) { 
        throw new IllegalArgumentException("can't find " utf8 " encoding"); 
      } 
    } 
 
    protected DataOutputStream out; 
    private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab 
 
    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符 
      this.out = out; 
      try { 
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 
      } catch (UnsupportedEncodingException uee) { 
        throw new IllegalArgumentException("can't find " utf8 " encoding"); 
      } 
    } 
 
    public LineRecordWriter(DataOutputStream out) {//默认的分隔符 
      this(out, "t"); 
    } 
 
    /**
    * Write the object to the byte stream, handling Text as a special输出流是byte格式的
    * case.
    * @param o the object to print是要输出的对象
    * @throws IOException if the write throws, we pass it on
    */ 
    private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value n 
      if (o instanceof Text) {//如果o是Text的实例 
        Text to = (Text) o; 
        out.write(to.getBytes(), 0, to.getLength());//写出 
      } else { 
        out.write(o.toString().getBytes(utf8)); 
      } 
    } 
 
    public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为 
      throws IOException { 
//下面是为了判断key和value是否为空值 
      boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了 
      boolean nullValue = value == null || value instanceof NullWritable; 
      if (nullKey && nullValue) {// 
        return; 
      } 
      if (!nullKey) { 
        writeObject(key); 
      } 
      if (!(nullKey || nullValue)) { 
        out.write(keyValueSeparator); 
      } 
      if (!nullValue) { 
        writeObject(value); 
      } 
      out.write(newline); 
    } 
 
    public synchronized 
    void close(TaskAttemptContext context) throws IOException { 
      out.close(); 
    } 
  } 
 
  public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//获得writer实例 
                        ) throws IOException, InterruptedException { 
    Configuration conf = job.getConfiguration(); 
    boolean isCompressed = getCompressOutput(job);// 
    String keyValueSeparator= conf.get("mapred.textoutputformat.separator", 
                                      "t"); 
    CompressionCodec codec = null;//压缩格式 还是? 
    String extension = ""; 
    if (isCompressed) { 
      Class<? extends CompressionCodec> codecClass = 
        getOutputCompressorClass(job, GzipCodec.class); 
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); 
      extension = codec.getDefaultExtension(); 
    } 
    Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现 
    FileSystem fs = file.getFileSystem(conf); 
    if (!isCompressed) { 
      FSDataOutputStream fileOut = fs.create(file, false); 
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); 
    } else { 
      FSDataOutputStream fileOut = fs.create(file, false); 
      return new LineRecordWriter<K, V>(new DataOutputStream 
                                        (codec.createOutputStream(fileOut)), 
                                        keyValueSeparator); 
    } 
  } 

  1. package org.apache.Hadoop.mapreduce.lib.output;  
  2.   
  3. import java.io.DataOutputStream;  
  4. import java.io.IOException;  
  5. import java.io.UnsupportedEncodingException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.fs.FSDataOutputStream;  
  11.   
  12. import org.apache.hadoop.io.NullWritable;  
  13. import org.apache.hadoop.io.Text;  
  14. import org.apache.hadoop.io.compress.CompressionCodec;  
  15. import org.apache.hadoop.io.compress.GzipCodec;  
  16. import org.apache.hadoop.mapreduce.OutputFormat;  
  17. import org.apache.hadoop.mapreduce.RecordWriter;  
  18. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  19. import org.apache.hadoop.util.*;  
  20.   
  21. /** An {@link OutputFormat} that writes plain text files. */  
  22. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式   
  23.   protected static class LineRecordWriter<K, V>//默认   
  24.     extends RecordWriter<K, V> {  
  25.     private static final String utf8 = "UTF-8";  
  26.     private static final byte[] newline;//行结束符?   
  27.     static {  
  28.       try {  
  29.         newline = "n".getBytes(utf8);  
  30.       } catch (UnsupportedEncodingException uee) {  
  31.         throw new IllegalArgumentException("can't find "   utf8   " encoding");  
  32.       }  
  33.     }  
  34.   
  35.     protected DataOutputStream out;  
  36.     private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab   
  37.   
  38.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符    
  39.       this.out = out;  
  40.       try {  
  41.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
  42.       } catch (UnsupportedEncodingException uee) {  
  43.         throw new IllegalArgumentException("can't find "   utf8   " encoding");  
  44.       }  
  45.     }  
  46.   
  47.     public LineRecordWriter(DataOutputStream out) {//默认的分隔符   
  48.       this(out, "t");  
  49.     }  
  50.   
  51.     /** 
  52.      * Write the object to the byte stream, handling Text as a special输出流是byte格式的 
  53.      * case. 
  54.      * @param o the object to print是要输出的对象 
  55.      * @throws IOException if the write throws, we pass it on 
  56.      */  
  57.     private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value n   
  58.       if (o instanceof Text) {//如果o是Text的实例   
  59.         Text to = (Text) o;  
  60.         out.write(to.getBytes(), 0, to.getLength());//写出   
  61.       } else {  
  62.         out.write(o.toString().getBytes(utf8));  
  63.       }  
  64.     }  
  65.   
  66.     public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为   
  67.       throws IOException {  
  68. <span style="white-space:pre">    </span>//下面是为了判断key和value是否为空值   
  69.       boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了   
  70.       boolean nullValue = value == null || value instanceof NullWritable;  
  71.       if (nullKey && nullValue) {//   
  72.         return;  
  73.       }  
  74.       if (!nullKey) {  
  75.         writeObject(key);  
  76.       }  
  77.       if (!(nullKey || nullValue)) {  
  78.         out.write(keyValueSeparator);  
  79.       }  
  80.       if (!nullValue) {  
  81.         writeObject(value);  
  82.       }  
  83.       out.write(newline);  
  84.     }  
  85.   
  86.     public synchronized   
  87.     void close(TaskAttemptContext context) throws IOException {  
  88.       out.close();  
  89.     }  
  90.   }  
  91.   
  92.   public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//获得writer实例   
  93.                          ) throws IOException, InterruptedException {  
  94.     Configuration conf = job.getConfiguration();  
  95.     boolean isCompressed = getCompressOutput(job);//   
  96.     String keyValueSeparator= conf.get("mapred.textoutputformat.separator",  
  97.                                        "t");  
  98.     CompressionCodec codec = null;//压缩格式 还是?   
  99.     String extension = "";  
  100.     if (isCompressed) {  
  101.       Class<? extends CompressionCodec> codecClass =   
  102.         getOutputCompressorClass(job, GzipCodec.class);  
  103.       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
  104.       extension = codec.getDefaultExtension();  
  105.     }  
  106.     Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现   
  107.     FileSystem fs = file.getFileSystem(conf);  
  108.     if (!isCompressed) {  
  109.       FSDataOutputStream fileOut = fs.create(file, false);  
  110.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
  111.     } else {  
  112.       FSDataOutputStream fileOut = fs.create(file, false);  
  113.       return new LineRecordWriter<K, V>(new DataOutputStream  
  114.                                         (codec.createOutputStream(fileOut)),  
  115.                                         keyValueSeparator);  
  116.     }  
  117.   }  
  118. }  

    更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

该类是在源代码中TextOutputFormat类基础上进行修改的,在这需要注意的一点是继承的父类FileOutputFormat是位于org.apache.hadoop.mapreduce.lib.output包中的

本文由3522.com发布于计算机教程,转载请注明出处:Hadoop 源码解析之-TextOutputFormat【3522.com】

关键词: 3522.com

上一篇:解决mongdb扩展安装出错问题

下一篇:没有了