2012년 2월 26일 일요일

맵리듀스 작동 방법

  1. 맵리듀스 잡 실행 상세 분석
  1. 잡 제출
  1. runJob()
  1. JobClient 인스턴스 생성
  2. submitJob() 호출
  3. 잡제출 후 2초에 한 번씩 진행 과정 조사 및 변경사항 보고
  1. submitJob()
  1. 잡 트래커에 새 잡 ID 요청
  2. 잡 출력 명세 확인 - 출력 디렉토리 명시 여부, 기존재 여부 등
  3. 잡에 대한 입력 스플릿 계산
  4. 에러 발생 시 MR프로그램에 전달
  5. 잡 JAR 파일, 설정 파일, 입력 스플릿 정보를 HDFS의 잡 ID가 이름인 디렉토리에 저장 (mapred.submit.replication 속성 - 잡 JAR 파일 복제 개수 설정)
  6. 잡 시작 준비 상태를 잡 트래커에 제출
  1. 잡 초기화
  1. 제출된 잡을 큐에 넣으면, 잡 스케줄러가 잡을 꺼내어 초기화 진행
  2. 초기화 과정 중 태스크 및 태스크의 상태와 태스크 진행 과정에 필요한 부가정보를 포함하는 객체를 생성
  3. 잡 스케줄러가 HDFS에서 계산된 입력 스플릿을 얻고 각 스플릿에 대하여 하나의 맵 태스크를 생성
  4. 생성할 리듀스 태스크 수 설정
  1. JobConf의 setNumReduceTasks()를 사용하여 mapred.reduce.tasks 속성에 기록
  1. 잡 스케줄러가 설정된 리듀스 수만큼 리듀스 태스크 생성
  1. 태스크 할당
  1. 태스크 트래커가 잡 트래커에게 주기적으로 보내는 하트비트의 반환으로써 태스크를 할당받음
  2. 잡 스케줄러는 잡 우선순위나 스케줄링 알고리즘에 따라 선택된 잡에서 태스크를 할당함
  3. 태스크 트래커는 다수의 맵/리듀스 태스크를 동시에 수행 가능하며, 일반적으로 맵 태스크 슬롯을 채운 뒤, 리듀스 태스크 슬롯을 채움
  4. 리듀스 태스크는 데이터의 지역성이 고려되지 않고 순차적으로 할당됨
  5. 맵 태스크는 각 태스크 트래커와 인접한 입력 스플릿을 갖도록 태스크가 할당됨
  1. 최적화 정도 - 태스크와 스플릿이 동일 노드에 존재할 때 > 동일 랙에 존재할 때 > 태스크가 다른 노드에서 스플릿을 조회할 때
  1. 태스크 실행
  1. 잡 JAR를 HDFS에서 태스크 트랙커의 지역 파일시스템으로 복사
  2. 분산 캐시로부터 필요한 파일을 지역 파일 시스템으로 복사
  3. 로컬에 잡 디렉토리를 생성하고 잡 JAR 해제
  4. 태스크러너 인스턴스를 생성하고 새 JVM을 실행하여 태스크를  수행
  5. 태스크 진행 과정을 부모에게 몇 초 간격으로 보고
  6. 스트리밍 또는 파이프를 이용하여 작성된 맵리듀스 외부 프로세스와 통신
  1. 진행 상황과 상태 갱신
  1. 태스크의 진행상황을 구성하는 요소
  1. 매퍼 또는 리듀서 내의 입력/출력 레코드 읽기
  2. 리포터의 setStatus() 호출 - 상태 설정
  3. 리포터의 ncrCounter() 호출 - 카운터 증가
  4. 리포터의 progress() 호출
  1. 태스크는 상태 변화를 태스크 트래커에게 보고
  2. 태스크 트래커는 하트비트를 통해 태스크들의 상태를 전송
  3. 잡 트래커는 태스크들의 상태를 전역적으로 결합하여 JobClient에게 보고
  1. 잡 완료
  1. 잡 트래커가 마지막 태스크의 완료 보고를 수신 후 ‘성공'으로 상태를 변경
  2. JobClient가 잡 완료를 확인하면 runJob()이 종료됨
  3. job.end.notification.url 속성 - HTTP로 통지
  4. 잡 트래커와 태스크 트래커의 상태 정보 정리 (중간 출력 삭제 등)
  1. 실패
  1. 태스크 실패
  1. 태스크 내 에러 발생
  1. 자식 JVM는 부모인 태스크 트래커에게 에러 보고
  2. 스트리밍 태스크의 경우 0을 반환하지 않으면 에러로 판단
  1. 자식 JVM의 갑작스러운 종료
  2. 행걸린 태스크
  1. 태스크 트래커가 mapred.task.timeout 속성에 설정된 시간 내에 태스크의 진행 상황을 보고 받지 못하면 실패로 인식
  1. 잡 트래커의 태스크 재스케줄링
  1. 실패한 태스크 트래커에게는 실패한 태스크를 재할당하지 않음
  2. 기본적으로 어떤 태스크가 4번 이상 실패하면, 전체 잡 실패 처리
  3. mapred.map.max.attempts, mapred.reduce.max.attemps에 최대 시도 회수 설정 (기본 - 4)
  4. mapred.max.map.failures.percent, mapred.max.reduce.failures.percent - 태스크 실패 허용 최대 비율 설정
  1. 태스크 트래커 실패
  1. 기본적으로 태스크 트래커가 10분 동안 하트비트를 전송하지 않으면 잡 트래커는 태스크 트래커 풀에서 해당 태스크 트래커에서 제외시킴
  2. 잡 트래커는 완료되지 않은 맵태스크에 대해서만 새 태스크 트래커에게 배치 시키 재실행
  3. 평균 태스크 실패율보다 높은 실패율을 보이는 태스크 트래커는 블랙리스트에 등록
  1. 잡트래커 실패
  1. 가장 심각한 실패유형이며 잡 전체가 실패
  2. 다수의 잡 트래커 실행시 주키퍼에 의해 잡 트래커들의 순위가 조정됨
  1. 잡 스케줄링
  1. 기본적으로 FIFO 스케줄러로 지정되어 있으나, 다중 사용자 스케줄러를 제공함
  2. 페어(fair) 스케줄러
  1. 모든 사용자가 클러스터를 시간적으로 공평하게 공유할 수 있도록 함
  2. 사용자는 잡 풀을 소유하게 되며, 풀에 제출된 잡 개수와 상관 없이 할당 태스크 슬롯 개수 관점에서 조정됨
  3. 스케줄러 설정하는 방법
  1. contrib/fairscheduler 디렉토리에서 JAR를 lib 디렉토리로 복사
  2. mapred.jobtracker.taskScheduler 속성의 값을 org.apache.hadoop.mapred.FairScheduler로 설정
  1. 커패시티(capacity) 스케줄러
  1. 사용자는 수용량이 할당된 큐를 소유함
  2. 잡은 우선순위와 FIFO 방식으로 선정됨
  1. 잡 우선순위 설정
  1. mapred.job.priority 속성, JobClient의 setJobPriority()
  2. VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
  1. 셔플과 정렬
  1. 맵 출력은 버퍼에 기록되며, 버퍼 내의 데이터를 스레드가 파티셔닝하고 파티션 별 정렬을 수행함
  2. 컴바이너 함수가 명시된 경우 메모리에서 이를 수행함
  3. 버퍼가 스필(spill) 한계에 도달하면 디스크에 기록하며, 마지막 출력 레코드가 기록되는 직후에 스필된 파일들은 하나의 출력 파일로 병합되고 정렬됨
  4. 맵 출력의 전송 데이터 량을 줄이기 위해 압축을 수행할 수 있음
  5. 잡 트래커에게 보고된 맵 태스크의 완료 상태가 리듀서 태스크에게 전달 되면, 리듀서는 HTTP를 이용하여 맵 출력을 가져감
  6. 리듀서는 맵 출력을 병렬 스레드 복사하는데, 맵 출력이 작은 경우에는 메모리에, 그렇지 않으면 디스크에 복사함
  7. 복사된 맵 출력은 정렬 순서가 유지되도록 병합되며, io.sort.factor에 설정된 병합계수 만큼 맵 출력을 나누어 라운드로 진행함
  8. 라운드 결과가 병합계수 만큼 준비가 되면 리듀스 함수로 보내져서 리듀스가 수행됨
  9. 리듀스 출력은 HDFS에 기록되며, 첫 번째 블록은 로컬 디스크에 기록됨
  10. 맵리듀스 성능 향상
  1. 셔플, JVM에 최대한의 메모리 크기 지정
  2. 맵리듀스 동작 시 최소한의 메모리 사용
  3. 맵 태스크의 최소한의 스필 파일 수 유지
  4. 속성 표 참조 (256P~257P)
  1. 태스크 실행
  1. 투기적 실행
  1. 최소 1분이 경과되고 평균 속도보다 느린 태스크를 감지하면, 또 다른 동일한 예비 태스크를 실행함
  2. 다른 모든 태스크들이 수행된 후 투기적 태스크가 실행됨
  3. 원래의 태스크 또는 투기적 태스크 중 하나가 완료되면 다른 태스크는 강제 종료됨
  4. 투기적 실행은 기본적으로 활성화되어 있으며 설정에서 비활성화 가능함
  1. mapred.map.tasks.speculative.execution (boolean)
  2. mapred.reduce.tasks.speculative.execution (boolean)
  1. 태스크 JVM 재사용
  1. 각 태스크에 대해 새 JVM을 시작하는 오버헤드를 줄일 수 있음
  2. 어떤 잡에 대하여 각 JVM이 수행할 최대 태스크 수 설정 가능
  1. mapred.job.reuse.jvm.num.tasks (기본 - 1)
  1. 비정상 레코드 생략
  1. 매퍼나 리듀서 코드 내에서 비정상 레코드의 예외 처리
  2. 생략모드를 이용하여 자동 스킵 가능
  1. SkipBadRecords 클래스를 맵, 리듀스 태스크에 대하여 각각 설정하여 생략모드를 활성화하도록 함
  2. 태스크가 두 번 실패하면 태스크 트래커에서 생략모드가 활성화됨
  3. 비정상 레코드는 잡 출력 디렉토리 내의  _logs/skip 에 저장됨
  1. 태스크 실행 환경
  1. 태스크는 자신의 환경 속성을 참조할 수 있음
  1. 잡 ID, 태스크 ID, 태스크 시행 ID, 잡 내의 태스크 ID 등
  1. 태스크 출력 충돌 문제 해결을 위하여 출력 임시 디렉토리 사용
  1. ${mapred.output.dir}/_temporary/${mapred.task.id}
  2. 태스크 완료 후 ${mapred.output.dir}로 복사
  1. 맵리듀스 프로그램 내에서 환경 속성을 참조할 수 있음

댓글 없음:

댓글 쓰기