spark-shell command 에서는 write  할때 문제가 되지 않는데,

spark-submit 에서 정의된 함수를 사용해서 write 하는 과정에서 not serializable 에러가 나오는 경우가 있다.


예를들면,


가령 같은 scala class 안에 function 을 정의하고 바로 사용 하게되면,


org.apache.spark.SparkException: Task not serializable when write  에러가 나는데, 


def job(tpath : String) : Unit = {

object getgap {
def getgap(s1:String , s2 : String) : Long = {
if(s1.length<8 || s2.length<8) 0
val format = new java.text.SimpleDateFormat("HH:mm:ss.SSS")
val v1 = format.parse(s1)
val v2 = format.parse(s2)
(v2.getTime() - v1.getTime()) / 100
}
}
  import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("yarn").appName("message_summerization").getOrCreate()
import spark.implicits._
//val inputpath = "/user/sangil55/m_input"
val rd = spark.read.json(inputpath)


val df_timegap = df_semi2.map(row=>(row.getString(0), row.getString(1), row.getString(2), getgap(row.getString(1),row.getString(2)) ) )
.withColumnRenamed("_1", "jobid").withColumnRenamed("_2", "starttime").withColumnRenamed("_3", "endtime").withColumnRenamed("_4", "gap")
.sort("starttime")

이 getgap 함수를 object 로 빼주게되면 static class 로 인식하여

잘돌아간다.


spark 환경에서 같은클래스에서 선언된 함수는 따로 클래스화되지 않고, 임시저장되는데 이걸 바로 사용하게되면

mapper 에서 임시저장된 함수를 제대로 분산시키지 못하는듯하다.


이와 비슷한 에러가 자주나오니,

spark-submit 환경에서 function 으로 동작하는 놈들은 웬만하면 따로 뺴주는게 좋다.




rdd- groupbyKey 와 달리 spark dataset 은 groupby 로 리듀스를 할 수 있는데, 


이때 .agg 형식으로 aggregation 을 한다.


다만  .agg 안에 들어가는 function 은 단한 선언으로 안되고, 

udf (user defined aggregationfuncion) 을 만들어서 구현해주어야 하는데, 동작이 뭔가 까다롭다



일단 shell 이나 main 에서는


아래와 같이 실행 해주면되고

val td = rd.select($"jobid".alias("jobid"), $"time".alias("starttime"), $"time".alias("endtime"), $"result".alias("report_result"), $"result".alias("error"))
//val td = rd.select($"jobid".alias("jobid"),$"time".alias("starttime"))

import CustomAgg._
val minagg = new minAggregation()
val maxagg = new maxAggregation()
val erragg = new errorAggregation()

val td2 = td.groupBy("jobid").agg(minagg(td("starttime")), maxagg(td("endtime")) , erragg(td("error"))) 


UDF 는 아래와 같이 만들어준다.

(max 값찾는 agg / min 값찾는 agg , error 찾는 agg 등이다.)

update 에서는 파티션내의 병합이 

merge 에서는 파티션간의 병합이 이루어지는 것 같다.

ackage CustomAgg

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.mutable.ArrayBuffer

object CustomAggregation
{

class maxAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])// initialize array
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
println(input.getString(0))
println(buffer.getSeq[String](0).mkString)
if(buffer.getSeq[String](0).mkString.isEmpty)
buffer.update(0, Seq(input.getString(0)))
else if(buffer.getSeq[String](0).mkString.compareTo(input.getString(0)) > 0)
buffer.update(0, buffer.getSeq[String](0))
else
buffer.update(0, Seq(input.getString(0)))
println(buffer.getSeq[String](0).mkString)
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
if(buffer1.getSeq[String](0).mkString.compareTo(buffer2.getSeq[String](0).mkString) > 0)
buffer1.update(0, buffer1.getSeq[String](0))
else
buffer1.update(0, buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}

class minAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff",StringType)
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, "")// initialize array
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {

if(buffer.getString(0).isEmpty)
buffer.update(0, input.getString(0))
else if(buffer.getString(0).compareTo(input.getString(0)) < 0)
buffer.update(0, buffer.getString(0))
else
buffer.update(0, input.getString(0))

println( "updated :" + buffer.getString(0))
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {

if(buffer1.getString(0).compareTo(buffer2.getString(0))< 0 && !buffer1.getString(0).isEmpty)
buffer1.update(0, buffer1.getString(0))
else
buffer1.update(0, buffer2.getString(0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getString(0))
}


class errorAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, Seq(""))// initialize array
}

