일시 : 10.02

대상서버 : bigdata07, 08, 09, 10

JDK 설치

jdk version

파일 다운로드

jdk 파일 다운로드 및 설치 할 서버로 업로드.


설치 경로에 다운로드 받은 파일 압축 해제

  • /usr/lib/jvm

심볼릭 링크 설정

기존 자바의 심볼릭 링크 삭제

java 명령은 /usr/bin/java 또는 javac의 심볼릭 링크이다.

etc/alternateives 경로 아래에 있는 java 혹은 javac의 심볼릭 링크를 삭제 한다.

rm  /etc/alternatives/java
# rm  /etc/alternatives/javac

새로 다운로드 받은 버전의 경로로 링크 설정

ln -s /usr/lib/jvm/jdk1.8.0_181/bin/java /etc/alternatives/java
# ln -s /usr/lib/jvm/java-1.8.0-openjdk.x86_64/bin/javac /etc/alternatives/javac


/etc/profile 또는 ~/.bash_profile등에서 JAVA_HOME 설정.

vi etc/profile

파일 하단에 아래 내용 추가

export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_181
export PATH=$PATH:$JAVA_HOME/bin

scala 설치

설치 scala verseion : 2.12.7

scala 다운로드

다운로드사이트 ( 에서 파일 다운로드(rpm 또는 tar.gz)

rpm을 사용해 패키지 설치

rpm -qa scala          # scala 설치 여부 확인
rpm -ivh scala-2.12.7  # scala 설치


spark를 클러스터로 동작 시키려면 spark cluster의 자원을 관리 해주는 Cluster manager가 필요하다.

아래의 방법을 사용 할 수 있다.

  • Yarn - hadoop 과 함께 설치.
  • Mesos - 소스코드로 제공되어 운영환경에 맞게 빌드 해주어야 함. gcc 버전 4.8이상.
  • Kubernetes - centos7 이상 사용 가능.
  • Standalone mode

Spark release : 2.3.2

package type : pre-built for apache hadoop 2.7 and later

설치 파일 다운로드

spark 다운로드 페이지 ( 에서 사용할 버전에 맞는 스파크 선택

standalone 설치



설치 정보





설치 경로



마스터 서버에서 slave정보를 입력 해줘야함.

slave로 사용할 서버에 spark 설치 해야 한다.



마스터 서버에서 $SPARK_HOME/sbin/ 스크립트를 실행.

마스터를 실행하게 되면 데몬으로 동작하며 spark_master_url:8080에 웹서버가 구동되므로 페이지에 접근이 가능하면 정상적으로 구동된 것이다.

slave 구동

각 slave 노드에서 $SPARK_HOME/sbin/start-slave.sh스크립트를 사용해 worker를 구동 시켜야 한다.

$SPARK_HOME/sbin/ spark_master_url:spark_port

스파크 마스터가 7077에서 구동되므로 ./ 커맨드로 실행한다.

정상적으로 작동이 되면 master의 UI 페이지에서 확인이 가능하다.


spark-submit을 이용해 테스트

$ ./spark-submit \
     --master spark:// \
     --class org.apache.spark.examples.SparkPi \
     /usr/local/spark/examples/jars/spark-examples*.jar \

Flatmap → map 으로 메세지 개수를 조정 한뒤 아래와 같은 테스트를 진행 하였다.

1, Agent 에서 메세지당 KB 를 조정하여 AGENT 의 CPU 와  KSTREAM 의  전체 Throuput 을 관찰하는 것이 목적이다.

Partition 1 - Thread 1 - Sleep 0 기준 테스트

2.같은 Thruput 을 Partition3 = thread 3 option 에서 관찰

3.2와같은 환경에서 timesleep = 30ms 조건에서 관찰

4. sleep time 당 Throghput in(주황) 과 CPu 점유율(파랑)

분석서버 현황

Host name
추가 설치 안
bigdata15Yum Repospark

Kafka, Spark 처리 결과


input size : 1.4T 모든서버

conditions : watermark 60s, trigger 1s

Test time : 10am~1pm

result : 오답률 50%이상

오답 원인 : 1차 ETL 병목 현상 (3번 Kafka 서버 특정 파티션 consumer 지연)


input size : 1.4T 모든서버

Conditions : watermark 120s, trigger 2s , 3,4,5 번 서버 Thread 옵션 조정 (8,8,8 → 6,9,9)

Test time : 2pm~3pm

result : 오답률 1% 이하

* spark watermark (지연시간) 를 줄이고 오답률을 개선하려면,

KAFKA /SPARK 클러스터를 복합적으로 늘려서

KAFKA 파티션 , SPARK RDD 파티션, Spark core수 , 1차 ETL 병목 현상

등 복합적으로 개선이 되어야 합니다.

Log Data 분석 시간 결과


분석 서버 부하 결과(CPU, Mmemory, Disk I/O)

CPU, Memory, Disk I/O 사용률 - 요약

buffer cache flush 필요

Host name
Disk I/O
buffer cach flush

Hadoop은 크게 두가지 컴포넌트로 구성되는데 하나는 파일을 저장할 수 있는 분산 파일 시스템인 HDFS(Hadoop Distributed File System)과 분산 컴퓨팅 환경을 제공하는 YARN(Yet Another Resource Negotiator) 입니다. Hadoop 을 처음 접하시는 분들은 HDFS에 대해서는 쉽게 이해하지만 YARN에 대해서는 개념을 잡기 어려워하시는 분들이 있습니다. 그 이유는 Hadoop하면 MapReduce를 많이 떠 올리시는데 MapReduce 따로 YARN 따로 있기 때문에 오는 혼선이 아닌가 생각합니다.  이 글에서는 YARN에 대한 간단한 개념 설명과 YARN에서 MapReduce를 사용하기 환경 설정은 어떻게 해야 하는지에 대해 살펴 보겠습니다.

YARN 과 MapReduce

Hadoop이 처음 나왔을 때에는 지금처럼 여러개의 서브 프로젝트로 구성되지 않았습니다. Hadoop 이라는 하나의 프로젝트 내에 hdfs와 mapreduce가 자바의 패키지로만 분리되어 있었습니다. 이런 구성에서 Hadoop이 많은 곳에 사용되어지고 하나의 클러스터가 수천대 이상으로 구성되면서 다음과 같은 추가 요구사항이 발생하게 됩니다.

  • 하나의 Hadoop 클러스터는 다수의 서버로 구성되어 많은 컴퓨팅 리소스를 가지고 있는데 이것을 하나의 컴퓨팅 플랫폼(MapReduce) 에만 할당하는 것은 비효율적이다.
    • MapReduce 이외에 Spark, Strom 등과 같은 다른 컴퓨팅 플랫폼이 출현하였고 이들을 사용하고자 하는 요구사항도 늘어 났음
  • 여러 개의 컴퓨팅 플랫폼을 동시에 실행할 경우 각 서버의 리소스(주로 메모리)가 부족하여 정상적으로 수행되던 작업들이 다른 작업에 의해 문제가 발생하게 된다.
    • MapReduce 이외에 다른 클러스터를 구성할 경우 클러스터 전체의 사용 효율이 떨어지는 경우가 많음

이런 요구사항을 해결하기 위해 Hadoop 프로젝트에서는 YARN이라는 새로운 서브 프로젝트를 만들게 되었습니다.

전통적인 MapReduce 시스템 구성

YARN에 대해 설명하기 전에 먼저 MapReduce에 대해 먼저 설명 드리겠습니다. MapReduce는 개념적으로 보면 크게 다음 세 부분으로 분리할 수 있습니다.

  • MapReduce라는 프로그램 모델을 추상화시킨 라이브러리
    • Mapper, Reducer 등 클래스로 별도 main() 메소드로 제공되는 클래스는 없음
  • 분산된 서버에 사용자가 만든 MapReduce 프로그램을 실행시키는 Runtime 환경
    • JobTracker, TaskTracker, TaskRunner 등과 같이 분산된 서버에 클러스터 환경을 구성
    • 사용자의 MapReduce  프로그램을 이들 환경에서 실행시키고, 서버 장애 발생 시 수행되던 작업을 다른 서버에 재시작 시키는 등의 역할을 수행
    • MapReduce 프로그램을 위한 중간 단계 데이터의 이동(Shuffle), 이동된 데이터의 정렬(Sort) 등의 작업을 수행
  • MapReduce runtime 환경에 사용자가 만든 프로그램의 실행을 지원하는 라이브러리
    • JobClient, JobConf 등 클래스과 유틸리티 클래스
    • 사용자가 만든 main() 내에서 JobTracker로 작업을 실행하게 요청하거나 작업의 상태 정보 등을 확인하기 위한 유틸리티 클래스

이런 구성에서 MapReduce는 다음과 같은 절차로 프로그램이 실행되었습니다.

  1. MapReduce 클러스터 실행
    한번 실행된 클러스터는 특별한 문제가 없으면 재시작 되지 않으며 여러 MapReduce 작업이 실행되어 있는 클러스터를 이용
  2. 사용자가 Mapper, Reducer 클래스를 개발
  3. MapReduce 작업에서 사용하는 Mapper Reducer 클래스에 대한 지정(1번에서 만든), 입력, 출력 디렉토리 설정, 작업 관련 다양한 옵션을 지정하는 Driver 프로그램 개발(Driver 프로그램이 자바의 main() 메소드를 가지고 있음)
  4. 사용자가 만든 Driver 프로그램 실행
    Hadoop에서 제공해주는 명령어인 bin/mapred 명령어는 클래스패스 등을 설정해주는 부가 기능만 제공하며 실제 메인 클래스는 사용자가 만든 Driver 클래스
  5. Driver 프로그램 내에서는 Job 클래스를 이용하여 JobTracker에 작업 실행 요청
  6. JobTracker는 해당 작업의 환경 설정 정보와 입력 데이터의 HDFS 서버의 위치, TaskTracker의 상태 정보를 이용하여 작업 스케줄링
  7. TaskTracker는 자신에게 할당된 Task를 JobTracker에서 가져와 실행
  8. TaskTracker가 Task 실행 시 TaskTrasker의 쓰레드로 실행하지 않고 별도의 프로세스로 fork 시켜서 실행
    Fork 시에는 main() 메소드를 포함하고 있는 클래스가 있어야 하는데 이때 사용되는 클래스가 MapRunner, ReduceRunner로 별도의 프로세스로 실행된 후 입력 데이터를 읽어 사용자가 개발한 map(), reduce() 함수를 호출해준다(MapReduce가 계속 변화되었기 때문에 MapRunner, ReduceRunner라는 클래스명은 정확하지 않을 수 있습니다.).
  9. Reduce는 Map 결과를 받기 위해 각가의 TaskTrasker로 Map Task의 결과 정보를 요청하여 이 데이터를 로컬 파일로 저장
  10. Reduce Task는 로컬에 저장된 Map 결과를 정렬한 후 reduce() 함수 실행
  11. 각 Task 가 종료될 때 마다 TaskTrakcer는 다음 Task를 JobTraker로 부터 가져와서 실행
  12. JobTracker는 특정 서버(TaskTracker) 장애시 또는 특정 Task 장애시 재 스케줄링 수행

YARN 이전에는 MapReduce 클러스터 내에서 이 기능이 모두 수행되었습니다. 따라서 Hadoop 클러스터의 서버에서 컴퓨팅에 필요한 리소스 상태(각 서버의 상태)는 MapReduce의 Master 서버 역할을 수행하는 JobTracker에 의해 관리되었습니다. 이렇게 JobTracker에 의해 리소스가 관리되기 때문에 Hadoop이 설치된 클러스터 서버들의 리소스를 사용하고자 하는 다른 컴퓨팅 클러스터와는 연동하기 어려운 문제가 있었습니다.

이 문제를 해결하기 위해 나온 것이 YARN이며 YARN은 기존 MapReduce 중에서 클러스터의 리소스를 관리하는 부분만 가져와서 다른 서비스에서도 사용 가능하도로 구성한 시스템입니다. 따라서 YARN 시스템은 기존 MapReduce 보다는 기능적으로는 간단하지만 범용적인 분산 리소스 관리 시스템이 되어야 했기 때문에 MapReduce의 클러스터 관리 체계 보다 더 복잡하고 다양한 기능을 제공하고 있습니다.

YARN의 시스템 구성

이렇게 MapReduce의 클러스터 구성 기능이 YARN으로 재 정의하면서 MapReduce의 구현도 변경되어야 했습니다. MapReduce는 컴퓨팅을 위한 프로그램만 제공하고 클러스터의 리소스 관리, 장애 관리 등은 YARN에 의해 관리되도록 변경되었습니다.

YARN은 핵심 구성 요소는 ResourceManager와 NodeManager입니다.

  • ResourceManager
    • YARN 클러스터의 Master 서버로 하나 또는 이중화를 위해 두개의 서버에만 실행됨
    • 클러스터 전체의 리소스를 관리
    • YARN 클러스터의 리소스를 사용하고자 하는 다른 플랫롬으로부터 요청을 받아 리소스 할당(스케줄링)
  • NodeManager
    • YARN 클러스터의 Worker 서버로 ResourceManager를 제외한 모든 서버에 실행
    • 사용자가 요청한 프로그램을 실행하는 Container를 fork 시키고 Container를 모니터링
      Container 장애 상황 또는 Container가 요청한 리소스보다 많이 사용하고 있는지 감시(요청한 리소스보다 많이 사용하면 해당 Container를 kill 시킴)

이 두 종류의 서버는 외부로의 특별한 요청이 없으면 아무런 동작도 수행하지 않으며 NodeManager는 자신의 상태만 계속해서 ResourceManager로 보내고 ResourceManager 는 클러스터의 전체 리소스 상황에 대한 현황 관리만 수행합니다. 즉, YARN 자체만으로는 할 수 있는 것이 아무것도 없습니다. YARN을 사용하는 또 다른 무언가가 있어야 합니다. 이 또 다른 무언가에 해당하는 것이 MapReduce나 Spark 과 같은 분산 컴퓨팅 플랫폼이나 HBase 등과 같이 분산된 환경에서의 컴퓨팅 리소스(CPU, 메모리 등)가 필요한 클러스터 기반 시스템 들입니다.

YARN에서 MapReduce 동작

YARN의 실제 동작 방식을 이해하기 위해서는 MapReduce가 YARN에서 어떻게 동작하는지를 이해하는 것이 가장 좋은 방법입니다. MapReduce가 최초로 YARN을 사용하는 시스템이었으며, 같은 Hadoop 프로젝트내에 엮여있기 때문에 가장 YARN을 가장 잘 사용하는 시스템이라고 할 수 있습니다. 다음은 YARN 기반에서 MapReduce 프로그램이 어떻게 동작하는지에 대한 설명입니다.

  1. 기존 MapReduce 프로그램과 동일하게 Mapper, Reducer, Driver 클래스 작성
  2. Driver 클래스 실행
    main 프로그램이기 때문에 bin/mapred 명령어를 이용하여 실행
  3. Driver 클래스내에서 Job 클래스를 이용하여 MapReduce작업을 요청하는 것이 아니라 MapReduce 클러스터를 구성하는 작업을 먼저 수행
    기존 MapReduce는 한번 구성된 클러스터를 영구적으로 사용하지만 YARN 환경에서는 Job 별로 클러스터를 구성
  4. MapReduce 클러스터를 구성하기 위해서는 기존의 JobTracker 역할을 수행하는 Master가 필요 이를 위해 YARN의 ResourceManager에게 리소스 요청
    YARN에서는 각각의 컴퓨팅 클러스터를 Application이라고 하며 Application을 실행하기 위해 필요한 Master 서버를 ApplicationMaster 라고 한다. 따라서 기존의 JobTracker는 YARN 입장에서 보면 MapReduce 프로그램을 실행하기 위한 ApplicationMaster가 된다.
    MapReduce의 Driver 프로그램은 내부적으로 MapReduce 클러스터 구성을 위해 ApplicationMaster를 먼저 요청한다. ApplicationMaster는 보통 1개로 구성되기 때문에 1개만 요청한다.
  5. YARN의 ResourceManager는 요청받은 ApplicationMaster를 자신이 관리하는 클러스터(여러개의 NodeManager) 중 하나의 서버를 선택하여 ApplicationMaster(JobTracker 역할)를 실행하고 이 서버를 클라이언트에게 알려준다.
    ApplicationMaster는 각 컴퓨팅 플랫폼별로 다른데 MapReduce에 사용되는 ApplicationMaster는 MRAppMaster 이며  YARN 패키지가 아닌 MapReduce 패키지내에 존재한다.
  6. 클라이언트는 ResourceManager로 부터 받은 ApplicationMaster 서버에 MapReduce 작업 요청을 한다.
  7. MRAppMaster는 작업 요청을 받으면 사용자가 실행한 MapReduce 작업에 필요한 리소스를 다시 ResourceManager에게 요청한다.
  8. ResourceManager는 요청받은 리소스에 대해 NodeManager를 지정하고 Container를 실행한 후 Container 목록을 MRAppMaster에 준다.
  9. NodeManager에 의해 실행된 MapReduce Task를 위한 Container는 MRAppMaster와 통신을 하며 기존 방식의 JobTracker/TaskTracker에서 처리했던 방식과 유사한 방법으로 Task를 실행한다.
    이 과정에서 MRAppMaster의 재활용, Task를 위해 실행된 Container의 재활용 등은 MapReduce와 YARN의 버전업에 따라 처리 방식이 조금씩 다르다.

버전에 따라 조금 다르고 제가 잘못알고 있는 부분도 있겠지만 큰 틀에서는 위와 같은 방식으로 동작합니다.

기존 JobTracker/TaskTracker 방식에 비해 더 복잡해지고 관리해야할 포인트도 더 많아졌습니다. 이것은 다른 플랫폼에서도 서버의 리소스를 공유하기 위해서는 어쩔수 없는 방식이라 생각합니다.  구성을 잘 보면 크게 달라진 것은 없습니다. MapReduce 작업을 위한 클러스터가 항상 실행되어 있지 않기 때문에 작업을 실행하기 전에 클러스터를 구성하는 단계가 추가 되었을 뿐입니다.

YARN을 이용한 클러스터 자원 공유

이제 YARN의 구성과 MapReduce가 어떻게 YARN을 이용하는지 살펴 보았습니다.  이제 YARN을 만든 최초 요구사항인 서로 다른 컴퓨팅 플랫폼 간에 서버들의 리소스를 어떻게 공유하는지 감이 잡히시나요?

예를 들어 100대의 서버에 하나의 MapReduce 작업이 실행되고 있는 중에 Spark 작업 요청이 들어 왔습니다. YARN 이 없는 상황이라면 MapReduce 를 위해 서버 50대에 별도 클러스터를 구성하고 50대에 Spark 클러스터를 구성할 수 있습니다. 또는 100대 모두 MapReduce, Spark 클러스터를 구성할 수 있습니다. 두가지 경우 모두 문제가 될 수 있습니다.

  • 50대, 50대 설치한 경우
    특정 시점에 MapReduce 작업만 수행되고 Spark 작업이 수행되지 않으면 50대의 자원은 낭비
  • 100대에  MapReduce, Spark 모두 설치한 경우
    각 클러스터가 메모리를 모두 사용하도록 구성되면 두 작업이 동시에 수행되면 작업은 Fail
    각 서버의 메모리를 50%씩 나누어 쓰게 구성하면 MapReduce 작업만 수행되거나 Spark 작업만 수행되면 자원 낭비

YARN이 있고 YARN내에서 리소스를 균등하게 사용할 수 있도록 스케줄링을 관리하는 기능이 있다면 다음과 같은 시나리오가 가능합니다.

  • 100대에 YARN 만 설치
  • MapReduce 작업 실행 시점에 100대의 서버에서 각 서버의 모든 자원을 활용하여 작업 실행
  • 이 상태에서 Spark 작업을 실행하면 YARN은 50대의 서버에서 실행되던 MapReduce Task를 강제 종료시키고 Spark에 작업에 리소스를 할당

실제로는 이것보다는 조금 더 복잡한 방식으로 동작하지만 이해를 돕기 위해 시나리오를 만들어 봤습니다. 이렇게 동작하면 서로 다른 컴퓨팅 플랫폼에서 작업하는 작업들이 하나의 리소스 관리자에 의해 관리됨으로써 서버 리소스 낭비를 최소화할 수 있습니다. 이것이 YARN을 만든 목적이고 YARN의 최대 장점입니다.

이렇게 Resource를 유연하게 할당할 수 있도록 하는 기능이 YARN의 스케줄러인데 FIFO, Capacity, Fair 스케줄러를 제공하며 기본은 FIFO 입니다. FIFO를 사용할 경우 위와 같은 시나리오는 제한적으로 얻을 수 있습니다. 위와 같은 시나리오를 얻기 위해서는 Capacity, Fair를 사용하시면 됩니다.

YARN에서 MapReduce를 위한 환경 설정

YARN에 작업을 실행시키기 위한 환경 설정은 처음 접하는 사용자에게는 쉽지 않습니다. 그 이유는 위에서 설명한 작업의 처리 흐름에 대한 개념과 리소스의 할당에 대한 개념때문입니다. 환경 설정을 가장 쉽게 이해하는 방법 중 하나는 YARN에서 MapReduce의 환경 설정을 이해하는 것입니다.  MapReduce의 YARN 환경 설정은 다음과 같습니다.


Shuffle 관련 설정: YARN 기반인 경우 NodeManager에 Task만 실행되고 중지되어 버리기 때문에 MapTask의 결과가 ReduceTask로 전달되기 위해서는 ReduceTask로부터의 Map 결과 데이터 전송 요청에 응답해서 데이터를 전송하는 데몬이 필요합니다. YARN에서는 이런 종류의 데몬을 위해 각 Application(JOB)별로 각 NodeManager에 하나씩 지정한 프로그램을 실행하는 기능을 제공하는데, 이런 서비스를 auxiliary service 라고 부릅니다.

아래 설정은 기본 설정 값으로 되어 있기 때문에 별도 설정은 필요 없지만 이와 유사한 서비스가 필요한 경우 아래와 같이 설정할 수 있습니다.




각 NodeManager는 Container 할당에 사용할 메모리 사이즈를 지정해야 합니다. 여기서 설정된 값은 컴퓨팅을 위해 할당된 메모리 이상으로 설정하지 않습니다. 예를 들어 장비의 메모리가 16GB이면 16GB 모두 할당하지 않고 12GB ~ 14GB 정도 할당합니다. 이유는 OS 자체 사용하는 메모리와 DataNode 등과 같은 다른 프로세스에서 사용하는 메모리를 고려해야 하기 때문입니다.




다음은 하나의 container에 할당할 수 있는 메모리 min/max 값을 설정합니다. 작업 요청 시 여기서 설정된 크기보다 작거나 큰 Container 할당 요청을 하면 에러가 발생하게 됩니다.




YARN으로 실행 시 JobTracker가 없기 때문에 MR Job이 종료된 이후에도 작업 실행 정보와 로그 정보를 보기 위해서는 MR History Server를 실행해야 합니다. 이 서버를 실행한다고 해도 작업 목록이 보이지 않는데 다음과 같이 log aggregation 옵션을 true로 해야 합니다.





JobTracker, TaskTracker는 없어졌지만 JobClient가 사용하는 환경설정을 해야 하는데 기존과 동일하게 mapred-site.xml 파일에 설정합니다.
처음 테스트 환경에 설치했을 때 가장 많은 어려움을 겪는 설정인데 Application Master에 대한 Container의 메모리 설정입니다. 기본값이 1536으로 1.5GB입니다. 이 설정 값과 “yarn.scheduler.maximum-allocation-mb” 설정이 맞지 않으면 다음과 같은 에러 메시지가 나타납니다.

Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512

이 메시지에서 requestedMemory의 의미는 MR Job을 위한 Application Master Container를 위한 메모리이고 maxMemory는 yarn.scheduler.maximum-allocation-mb 값입니다. 즉 설정 값은 반드시 yarn.scheduler.maximum-allocation-mb 값보다는 작아야 합니다. 1.5GB는 수백대 클러스터 규모에서는 다소 크기 때문에 저는 1GB 또는 그 이하로 할당을 합니다.




기본 설정 값에서 Map/Reduce Task는 각각 1GB의 Container를 요청합니다. 작업에 따라 이 값이 너무 클수도 있고, 작을 수도 있는데 다음 설정 값으로 변경할 수 있습니다.




테스트 환경에서는 메모리 크기가 크지 않기 때문에 메모리 사이즈 설정을 잘해야 하는데 가장 간단한 MapReduce를 실행하기 위해서는 Container 3개가 필요합니다. NodeManager의 yarn.nodemanager.resource.memory-mb 설정 값의 합이,, mapreduce.reduce.memory.mb의 합보다 커야 합니다.

메모리 설정을 정확하게 잘 한 경우에도 작업 실행 중에 다음과 같은 에러 메시지를 볼 수 있습니다.


Current usage: 700 MB of 1 GB physical memory used; 
2.2 GB of 2.1 GB virtual memory used. Killing container.

주로 여기서 멘붕을 많이 겪으시는데 이 메시지의 의미는 할당된 다음과 같습니다.

  • Container 는 1GB  까지 사용 가능한데 실제로는 700 MB 사용 있음. 여기는 문제가 안됨
  • YARN은 Physical 메모리 뿐만 아니라 Virtual Memory 사용도 체크하는데 Virtual Memory의 제한 값은 Container 메모리 사이즈의 2.1 배이다. 따라서 위 메시지는  1GB Container는 2.1GB  까지 사용 가능한데 2.2GB 사용했기 때문에 이를 초과해서 발생한 문제이다.

위 문제를 해결하기 위해서는 두가지 방법이 있습니다. 첫번째는 다음 옵션을 통해 virtual memory 체크를 disable 하거나 rate 값을 더 크게 할 수 있습니다.




virtual memory 설정은 disable 할 경우 어떤 문제가 발생하는지데 대해서는 아직 확인을 하지 못했습니다. 실제로 Virtual memory를 많이 사용한다고 해서 문제되는 것은 없다고 봅니다. Hadoop 개발자 커뮤니티 내에서도 default 값으로 disable 시키거나 ratio를 더 크게 하자는 주장도 있는 것으로 봐서 disable 해도 크게 문제될 것 같지는 않습니다. 다음 JIRA 이슈를 참고하세요.

두 번째 방법은  Map, Reduce Task의 Heap을 조절하는 방법입니다. Map, Reduce Task가 fork 될때, 이 두 옵션 값에 설정된 정보를 이용하여 JVM을 실행합니다. 이 설정에 -Xmx 값을, mapreduce.reduce.memory.mb 에 설정된 값의 70 ~ 80% 정도만 설정합니다. 이 설정은 ApplicationMaster를 위한 Container에도 동일합니다.




글을 마치며

지금까지 YARN이 왜 만들어 졌으며 MapReduce와의 관계, 간단한 동작 방식 등에 대해서 살펴 보았습니다. 최근에는 다양한 컴퓨팅 플랫폼이 등장하고 있으며 실제 서비스에도 하나 이상의 플랫폼을 사용하는 경우도 많이 있습니다. 이런 경우 현재까지는 YARN이 이들 컴퓨팅을 같은 서버에서 자원을 효과적으로 사용하는 요구사항에는 가장 앞서 있는 솔루션이라 할 수 있습니다. 국내에서 개발되는 오픈소스나 솔루션들도 YARN을 지원하는 솔루션이 늘어 났으면 하는 바램입니다.ㅁㅁ

'spark,kafka,hadoop ecosystems > yarn' 카테고리의 다른 글

yarn 이란  (0) 2018.11.20
Query Type
Supported Output ModesNotes
Queries with aggregationAggregation on event-time with watermarkAppend, Update, CompleteAppend mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. 

Update mode uses watermark to drop old aggregation state. 

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregationsComplete, UpdateSince no watermark is defined (only defined in other category), old aggregation state is not dropped. 

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithStateUpdate
Queries with flatMapGroupsWithStateAppend operation modeAppendAggregations are allowed after flatMapGroupsWithState.
Update operation modeUpdateAggregations not allowed after flatMapGroupsWithState.
Queries with joinsAppendUpdate and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queriesAppend, UpdateComplete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.

Output Sinks

There are a few types of built-in output sinks.

  • File sink - Stores the output to a directory.

        .format("parquet")        // can be "orc", "json", "csv", etc.

  • Kafka sink - Stores the output to one or more topics in Kafka.


  • Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.


  • Console sink(for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.


  • Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.


Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are meant for debugging purposes only. See the earlier section on fault-tolerance semantics. Here are the details of all the sinks in Spark.

Supported Output Modes
File SinkAppendpath: path to the output directory, must be specified. 

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) 
truncate: Whether to truncate the output if too long (default: true)
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.

2.3.0 부터 지원된다.,

(IPC - 2.2.0 , EPC 2.3.0)

Support matrix for joins in streaming queries
Left InputRight InputJoin Type
StaticStaticAll typesSupported, since its not on streaming data even though it can be present in a streaming query
StreamStaticInnerSupported, not stateful
Left OuterSupported, not stateful
Right OuterNot supported
Full OuterNot supported
StaticStreamInnerSupported, not stateful
Left OuterNot supported
Right OuterSupported, not stateful
Full OuterNot supported
StreamStreamInnerSupported, optionally specify watermark on both sides + time constraints for state cleanup
Left OuterConditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
Right OuterConditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
Full OuterNot supported
import org.apache.spark.sql.functions.expr
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime""2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime""3 hours")
// Join with event-time constraints
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"

정리하자면, STREAM - STREAM JOIN 에서 INNER 조인은 watermark 가 옵셔날이지만 의미적으로도 써주는것이 좋고 성능도개선된다.

OUTER 조인은 Watermark 가 필수인데 ,그이유는 마지막까지 조인할 대상을 찾고 기다리기 때문에,  만약 null 일 경우 delay time 은 최대가된다.

  • Joins can be cascaded, that is, you can do df1.join(df2, ...).join(df3, ...).join(df4, ....).

  • As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

  • As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

    • Cannot use streaming aggregations before joins.

    • Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.

  1. deduplication

    / Without watermark using guid column
    // With watermark using guid and eventTime columns
      .withWatermark("eventTime""10 seconds")

    watermark - watermark 까지 (참고로 이전 date 는 중복제거가 안 될 수 있음)
    without - sate 되는대까지

  2. Arbitary State
    Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).
    위의 Operation 은 유저가원하는(임의의) state 를 만든다. 다만 그 State 가 어떻게 활용되는지는 별이야기가없다.

     3. Unsupported!

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).

