计算机教程

当前位置:3522.com > 计算机教程 > 在Hadoop的streaming中使用自定义的inputformat和outpu

在Hadoop的streaming中使用自定义的inputformat和outpu

来源:http://www.4sports-uk.com 作者:3522.com 时间:2019-11-05 01:10

Hadoop的streaming中有一个选项是指定输入输出格式化的:

基于旧的mapreduce的api的输入格式在hive中已有实现,在org.apache.Hadoop.hive.ql.io下,下面代码是根据其源码自己实现的新mapreduce api接口。上代码:RCFileInputFormat.java

  1. -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.  
  2. -outputformat TextOutputFormat(default)|JavaClassName  Optional.  
  1. import java.io.IOException;  
  2. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  3. import org.apache.hadoop.io.LongWritable;  
  4. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  6.   
  7. /** 
  8.  * RCFileInputFormat. 
  9.  * 
  10.  * @param <K> 
  11.  * @param <V> 
  12.  */  
  13. public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>  
  14.     extends FileInputFormat<K, V>  {  
  15.   
  16.   public RCFileInputFormat() {  
  17.   
  18.   }  
  19.   
  20.   
  21.   
  22. @SuppressWarnings("unchecked")  
  23. @Override  
  24. public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(  
  25.         org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1)  
  26.         throws IOException, InterruptedException {  
  27.      return new RCFileRecordReader();  
  28. }  
  29. }  

但是在0.14版本之后,hadoop不再支持带多个jar包文件,所以,如果要使用自己定义的Inputformat或者Outputformat,就得将对应的class文件加入到hadoop-streaming-1.0.1.jar中去,比如:

 

  1. jar uf ../../contrib/streaming/hadoop-streaming-1.0.1.jar org/apache/hadoop/streaming/*.class   
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.fs.FileSystem;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.hive.ql.io.RCFile;  
  6. import org.apache.hadoop.hive.ql.io.RCFile.Reader;  
  7. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.mapreduce.InputSplit;  
  10. import org.apache.hadoop.mapreduce.RecordReader;  
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;</P><P>/** 
  13.  * RCFileRecordReader. 
  14.  *  
  15.  * @param <K> 
  16.  * @param <V> 
  17.  */  
  18. public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>  
  19.   extends RecordReader<LongWritable, BytesRefArrayWritable> {</P><P> private Reader in;  
  20.  private long start;  
  21.  private long end;  
  22.  private boolean more = true;  
  23.  private LongWritable key = null;  
  24.  private BytesRefArrayWritable value = null;  
  25.  protected Configuration conf;</P><P> /** 
  26.   * Return the progress within the input split. 
  27.   *  
  28.   * @return 0.0 to 1.0 of the input byte range 
  29.   */  
  30.  public float getProgress() throws IOException {  
  31.   if (end == start) {  
  32.    return 0.0f;  
  33.   } else {  
  34.    return Math.min(1.0f, (in.getPosition() - start)  
  35.      / (float) (end - start));  
  36.   }  
  37.  }
  38.  public void close() throws IOException {  
  39.   in.close();  
  40.  }
  41.  @Override  
  42.  public LongWritable getCurrentKey() throws IOException,  
  43.    InterruptedException {
  44.   return key;  
  45.  }
  46.  @Override  
  47.  public BytesRefArrayWritable getCurrentValue() throws IOException,  
  48.    InterruptedException {
  49.   return value;  
  50.  }
  51.  @Override  
  52.  public void initialize(InputSplit split, TaskAttemptContext context)  
  53.    throws IOException, InterruptedException {  
  54.   FileSplit fileSplit = (FileSplit) split;  
  55.   conf = context.getConfiguration();  
  56.   Path path = fileSplit.getPath();  
  57.   FileSystem fs = path.getFileSystem(conf);  
  58.   this.in = new RCFile.Reader(fs, path, conf);  
  59.   this.end = fileSplit.getStart()   fileSplit.getLength();
  60.   if (fileSplit.getStart() > in.getPosition()) {  
  61.    in.sync(fileSplit.getStart()); // sync to start   
  62.   }
  63.   this.start = in.getPosition();  
  64.   more = start < end;  
  65.  }
  66.  @Override  
  67.  public boolean nextKeyValue() throws IOException, InterruptedException {  
  68.   if (!more) {  
  69.    return false;  
  70.   }  
  71.   if (key == null) {  
  72.    key = new LongWritable();  
  73.   }  
  74.   if (value == null) {  
  75.    value = new BytesRefArrayWritable();  
  76.   }  
  77.   more = in.next(key);  
  78.   if (!more) {  
  79.    return false;  
  80.   }  
  81.   long lastSeenSyncPos = in.lastSeenSyncPos();  
  82.   if (lastSeenSyncPos >= end) {  
  83.    more = false;  
  84.    return more;  
  85.   }  
  86.   in.getCurrentRow(value);  
  87.   return more;  
  88.  }  
  89. }  

然后在-inputformat后面就可以直接带类名了。

应用方式:

下面通过一个例子来说明下,实现Map的输入<key,value>,key为文件名,value为文档的整篇内容:

job.setInputFormatClass(RCFileInputFormat.class);

1.定义自己的InputFormat:

