출처 : https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_spark_tuning.html

spark 에는 여러가지 종류의 transformation / action 등이 있다. 종류에 따라서 동작이다른데, 

-map, filter : narrow transformation 

-coalesce : still narrow 

-groupbyKey ReducebyKey : wide transformation >> Shuffle 이 많이 일어나고, stage 가 생기게 된다.


이처럼 stage 와 shuffle 이 늘어날 수록 incur high disk & network I/O 등이 일어나 expensive 한 작업이 일어나게된다.

또는 numpatition 등에 의해 stage boundary 를 trigger 할수도 있다.


스파크를 튜닝 하는 방법은 여러가지가 있을 수 있다.


  1. chooosing Optimal transformatons
    a. map + reducebyKey   <<< aggregateByKey
    b. join <<< cogroup  (if already grouped by key)
    후자가 더빠르다.
  2. Shuffle 자체를 줄이는 방법
    reduce by key 등의 transformation 은 lazy evaluation 이 일어나는데,  부모 Partition 갯수가 같게 세팅되면, 한번에 일어날 수 있음
    그렇기 떄문에 ReducebyKey( XXX, numpartion = X) 에 들어가는 X 가 중요하다.





  3. 보통은 shuffle 을 줄이는 것의 performence가 늘어난다. 그러나 다음의 경우에는  그렇지 않다.
    -inputfile 이 크고 Partition 이 늘어야 하는 경우 (shuffle 도 늘어남)
    -Reduce 개수가 가 한번에 몰린경우 Tree / Tree Aggregate 등을 사용하면 shuffle 이 늘어나지만, perfomance 가 늘어날 수 있다.

  4. secondary sort 
    -RepartitionAndSortWithinPartitions >> repartition + sort
    전자가 shuffle machinery 를 이용하여 후자가 사용하는 sort 보다 빠르다.   이 sorting 은 join 에서도 쓰인다.

  5. Memory tunning 
    -a. Yarn 을 튜닝하자
     특히 Yarn.nodemanager.resouce.memory-mb  / Yarn.nodemanager.resouce.cpu-vcores 를 이용하여 리소스매니저의 리소스를 확보해야한다.
    -b. Executors-cores 과 executor-memory 를 실행시 옵션으로 셋팅  단, heap size 는 max(excutormemory * 0.07 ,384)
    -c. AM 은  client deploy mode 와 cluster deploy mode 가 있는데, 전자는  1GB 1core 를 client 에서 할당하고
       gnwksms --Driver 옵션을 이용하여 따로 설정에 유의
    -d. number of cores for excutor 는 5이하가 되는 것이 좋다.
    -e.tiny excutor → perf 저하

  6. tuning num of partitions
     보통 Partition 의 개수의 따르나 아래동작은 다르다.
    coalesce → fewer 
    union → sum of its parent partitions number
    cartesian → product of its parents

    특히 Join Cogroup / GrouBuKey 등의 동작은 Memory 사용률이 높고 exceed 가 발생하면, DISK I/O 와 GC 등이 일어나 속도가 감소된다. (파티션별 용량 설정이 중요)
    Repartitions 으로 partition 개수를 늘릴 수 있다
    heuristic 한 방법으로 OPTIMAL Partition # = 전체 코어수 *3 정도 가 좋다는데 정확하지않고, spark 에서도 1.5배씩 늘려가며 찾으라는 말을 한다.

         보통은 아주 많은 경우가 적은경우보다는 좋다고 한다.


7. Reduce data size 이건 당연한 소리

8. 출력 format 을 Sequence 파일로 해라

'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

spark D streaming vs Spark Struct Streaming  (0) 2018.11.20
spark udf  (0) 2018.11.20
transformation and actions  (0) 2018.11.20
spark memory  (0) 2018.11.20
join with spark  (0) 2018.11.20

RDD Operations

transformations_actions.xlsx

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation
Meaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.





Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action
Meaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacementnum, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

