2012년 2월 26일 일요일

맵리듀스 기능

  1. 카운터
  1. 카운터는 잡의  품질통제, 또는 응용프로그램 수준 통계를 제공하므로 문제 진단을 하는데 유용하게 사용됨
  2. 내장 카운터
  1. 태스크 카운터는 태스크트래커와 잡트래커로 수집됨
  2. 태스크 카운터는 변경 수치가 아니라 누적 수치임
  3. 맵리듀스 프레임워크
  1. 맵 입력/출력 레코드, 맵 생략된 레코드, 맵 입력/출력 바이트, 컴바인 입력/출력  레코드, 리듀스 입력 그룹, 리듀스 입력/출력 레코드, 리듀스 생략된 그룹, 리듀스 생략된 레코드, 스필드 레코드
  1. 파일시스템
  1. 파일시스템 읽은 바이트, 파일시스템 바이트
  1. 잡 카운터
  1. 잡트래커가 관리하는 항목이므로 태스크로부터 전송되지 않음
  2. 실행도니 맵/리듀스 태스크, 실패한 맵/리듀스 태스크, 데이터-로컬 맵 태스크, 랩-로컬 맵 태스크, 다른 로컬 맵 태스크
  1. 310P 내장 카운터 표 참조
  1. 사용자 정의 자바 카운터
  1. 맵리듀스 코드 수준에서 카운터의 집합과 증가 방식을 정의할 수 있음
  2. 정적 카운터 그룹은 enum으로, 카운터는 enum의 field로 정의됨
  1. 리소스번들을 이용하여 카운터 출력 시 읽기 좋도록 할 수 있음
  2. 리소스번들파일명 -
{enum이정의된클래스명}_{enum타입명}[{_언어코드}].properties
  1. 리소스번들파일내용 -
