Programming Model

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

> Struct Streaming 의 idea 는  연속된 append 구조이다. 너는 스트리밍 계산을 노말한 batch query 형태로 계산 할 수 있을 것이다.

Basic Concepts

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

Stream as a Table

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

Model

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

  • Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

  • Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail later.

>> output mode 는 Complete mode / Append mode / Update 모드 3가지 가 있다.

Complete Mode 는 updated 된 Result Table 이 만들어졌을때졌을때,  외부 저장소에 출력된다. 

Append Mode 는 새로운 Rows 들만 Trigger 에 의해 외부저장소에 출력 된다.

Update Mode 란 새롭거나 수정된 Rows 들만 Trigger 에 의해 외부 저장소에 출력 된다. Completed 모드와 다른점은 만약 query 가 Aggregation 을 포함하지 않으면, 이것은 Append Mode 와 똑같이 똑같이 동작한다. (Complete 모드보다 빠를것으로 예상된다.)

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

>> 구현하자면  아래와같이 구현한다.

val query = wordCounts.writeStream
.outputMode("complete") or .outputMode("append") or .outputMode("update") 

>>  기존 SQL 과 똑같이 구현하는데, Spark 가 연속적으로 새로운 data 을 체킹한다. (socket 기반) 만약 새로운 Data 가 있으면 Spark 는 증가분에대해서 Query 를 날리게 되고 이전에 돌린 Running count 과 combine 하게 된다. (아래 그림 설명)

Model


Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result 

>> % Struct Streaming 은 전체 Table 을 구체화 하지 않는다. 그것은 가장 마지막 사용가느한 data 를 읽고 점진적으로 Result 를 update하는 구조이다.

그리고 원본 source data 는 지우게된다. 이것은 가능한 가장 작은 data 를 유지하는데 그것을 state Data 라한다. Result 를 업데이트하기위한 것이 state Data 이기도 하다.

Basic Operations - Selection, Projection, Aggregation

Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed laterin this section.

>> 기본적인 select 등의  Query 를 아래와 같이 지원한다. 다만 몇 몇개는 지원하지 않는다.

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
 
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
 
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs
 
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API
 
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
Note, you can identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.


 unsupported Operation

Schema inference and partition of streaming DataFrames/Datasets

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).

>> default 로 Struct Streaming 은 input schema 를 제시하여 입력 받도록 한다. (CSV 의 경우 지원하는데 JSON 은?? kafka 는? )

 여기를 클릭하여 펼치기...


Basic Operation (like dataframe)


case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs 
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API


Discretized Streams (DStreams) Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

API using Datasets and DataFrames Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the DataFrame/Dataset Programming Guide.

해석 하자면 DStream 은 Spark 에서 기본적으로 제공하는스트림이다. source 로 부터 오는 input stream 이거나, transform 되어 제공되는 data stream 중에 하나이다.

내부적으로 DStream 은 연속적인 RDD series 로 구현 되어있다. sparkd 의 추상적인 가상화 구조이다. (RDD)

 

Spark - Struct Streaming 에서는 2.0에서부터는 bound 또는 unbounded Data 에서 SparkSession 으로부터 streaming 객체를 생성 할 수 있다. 이 Struct Streaming 에서는 Dataframe / dataset 의 API 를 똑같이 operation 할수있다. (실제로는, 일부는 제외)


이 밖에도 기존 D Stream 에서는 Source로 부터 입력 받을때, 기본제공 String Serializer 외에 JSON 등을 RDD 객체화하려면 별도의 Serializer 등이 필요했는데,

Spark-struct-Streaming 에서는 readStream 등을 통하여, Dataframe 객체 등을 생성한다.

'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

spark struct streaming - window operation  (0) 2018.11.20
Spark Struct Streaming - intro  (0) 2018.11.20
spark udf  (0) 2018.11.20
spark tuning 하기  (0) 2018.11.20
transformation and actions  (0) 2018.11.20

Spark sql에서 사용자 정의 함수가 필요한 경우 `udf`를 이용하면 직접 함수를 정의하여 사용할 수 있다.

인자로 함수를 전달 받고 결과로 `UserDefinedFunction`을 돌려주며, 일반 컬럼 처럼 `select` 구문에서 사용이 가능