def checkerror (str : String) : Boolean = {

if(str.compareTo("Info") == 0) false
if(str.compareTo("0") == 0) false
if(str.compareTo("Success") == 0) false
if(str.compareTo("") == 0) false
true
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
if(checkerror(input.getString(0)))
{
input.getString(0)
buffer.update(0, Seq(input.getString(0)))
}
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
println(buffer1.getSeq[String](0).toString())
println(buffer2.getSeq[String](0).toString())
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}


class GeometricMean extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function.
override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", StringType) :: Nil)

// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
StructField("count", LongType) ::
StructField("product", StringType) :: Nil
)

// This is the output type of your aggregatation function.
override def dataType: DataType = StringType

override def deterministic: Boolean = true

// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
buffer(1) = " "
}

// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[String](0) + 1
buffer(1) = buffer.getAs[String](1) + input.getAs[String](0)
}

// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
}

// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
}
}

} 


https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_spark_tuning.html

spark 에는 여러가지 종류의 transformation / action 등이 있다. 종류에 따라서 동작이다른데, 

-map, filter : narrow transformation 

-coalesce : still narrow 

-groupbyKey ReducebyKey : wide transformation >> Shuffle 이 많이 일어나고, stage 가 생기게 된다.


이처럼 stage 와 shuffle 이 늘어날 수록 incur high disk & network I/O 등이 일어나 expensive 한 작업이 일어나게된다.

또는 numpatition 등에 의해 stage boundary 를 trigger 할수도 있다.


스파크를 튜닝 하는 방법은 여러가지가 있을 수 있다.


  1. chooosing Optimal transformatons
    a. map + reducebyKey   <<< aggregateByKey
    b. join <<< cogroup  (if already grouped by key)
    후자가 더빠르다.
  2. Shuffle 자체를 줄이는 방법
    reduce by key 등의 transformation 은 lazy evaluation 이 일어나는데,  부모 Partition 갯수가 같게 세팅되면, 한번에 일어날 수 있음
    그렇기 떄문에 ReducebyKey( XXX, numpartion = X) 에 들어가는 X 가 중요하다.





  3. 보통은 shuffle 을 줄이는 것의 performence가 늘어난다. 그러나 다음의 경우에는  그렇지 않다.
    -inputfile 이 크고 Partition 이 늘어야 하는 경우 (shuffle 도 늘어남)
    -Reduce 개수가 가 한번에 몰린경우 Tree / Tree Aggregate 등을 사용하면 shuffle 이 늘어나지만, perfomance 가 늘어날 수 있다.

  4. secondary sort 
    -RepartitionAndSortWithinPartitions >> repartition + sort
    전자가 shuffle machinery 를 이용하여 후자가 사용하는 sort 보다 빠르다.   이 sorting 은 join 에서도 쓰인다.

  5. Memory tunning 
    -a. Yarn 을 튜닝하자
     특히 Yarn.nodemanager.resouce.memory-mb  / Yarn.nodemanager.resouce.cpu-vcores 를 이용하여 리소스매니저의 리소스를 확보해야한다.
    -b. Executors-cores 과 executor-memory 를 실행시 옵션으로 셋팅  단, heap size 는 max(excutormemory * 0.07 ,384)
    -c. AM 은  client deploy mode 와 cluster deploy mode 가 있는데, 전자는  1GB 1core 를 client 에서 할당하고
       gnwksms --Driver 옵션을 이용하여 따로 설정에 유의
    -d. number of cores for excutor 는 5이하가 되는 것이 좋다.
    -e.tiny excutor → perf 저하

  6. tuning num of partitions
     보통 Partition 의 개수의 따르나 아래동작은 다르다.
    coalesce → fewer 
    union → sum of its parent partitions number
    cartesian → product of its parents

    특히 Join Cogroup / GrouBuKey 등의 동작은 Memory 사용률이 높고 exceed 가 발생하면, DISK I/O 와 GC 등이 일어나 속도가 감소된다. (파티션별 용량 설정이 중요)
    Repartitions 으로 partition 개수를 늘릴 수 있다
    heuristic 한 방법으로 OPTIMAL Partition # = 전체 코어수 *3 정도 가 좋다는데 정확하지않고, spark 에서도 1.5배씩 늘려가며 찾으라는 말을 한다.

         보통은 아주 많은 경우가 적은경우보다는 좋다고 한다.


7. Reduce data size 이건 당연한 소리

8. 출력 format 을 Sequence 파일로 해라

RDD Operations


아래 엑셀파일은 예제 코드를 돌린 것과 결과 입니다.

transformations_actions_datasetupdated0322.xlsx

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation
Meaning
Transformation
Meaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.





Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action
Meaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacementnum, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

+ Recent posts