https://www.cloudera.com/documentation/enterprise/5-8-x/topics/cdh_ig_yarn_tuning.html

'spark,kafka,hadoop ecosystems > yarn' 카테고리의 다른 글

yarn 이란  (0) 2018.11.20
메타 데이터의 끝으로 건너뛰기
메타 데이터의 시작으로 이동
  1. Tensorflow 기본 개념

    1. Constant를 이용한 기본 계산


텐서플로우(TensorFlow)는 라이브러리 이름에서 알수있듯이 텐서(Tensor)를 흘려보내면서(Flow) 데이터를 처리하는 라이브러리이다. 여기서 텐서(Tensor)는 다차원 배열을 의미한다. 텐서의 예시는 아래와 같다.


3 # 랭크 0 텐서; shape [] 스칼라
[1., 2., 3.] # 랭크 1 텐서; shape [3] 벡터
[[1., 2., 3.], [4., 5., 6.]] # 랭크 2 텐서; shape [23] 행렬
[[[1., 2., 3.]], [[7., 8., 9.]]] # 랭크 3 텐서; shape [213]


랭크*Rank는 텐서의 차원(Dimension)을 의미한다. 만약 랭크가 0이라면 스칼라, 1이라면 벡터, 2라면 행렬, 3이상이면 텐서라고 부른다.이런 텐서는 계산 그래프 구조(Computational Graph)를 통해 노드에서 노드로 이동(Flow) 한다. 따라서 텐서플로우를 이용해서 프로그램을 작성할 때는 다음의 두 과정을 거쳐야 한다.

        1. 그래프 생성

        2. 그래프 실행

그래프 생성 단계에서는 연산 과정을 그래프 형태로 표현하게 된다. 컴퓨터공학에서 말하는 그래프는 노드(Node)와 엣지(Edge)로 이루어진 자료구조이다. 텐서플로우는 노드에 연산(Operator), 변수(Variable), 상수(Constant) 등을 정의하고, 노드들간의 연결인 엣지를 통해 실제 텐서를 주고받으면서 계산을 수행한다.이제 실제 코드를 통해 텐서플로우에서 어떻게 그래프를 생성하는지 살펴보자.먼저 텐서플로우 라이브러리를 Import한다.


1
import tensorflow as tf


그 다음에 상수 형태의 노드를 두개 정의하고, 파이썬의 print 함수를 이용해서 노드의 값을 출력해 보자.


