hadoop中mapreduce如何自定义InputFormat

这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

网站建设哪家好,找成都创新互联公司!专注于网页设计、网站建设、微信开发、小程序制作、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了石鼓免费建站欢迎大家使用!

    首先我们要先定义一个类继承FileInputFormat,并重写createRecordReader方法返回RecordReader,然后定义一个类继承RecordReader,createRecordReader方法返回也就是我们定义的RecordReader的子类的对象。

代码如下

public class TrackInputFormat extends FileInputFormat {

	@Override
	public RecordReader createRecordReader(InputSplit arg0,
			TaskAttemptContext arg1) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new TrackRecordReader();
		
	}
}

package input;


import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;


/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader {
	Logger logger = Logger.getLogger(TrackRecordReader.class.getName());
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一条记录的分隔符
	private byte[] separator = "]@\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		//mapreduce.input.linerecordreader.line.maxlength
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		//logger.info("path========================="+file.toString());	
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		//logger.info("codec========================="+codec);	
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
				start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
		/*if (skipFirstLine) { 
			int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
		    if(newSize > 0){
		    	start += newSize;
		    }
		}*/

	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}
		}
		if (newSize == 0) {
			//读取下一个buffer
			key = null;
			value = null;
			return false;
		} else {
			//读同一个buffer的下一个记录
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() {
		return key;
	}

	@Override
	public Text getCurrentValue() {
		return value;
	}

	/**
	 * Get the progress within the split
	 */
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	public synchronized void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

	public class NewLineReader {
		private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024;
		private int bufferSize = DEFAULT_BUFFER_SIZE;
		private InputStream in;
		private byte[] buffer;
		private int bufferLength = 0;
		private int bufferPosn = 0;

		public NewLineReader(InputStream in) {
			this(in, DEFAULT_BUFFER_SIZE);
		}

		public NewLineReader(InputStream in, int bufferSize) {
			this.in = in;
			this.bufferSize = bufferSize;
			this.buffer = new byte[this.bufferSize];
		}

		public NewLineReader(InputStream in, Configuration conf)
				throws IOException {
			this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
		}

		public void close() throws IOException {
			in.close();
		}

		public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
				throws IOException {
			str.clear();
			Text record = new Text();
			int txtLength = 0;
			long bytesConsumed = 0L;
			boolean newline = false;
			int sepPosn = 0;
			do {
				// 已经读到buffer的末尾了,读下一个buffer
				if (this.bufferPosn >= this.bufferLength) {
					bufferPosn = 0;
					bufferLength = in.read(buffer);
					// 读到文件末尾了,则跳出,进行下一个文件的读取
					if (bufferLength <= 0) {
						break;
					}
				}
				int startPosn = this.bufferPosn;
				for (; bufferPosn < bufferLength; bufferPosn++) {
					// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
					if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
						sepPosn = 0;
					}
					// 遇到行分隔符的第一个字符
					if (buffer[bufferPosn] == separator[sepPosn]) {
						bufferPosn++;
						int i = 0;
						// 判断接下来的字符是否也是行分隔符中的字符
						for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
							// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
							if (bufferPosn + i >= bufferLength) {
								bufferPosn += i - 1;
								break;
							}
							// 一旦其中有一个字符不相同,就判定为不是分隔符
							if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
								sepPosn = 0;
								break;
							}
						}
						// 的确遇到了行分隔符
						if (sepPosn == separator.length) {
							bufferPosn += i;
							newline = true;
							sepPosn = 0;
							break;
						}
					}
				}
				int readLength = this.bufferPosn - startPosn;
				bytesConsumed += readLength;
				// 行分隔符不放入块中
				if (readLength > maxLineLength - txtLength) {
					readLength = maxLineLength - txtLength;
				}
				if (readLength > 0) {
					record.append(this.buffer, startPosn, readLength);
					txtLength += readLength;
					// 去掉记录的分隔符
					if (newline) {
						str.set(record.getBytes(), 0, record.getLength() - separator.length);
					}
				}
			} 
			while (!newline && (bytesConsumed < maxBytesToConsume));
			if (bytesConsumed > (long) Integer.MAX_VALUE) {
				throw new IOException("Too many bytes before newline: "
						+ bytesConsumed);
			}

			return (int) bytesConsumed;
		}

		public int readLine(Text str, int maxLineLength) throws IOException {
			return readLine(str, maxLineLength, Integer.MAX_VALUE);
		}

		public int readLine(Text str) throws IOException {
			return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
		}
	}
}

private byte[] separator = "]@\n".getBytes();

感谢你能够认真阅读完这篇文章,希望小编分享的“hadoop中mapreduce如何自定义InputFormat”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!


网站栏目:hadoop中mapreduce如何自定义InputFormat
当前路径:http://myzitong.com/article/jeesoe.html