functions.scala 원본 펼치기


udf 사용 방법

UDF 등록
import org.apache.spark.sql.functions.udf
val ctof = (degreesCelcius: Double) => {(degreesCelcius * 9.0 5.0) + 32.0}
val ctofUdf = udf(ctof)

udf 함수에 작성한 함수를 인자로 전달하여 정의한다.

Spark SQL에서 UDF 사용
df.select('city, ctofUdf('avgLow) as "avgLowF", ctofUdf('avgHigh) as "avgHighF").show

spark session 이용

UDF 등록(spark session 이용)
val ctof = (degreesCelcius: Double) => {(degreesCelcius * 9.0 5.0) + 32.0}
spark.udf.register("CTOF", ctof)
//sqlContext.udf.register("CTOF", ctof)

spark 세션의 udf()메서드를 이용하는 경우 spark가 제공하는 sql 메서드를 이용해 문자열로 작성한 쿼리에서 새로 정의함 함수를 사용 할 수 있다.

Spark SQL에서 UDF 사용
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show

실행 결과

temperature.json
{"city":"St. John's","avgHigh":8.7,"avgLow":0.6}
{"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9}
{"city":"Halifax","avgHigh":11.0,"avgLow":1.6}
{"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5}
{"city":"Quebec","avgHigh":9.0,"avgLow":-1.0}
{"city":"Montreal","avgHigh":11.1,"avgLow":1.4}
SQL 실행결과
+-------------+------------------+------------------+
|         city|           avgLowF|          avgHighF|
+-------------+------------------+------------------+
|   St. John's|             33.08|             47.66|
|Charlottetown|             33.62|             49.46|
|      Halifax|             34.88|              51.8|
|  Fredericton|              31.1|             52.16|
|       Quebec|              30.2|              48.2|
|     Montreal|             34.52|             51.98|
|       Ottawa|             33.98|51.620000000000005|
|      Toronto|              36.5|              54.5|
|     Winnipeg|             26.42|             46.94|
|       Regina|25.880000000000003|48.379999999999995|
|     Edmonton|             25.16|              47.3|
|      Calgary|             27.68|              50.9|
|    Vancouver|              43.7|             56.66|
|     Victoria|             41.54|57.379999999999995|
|   Whitehorse|             21.38|              40.1|
|  Yellowknife|              15.8|             31.64|
+-------------+------------------+------------------+

참고

UDF는 블랙박스로 동작하기 때문에 주의해서 사용해야 한다.(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs-blackbox.html)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// no optimization as it is a more involved Scala function in filter
// 08/30 Asked on dev@spark mailing list for explanation
val cities6chars = cities.filter(_.name.length == 6).map(_.name.toUpperCase)
 
cities6chars.explain(true)
 
// or simpler when only concerned with PushedFilters attribute in Parquet
scala> cities6chars.queryExecution.optimizedPlan
res33: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#248]
+- MapElements <function1>, class City, [StructField(id,LongType,false), StructField(name,StringType,true)], obj#247: java.lang.String
   +- Filter <function1>.apply
      +- DeserializeToObject newInstance(class City), obj#246: City
         +- Relation[id#236L,name#237] parquet
 
// no optimization for Dataset[City]?!
// 08/30 Asked on dev@spark mailing list for explanation
val cities6chars = cities.filter(_.name == "Warsaw").map(_.name.toUpperCase)
 
cities6chars.explain(true)
 
// The filter predicate is pushed down fine for Dataset's Column-based query in where operator
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res29: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
   +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
 
// Let's define a UDF to do the filtering
val isWarsaw = udf { (s: String) => s == "Warsaw" }
 
// Use the UDF in where (replacing the Column-based query)
scala> cities.where(isWarsaw('name)).queryExecution.executedPlan
res33: org.apache.spark.sql.execution.SparkPlan =
*Filter UDF(name#129)
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>


'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

Spark Struct Streaming - intro  (0) 2018.11.20
spark D streaming vs Spark Struct Streaming  (0) 2018.11.20
spark tuning 하기  (0) 2018.11.20
transformation and actions  (0) 2018.11.20
spark memory  (0) 2018.11.20

+ Recent posts