1
node1 = tf.constant(3.0, dtype=tf.float32) node2 = tf.constant(4.0# 암시적으로 tf.float32 타입으로 선언된다. print(node1, node2)


출력 결과는 다음과 같다.


Tensor("Const:0", shape=(), dtype=float32) Tensor("Const_1:0", shape=(), dtype=float32)


여기까지는 그래프 생성이 된 것 뿐이므로 실제 결과값을 얻으려면 run 단계가 필요하다. 


1
sess = tf.Session() print(sess.run([node1, node2]))


이제 아래와 같은 결과가 출력된다.


[3.04.0]



b. Placeholder 를 이용한 계산


이제 상수의 개념을 알았으면, 다음은 placeholder에 대해서 알아보자.

y = x * 2 를 그래프를 통해서 실행한다고 하자. 입력값으로는 1,2,3,4,5를 넣고, 출력은 2,4,6,8,10을 기대한다고 하자. 이렇게 여러 입력값을 그래프에서 넣는 경우는 머신러닝에서 y=W*x + b 와 같은 그래프가 있다고 할 때, x는 학습을 위한 데이타가 된다.

즉 지금 살펴보고자 하는 데이타 타입은 학습을 위한 학습용 데이타를 위한 데이타 타입이다.


y=x*2를 정의하면 내부적으로 다음과 같은 그래프가 된다.

그러면, x에는 값을 1,2,3,4,5를 넣어서 결과값을 그래프를 통해서 계산해 내야한다. 개념적으로 보면 다음과 같다.

이렇게 학습용 데이타를 담는 그릇을 플레이스홀더(placeholder)라고 한다.

플레이스홀더에 대해서 알아보면, 플레이스 홀더의 위의 그래프에서 x 즉 입력값을 저장하는 일종의 통(버킷)이다.

tf.placeholder(dtype,shape,name) 으로 정의된다.

플레이스 홀더 정의에 사용되는 변수들을 보면

  • dtype : 플레이스홀더에 저장되는 데이타형이다. tf.float32와 같이 실수,정수등의 데이타 타입을 정의한다.

  • shape : 행렬의 차원을 정의한다. shapre=[3,3]으로 정의해주면, 이 플레이스홀더는 3x3 행렬을 저장하게 된다.

  • name : name은 이 플레이스 홀더의 이름을 정의한다. name에 대해서는 나중에 좀 더 자세하게 설명하도록 하겠다.

그러면 이 x에 학습용 데이타를 어떻게 넣을 것인가? 이를 피딩(feeding)이라고 한다.

다음 예제를 보자


1
2
3
4
5
6
7
import tensorflow as tf
input_data = [1,2,3,4,5]
= tf.placeholder(dtype=tf.float32)
= * 2
sess = tf.Session()
result = sess.run(y,feed_dict={x:input_data})
print result

처음 input_data=[1,2,3,4,5]으로 정의하고

다음으로 x=tf.placeholder(dtype=tf.float32) 를 이용하여, x를 float32 데이타형을 가지는 플레이스 홀더로 정의하다. shape은 편의상 생략하였다.

그리고 y=x * 2 로 그래프를 정의하였다.


세션이 실행될때, x라는 통에 값을 하나씩 집어 넣는데, (앞에서도 말했듯이 이를 피딩이라고 한다.)

sess.run(y,feed_dict={x:input_data}) 와 같이 세션을 통해서 그래프를 실행할 때, feed_dict 변수를 이용해서 플레이스홀더 x에, input_data를 피드하면, 세션에 의해서 그래프가 실행되면서 x는 feed_dict에 의해서 정해진 피드 데이타 [1,2,3,4,5]를 하나씩 읽어서 실행한다.

c. 변수형(Variables)

마지막 데이타형은 변수형으로,

y=W*x+b 라는 학습용 가설이 있을때, x가 입력데이타 였다면, W와 b는 학습을 통해서 구해야 하는 값이 된다.  이를 변수(Variable)이라고 하는데, 변수형은 Variable 형의 객체로 생성이 된다.


  • tf.Variable.__init__(initial_value=None, trainable=True, collections=None, validate_shape=True, caching_device=None, name=None, variable_def=None, dtype=None, expected_shape=None, import_scope=None)


변수형에 값을 넣는 것은 다음과 같이 한다.


var = tf.Variable([1,2,3,4,5], dtype=tf.float32)

아래의 예제를 보자.

1
2
3
4
5
6
7
8
import tensorflow as tf
input_data = [1,2,3,4,5]
= tf.placeholder(dtype=tf.float32)
= tf.Variable([2],dtype=tf.float32)
= W*x
sess = tf.Session()
result = sess.run(y,feed_dict={x:input_data})
print result

우리가 기대하는 결과는 다음과 같다. y=W*x와 같은 그래프를 가지고, 


x는 [1,2,3,4,5] 값을 피딩하면서, 변수 W에 지정된 2를 곱해서 결과를 내기를 바란다.

그렇지만 코드를 실행해보면 다음과 같이 에러가 출력되는 것을 확인할 수 있다.


이유는 텐서플로우에서 변수형은 그래프를 실행하기 전에 초기화를 해줘야 그 값이 변수에 지정이 된다. 


세션을 초기화 하는 순간 변수 W에 그 값이 지정되는데, 초기화를 하는 방법은 다음과 같이 변수들을 global_variables_initializer() 를 이용해서 초기화 한후, 초기화된 결과를 세션에 전달해 줘야 한다.


init = tf.global_variables_initializer()

sess.run(init)


그러면 초기화를 추가한 코드를 보자

1
2
3
4
5
6
7
8
9
10
import tensorflow as tf
input_data = [1,2,3,4,5]
= tf.placeholder(dtype=tf.float32)
= tf.Variable([2],dtype=tf.float32)
= W*x
sess = tf.Session()
init = tf.global_variables_initializer()     // initialize!
sess.run(init)
result = sess.run(y,feed_dict={x:input_data})
print result

초기화를 수행한 후, 코드를 수행해보면 다음과 같이 우리가 기대했던 결과가 출력됨을 확인할 수 있다.




2. Tensorframes 개요

Tensorframes는 스칼라와 아파치 스파크를위한 실험적인 TensorFlow 바인딩이다. 
TensorFrames(TensorFlow on Spark Dataframes)를 사용하면 TensorFlow 프로그램을 사용하여 Apache Spark의 DataFrames를 조작 할 수 있다.

기본적인 Tensorflow의 계산 구조는 아래 그림과 같다.

placeholder 로 입력 값을 정의하고 graph 구조 안에서 연산을 수행한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
t1 = tf.placeholder(tf.float32)
t2 = tf.placeholder(tf.float32)
 
# t3: matrix multiplication (m1 x m3)
tp = tf.matmul(t1, t2)
 
 
# setup input matrices
m1 = [[3.2.1.0.]]
m2 = [[-5.], [-4.], [-3.], [-2.]]
 
# Execute the graph within a session
with tf.Session() as s:
     print(s.run([tp], feed_dict={t1:m1, t2:m2}))

그러면 결과값은 아래와 같이 출력된다.

[array([[-26.]], dtype=float32)]

Tensorframes를 사용하면 어떻게 될까?

위의 다이어그램에서 보는 것처럼 TensorFrames는 Spark DataFrames와 TensorFlow 사이에 Bridge 역할을 한다.

이를 통해 DataFrames를 가져 와서 TensorFlow 계산 그래프에 입력으로 적용 할 수 있다. TensorFrames를 사용하면 TensorFlow 계산 그래프 출력을 가져 와서 다시 DataFrames로 밀어 넣을 수 있으므로 다운 스트림 Spark 처리를 계속할 수 있다.

 간단한 예제로, 갖고 있는 데이터의 컬럼에 특정 값을 더하는 아래의 연산을 살펴보자.

1
2
3
4
5
6
7
8
# Import TensorFlow, TensorFrames, and Row
import tensorflow as tf
import tensorframes as tfs
from pyspark.sql import Row
 
# Create RDD of floats and convert into DataFrame `df`
rdd = [Row(x=float(x)) for in range(10)]
df = sqlContext.createDataFrame(rdd)

데이터는 RDD 기반의 dataframe으로 저장되어 있다. 데이터 형태를 보려면 우리가 익히 아는 spark 명령어를 쓰면 된다.

1
df.show()

 이제 tensorflow graph 연산을 해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Run TensorFlow program executes:
#   The 'op' performs the addition (i.e. 'x' + '3')
#   Place the data back into a DataFrame
with tf.Graph().as_default() as g:
 
#   The placeholder that corresponds to column 'x'.
#   The shape of the placeholder is automatically
#   inferred from the DataFrame.
    = tfs.block(df, "x")
     
    # The output that adds 3 to x
    = tf.add(x, 3, name='z')
     
    # The resulting `df2` DataFrame
    df2 = tfs.map_blocks(z, df)
 
# Note that 'z' is the tensor output from the
# 'tf.add' operation
print z
 
## Output
Tensor("z:0", shape=(?,), dtype=float64)

간단하게 코드를 설명하면,

  • x는 tfs.block을 사용한다.이 블록은 DataFrame의 열 내용을 기반으로 블록 자리 표시자를 만든다
  • z는 TensorFlow add 메소드 (tf.add)의 출력 텐서이다.
  • df2는 df DataFrame에 z 텐서 블록 단위로 추가 열을 추가하는 새로운 DataFrame이다.

z는 텐서이므로, 우리가 원하는 데이터 결과값을 보려면 위에서와 같이 df.show() 명령을 쓰면 된다.


Query Type
Supported Output ModesNotes
Queries with aggregationAggregation on event-time with watermarkAppend, Update, CompleteAppend mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. 

Update mode uses watermark to drop old aggregation state. 

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregationsComplete, UpdateSince no watermark is defined (only defined in other category), old aggregation state is not dropped. 

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithStateUpdate
Queries with flatMapGroupsWithStateAppend operation modeAppendAggregations are allowed after flatMapGroupsWithState.
Update operation modeUpdateAggregations not allowed after flatMapGroupsWithState.
Queries with joinsAppendUpdate and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queriesAppend, UpdateComplete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.


Output Sinks

There are a few types of built-in output sinks.

  • File sink - Stores the output to a directory.

    writeStream
        .format("parquet")        // can be "orc", "json", "csv", etc.
        .option("path""path/to/destination/dir")
        .start()


  • Kafka sink - Stores the output to one or more topics in Kafka.

    writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers""host1:port1,host2:port2")
        .option("topic""updates")
        .start()


  • Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.

    writeStream
        .foreach(...)
        .start()


  • Console sink(for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.

    writeStream
        .format("console")
        .start()


  • Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.

    writeStream
        .format("memory")
        .queryName("tableName")
        .start()


Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are meant for debugging purposes only. See the earlier section on fault-tolerance semantics. Here are the details of all the sinks in Spark.

Sink
Supported Output Modes
Options
Fault-tolerant
Notes
File SinkAppendpath: path to the output directory, must be specified. 

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) 
truncate: Whether to truncate the output if too long (default: true)
No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.


'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

spark 재설치  (0) 2018.11.21
etl 통테 결과  (0) 2018.11.21
Spark Struct Streaming - joins  (0) 2018.11.20
Spark Struct Streaming - other operations  (0) 2018.11.20
spark struct streaming - window operation  (0) 2018.11.20

+ Recent posts