spark-shell command 에서는 write 할때 문제가 되지 않는데,
spark-submit 에서 정의된 함수를 사용해서 write 하는 과정에서 not serializable 에러가 나오는 경우가 있다.
예를들면,
가령 같은 scala class 안에 function 을 정의하고 바로 사용 하게되면,
org.apache.spark.SparkException: Task not serializable when write 에러가 나는데,
def job(tpath : String) : Unit = {
object getgap {
def getgap(s1:String , s2 : String) : Long = {
if(s1.length<8 || s2.length<8) 0
val format = new java.text.SimpleDateFormat("HH:mm:ss.SSS")
val v1 = format.parse(s1)
val v2 = format.parse(s2)
(v2.getTime() - v1.getTime()) / 100
}
}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("yarn").appName("message_summerization").getOrCreate()
import spark.implicits._
//val inputpath = "/user/sangil55/m_input"
val rd = spark.read.json(inputpath)
val df_timegap = df_semi2.map(row=>(row.getString(0), row.getString(1), row.getString(2), getgap(row.getString(1),row.getString(2)) ) )
.withColumnRenamed("_1", "jobid").withColumnRenamed("_2", "starttime").withColumnRenamed("_3", "endtime").withColumnRenamed("_4", "gap")
.sort("starttime")
이 getgap 함수를 object 로 빼주게되면 static class 로 인식하여
잘돌아간다.
spark 환경에서 같은클래스에서 선언된 함수는 따로 클래스화되지 않고, 임시저장되는데 이걸 바로 사용하게되면
mapper 에서 임시저장된 함수를 제대로 분산시키지 못하는듯하다.
이와 비슷한 에러가 자주나오니,
spark-submit 환경에서 function 으로 동작하는 놈들은 웬만하면 따로 뺴주는게 좋다.
'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글
Spark Structed Streaming - 전체내용 번역 및 정리 (0) | 2018.08.14 |
---|---|
[Tip] Spark crontab 배치 script 에서 실행 안되는 문제 해결 (0) | 2018.07.17 |
2. spark user defined aggregation function 만들기 (with scala) (0) | 2018.05.10 |
1-2 spark applicaiton tuning 하기 (0) | 2018.04.19 |
1-1. Spark Transformation vs Action example code (0) | 2018.04.19 |