public static class Map extends   Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {

ContentRecordReder.java

@Override

  1. package org.apache.hadoop.streaming;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. //import org.apache.commons.logging.Log;   
  6. //import org.apache.commons.logging.LogFactory;   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FSDataInputStream;  
  9. import org.apache.hadoop.fs.FileSystem;  
  10. import org.apache.hadoop.fs.Path;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  13. import org.apache.hadoop.mapred.FileSplit;  
  14. import org.apache.hadoop.mapred.RecordReader;  
  15.   
  16. import com.sun.org.apache.commons.logging.Log;  
  17. import com.sun.org.apache.commons.logging.LogFactory;  
  18.   
  19. public class ContentRecordReder implements RecordReader<Text,Text> {  
  20.     private static final Log LOG = LogFactory.getLog(ContentRecordReder.class.getName());    
  21.     private CompressionCodecFactory compressionCodecs = null;    
  22.     private long start;    
  23.     private long pos;    
  24.     private long end;    
  25.     private byte[] buffer;    
  26.     private String keyName;    
  27.     private FSDataInputStream fileIn;    
  28.         
  29.     public ContentRecordReder(Configuration job,FileSplit split) throws IOException{    
  30.         start = split.getStart(); //从中可以看出每个文件是作为一个split的     
  31.         end = split.getLength()   start;  
  32.         final Path path = split.getPath();  
  33.         keyName = path.toString();    
  34.         LOG.info("filename in hdfs is : "   keyName);    
  35.         System.out.println("filename in hdfs is : "   keyName);  
  36.         final FileSystem fs = path.getFileSystem(job);    
  37.         fileIn = fs.open(path);    
  38.         fileIn.seek(start);    
  39.         buffer = new byte[(int)(end - start)];    
  40.         this.pos = start;  
  41.   
  42.     }    
  43.     
  44.     public Text createKey() {    
  45.         return new Text();    
  46.     }    
  47.     
  48.     public Text createValue() {    
  49.         return new Text();    
  50.     }    
  51.     
  52.     public long getPos() throws IOException{    
  53.         return pos;    
  54.     }    
  55.     
  56.     public float getProgress() {    
  57.         if (start == end) {    
  58.             return 0.0f;    
  59.         } else {    
  60.             return Math.min(1.0f, (pos - start) / (float)(end - start));    
  61.         }    
  62.     }    
  63.     
  64.     public boolean next(Text key, Text value) throws IOException{    
  65.         while(pos < end) {    
  66.             key.set(keyName);    
  67.             value.clear();    
  68.             fileIn.readFully(pos,buffer);    
  69.             value.set(buffer);    
  70.             LOG.info("---内容: "   value.toString());    
  71.             System.out.println("---内容: "   value.toString());  
  72.             pos  = buffer.length;    
  73.             LOG.info("end is : "   end    " pos is : "   pos);    
  74.             return true;    
  75.         }    
  76.         return false;    
  77.     }    
  78.     
  79.     public void close() throws IOException{    
  80.         if(fileIn != null) {    
  81.             fileIn.close();    
  82.         }    
  83.             
  84.     }    
  85. }   

protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException,   InterruptedException {

ContentInputFormat.java

String top = new String(value.get(32).getBytesCopy());

  1. package org.apache.hadoop.streaming;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.fs.FileSystem;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  9. import org.apache.hadoop.mapred.FileSplit;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.JobConfigurable;  
  12. import org.apache.hadoop.mapred.Reporter;  
  13. import org.apache.hadoop.mapred.InputSplit;  
  14. import org.apache.hadoop.mapred.RecordReader;  
  15. import org.apache.hadoop.mapred.FileInputFormat;  
  16.   
  17. public class ContentInputFormat extends FileInputFormat<Text,Text>{  
  18.     private long mySplitSize = 1024*1024;  
  19.     private CompressionCodecFactory compressionCodecs = null;    
  20.     public void configure(JobConf conf) {    
  21.         compressionCodecs = new CompressionCodecFactory(conf);    
  22.     }  
  23.       
  24.     /**  
  25.      * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理  
  26.      *  
  27.      * @param fs  
  28.      * @param file  
  29.      *  
  30.      * @return false  
  31.      */    
  32.     protected boolean isSplitable(FileSystem fs, Path file) {    
  33.         return false;   
  34.     }    
  35.     
  36.     public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,    
  37.                             JobConf job, Reporter reporter) throws IOException{    
  38.         reporter.setStatus(genericSplit.toString());    
  39.         ContentRecordReder contentRecordReder = new ContentRecordReder(job,(FileSplit)genericSplit);  
  40.         return (RecordReader<Text, Text>) contentRecordReder;  
  41.     }  
  42.   

byte[] channel = value.get(12).getBytesCopy();

2.编译

图片 1

  1. javac -classpath ~/hadoop-1.0.1/hadoop-core-1.0.1.jar:~/hadoop-1.0.1/lib/*:./con  
  2. tent-record-reader.jar ./*.java -Xlint:deprecation

本文由3522.com发布于计算机教程,转载请注明出处:在Hadoop的streaming中使用自定义的inputformat和outpu

关键词: 3522.com

上一篇:ansible实践3-playbook条件判断

下一篇:没有了