sangil55 2018. 11. 20. 19:45

Spark sql에서 사용자 정의 함수가 필요한 경우 `udf`를 이용하면 직접 함수를 정의하여 사용할 수 있다.

인자로 함수를 전달 받고 결과로 `UserDefinedFunction`을 돌려주며, 일반 컬럼 처럼 `select` 구문에서 사용이 가능


functions.scala 원본 펼치기


udf 사용 방법

UDF 등록
import org.apache.spark.sql.functions.udf
val ctof = (degreesCelcius: Double) => {(degreesCelcius * 9.0 5.0) + 32.0}
val ctofUdf = udf(ctof)

udf 함수에 작성한 함수를 인자로 전달하여 정의한다.

Spark SQL에서 UDF 사용
df.select('city, ctofUdf('avgLow) as "avgLowF", ctofUdf('avgHigh) as "avgHighF").show

spark session 이용

UDF 등록(spark session 이용)
val ctof = (degreesCelcius: Double) => {(degreesCelcius * 9.0 5.0) + 32.0}
spark.udf.register("CTOF", ctof)
//sqlContext.udf.register("CTOF", ctof)

spark 세션의 udf()메서드를 이용하는 경우 spark가 제공하는 sql 메서드를 이용해 문자열로 작성한 쿼리에서 새로 정의함 함수를 사용 할 수 있다.

Spark SQL에서 UDF 사용
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show

실행 결과

temperature.json
{"city":"St. John's","avgHigh":8.7,"avgLow":0.6}
{"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9}
{"city":"Halifax","avgHigh":11.0,"avgLow":1.6}
{"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5}
{"city":"Quebec","avgHigh":9.0,"avgLow":-1.0}
{"city":"Montreal","avgHigh":11.1,"avgLow":1.4}
SQL 실행결과
+-------------+------------------+------------------+
|         city|           avgLowF|          avgHighF|
+-------------+------------------+------------------+
|   St. John's|             33.08|             47.66|
|Charlottetown|             33.62|             49.46|
|      Halifax|             34.88|              51.8|
|  Fredericton|              31.1|             52.16|
|       Quebec|              30.2|              48.2|
|     Montreal|             34.52|             51.98|
|       Ottawa|             33.98|51.620000000000005|
|      Toronto|              36.5|              54.5|
|     Winnipeg|             26.42|             46.94|
|       Regina|25.880000000000003|48.379999999999995|
|     Edmonton|             25.16|              47.3|
|      Calgary|             27.68|              50.9|
|    Vancouver|              43.7|             56.66|
|     Victoria|             41.54|57.379999999999995|
|   Whitehorse|             21.38|              40.1|
|  Yellowknife|              15.8|             31.64|
+-------------+------------------+------------------+

참고

UDF는 블랙박스로 동작하기 때문에 주의해서 사용해야 한다.(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs-blackbox.html)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// no optimization as it is a more involved Scala function in filter
// 08/30 Asked on dev@spark mailing list for explanation
val cities6chars = cities.filter(_.name.length == 6).map(_.name.toUpperCase)
 
cities6chars.explain(true)
 
// or simpler when only concerned with PushedFilters attribute in Parquet
scala> cities6chars.queryExecution.optimizedPlan
res33: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#248]
+- MapElements <function1>, class City, [StructField(id,LongType,false), StructField(name,StringType,true)], obj#247: java.lang.String
   +- Filter <function1>.apply
      +- DeserializeToObject newInstance(class City), obj#246: City
         +- Relation[id#236L,name#237] parquet
 
// no optimization for Dataset[City]?!
// 08/30 Asked on dev@spark mailing list for explanation
val cities6chars = cities.filter(_.name == "Warsaw").map(_.name.toUpperCase)
 
cities6chars.explain(true)
 
// The filter predicate is pushed down fine for Dataset's Column-based query in where operator
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res29: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
   +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>
 
// Let's define a UDF to do the filtering
val isWarsaw = udf { (s: String) => s == "Warsaw" }
 
// Use the UDF in where (replacing the Column-based query)
scala> cities.where(isWarsaw('name)).queryExecution.executedPlan
res33: org.apache.spark.sql.execution.SparkPlan =
*Filter UDF(name#129)
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>