지구정복

[MapReduce] wordcount 직접 구현하기 본문

데이터 엔지니어링 정복/MapReduce

[MapReduce] wordcount 직접 구현하기

eeaarrtthh 2021. 5. 24. 17:13
728x90
반응형
SMALL

기본적으로 하둡에는 wordcount 맵리듀스가 있지만 이를 직접 구현해보자.

 

먼저 메이븐프로젝트에 코딩을하고 jar파일로 export 시킨 다음 하둡으로 해당 jar파일을 실행하여 wordcount기능을 수행할 것이다. 이 과정이 궁금하신 분들은 아래 게시글 참고~

https://earthconquest.tistory.com/236

 

이를위해 이클립스로 메이븐프로젝트를 만들고 매퍼를 정의한다.

-WordCountMapper.java

package com.exam.chap03;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		//필드 선언
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		
		//맵 메서드 정의
		public void map( LongWritable key, Text value, Context context ) 
				throws IOException, InterruptedException {
			
			StringTokenizer itr = new StringTokenizer( value.toString() );
			while( itr.hasMoreElements() ) {
				word.set( itr.nextToken() );
				context.write(word, one);
			}
	}
}

이때 필드값인 one은 무조건 IntWriteable(1)로 하고 final static인 이유는 맵 메서드로 출력하는 단어의 글자 수가 무조건 1이기 때문이다.

 

자바의 StringTokenizer를 이용해서 공백을 기준으로 입력값 문자열을 쪼갠다.

그리고 반복문을 돌면서 word에 각 문자들을 추가한다.

그리고 context객체의 write메서드로 매퍼의 출력 데이터에 word와 one을 추가한다.

이때 매퍼의 출력데이터의 키는 word이고, 값은 one이다.

 

 

이번에는 리듀서를 구현한다.

새로운 클래스를 만든다.

-WordCountReducer.java

package com.exam.chap03;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	private IntWritable result = new IntWritable();
	
	public void reduce( Text key, Iterable<IntWritable> values, Context context ) throws
		IOException, InterruptedException {
		
		int sum = 0;
		for( IntWritable val : values ) {
			sum += val.get();
		}
		
		result.set(sum);
		context.write(key, result);
	}
}

reduce 메소드의 매개변수로는 Text타입의 단어명, Iterable<IntWritable>타입의 단어의 수가 배열로 그리고 context정보이다.

여기서 해당 key인 단어의 수는 모두 더해져야 하므로 반복문을 통해 해당 단어의 수를 모두 더해주고 이를 다시 context에 key와 value형태로 추가해준다.

 

 

이제 맵과 리듀스를 실행시킬 메인 클래스인 드라이버 클래스를 구현한다.

-WordCount.java

package com.exam.chap03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		if( args.length != 2 ) {
			System.err.println( "Usage: WordCount <input> <output> ");
			System.exit(2);
		}
		
		//잡 생성
		Job job = new Job(conf, "WordCount" ); 
		
		//잡에 메인, 매퍼, 리듀서 클래스 선언
		job.setJarByClass( WordCount.class );
		job.setMapperClass( WordCountMapper.class );
		job.setReducerClass( WordCountReducer.class );
		
		//매퍼에 입력되는 입력데이터형식, 리듀서가 출력하는 출력데이터 형식 선언
		job.setInputFormatClass( TextInputFormat.class );
		job.setOutputFormatClass( TextOutputFormat.class );
		
		//매퍼와 리듀서가 출력하는 키와 값의 데이터 타입 정의
		job.setOutputKeyClass( Text.class );
		job.setOutputValueClass( IntWritable.class );
		
		//입출력 데이터를 어떤 경로로 받고 출력할 것인지 정의
		FileInputFormat.addInputPath( job, new Path(args[0] ) );
		FileOutputFormat.setOutputPath( job, new Path(args[1] ) );
		
		//잡 실행하기
		job.waitForCompletion(true);
		
	}
}

 

 

이제 이 메이븐 프로젝트를 jar파일로 빌드시킨다.

 

그리고 리눅스 서버에 아래와 같은 텍스트 파일을 만든다.

[hadoop01@hadoop01 ~]$ vi /home/hadoop01/eclipse/input.txt
read a book
write a book

그리고 이 파일을 HDFS에 업로드 시킨다.

[hadoop01@hadoop01 ~]$ hadoop fs -put /home/hadoop01/eclipse/input.txt input1.txt

 

그리고 jar파일을 실행시킨다.

[hadoop01@hadoop01 ~]$ hadoop jar /home/hadoop01/eclipse/hadoopEx01.jar com.exam.chap03.WordCount input1.txt wordcount_output

 

결과를 확인해본다.

[hadoop01@hadoop01 ~]$ hadoop fs -cat wordcount_output/part-r-00000
a	2
book	2
read	1
write	1

 

 

 

 

 

 

728x90
반응형
LIST
Comments