val query = df.writeStream.format("console").start()   // get the query object          // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId       // get the unique id of this run of the query, which will be generated at every start/restart        // get the name of the auto-generated or user-specified name
query.explain()   // print detailed explanations of the query
query.stop()      // stop the query
query.awaitTermination()   // block until query is terminated, with stop() or with error
query.exception       // the exception if the query has been terminated with error
query.recentProgress  // an array of the most recent progress updates for this query
query.lastProgress    // the most recent progress update of this streaming query
val spark: SparkSession = ...    // get the list of currently active streaming queries
spark.streams.get(id)   // get a query object by its unique id
spark.streams.awaitAnyTermination()   // block until any one of them terminates

Window Operations on Event Time

Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.

>>Aggeregation 과정은 Structured Streaming 에서 똑바르며 grouped aggregation 과 유사하다. 다만 grouped aggregation 에서는 특정컬럼을 기준으로 정리되는데 window - based aggregation 은 Event time 을기준으로 정리된다. 된다.

Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).

The result tables would look something like the following.

Window Operations

`Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.

>> 기본적으로 늦은 data 들에 대해서 처리를 하지만, Spark 는 늦은데이타에서 업데이트를 지속하기위해서 old value 를 컨트롤한다. 뿐만아니라 cleaning up 도한다.

2.1 에서부터는 watermarking 을 제공하여 늦은데이타를 날릴 수 있는 Threshold 값을 조절 할 수 있다.

Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 to update the older counts for the window 12:00 - 12:10. This occurs naturally in our window-based grouping – Structured Streaming can maintain the intermediate state for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly, as illustrated below.z`Handling Late Data

