spark-shell command 에서는 write  할때 문제가 되지 않는데,

spark-submit 에서 정의된 함수를 사용해서 write 하는 과정에서 not serializable 에러가 나오는 경우가 있다.


예를들면,


가령 같은 scala class 안에 function 을 정의하고 바로 사용 하게되면,


org.apache.spark.SparkException: Task not serializable when write  에러가 나는데, 


def job(tpath : String) : Unit = {

object getgap {
def getgap(s1:String , s2 : String) : Long = {
if(s1.length<8 || s2.length<8) 0
val format = new java.text.SimpleDateFormat("HH:mm:ss.SSS")
val v1 = format.parse(s1)
val v2 = format.parse(s2)
(v2.getTime() - v1.getTime()) / 100
}
}
  import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("yarn").appName("message_summerization").getOrCreate()
import spark.implicits._
//val inputpath = "/user/sangil55/m_input"
val rd = spark.read.json(inputpath)


val df_timegap = df_semi2.map(row=>(row.getString(0), row.getString(1), row.getString(2), getgap(row.getString(1),row.getString(2)) ) )
.withColumnRenamed("_1", "jobid").withColumnRenamed("_2", "starttime").withColumnRenamed("_3", "endtime").withColumnRenamed("_4", "gap")
.sort("starttime")

이 getgap 함수를 object 로 빼주게되면 static class 로 인식하여

잘돌아간다.


spark 환경에서 같은클래스에서 선언된 함수는 따로 클래스화되지 않고, 임시저장되는데 이걸 바로 사용하게되면

mapper 에서 임시저장된 함수를 제대로 분산시키지 못하는듯하다.


이와 비슷한 에러가 자주나오니,

spark-submit 환경에서 function 으로 동작하는 놈들은 웬만하면 따로 뺴주는게 좋다.




+ Recent posts