CouterGroupName={출력_카운터그룹명}
{enum필드1}.name={출력_카운터명1}
{enum필드2}.name={출력_카운터명2} ...
  1. 동적 카운터는 reporter.incrCounter(..) 호출 시 카운터그룹명과 카운터명을 인자로서 임의로 지정함
  2. 자바 코드에서 API로 카운터 조회
  1. 316P 예제코드 참조
  1. 사용자 정의 스트리밍 카운터
  1. 표준에러 스트림으로 정해진 포맷의 문자열을 전송
  1. 정렬
  1. 부분 정렬
  2. 전체 정렬
  1. 단일 파티션을 사용
  2. 순차적이면서 처리 시간이 균등하도록 분할된 파티션들을 만들어 최종적으로 각 출력을 전체적으로 정렬된 순서로 만드는 방식을 이용
  3. 균등한 파티션 집합 생성을 위해 키 샘플링을 통해 키 분포를 예측할 수 있음
  1. InputSampler.writePartitionFile(conf, InputSampler.RandomSampler<K,V>(균등확률, 샘플최대개수, 샘플링할 스플릿의 최대개수))
  2. 기타 샘플러 - SplitSampler, IntervalSampler
  1. 326P 소스코드 참조
  1. 보조 정렬
  1. 키를 원래의 키와 값의 조합인 복합키로 만듬
  2. 키 comparator는 복합키에 대하여 순서를 매김
  3. 복합키에 대한 파티셔너와 그룹핑comparator는 원래의 키에 대해서만 파티셔닝과 그룹핑을 수행함
  4. 330P 소스코드 참조
  5. 하둡 명령 옵션
  1. -D mapred.output.key.comparator.class= org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
  2. -D mapred.text.key.comparator.options=”-k1n -k2nr”
  3. -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
  1. 조인
  1. 맵-사이드 조인
  1. 데이터가 맵 함수에 도달하기 전에 조인을 수행하는 방식
  2. 조건
  1. 각 입력 데이터셋은 동일한 개수의 파티션으로 나누어져야 함
  2. 각 데이터셋은 동일한 키(조인 키)에 의해 정렬되어 있어야 함
  3. 특정 키의 모든 레코드는 같은 파티션에 있어야 함
  4. 리듀서 개수가 같고, 같은 키를 가지고, 결과 파일이 분할되지 않는 (HDFS블록보다 작거나, gzip으로 압축된 경우) 조건을 만족하는 잡 결과물 조인에도 적용 가능
  1. CompositeInputFormat 사용
  1. 입력 데이터와 조인 유형을 문법에 맞는 조인 표현식을 설정
  1. join 예제 - org.apache.hadoop.examples.Join 참조
  1. 리듀스-사이드 조인
  1. 맵-사이드 조인처럼 입력 데이터셋이 특정한 방식으로 구조화될 필요는 없음
  2. 매퍼가 각 레코드에 태그를 붙이고 맵의 출력키로서 조인키를 사용하게 되면 동일 키의 레코드들은 하나의 리듀서에 모이게 됨
  3. 다중 입력을 적용해야 함
  1. MultipleInput을 사용하여 분석, 태그를 붙이는 로직을 분리함
  1. 보조 정렬을 수행해야 함
  1.  다른 출처로부터의 레코드들의 순서는 보장되지 않음
  1. 사이드 데이터 분배
  1. 사이드 데이터는 주 데이터셋을 처리하기 위해 잡 과정에서 필요한 부가적인 읽기전용 데이터임
  2. 캐시를 이용하여 같은 잡의 동일한 태스크 트래커에서 연속적으로 실행되는 태스크끼리의 데이터공유가 가능함
  3. 잡 환경 설정 파일을 이용하는 방법
  1. JobConf의 setter메소드를 이용하여 잡 환경 설정에 저장하고, 매퍼나 리듀서에서 configure()메소드를 오버라이드해서 전달된 JobConf의 getter메소드를 사용함
  2. 임의의 객체를 메타데이터로 전달하기 위해서 직렬화를 사용해야 함
  3. DefaultStringfier
  1. 하둡이 제공하는 클래스로 객체를 직렬화해줌
  2. 수 KB이상의 데이터를 전송하면 하둡 데몬에 메모리 부하를 발생시킴
  1. 분산 캐시
  1. 파일을 복사해서 태스크가 실행되어 파일을 사용할 시점에 파일을 분배시켜 주는 서비스
  2. 네트워크 대역폭을 줄이기 위해 일반적으로 파일이 특정 노드에 잡 단위로 복사됨
  3. GenericOptionsParser를 사용하는 응용프로그램에 옵션을 지정하는 방식
  4. -files 옵션
  1. 분산 시킬 파일을 따옴표로 구분지은 URI형태로 지정함
  1. -archive 옵션
  1. jar, zip, tar, gzipped tar 파일을 태스크에 복사
  2. 태스크 노드에서 파일이 해제됨
  1. -libjars 옵션
  1. jar 파일을 매퍼와 리듀서 태스크의 클래스패스에 추가
  1. 동작 방식
  1. 잡 실행 후 HDFS로부터 잡트래커의 파일시스템으로 복사됨
  2. 태스크 실행 전 잡트래커의 파일시스템으로부터 태스크트래커의 로컬 디스크 또는 캐시로 복사됨
  3. 태스크트래커는 캐시 파일을 참조하는 레퍼런스 수가 0일 경우 해당 파일을 삭제할 수 있음
  4. 로컬에서 일정 캐시 크기를 넘어서면 파일이 삭제됨
(local.cache.size 속성에 설정 - 기본 10GB)
  1. 태스크트래커에서의 파일 위치 - ${mapred.local.dir}/taskTracker/archive
  1. DistributedCache API
  1. addCacheFile(), addCacheArchive - HDFS상의 파일 위치 알림
  2. getLocalCacheFiles(), getLocalCacheArchives() - 태스크 노드가 로컬에 있는 캐시파일 목록 조회
  1. 맵리듀스 라이브러리 클래스
  1. ChainMapper, ChainReducer
  1. 단일 매퍼에서 매러의 체인을 실행하고, 매퍼의 체인이 끝난 후 단일 리듀서가 실행됨
  2. 상징적인 표현 - M+RM*
  3. 다수의 맵리듀스 잡을 실행하는 것보다 대체로 디스크 I/O를 줄일 수 있음
  1. FieldSelectionMapReduce
  1. 입력키와 값으로부터 필드를 선택하여 출력으로 내보낼 수 있음
  1. IntSumReducer, LongSumReducer
  1. 정수 값을 합하여 모든 키에 대한 총계를 계산함
  1. InverseMapper
  1. 키와 값을 바꿈
  1. TokenCounterMapper
  1. 자바의 StringTokenizer를 이용하여 입력 값을 단어 단위로 쪼개어 각 단어와 카운트를 내보냄
  1. RegexMapper
  1. 입력값에서 정규표현식에 해당하는 부분과 카운트를 내보냄