첨부 파일(transformations_actions_datasetupdated0322.xlsx)

다운로드 해서 보시면 좀 더 편하게 볼 수 있습니다.

Edit Document
dataset apiimport org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate() import spark.implicits._ val sc = spark.sparkContext
return typemethoddescriptionexampleresultRDD 영향도
Dataset<Row>agg(Column expr, Column... exprs)Aggregates on the entire Dataset without groups. Col 이 list 로 만들어질경우 합쳐주는 연산?import org.apache.spark.sql.functions._ val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), 1,2,3L),(9,9,9L)).toDS() ds.groupBy("_1").agg(collect_list("_2") as "list").show(false)+---+---------+ |_1 |list | +---+---------+ |1 |[1, 2, 3]| |2 |[1] | +---+---------+
Dataset<Row>agg(Column expr, scala.collection.Seq<Column> exprs)Aggregates on the entire Dataset without groups.ds.groupBy("_1").agg(count("*"), sum("_2").alias("summing"), sum("_3")).show(false)+---+--------+-------+-------+ |_1 |count(1)|sum(_2)|sum(_3)| +---+--------+-------+-------+ |1 |3 |6 |9 | |2 |1 |1 |5 | +---+--------+-------+-------+
Dataset<Row>agg(scala.collection.immutable.Map<String,String> exprs)(Scala-specific) Aggregates on the entire Dataset without groups.
Dataset<Row>agg(scala.Tuple2<String,String> aggExpr, scala.collection.Seq<scala.Tuple2<String,String>> aggExprs)(Scala-specific) Aggregates on the entire Dataset without groups.
Dataset<T>alias(String alias)Returns a new Dataset with an alias set.ds.groupBy("_1").agg(count("*"), sum("_2").alias("summing"), sum("_3")).show(false)'+---+--------+-------+-------+ |_1 |count(1)|summing|sum(_3)| +---+--------+-------+-------+ |1 |3 |6 |9 | |2 |1 |1 |5 | +---+--------+-------+-------+
Dataset<T>alias(scala.Symbol alias)
(Scala-specific) Returns a new Dataset with an alias set.
Columnapply(String colName)Selects column based on the column name and return it as a Column.ds.apply("_1").toString()
Dataset<T>as(String alias)Returns a new Dataset with an alias set.
Dataset<T>as(scala.Symbol alias)(Scala-specific) Returns a new Dataset with an alias set.
Dataset<T>cache()Persist this Dataset with the default storage level (MEMORY_AND_DISK).ds.cache()
Dataset<T>checkpoint()Eagerly checkpoint a Dataset and return the new Dataset.
Dataset<T>checkpoint(boolean eager)Returns a checkpointed version of this Dataset.
scala.reflect.ClassTag<T>classTag() 
Dataset<T>coalesce(int numPartitions)Returns a new Dataset that has exactly numPartitions partitions.
Columncol(String colName)Selects column based on the column name and return it as a Column.
Objectcollect()Returns an array that contains all of Rows in this Dataset.ds.select("_1").collect()
java.util.List<T>collectAsList()Returns a Java list that contains all of Rows in this Dataset.
String[]columns()Returns all column names as an array.
longcount()Returns the number of rows in the Dataset.
voidcreateGlobalTempView(String viewName)Creates a global temporary view using the given name.Global Temporary View는 모든 세션에서 공유 가능하며, Spark 어플리케이션이 종료되기 전까지 살아있게 됩니다. 제 경우 Master 노드의 하드디스크에 저장
voidcreateOrReplaceTempView(String viewName)Creates a local temporary view using the given name.
voidcreateTempView(String viewName)Creates a local temporary view using the given name.
Dataset<Row>crossJoin(Dataset<?> right)Explicit cartesian join with another DataFrame.
RelationalGroupedDatasetcube(Column... cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetcube(scala.collection.Seq<Column> cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.cube동작
RelationalGroupedDatasetcube(String col1, scala.collection.Seq<String> cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetcube(String col1, String... cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
Dataset<Row>describe(scala.collection.Seq<String> cols)Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.ds.discribe.sho간단한평균
Dataset<Row>describe(String... cols)Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.
Dataset<T>distinct()Returns a new Dataset that contains only the unique rows from this Dataset.중복제거 (완전히 일치하는 row 끼리만)
Dataset<Row>drop(Column col)Returns a new Dataset with a column dropped.ds.drop("_2").show+---+---+ | _1| _3| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 5| +---+---+
Dataset<Row>drop(scala.collection.Seq<String> colNames)Returns a new Dataset with columns dropped.
Dataset<Row>drop(String... colNames)Returns a new Dataset with columns dropped.
Dataset<Row>drop(String colName)Returns a new Dataset with a column dropped.
Dataset<T>dropDuplicates()Returns a new Dataset that contains only the unique rows from this Dataset.
Dataset<T>dropDuplicates(scala.collection.Seq<String> colNames)(Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the subset of columns.ds.dropDuplicates.show+---+---+ | _1| _3| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 5| +---+---+
Dataset<T>dropDuplicates(String[] colNames)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.ds.dropDuplicates("_1").show+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 2| 1| 5| +---+---+---+
Dataset<T>dropDuplicates(String col1, scala.collection.Seq<String> cols)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
Dataset<T>dropDuplicates(String col1, String... cols)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
scala.Tuple2<String,String>[]dtypes()Returns all column names and their data types as an array.ds.dtypesArray[(String, String)] = Array((_1,IntegerType), (_2,IntegerType), (_3,LongType))
Dataset<T>except(Dataset<T> other)Returns a new Dataset containing rows in this Dataset but not in another Dataset.
voidexplain()Prints the physical plan to the console for debugging purposes.
voidexplain(boolean extended)Prints the plans (logical and physical) to the console for debugging purposes.
Dataset<T>filter(Column condition)Filters rows using the given condition.
Tfirst()Returns the first row.
voidforeach(ForeachFunction<T> func)(Java-specific) Runs func on each element of this Dataset.
voidforeach(scala.Function1<T,scala.runtime.BoxedUnit> f)Applies a function f to all rows.
voidforeachPartition(ForeachPartitionFunction<T> func)(Java-specific) Runs func on each partition of this Dataset.
voidforeachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)Applies a function f to each partition of this Dataset.
RelationalGroupedDatasetgroupBy(Column... cols)Groups the Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetgroupBy(scala.collection.Seq<Column> cols)Groups the Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetgroupBy(String col1, scala.collection.Seq<String> cols)Groups the Dataset using the specified columns, so that we can run aggregation on them.
RelationalGroupedDatasetgroupBy(String col1, String... cols)Groups the Dataset using the specified columns, so that we can run aggregation on them.
Thead()Returns the first row.
Objecthead(int n)Returns the first n rows.
String[]inputFiles()Returns a best-effort snapshot of the files that compose this Dataset.in.inputFiles.foreach(println)file:///D:/scala/test2/input.txt인풋파일 브리핑
Dataset<T>intersect(Dataset<T> other)Returns a new Dataset containing rows only in both this Dataset and another Dataset.
booleanisLocal()Returns true if the collect and take methods can be run locally (without any Spark executors).ds.isLocal ds.cube(("_1"),("_2")).sum().isLocaltrue falselocal 로 도는지 아닌지 확인가능
booleanisStreaming()Returns true if this Dataset contains one or more sources that continuously return data as it arrives.스트리밍형식용도 잘 모르겠음
JavaRDD<T>javaRDD()Returns the content of the Dataset as a JavaRDD of Ts.
Dataset<Row>join(Dataset<?> right)Join with another DataFrame.
Dataset<Row>join(Dataset<?> right, Column joinExprs)Inner join with another DataFrame, using the given join expression.val ds = Seq((1, 1, "A"), (1, 2, "B"), (1, 3, "C"), (2, 1, "D")).toDS() val ds2 = Seq(("A",1), ("B",), (1, 3, 4L), (2, 1, 5L)).toDS()
Dataset<Row>join(Dataset<?> right, Column joinExprs, String joinType)Join with another DataFrame, using the given join expression.//inner employees .join(departments, "DepartmentID") .show() //outer employees .join(departments, Seq("DepartmentID"), "left_outer") .show() //inner, outer, left_outer, right_outer, leftsemi //carsetian employees .join(departments) .show(10) //conditional (nonequal) orders .join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate") .show()
Dataset<Row>join(Dataset<?> right, scala.collection.Seq<String> usingColumns)Inner equi-join with another DataFrame using the given columns.
Dataset<Row>join(Dataset<?> right, scala.collection.Seq<String> usingColumns, String joinType)Equi-join with another DataFrame using the given columns.
Dataset<Row>join(Dataset<?> right, String usingColumn)Inner equi-join with another DataFrame using the given column.
Dataset<T>limit(int n)Returns a new Dataset by taking the first n rows.
DataFrameNaFunctionsna()Returns a DataFrameNaFunctions for working with missing data.df.show() // +-----+----+ // | name| age| // +-----+----+ // |alice| 35| // | bob|null| // | | 24| // +-----+----+ scala> df.na.fill(10.0,Seq("age")) // res4: org.apache.spark.sql.DataFrame = [name: string, age: int] // scala> df.na.fill(10.0,Seq("age")).show // +-----+---+ // | name|age| // +-----+---+ // |alice| 35| // | bob| 10| // | | 24| // +-----+---+ scala> df.na.replace("age", Map(35 -> 61,24 -> 12))).show() // +-----+----+ // | name| age| // +-----+----+ // |alice| 61| // | bob|null| // | | 12| // +-----+----+update 및 replace 가능? 써도되나?
static Dataset<Row>ofRows(SparkSession sparkSession, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan) 
Dataset<T>orderBy(Column... sortExprs)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(scala.collection.Seq<Column> sortExprs)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(String sortCol, String... sortCols)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>persist()Persist this Dataset with the default storage level (MEMORY_AND_DISK).
Dataset<T>persist(StorageLevel newLevel)Persist this Dataset with the given storage level.
voidprintSchema()Prints the schema to the console in a nice tree format.ds.printSchema()root |-- _1: integer (nullable = false) |-- _2: integer (nullable = false) |-- _3: long (nullable = false)
org.apache.spark.sql.execution.QueryExecutionqueryExecution() ds.queryexecution== Parsed Logical Plan == LocalRelation [_1#3, _2#4, _3#5L] == Analyzed Logical Plan == _1: int, _2: int, _3: bigint LocalRelation [_1#3, _2#4, _3#5L] == Optimized Logical Plan == LocalRelation [_1#3, _2#4, _3#5L] == Physical Plan == LocalTableScan [_1#3, _2#4, _3#5L]용도 잘 모르겠음
Dataset<T>[]randomSplit(double[] weights)Randomly splits this Dataset with the provided weights.val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), (1,2,3L),(9,9,9L)).toDS() ds.randomSplit(Array(0.3,0.2,0.5),5).foreach(t=>t.show)+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 5| 5| 5| | 9| 9| 9| +---+---+---+ +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 3| 4| | 2| 1| 5| +---+---+---+ +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 2| 3| | 1| 2| 3| | 2| 2| 5| | 4| 4| 5| +---+---+---+
Dataset<T>[]randomSplit(double[] weights, long seed)Randomly splits this Dataset with the provided weights.
java.util.List<Dataset<T>>randomSplitAsList(double[] weights, long seed)Returns a Java list that contains randomly split Dataset with the provided weights.
RDD<T>rdd()Represents the content of the Dataset as an RDD of T.
Dataset<T>repartition(Column... partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions.in rdd 설명
Dataset<T>repartition(int numPartitions)Returns a new Dataset that has exactly numPartitions partitions.in rdd 설명
Dataset<T>repartition(int numPartitions, Column... partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions into numPartitions.
Dataset<T>repartition(int numPartitions, scala.collection.Seq<Column> partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions into numPartitions.
Dataset<T>repartition(scala.collection.Seq<Column> partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions.
RelationalGroupedDatasetrollup(Column... cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), (1,2,3L),(9,9,9L)).toDS() ds.show ds.rollup("_1").count.show ds.rollup("_1").agg("_2","_3").show ds.rollup("_1").agg(sum("_2")).show+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 1| 2| 3| | 1| 3| 4| | 2| 1| 5| | 2| 2| 5| | 4| 4| 5| | 5| 5| 5| | 1| 2| 3| | 9| 9| 9| +---+---+---+ +----+-----+ | _1|count| +----+-----+ | 1| 4| | 4| 1| |null| 9| | 2| 2| | 5| 1| | 9| 1| +----+-----+ +----+-------+ | _1|sum(_2)| +----+-------+ | 1| 8| | 4| 4| |null| 29| | 2| 3| | 5| 5| | 9| 9| +----+-------+
RelationalGroupedDatasetrollup(scala.collection.Seq<Column> cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetrollup(String col1, scala.collection.Seq<String> cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetrollup(String col1, String... cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
Dataset<T>sample(boolean withReplacement, double fraction)Returns a new Dataset by sampling a fraction of rows, using a random seed.
Dataset<T>sample(boolean withReplacement, double fraction, long seed)Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.
StructTypeschema()Returns the schema of this Dataset.
Dataset<Row>select(Column... cols)Selects a set of column based expressions.
Dataset<Row>select(scala.collection.Seq<Column> cols)Selects a set of column based expressions.
Dataset<Row>select(String col, scala.collection.Seq<String> cols)Selects a set of columns.
Dataset<Row>select(String col, String... cols)Selects a set of columns.
Dataset<Row>selectExpr(scala.collection.Seq<String> exprs)Selects a set of SQL expressions.
Dataset<Row>selectExpr(String... exprs)Selects a set of SQL expressions.
voidshow()Displays the top 20 rows of Dataset in a tabular form.
voidshow(boolean truncate)Displays the top 20 rows of Dataset in a tabular form.
voidshow(int numRows)Displays the Dataset in a tabular form.
voidshow(int numRows, boolean truncate)Displays the Dataset in a tabular form.
voidshow(int numRows, int truncate)Displays the Dataset in a tabular form.
Dataset<T>sort(Column... sortExprs)Returns a new Dataset sorted by the given expressions.ds.sort(($"_1".desc)).show+---+---+---+ | _1| _2| _3| +---+---+---+ | 9| 9| 9| | 5| 5| 5| | 4| 4| 5| | 2| 1| 5| | 2| 2| 5| | 1| 1| 2| | 1| 3| 4| | 1| 2| 3| | 1| 2| 3| +---+---+---+
Dataset<T>sort(scala.collection.Seq<Column> sortExprs)Returns a new Dataset sorted by the given expressions.
Dataset<T>sort(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<T>sort(String sortCol, String... sortCols)Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<T>sortWithinPartitions(Column... sortExprs)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(scala.collection.Seq<Column> sortExprs)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(String sortCol, String... sortCols)Returns a new Dataset with each partition sorted by the given expressions.
SparkSessionsparkSession() 
SQLContextsqlContext() 
DataFrameStatFunctionsstat()Returns a DataFrameStatFunctions for working statistic functions support.??뭐가있을까
StorageLevelstorageLevel()Get the Dataset's current storage level, or StorageLevel.NONE if not persisted.
Objecttake(int n)Returns the first n rows in the Dataset.
java.util.List<T>takeAsList(int n)Returns the first n rows in the Dataset as a list.
Dataset<Row>toDF()Converts this strongly typed collection of data to generic Dataframe.
Dataset<Row>toDF(scala.collection.Seq<String> colNames)Converts this strongly typed collection of data to generic DataFrame with columns renamed.
Dataset<Row>toDF(String... colNames)Converts this strongly typed collection of data to generic DataFrame with columns renamed.
JavaRDD<T>toJavaRDD()Returns the content of the Dataset as a JavaRDD of Ts.
Dataset<String>toJSON()Returns the content of the Dataset as a Dataset of JSON strings.
java.util.Iterator<T>toLocalIterator()Return an iterator that contains all of Rows in this Dataset.
StringtoString() 
<U> Dataset<U>transform(scala.Function1<Dataset<T>,Dataset<U>> t)Concise syntax for chaining custom transformations.
Dataset<T>union(Dataset<T> other)Returns a new Dataset containing union of rows in this Dataset and another Dataset.
Dataset<T>unpersist()Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
Dataset<T>unpersist(boolean blocking)Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
Dataset<T>where(Column condition)Filters rows using the given condition.
Dataset<T>where(String conditionExpr)Filters rows using the given SQL expression.
Dataset<Row>withColumn(String colName, Column col)Returns a new Dataset by adding a column or replacing the existing column that has the same name.
Dataset<Row>withColumnRenamed(String existingName, String newName)Returns a new Dataset with a column renamed.
DataFrameWriter<T>write()Interface for saving the content of the non-streaming Dataset out into external storage.
DataStreamWriter<T>ExperimentalwriteStream()Interface for saving the content of the streaming Dataset out into external storage.


'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

spark udf  (0) 2018.11.20
spark tuning 하기  (0) 2018.11.20
spark memory  (0) 2018.11.20
join with spark  (0) 2018.11.20
spark vs hadoop page rank 실행시간 비교  (0) 2018.11.20






  • spark.executor.memory는 실행기에 사용할 수 있는 총 메모리 크기를 정의합니다.
  • spark.storage.memoryFraction(기본 ~60%)은 지속된 RDD를 저장하는 데 사용할 수 있는 메모리 양을 정의합니다.
  • spark.shuffle.memoryFraction(기본 ~20%)은 무작위 재생용으로 예약된 메모리 양을 정의합니다.
  • spark.storage.unrollFraction 및 spark.storage.safetyFraction(총 메모리의 ~30%) - 이러한 값은 Spark에서 내부적으로 사용되므로 변경하지 않아야 합니다



LRU 란?


Least recently used (LRU)[edit]


Discards the least recently used items first. This algorithm requires keeping track of what was used when, which is expensive if one wants to make sure the algorithm always discards the least recently used item. General implementations of this technique require keeping "age bits" for cache-lines and track the "Least Recently Used" cache-line based on age-bits. In such an implementation, every time a cache-line is used, the age of all other cache-lines changes. LRU is actually a family of caching algorithms with members including 2Q by Theodore Johnson and Dennis Shasha,[3] and LRU/K by Pat O'Neil, Betty O'Neil and Gerhard Weikum.[4]


The access sequence for the below example is A B C D E D F.


In the above example once A B C D gets installed in the blocks with sequence numbers (Increment 1 for each new Access) and when E is accessed, it is a miss and it needs to be installed in one of the blocks. According to the LRU Algorithm, since A has the lowest Rank(A(0)), E will replace A.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala,JavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is


Storage Level
Meaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER 
(Java and Scala)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER 
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.


  • Cache 와 Persist 의 차이 : Cache 는 Default 값(MEMORY_AND_DISK)이 정해져 있는 반면 Persist 는 위와 같은 Storage Level 을 설정할 수 있음
  • Dataset 과 RDD 의 Default Storage Level 이 다름 : RDD = MEMORY_ONLY, Dataset = MEMORY_AND_DISK

※ local 공간이 충분치 않으면, out of space 가 발생할 수 있음


Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.


+ Recent posts