However, to run this query for days, it’s necessary for the system to bound the amount of intermediate in-memory state it accumulates. This means the system needs to know when an old aggregate can be dropped from the in-memory state because the application is not going to receive late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly. You can define the watermark of a query by specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T). In other words, late data within the threshold will be aggregated, but data later than the threshold will start getting dropped (see later in the section for the exact guarantees). Let’s understand this with an example. We can easily define watermarking on the previous example using withWatermark() as shown below.

>>하지만 이것을 일단위로 돌리려면, state data 를 memory 에 들어야하고, 블라블라~ water marking 써야하고 > 이것이 지속적인 성능을 향상

import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp""10 minutes")
        window($"timestamp""10 minutes""5 minutes"),

Watermarking in Update Mode

Watermarking in Append Mode

Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. However, the partial counts are not updated to the Result Table and not written to sink. The engine waits for “10 mins” for late date to be counted, then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window 12:00 - 12:10 is appended to the Result Table only after the watermark is updated to 12:11.

>> Append mode 의경우 watermark 의 유효기간까지 data 를 유지하고, 유효기간이 완료되면 추가된 row 들만 append 하게된다.

Conditions for watermarking to clean aggregation state

watermarking을 하기 위해선 아래조건이 만족 필요

  • Output mode must be Append or Update. Complete mode requires all aggregate data to be preserved, and hence cannot use watermarking to drop intermediate state. See the Output Modes section for detailed explanation of the semantics of each output mode.

  • The aggregation must have either the event-time column, or a window on the event-time column.

  • withWatermark must be called on the same column as the timestamp column used in the aggregate. For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column.

  • withWatermark must be called before the aggregation for the watermark details to be used. For example, df.groupBy("time").count().withWatermark("time", "1 min") is invalid in Append output mode.

Semantic Guarantees of Aggregation with Watermarking
  • A watermark delay (set with withWatermark) of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind (in terms of event-time) the latest data processed till then is guaranteed to be aggregated.

  • However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less likely is the engine going to process it.