맵리듀스 타입과 포맷

  1. 맵리듀스 타입
  1. map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
partition: (K2, V2) → 정수
reduce: (K2, list(V2)) → list(K3, V3)
  1. 리듀스의 입력 타입은 맵의 출력 타입과 같아야 함
  2. 컴바인의 타입은 리듀스의 타입과 같은 경우가 많음
  3. 파티션은 중간 키와 값(K2, V2)에 대하여 파티션 번호를 반환하며 키에 의해서 결정됨
  4. 267P 맵리듀스 타입 설정 표 참조
  5. 기본적인 맵리듀스 잡 - input/ouput만 설정하는 최소 설정과 동일한 명시적인 JobConf 구현
  1. conf.setInputFormat(TextInputFormat.class)
  2. conf.setNumMapTasks(1)
  1. 실제 맵 태스크 개수는 입력 데이터의 크기, 파일의 블록 크기로 결정됨
  1. conf.setMapperClass(IdentityMapper.class)
  1. 입력키와 값을 변경 없이 그대로 출력함
  2. 맵 입력키/출력키, 입력값/출력값이 서로 같은 타입으로 설정되어야 정상적으로 동작함
  1. conf.setMapRunnerClass(MapRunner.class)
  1. MapRunnable의 기본 구현체
  2. 각 레코드를 순차적으로 읽어서 매퍼의 map()메소드를 호출함
  1. conf.setMapOutputKeyClass(LongWritable.class)
  2. conf.setMapOutputValueClass(Text.class)
  3. conf.setPartitionerClass(HashPartitioner.class)
  1. 레코드의 키를 해시해서 해당 레코드가 어떤 파티션에 속하는지 결정함
  2. 각 파티션은 리듀스 태스크에 의해 분산 처리됨
  3. 잡 내에서 파티션 개수는 리듀스 태스크 개수와 같음
  4. return (key.hashCode() & Integer.MAX_VALUE) % numPartitions
  1. conf.setNumReduceTasks(1)
  1. 최적의 리듀서 개수는 클러스터 내의 리듀서 슬롯의 총 개수와 연관이 있음
  2. 리듀서수를 슬롯의 총 개수보다 약간 작게 설정하는 것이 좋음
  3. 슬롯의 총 개수 = 클러스터 노드의 개수 * mapred.tasktracker.reduce.tasks.maximum속성값
  1. conf.setReducerClass(IdentityReducer.class)
  1. 입력키와 값을 변경 없이 그대로 출력함
  1. conf.setOutputKeyClass(LongWritable.class)
  2. conf.setOutputValue(Text.class)
  3. conf.setOutputFormat(TextOutputFormat.class)
  1. 기본 스트리밍 잡
  1. 입력포맷이 TextInputFormat이면 스트리밍 출력의 키와 값은 모두 Text이므로 기본 IdentityMapper를 사용하면 안되고 다른 mapper를 제공해야 함
  2.  command 예시 :
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar
-input input/ncdc/sample.txt
-output output
-mapper /bin/cat
-inputformat org.apache.hadoop.mapred.TextInputFormat
(-partitioner org.apache.hadoop.mapred.lib.HashPartitioner
-numReduceTasks 1
-reducer org.apache.hadoop.mapred.lib.IdentityReducer
-outputformat org.apache.hadoop.mapred.TextOutputFormat)
  1. 스트리밍 키와 값
  1. 키와 값은 구분자로 분리함 (기본 - ‘\t’)
  2. 구분자로 분리되는 필드들을 더하여 키로 정할 수 있음
  3. 입력포맷과 출력포맷을 구분하여 설정함
  4. 275P 스트리밍 분리자 속성 표 참조
  1. 입력 포맷
  1. 스플릿과 레코드는 논리적 개념이며 InputSplit 인터페이스의 구현체로 표현됨
  2. 맵리듀스 프로그래밍에서는 InputSplit을 다루는 (입력 스플릿 생성 및 레코드로의 분할하는) InputFormat 인터페이스 구현체를 사용함
  3. 크기가 큰 스플릿을 우선으로 순차적으로 처리함으로써 잡 수행시간을 최소화하도록 함
  4. MultithreadedMapRunner
  1. MapRunnable의 다른 구현체
  2. mapred.map.multithreadedrunner.threads 속성의 개수만큼 동시에 매퍼 실행
  3. 각 레코드를 처리하는 데에 시간이 드는 매퍼에 대하여 적용하면 효과적임
  1. InputSplit
  1. long getLength()
  1. 바이트 길이 반환
  1. String[] getLocations()
  1. 호스트네임 문자열로 된 저장소 위치 반환
  1. InputFormat
  1. InputSplit[] getSplits(JobConf job, int numSplits)
  1. hint로서 원하는 맵태스크 개수 설정
  2. 입력 데이터 크기와 파일 블록 크기로 계산된 임의 개수 반환
  1. RecordReader<K,V> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
  1. 레코드의 Iterator 반환
  2. MapRunner 구현에서 reader로부터 map맵퍼의 입력을 얻음
  3. while(reader.next(key, value) { mapper.map(key, value, output, reporter); }
  1. FileInputFormat
  1. 입력파일 경로 지정 구현 (파일 지정, 파일 패턴 지정)
  1. addInputPath(s)(..), setInputPaths(..)
  2. mapred.input.dir 속성
  1. 특정 입력파일 배제 필터 구현
  1. setInputPathFilter(..)
  2. 필터 미설정시 기본 필터로 시스템 숨김 파일 배제 처리
  3. mapred.input.path.filter.class 속성
  1. 입력파일 스플릿 생성 구현
  1. 일반적으로 HDFS 블록보타 큰 파일을 블록 크기의 스플릿으로 분할
  2. 파일 스플릿의 최소(기본 - 1)/최대(기본 - Long.MAX_VALUE) 바이트 속성 설정 가능
  3. 최소크기 < 블록크기 < 최대크기
  1. 스플릿의 레코드 분할에 대한 자식 클래스 구현 강제
  1. CombineFileInputFormat
  1. HDFS 블록보다 작은 파일들을 처리할 때 하둡의 성능을 보완함
  2. 노드와 랙의 위치를 고려하여 블록들을 스플릿으로 묶음
  3. 추상 클래스이므로 하위 클래스에서 getRecordReader(..) 구현 필요
  4. SequenceFile을 이용하여 작은 파일을 더 큰 파일로 병합하여 저장하는 것을 권고함
  1. 스플릿 방지 기법
  1. 최소 스플릿 크기를 Long.MAX_VALUE로 설정
  2. FileInputFormat하위 클래스의 하위 클래스를 생성하여 isSplitable() 메소드를 false가 반환되도록 구현
  1. 매퍼의 파일 정보
  1. 매퍼는 잡 환경 설정 객체의 속성으로부터 스플릿에 대한 정보를 획득 가능
  2. 파일 스플릿 속성
  1. map.input.file - 처리될 입력 파일 경로
  2. map.input.start - 시작 지점 바이트 오프셋
  3. map.input.length - 바이트 길이
  1. 전체 파일을 하나의 레코드로 처리하는 방법
  1. 285P 예제 소스코드 참조
  2. FileInputFormat 하위 클래스 구현
  1. isSplitable() 메소드 오버라이드하여 false 반환
  2. key → NullWritable, value → BytesWritable로 타입 결정
  1. RecordReader 구현
  1. next(key, value) 메소드 내에서 연 파일을 바이트 배열에 넣고, ByteWritable 인스턴스에 바이트 배열을 할당하도록 구현
  1. TextInputFormat
  1. 키 → LongWritable - 파일 내에서 라인 시작 지점의 바이트 오프셋 주소
  2. 값 → Text - 라인의 내용(개행문자 제외)
  1. FileInputFormat이 정의하는 논리적인 레코드가 HDFS 블록에 정확히 들어맞지 않는 경우, 두 블록에 걸쳐진 레코드까지 포함하여 하나의 스플릿으로 간주함
  2. KeyValueTextInputFormat
  1. 키, 구분자, 값이 명시된 텍스트 파일이 입력인 경우 사용
  1. NLineInputFormat
  1. 매퍼가 고정된 개수의 입력 라인을 받고자 하는 경우(스플릿에 포함될 라인 수를 고정하는 경우) 사용
  2. mapred.line.input.format.linespermap 속성 - 입력 라인수 설정
  3. 시뮬레이션과 같이 작은 양의 입력을 취하고 많은 연산을 수행한 결과물을 내는 응용 프로그램에 적합
  1. 태스크 타임아웃 발생 방지를 위해 상태나 증가된 카운터를 주기적으로 보고해야 함
  1. StreamXmlRecordReader
  1. XML문서 내의 시작, 종료 태그에 대한 정규표현식 패턴을 지정하여 레코드로 나눌 수 있음
  2. stream.recordrerader.class 속성 -  org.apache.hadoop.streamin.StreamXmlRecordReader로 설정
  3. 입력포맷 - StreamInputFormat
  1. 바이너리 입력
  1. SequenceFileInputFormat
  1. 시퀀스파일을 맵리듀스 입력으로 사용하는 경우 사용
  2. 파일경로가 디렉토리인 경우 MapFile을 읽는 것으로 간주하여 MapFile을 읽어들일 수도 있음
  1. SequenceFileAsTextInputFormat
  1. 시퀀스파일의 키와 값을 Text객체로 변환시키는 경우 사용
  2. 시퀀스파일을 Streaming을 위한 입력 데이터로 만드는 경우 사용
  1. SequenceFileAsBinaryInputFormat
  1. 시퀀스파일의 키와 값을 임의의 바이너리 객체로 변환시키는 경우 사용
  2. 키와 값은 BytesWritable 객체로 캡슐화됨
  1. MultipleInputs
  1. 서로 다른 포맷의 입력을 처리하여 동일한 맵 출력물을 생성하고자 하는 경우 사용
  2. FileInputFormat.addInputPath()를 오버라이드/오버로드한 메소드 제공
  1. DBInputFormat
  1. RDB에서 JDBC로 데이터를 읽는 입력 포맷
  2. 이 포맷에 대해서 sharding기능이 없으므로 너무 많은 매퍼를 실행해서 대상 DB에 과부하를 주지 않도록 해야 함
  3. MultipleInputs를 이용하여 HDFS 상의 더 큰 데이터셋과 조인할 작은 데이터셋을 올리는 데에 적합함
  4. DBOutputFormat으로 잡의 결과물을 DB에 전송함
  5. 대안으로 SQOOP을 고려할 수 있음
  6. TableInputFormat/TableOutputFormat - HBase용 입출력 모델
  1. 출력 포맷
  1. TextOuputFormat
  1. 레코드를 텍스트의 라인으로 기록
  2. toString() - 키, 값을 문자열로 변환
  1. 바이너리 출력
  1. SequenceFileOutputFormat
  1. 시퀀스파일로 출력
  2. 추가 맵리듀스 잡 존재시 유용함
  1. SequenceFileAsBinaryOutputFormat
  1. 바이너리 포맷으로 키, 값을 시퀀스파일 컨테이너로 출력
  1. MapFileOutputFormat
  1. MapFiles로 출력
  2. 리듀서에서 키를 정렬하여 MapFile의 키를 순서대로 추가해야 함
  1. MultipleOutputFormat / MulipleOutputs
  1. 리듀서 당 다수의 출력파일을 생성하는 경우 사용
  2. MultipleOutputs이 더 많은 기능을 가지며 MultipleOutputFormat은 출력 디렉토리 구조와 파일 이름을 제어할 때 유용함
  3. MultipleOutputFormat
  1. 파일과 디렉토리 이름에 대한 완벽한 제어 가능
  2. 하위 클래스 - MultipleTextOutputFormat, MultipleSequenceFileOutputFormat
  3. protected String generateFileNameForKeyValue(key, value, name) - 하위 클래스에서 재정의하여 사용함
  1. MultipleOutputs
  1. 다른 출력파일에서는 다른 타입의 키와 값 사용 가능
  2. 같은 잡에서 맵과 리듀스에 의해 사용
  3. 레코드 별 다중 출력파일 생성
  4. 어떤 OutputFormat도 사용 가능
  1. LazyOutputFormat
  1. 출력할 결과가 있을 경우에만 출력파일이 생성되도록 함
  2. jobconf.setOutputFormatClass(..)에 지정

맵리듀스 작동 방법

  1. 맵리듀스 잡 실행 상세 분석
  1. 잡 제출
  1. runJob()
  1. JobClient 인스턴스 생성
  2. submitJob() 호출
  3. 잡제출 후 2초에 한 번씩 진행 과정 조사 및 변경사항 보고
  1. submitJob()
  1. 잡 트래커에 새 잡 ID 요청
  2. 잡 출력 명세 확인 - 출력 디렉토리 명시 여부, 기존재 여부 등
  3. 잡에 대한 입력 스플릿 계산
  4. 에러 발생 시 MR프로그램에 전달
  5. 잡 JAR 파일, 설정 파일, 입력 스플릿 정보를 HDFS의 잡 ID가 이름인 디렉토리에 저장 (mapred.submit.replication 속성 - 잡 JAR 파일 복제 개수 설정)
  6. 잡 시작 준비 상태를 잡 트래커에 제출
  1. 잡 초기화
  1. 제출된 잡을 큐에 넣으면, 잡 스케줄러가 잡을 꺼내어 초기화 진행
  2. 초기화 과정 중 태스크 및 태스크의 상태와 태스크 진행 과정에 필요한 부가정보를 포함하는 객체를 생성
  3. 잡 스케줄러가 HDFS에서 계산된 입력 스플릿을 얻고 각 스플릿에 대하여 하나의 맵 태스크를 생성
  4. 생성할 리듀스 태스크 수 설정
  1. JobConf의 setNumReduceTasks()를 사용하여 mapred.reduce.tasks 속성에 기록
  1. 잡 스케줄러가 설정된 리듀스 수만큼 리듀스 태스크 생성
  1. 태스크 할당
  1. 태스크 트래커가 잡 트래커에게 주기적으로 보내는 하트비트의 반환으로써 태스크를 할당받음
  2. 잡 스케줄러는 잡 우선순위나 스케줄링 알고리즘에 따라 선택된 잡에서 태스크를 할당함
  3. 태스크 트래커는 다수의 맵/리듀스 태스크를 동시에 수행 가능하며, 일반적으로 맵 태스크 슬롯을 채운 뒤, 리듀스 태스크 슬롯을 채움
  4. 리듀스 태스크는 데이터의 지역성이 고려되지 않고 순차적으로 할당됨
  5. 맵 태스크는 각 태스크 트래커와 인접한 입력 스플릿을 갖도록 태스크가 할당됨
  1. 최적화 정도 - 태스크와 스플릿이 동일 노드에 존재할 때 > 동일 랙에 존재할 때 > 태스크가 다른 노드에서 스플릿을 조회할 때
  1. 태스크 실행
  1. 잡 JAR를 HDFS에서 태스크 트랙커의 지역 파일시스템으로 복사
  2. 분산 캐시로부터 필요한 파일을 지역 파일 시스템으로 복사
  3. 로컬에 잡 디렉토리를 생성하고 잡 JAR 해제
  4. 태스크러너 인스턴스를 생성하고 새 JVM을 실행하여 태스크를  수행
  5. 태스크 진행 과정을 부모에게 몇 초 간격으로 보고
  6. 스트리밍 또는 파이프를 이용하여 작성된 맵리듀스 외부 프로세스와 통신
  1. 진행 상황과 상태 갱신
  1. 태스크의 진행상황을 구성하는 요소
  1. 매퍼 또는 리듀서 내의 입력/출력 레코드 읽기
  2. 리포터의 setStatus() 호출 - 상태 설정
  3. 리포터의 ncrCounter() 호출 - 카운터 증가
  4. 리포터의 progress() 호출
  1. 태스크는 상태 변화를 태스크 트래커에게 보고
  2. 태스크 트래커는 하트비트를 통해 태스크들의 상태를 전송
  3. 잡 트래커는 태스크들의 상태를 전역적으로 결합하여 JobClient에게 보고
  1. 잡 완료
  1. 잡 트래커가 마지막 태스크의 완료 보고를 수신 후 ‘성공'으로 상태를 변경
  2. JobClient가 잡 완료를 확인하면 runJob()이 종료됨
  3. job.end.notification.url 속성 - HTTP로 통지
  4. 잡 트래커와 태스크 트래커의 상태 정보 정리 (중간 출력 삭제 등)
  1. 실패
  1. 태스크 실패
  1. 태스크 내 에러 발생
  1. 자식 JVM는 부모인 태스크 트래커에게 에러 보고
  2. 스트리밍 태스크의 경우 0을 반환하지 않으면 에러로 판단
  1. 자식 JVM의 갑작스러운 종료
  2. 행걸린 태스크
  1. 태스크 트래커가 mapred.task.timeout 속성에 설정된 시간 내에 태스크의 진행 상황을 보고 받지 못하면 실패로 인식
  1. 잡 트래커의 태스크 재스케줄링
  1. 실패한 태스크 트래커에게는 실패한 태스크를 재할당하지 않음
  2. 기본적으로 어떤 태스크가 4번 이상 실패하면, 전체 잡 실패 처리
  3. mapred.map.max.attempts, mapred.reduce.max.attemps에 최대 시도 회수 설정 (기본 - 4)
  4. mapred.max.map.failures.percent, mapred.max.reduce.failures.percent - 태스크 실패 허용 최대 비율 설정
  1. 태스크 트래커 실패
  1. 기본적으로 태스크 트래커가 10분 동안 하트비트를 전송하지 않으면 잡 트래커는 태스크 트래커 풀에서 해당 태스크 트래커에서 제외시킴
  2. 잡 트래커는 완료되지 않은 맵태스크에 대해서만 새 태스크 트래커에게 배치 시키 재실행
  3. 평균 태스크 실패율보다 높은 실패율을 보이는 태스크 트래커는 블랙리스트에 등록
  1. 잡트래커 실패
  1. 가장 심각한 실패유형이며 잡 전체가 실패
  2. 다수의 잡 트래커 실행시 주키퍼에 의해 잡 트래커들의 순위가 조정됨
  1. 잡 스케줄링
  1. 기본적으로 FIFO 스케줄러로 지정되어 있으나, 다중 사용자 스케줄러를 제공함
  2. 페어(fair) 스케줄러
  1. 모든 사용자가 클러스터를 시간적으로 공평하게 공유할 수 있도록 함
  2. 사용자는 잡 풀을 소유하게 되며, 풀에 제출된 잡 개수와 상관 없이 할당 태스크 슬롯 개수 관점에서 조정됨
  3. 스케줄러 설정하는 방법
  1. contrib/fairscheduler 디렉토리에서 JAR를 lib 디렉토리로 복사
  2. mapred.jobtracker.taskScheduler 속성의 값을 org.apache.hadoop.mapred.FairScheduler로 설정
  1. 커패시티(capacity) 스케줄러
  1. 사용자는 수용량이 할당된 큐를 소유함
  2. 잡은 우선순위와 FIFO 방식으로 선정됨
  1. 잡 우선순위 설정
  1. mapred.job.priority 속성, JobClient의 setJobPriority()
  2. VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
  1. 셔플과 정렬
  1. 맵 출력은 버퍼에 기록되며, 버퍼 내의 데이터를 스레드가 파티셔닝하고 파티션 별 정렬을 수행함
  2. 컴바이너 함수가 명시된 경우 메모리에서 이를 수행함
  3. 버퍼가 스필(spill) 한계에 도달하면 디스크에 기록하며, 마지막 출력 레코드가 기록되는 직후에 스필된 파일들은 하나의 출력 파일로 병합되고 정렬됨
  4. 맵 출력의 전송 데이터 량을 줄이기 위해 압축을 수행할 수 있음
  5. 잡 트래커에게 보고된 맵 태스크의 완료 상태가 리듀서 태스크에게 전달 되면, 리듀서는 HTTP를 이용하여 맵 출력을 가져감
  6. 리듀서는 맵 출력을 병렬 스레드 복사하는데, 맵 출력이 작은 경우에는 메모리에, 그렇지 않으면 디스크에 복사함
  7. 복사된 맵 출력은 정렬 순서가 유지되도록 병합되며, io.sort.factor에 설정된 병합계수 만큼 맵 출력을 나누어 라운드로 진행함
  8. 라운드 결과가 병합계수 만큼 준비가 되면 리듀스 함수로 보내져서 리듀스가 수행됨
  9. 리듀스 출력은 HDFS에 기록되며, 첫 번째 블록은 로컬 디스크에 기록됨
  10. 맵리듀스 성능 향상
  1. 셔플, JVM에 최대한의 메모리 크기 지정
  2. 맵리듀스 동작 시 최소한의 메모리 사용
  3. 맵 태스크의 최소한의 스필 파일 수 유지
  4. 속성 표 참조 (256P~257P)
  1. 태스크 실행
  1. 투기적 실행
  1. 최소 1분이 경과되고 평균 속도보다 느린 태스크를 감지하면, 또 다른 동일한 예비 태스크를 실행함
  2. 다른 모든 태스크들이 수행된 후 투기적 태스크가 실행됨
  3. 원래의 태스크 또는 투기적 태스크 중 하나가 완료되면 다른 태스크는 강제 종료됨
  4. 투기적 실행은 기본적으로 활성화되어 있으며 설정에서 비활성화 가능함
  1. mapred.map.tasks.speculative.execution (boolean)
  2. mapred.reduce.tasks.speculative.execution (boolean)
  1. 태스크 JVM 재사용
  1. 각 태스크에 대해 새 JVM을 시작하는 오버헤드를 줄일 수 있음
  2. 어떤 잡에 대하여 각 JVM이 수행할 최대 태스크 수 설정 가능
  1. mapred.job.reuse.jvm.num.tasks (기본 - 1)
  1. 비정상 레코드 생략
  1. 매퍼나 리듀서 코드 내에서 비정상 레코드의 예외 처리
  2. 생략모드를 이용하여 자동 스킵 가능
  1. SkipBadRecords 클래스를 맵, 리듀스 태스크에 대하여 각각 설정하여 생략모드를 활성화하도록 함
  2. 태스크가 두 번 실패하면 태스크 트래커에서 생략모드가 활성화됨
  3. 비정상 레코드는 잡 출력 디렉토리 내의  _logs/skip 에 저장됨
  1. 태스크 실행 환경
  1. 태스크는 자신의 환경 속성을 참조할 수 있음
  1. 잡 ID, 태스크 ID, 태스크 시행 ID, 잡 내의 태스크 ID 등
  1. 태스크 출력 충돌 문제 해결을 위하여 출력 임시 디렉토리 사용
  1. ${mapred.output.dir}/_temporary/${mapred.task.id}
  2. 태스크 완료 후 ${mapred.output.dir}로 복사
  1. 맵리듀스 프로그램 내에서 환경 속성을 참조할 수 있음