spark,kafka,hadoop ecosystems/apache spark
* [tip] spark 에서 Write 에서 Task not serializable when write 에러 해결법
sangil55
2018. 6. 7. 15:46
spark-shell command 에서는 write 할때 문제가 되지 않는데,
spark-submit 에서 정의된 함수를 사용해서 write 하는 과정에서 not serializable 에러가 나오는 경우가 있다.
예를들면,
가령 같은 scala class 안에 function 을 정의하고 바로 사용 하게되면,
org.apache.spark.SparkException: Task not serializable when write 에러가 나는데,
def job(tpath : String) : Unit = {
object getgap {
def getgap(s1:String , s2 : String) : Long = {
if(s1.length<8 || s2.length<8) 0
val format = new java.text.SimpleDateFormat("HH:mm:ss.SSS")
val v1 = format.parse(s1)
val v2 = format.parse(s2)
(v2.getTime() - v1.getTime()) / 100
}
}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("yarn").appName("message_summerization").getOrCreate()
import spark.implicits._
//val inputpath = "/user/sangil55/m_input"
val rd = spark.read.json(inputpath)
val df_timegap = df_semi2.map(row=>(row.getString(0), row.getString(1), row.getString(2), getgap(row.getString(1),row.getString(2)) ) )
.withColumnRenamed("_1", "jobid").withColumnRenamed("_2", "starttime").withColumnRenamed("_3", "endtime").withColumnRenamed("_4", "gap")
.sort("starttime")
이 getgap 함수를 object 로 빼주게되면 static class 로 인식하여
잘돌아간다.
spark 환경에서 같은클래스에서 선언된 함수는 따로 클래스화되지 않고, 임시저장되는데 이걸 바로 사용하게되면
mapper 에서 임시저장된 함수를 제대로 분산시키지 못하는듯하다.
이와 비슷한 에러가 자주나오니,
spark-submit 환경에서 function 으로 동작하는 놈들은 웬만하면 따로 뺴주는게 좋다.