RDD Operations

transformations_actions.xlsx

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation
Meaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.





Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action
Meaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacementnum, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

첨부 파일(transformations_actions_datasetupdated0322.xlsx)

다운로드 해서 보시면 좀 더 편하게 볼 수 있습니다.

Edit Document
dataset apiimport org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate() import spark.implicits._ val sc = spark.sparkContext
return typemethoddescriptionexampleresultRDD 영향도
Dataset<Row>agg(Column expr, Column... exprs)Aggregates on the entire Dataset without groups. Col 이 list 로 만들어질경우 합쳐주는 연산?import org.apache.spark.sql.functions._ val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), 1,2,3L),(9,9,9L)).toDS() ds.groupBy("_1").agg(collect_list("_2") as "list").show(false)+---+---------+ |_1 |list | +---+---------+ |1 |[1, 2, 3]| |2 |[1] | +---+---------+
Dataset<Row>agg(Column expr, scala.collection.Seq<Column> exprs)Aggregates on the entire Dataset without groups.ds.groupBy("_1").agg(count("*"), sum("_2").alias("summing"), sum("_3")).show(false)+---+--------+-------+-------+ |_1 |count(1)|sum(_2)|sum(_3)| +---+--------+-------+-------+ |1 |3 |6 |9 | |2 |1 |1 |5 | +---+--------+-------+-------+
Dataset<Row>agg(scala.collection.immutable.Map<String,String> exprs)(Scala-specific) Aggregates on the entire Dataset without groups.
Dataset<Row>agg(scala.Tuple2<String,String> aggExpr, scala.collection.Seq<scala.Tuple2<String,String>> aggExprs)(Scala-specific) Aggregates on the entire Dataset without groups.
Dataset<T>alias(String alias)Returns a new Dataset with an alias set.ds.groupBy("_1").agg(count("*"), sum("_2").alias("summing"), sum("_3")).show(false)'+---+--------+-------+-------+ |_1 |count(1)|summing|sum(_3)| +---+--------+-------+-------+ |1 |3 |6 |9 | |2 |1 |1 |5 | +---+--------+-------+-------+
Dataset<T>alias(scala.Symbol alias)
(Scala-specific) Returns a new Dataset with an alias set.
Columnapply(String colName)Selects column based on the column name and return it as a Column.ds.apply("_1").toString()
Dataset<T>as(String alias)Returns a new Dataset with an alias set.
Dataset<T>as(scala.Symbol alias)(Scala-specific) Returns a new Dataset with an alias set.
Dataset<T>cache()Persist this Dataset with the default storage level (MEMORY_AND_DISK).ds.cache()
Dataset<T>checkpoint()Eagerly checkpoint a Dataset and return the new Dataset.
Dataset<T>checkpoint(boolean eager)Returns a checkpointed version of this Dataset.
scala.reflect.ClassTag<T>classTag() 
Dataset<T>coalesce(int numPartitions)Returns a new Dataset that has exactly numPartitions partitions.
Columncol(String colName)Selects column based on the column name and return it as a Column.
Objectcollect()Returns an array that contains all of Rows in this Dataset.ds.select("_1").collect()
java.util.List<T>collectAsList()Returns a Java list that contains all of Rows in this Dataset.
String[]columns()Returns all column names as an array.
longcount()Returns the number of rows in the Dataset.
voidcreateGlobalTempView(String viewName)Creates a global temporary view using the given name.Global Temporary View는 모든 세션에서 공유 가능하며, Spark 어플리케이션이 종료되기 전까지 살아있게 됩니다. 제 경우 Master 노드의 하드디스크에 저장
voidcreateOrReplaceTempView(String viewName)Creates a local temporary view using the given name.
voidcreateTempView(String viewName)Creates a local temporary view using the given name.
Dataset<Row>crossJoin(Dataset<?> right)Explicit cartesian join with another DataFrame.
RelationalGroupedDatasetcube(Column... cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetcube(scala.collection.Seq<Column> cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.cube동작
RelationalGroupedDatasetcube(String col1, scala.collection.Seq<String> cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetcube(String col1, String... cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
Dataset<Row>describe(scala.collection.Seq<String> cols)Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.ds.discribe.sho간단한평균
Dataset<Row>describe(String... cols)Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.
Dataset<T>distinct()Returns a new Dataset that contains only the unique rows from this Dataset.중복제거 (완전히 일치하는 row 끼리만)
Dataset<Row>drop(Column col)Returns a new Dataset with a column dropped.ds.drop("_2").show+---+---+ | _1| _3| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 5| +---+---+
Dataset<Row>drop(scala.collection.Seq<String> colNames)Returns a new Dataset with columns dropped.
Dataset<Row>drop(String... colNames)Returns a new Dataset with columns dropped.
Dataset<Row>drop(String colName)Returns a new Dataset with a column dropped.
Dataset<T>dropDuplicates()Returns a new Dataset that contains only the unique rows from this Dataset.
Dataset<T>dropDuplicates(scala.collection.Seq<String> colNames)(Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the subset of columns.ds.dropDuplicates.show+---+---+ | _1| _3| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 5| +---+---+
Dataset<T>dropDuplicates(String[] colNames)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.ds.dropDuplicates("_1").show+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 2| 1| 5| +---+---+---+
Dataset<T>dropDuplicates(String col1, scala.collection.Seq<String> cols)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
Dataset<T>dropDuplicates(String col1, String... cols)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
scala.Tuple2<String,String>[]dtypes()Returns all column names and their data types as an array.ds.dtypesArray[(String, String)] = Array((_1,IntegerType), (_2,IntegerType), (_3,LongType))
Dataset<T>except(Dataset<T> other)Returns a new Dataset containing rows in this Dataset but not in another Dataset.
voidexplain()Prints the physical plan to the console for debugging purposes.
voidexplain(boolean extended)Prints the plans (logical and physical) to the console for debugging purposes.
Dataset<T>filter(Column condition)Filters rows using the given condition.
Tfirst()Returns the first row.
voidforeach(ForeachFunction<T> func)(Java-specific) Runs func on each element of this Dataset.
voidforeach(scala.Function1<T,scala.runtime.BoxedUnit> f)Applies a function f to all rows.
voidforeachPartition(ForeachPartitionFunction<T> func)(Java-specific) Runs func on each partition of this Dataset.
voidforeachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)Applies a function f to each partition of this Dataset.
RelationalGroupedDatasetgroupBy(Column... cols)Groups the Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetgroupBy(scala.collection.Seq<Column> cols)Groups the Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetgroupBy(String col1, scala.collection.Seq<String> cols)Groups the Dataset using the specified columns, so that we can run aggregation on them.
RelationalGroupedDatasetgroupBy(String col1, String... cols)Groups the Dataset using the specified columns, so that we can run aggregation on them.
Thead()Returns the first row.
Objecthead(int n)Returns the first n rows.
String[]inputFiles()Returns a best-effort snapshot of the files that compose this Dataset.in.inputFiles.foreach(println)file:///D:/scala/test2/input.txt인풋파일 브리핑
Dataset<T>intersect(Dataset<T> other)Returns a new Dataset containing rows only in both this Dataset and another Dataset.
booleanisLocal()Returns true if the collect and take methods can be run locally (without any Spark executors).ds.isLocal ds.cube(("_1"),("_2")).sum().isLocaltrue falselocal 로 도는지 아닌지 확인가능
booleanisStreaming()Returns true if this Dataset contains one or more sources that continuously return data as it arrives.스트리밍형식용도 잘 모르겠음
JavaRDD<T>javaRDD()Returns the content of the Dataset as a JavaRDD of Ts.
Dataset<Row>join(Dataset<?> right)Join with another DataFrame.
Dataset<Row>join(Dataset<?> right, Column joinExprs)Inner join with another DataFrame, using the given join expression.val ds = Seq((1, 1, "A"), (1, 2, "B"), (1, 3, "C"), (2, 1, "D")).toDS() val ds2 = Seq(("A",1), ("B",), (1, 3, 4L), (2, 1, 5L)).toDS()
Dataset<Row>join(Dataset<?> right, Column joinExprs, String joinType)Join with another DataFrame, using the given join expression.//inner employees .join(departments, "DepartmentID") .show() //outer employees .join(departments, Seq("DepartmentID"), "left_outer") .show() //inner, outer, left_outer, right_outer, leftsemi //carsetian employees .join(departments) .show(10) //conditional (nonequal) orders .join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate") .show()
Dataset<Row>join(Dataset<?> right, scala.collection.Seq<String> usingColumns)Inner equi-join with another DataFrame using the given columns.
Dataset<Row>join(Dataset<?> right, scala.collection.Seq<String> usingColumns, String joinType)Equi-join with another DataFrame using the given columns.
Dataset<Row>join(Dataset<?> right, String usingColumn)Inner equi-join with another DataFrame using the given column.
Dataset<T>limit(int n)Returns a new Dataset by taking the first n rows.
DataFrameNaFunctionsna()Returns a DataFrameNaFunctions for working with missing data.df.show() // +-----+----+ // | name| age| // +-----+----+ // |alice| 35| // | bob|null| // | | 24| // +-----+----+ scala> df.na.fill(10.0,Seq("age")) // res4: org.apache.spark.sql.DataFrame = [name: string, age: int] // scala> df.na.fill(10.0,Seq("age")).show // +-----+---+ // | name|age| // +-----+---+ // |alice| 35| // | bob| 10| // | | 24| // +-----+---+ scala> df.na.replace("age", Map(35 -> 61,24 -> 12))).show() // +-----+----+ // | name| age| // +-----+----+ // |alice| 61| // | bob|null| // | | 12| // +-----+----+update 및 replace 가능? 써도되나?
static Dataset<Row>ofRows(SparkSession sparkSession, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan) 
Dataset<T>orderBy(Column... sortExprs)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(scala.collection.Seq<Column> sortExprs)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(String sortCol, String... sortCols)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>persist()Persist this Dataset with the default storage level (MEMORY_AND_DISK).
Dataset<T>persist(StorageLevel newLevel)Persist this Dataset with the given storage level.
voidprintSchema()Prints the schema to the console in a nice tree format.ds.printSchema()root |-- _1: integer (nullable = false) |-- _2: integer (nullable = false) |-- _3: long (nullable = false)
org.apache.spark.sql.execution.QueryExecutionqueryExecution() ds.queryexecution== Parsed Logical Plan == LocalRelation [_1#3, _2#4, _3#5L] == Analyzed Logical Plan == _1: int, _2: int, _3: bigint LocalRelation [_1#3, _2#4, _3#5L] == Optimized Logical Plan == LocalRelation [_1#3, _2#4, _3#5L] == Physical Plan == LocalTableScan [_1#3, _2#4, _3#5L]용도 잘 모르겠음
Dataset<T>[]randomSplit(double[] weights)Randomly splits this Dataset with the provided weights.val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), (1,2,3L),(9,9,9L)).toDS() ds.randomSplit(Array(0.3,0.2,0.5),5).foreach(t=>t.show)+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 5| 5| 5| | 9| 9| 9| +---+---+---+ +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 3| 4| | 2| 1| 5| +---+---+---+ +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 2| 3| | 1| 2| 3| | 2| 2| 5| | 4| 4| 5| +---+---+---+
Dataset<T>[]randomSplit(double[] weights, long seed)Randomly splits this Dataset with the provided weights.
java.util.List<Dataset<T>>randomSplitAsList(double[] weights, long seed)Returns a Java list that contains randomly split Dataset with the provided weights.
RDD<T>rdd()Represents the content of the Dataset as an RDD of T.
Dataset<T>repartition(Column... partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions.in rdd 설명
Dataset<T>repartition(int numPartitions)Returns a new Dataset that has exactly numPartitions partitions.in rdd 설명
Dataset<T>repartition(int numPartitions, Column... partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions into numPartitions.
Dataset<T>repartition(int numPartitions, scala.collection.Seq<Column> partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions into numPartitions.
Dataset<T>repartition(scala.collection.Seq<Column> partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions.
RelationalGroupedDatasetrollup(Column... cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), (1,2,3L),(9,9,9L)).toDS() ds.show ds.rollup("_1").count.show ds.rollup("_1").agg("_2","_3").show ds.rollup("_1").agg(sum("_2")).show+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 1| 2| 3| | 1| 3| 4| | 2| 1| 5| | 2| 2| 5| | 4| 4| 5| | 5| 5| 5| | 1| 2| 3| | 9| 9| 9| +---+---+---+ +----+-----+ | _1|count| +----+-----+ | 1| 4| | 4| 1| |null| 9| | 2| 2| | 5| 1| | 9| 1| +----+-----+ +----+-------+ | _1|sum(_2)| +----+-------+ | 1| 8| | 4| 4| |null| 29| | 2| 3| | 5| 5| | 9| 9| +----+-------+
RelationalGroupedDatasetrollup(scala.collection.Seq<Column> cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetrollup(String col1, scala.collection.Seq<String> cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetrollup(String col1, String... cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
Dataset<T>sample(boolean withReplacement, double fraction)Returns a new Dataset by sampling a fraction of rows, using a random seed.
Dataset<T>sample(boolean withReplacement, double fraction, long seed)Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.
StructTypeschema()Returns the schema of this Dataset.
Dataset<Row>select(Column... cols)Selects a set of column based expressions.
Dataset<Row>select(scala.collection.Seq<Column> cols)Selects a set of column based expressions.
Dataset<Row>select(String col, scala.collection.Seq<String> cols)Selects a set of columns.
Dataset<Row>select(String col, String... cols)Selects a set of columns.
Dataset<Row>selectExpr(scala.collection.Seq<String> exprs)Selects a set of SQL expressions.
Dataset<Row>selectExpr(String... exprs)Selects a set of SQL expressions.
voidshow()Displays the top 20 rows of Dataset in a tabular form.
voidshow(boolean truncate)Displays the top 20 rows of Dataset in a tabular form.
voidshow(int numRows)Displays the Dataset in a tabular form.
voidshow(int numRows, boolean truncate)Displays the Dataset in a tabular form.
voidshow(int numRows, int truncate)Displays the Dataset in a tabular form.
Dataset<T>sort(Column... sortExprs)Returns a new Dataset sorted by the given expressions.ds.sort(($"_1".desc)).show+---+---+---+ | _1| _2| _3| +---+---+---+ | 9| 9| 9| | 5| 5| 5| | 4| 4| 5| | 2| 1| 5| | 2| 2| 5| | 1| 1| 2| | 1| 3| 4| | 1| 2| 3| | 1| 2| 3| +---+---+---+
Dataset<T>sort(scala.collection.Seq<Column> sortExprs)Returns a new Dataset sorted by the given expressions.
Dataset<T>sort(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<T>sort(String sortCol, String... sortCols)Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<T>sortWithinPartitions(Column... sortExprs)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(scala.collection.Seq<Column> sortExprs)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(String sortCol, String... sortCols)Returns a new Dataset with each partition sorted by the given expressions.
SparkSessionsparkSession() 
SQLContextsqlContext() 
DataFrameStatFunctionsstat()Returns a DataFrameStatFunctions for working statistic functions support.??뭐가있을까
StorageLevelstorageLevel()Get the Dataset's current storage level, or StorageLevel.NONE if not persisted.
Objecttake(int n)Returns the first n rows in the Dataset.
java.util.List<T>takeAsList(int n)Returns the first n rows in the Dataset as a list.
Dataset<Row>toDF()Converts this strongly typed collection of data to generic Dataframe.
Dataset<Row>toDF(scala.collection.Seq<String> colNames)Converts this strongly typed collection of data to generic DataFrame with columns renamed.
Dataset<Row>toDF(String... colNames)Converts this strongly typed collection of data to generic DataFrame with columns renamed.
JavaRDD<T>toJavaRDD()Returns the content of the Dataset as a JavaRDD of Ts.
Dataset<String>toJSON()Returns the content of the Dataset as a Dataset of JSON strings.
java.util.Iterator<T>toLocalIterator()Return an iterator that contains all of Rows in this Dataset.
StringtoString() 
<U> Dataset<U>transform(scala.Function1<Dataset<T>,Dataset<U>> t)Concise syntax for chaining custom transformations.
Dataset<T>union(Dataset<T> other)Returns a new Dataset containing union of rows in this Dataset and another Dataset.
Dataset<T>unpersist()Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
Dataset<T>unpersist(boolean blocking)Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
Dataset<T>where(Column condition)Filters rows using the given condition.
Dataset<T>where(String conditionExpr)Filters rows using the given SQL expression.
Dataset<Row>withColumn(String colName, Column col)Returns a new Dataset by adding a column or replacing the existing column that has the same name.
Dataset<Row>withColumnRenamed(String existingName, String newName)Returns a new Dataset with a column renamed.
DataFrameWriter<T>write()Interface for saving the content of the non-streaming Dataset out into external storage.
DataStreamWriter<T>ExperimentalwriteStream()Interface for saving the content of the streaming Dataset out into external storage.


'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

spark udf  (0) 2018.11.20
spark tuning 하기  (0) 2018.11.20
spark memory  (0) 2018.11.20
join with spark  (0) 2018.11.20
spark vs hadoop page rank 실행시간 비교  (0) 2018.11.20






  • spark.executor.memory는 실행기에 사용할 수 있는 총 메모리 크기를 정의합니다.
  • spark.storage.memoryFraction(기본 ~60%)은 지속된 RDD를 저장하는 데 사용할 수 있는 메모리 양을 정의합니다.
  • spark.shuffle.memoryFraction(기본 ~20%)은 무작위 재생용으로 예약된 메모리 양을 정의합니다.
  • spark.storage.unrollFraction 및 spark.storage.safetyFraction(총 메모리의 ~30%) - 이러한 값은 Spark에서 내부적으로 사용되므로 변경하지 않아야 합니다



LRU 란?


Least recently used (LRU)[edit]


Discards the least recently used items first. This algorithm requires keeping track of what was used when, which is expensive if one wants to make sure the algorithm always discards the least recently used item. General implementations of this technique require keeping "age bits" for cache-lines and track the "Least Recently Used" cache-line based on age-bits. In such an implementation, every time a cache-line is used, the age of all other cache-lines changes. LRU is actually a family of caching algorithms with members including 2Q by Theodore Johnson and Dennis Shasha,[3] and LRU/K by Pat O'Neil, Betty O'Neil and Gerhard Weikum.[4]


The access sequence for the below example is A B C D E D F.


In the above example once A B C D gets installed in the blocks with sequence numbers (Increment 1 for each new Access) and when E is accessed, it is a miss and it needs to be installed in one of the blocks. According to the LRU Algorithm, since A has the lowest Rank(A(0)), E will replace A.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala,JavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is


Storage Level
Meaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER 
(Java and Scala)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER 
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.


  • Cache 와 Persist 의 차이 : Cache 는 Default 값(MEMORY_AND_DISK)이 정해져 있는 반면 Persist 는 위와 같은 Storage Level 을 설정할 수 있음
  • Dataset 과 RDD 의 Default Storage Level 이 다름 : RDD = MEMORY_ONLY, Dataset = MEMORY_AND_DISK

※ local 공간이 충분치 않으면, out of space 가 발생할 수 있음


Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.


An SQL join clause combines records from two or more tables. This operation is very common in data processing and understanding of what happens under the hood is important. There are several common join types: INNERLEFT OUTERRIGHT OUTERFULL OUTER and CROSS or CARTESIAN.

join types

Join which uses the same table is a self-join. If an operation uses equality operator, it is equi-join, otherwise, it is non-equi-join.

This article covers different join types in Apache Spark as well as examples of slowly changed dimensions (SCD) and joins on non-unique columns.

Sample data

All subsequent explanations on join types in this article make use of the following two tables, taken from Wikipedia article. The rows in these tables serve to illustrate the effect of different types of joins and join-predicates.

Employees table has a nullable column. To express it in terms of statically typed Scala, one needs to use Option type.

val employees = sc.parallelize(Array[(String, Option[Int])](
  ("Rafferty", Some(31)), ("Jones", Some(33)), ("Heisenberg", Some(33)), ("Robinson", Some(34)), ("Smith", Some(34)), ("Williams"null)
)).toDF("LastName""DepartmentID")
 
employees.show()
 
+----------+------------+
|  LastName|DepartmentID|
+----------+------------+
|  Rafferty|          31|
|     Jones|          33|
|Heisenberg|          33|
|  Robinson|          34|
|     Smith|          34|
|  Williams|        null|
+----------+------------+
val departments = sc.parallelize(Array(
  (31"Sales"), (33"Engineering"), (34"Clerical"),
  (35"Marketing")
)).toDF("DepartmentID""DepartmentName")
 
departments.show()
 
+------------+--------------+
|DepartmentID|DepartmentName|
+------------+--------------+
|          31|         Sales|
|          33|   Engineering|
|          34|      Clerical|
|          35|     Marketing|
+------------+--------------+

Department table does not have nullable columns, type specification could be omitted.


Inner join

Following SQL code

SELECT * FROM employee INNER JOIN department ON employee.DepartmentID = department.DepartmentID;

could be written in Spark as

employees
  .join(departments, "DepartmentID")
  .show()
 
+------------+----------+--------------+
|DepartmentID|  LastName|DepartmentName|
+------------+----------+--------------+
|          31|  Rafferty|         Sales|
|          33|     Jones|   Engineering|
|          33|Heisenberg|   Engineering|
|          34|  Robinson|      Clerical|
|          34|     Smith|      Clerical|
+------------+----------+--------------+



Beautiful, is not it? Spark automatically removes duplicated “DepartmentID” column, so column names are unique and one does not need to use table prefix to address them.

Left outer join

Left outer join is a very common operation, especially if there are nulls or gaps in a data. Note, that column name should be wrapped into scala Seq if join type is specified.

employees
  .join(departments, Seq("DepartmentID"), "left_outer")
  .show()
 
+------------+----------+--------------+
|DepartmentID|  LastName|DepartmentName|
+------------+----------+--------------+
|          31|  Rafferty|         Sales|
|          33|     Jones|   Engineering|
|          33|Heisenberg|   Engineering|
|          34|  Robinson|      Clerical|
|          34|     Smith|      Clerical|
|        null|  Williams|          null|
+------------+----------+--------------+


Other join types

Spark allows using following join types: innerouterleft_outerright_outerleftsemi. The interface is the same as for left outer join in the example above.

For cartesian join column specification should be omitted:

employees
  .join(departments)
  .show(10)
 
+----------+------------+------------+--------------+
|  LastName|DepartmentID|DepartmentID|DepartmentName|
+----------+------------+------------+--------------+
|  Rafferty|          31|          31|         Sales|
|  Rafferty|          31|          33|   Engineering|
|  Rafferty|          31|          34|      Clerical|
|  Rafferty|          31|          35|     Marketing|
|     Jones|          33|          31|         Sales|
|     Jones|          33|          33|   Engineering|
|     Jones|          33|          34|      Clerical|
|     Jones|          33|          35|     Marketing|
|Heisenberg|          33|          31|         Sales|
|Heisenberg|          33|          33|   Engineering|
+----------+------------+------------+--------------+
only showing top 10 rows



Warning: do not use cartesian join with big tables in production.

Join expression, slowly changing dimensions and non-equi join

Spark allows us to specify join expression instead of a sequence of columns. In general, expression specification is less readable, so why do we need such flexibility? The reason is non-equi join.

One application of it is slowly changing dimensions. Assume there is a table with product prices over time:

val products = sc.parallelize(Array(
  ("steak""1990-01-01""2000-01-01"150),
  ("steak""2000-01-02""2020-01-01"180),
  ("fish""1990-01-01""2020-01-01"100)
)).toDF("name""startDate""endDate""price")
 
products.show()
 
+-----+----------+----------+-----+
| name| startDate|   endDate|price|
+-----+----------+----------+-----+
|steak|1990-01-01|2000-01-01|  150|
|steak|2000-01-02|2020-01-01|  180|
| fish|1990-01-01|2020-01-01|  100|
+-----+----------+----------+-----+


There are two products only: steak and fish, price of steak has been changed once. Another table consists of product orders by day:

val orders = sc.parallelize(Array(
  ("1995-01-01""steak"),
  ("2000-01-01""fish"),
  ("2005-01-01""steak")
)).toDF("date""product")
 
orders.show()
 
+----------+-------+
|      date|product|
+----------+-------+
|1995-01-01|  steak|
|2000-01-01|   fish|
|2005-01-01|  steak|
+----------+-------+


Our goal is to assign an actual price for every record in the orders table. It is not obvious to do using only equality operators, however, spark join expression allows us to achieve the result in an elegant way:

orders
  .join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate")
  .show()
 
+----------+-------+-----+----------+----------+-----+
|      date|product| name| startDate|   endDate|price|
+----------+-------+-----+----------+----------+-----+
|2000-01-01|   fish| fish|1990-01-01|2020-01-01|  100|
|1995-01-01|  steak|steak|1990-01-01|2000-01-01|  150|
|2005-01-01|  steak|steak|2000-01-02|2020-01-01|  180|
+----------+-------+-----+----------+----------+-----+

This technique is very useful, yet not that common. It could save a lot of time for those who write as well as for those who read the code.

Inner join using non primary keys

Last part of this article is about joins on non unique columns and common mistakes related to it. Join (intersection) diagrams in the beginning of this article stuck in our heads. Because of visual comparison of sets intersection we assume, that result table after inner join should be smaller, than any of the source tables. This is correct only for joins on unique columns and wrong if columns in both tables are not unique. Consider following DataFrame with duplicated records and its self-join:

val df = sc.parallelize(Array(
  (0), (1), (1)
)).toDF("c1")
 
df.show()
df.join(df, "c1").show()
 
// Original DataFrame
+---+
| c1|
+---+
|  0|
|  1|
|  1|
+---+
 
// Self-joined DataFrame
+---+
| c1|
+---+
|  0|
|  1|
|  1|
|  1|
|  1|
+---+


Note, that size of the result DataFrame is bigger than the source size. It could be as big as n2, where n is a size of source.

Conclusion

The article covered different join types implementations with Apache Spark, including join expressions and join on non-unique keys.

Apache Spark allows developers to write the code in the way, which is easier to understand. It improves code quality and maintainability.


total input 30만줄 X 66rows = 1980만 rows = 약 5GB

output = ranks => about 1980만 rows  = 약 549MB



환경작업condtion1condition2condition3소요시간(m)결과 파일 개수(row수)
Hadoop/ PageRankpage rank itteration 5yarn - 2GB12 Containersmap split by 67files(80MB each)46.5
 setting


0
 itr1(setting + itr1) 

5.154917
 itr2 

9.771217

itr3 

7.103517

itr4


9.663967

itr5


14.85307
Spark / PageRankpage rank itteration 5yarn-3GB5Excutor - 2corespark.read with 67files(80MB each)13.6

setting

read 하는 시간 포함

mapPartition.cache() 포함

0.68

itr1


2.583

itr2


2.61

itr3


2.48

itr4


2.61

itr5


2.6



환경작업condtion1condition2condition3소요시간결과 파일 개수(row수)
Hadoop / ETL Hadoop local modetitle parsinglocal mode  1033586(ms) = 17분17773690
find links for (id,Title) : (id,Title)local modeid matching memory exceedmemory exceed
find links for (Title) : (Title)local modeonly title 2594021(ms) = 43분16861907/4.4GB
Hadoop / ETL Hadoop cluster modetitle parsing clustermode  447268(ms) = 7분16861907
find links for (id,Title) : (id,Title)clustermodeid matching 819340(ms) 13분 
find links for (Title) : (Title)clustermodeonly title

 memory = 1024

container 수 23

map/reduce = vcore = 2

map/reduces = 5

13.45분

16861907/4.4GB
find links for (Title) : (Title)clustermodeonly title

 memory = 2048

container 수 12

13.27분

16861907/4.4GB
ETL by Spark clustermode(1-3)title parsingclusermode

7분

find links for (Title) : (Title)

excutor = 5

excutorcore = 2

memory 3G

10분 15초














Hadoop/ PageRankpage rank itteration 3cluster modetitles 

itr 1 은 7분 20초

itr2 는 측정불가

 
   
   
       
       
Spark / PageRankpage rank itteration 5cluster modedataset without persist
10 분 



dataset with persistrepartition X14분

  dataset with persist repartition 1011분  
   dataset with persist...

*spark 테스트 중 unusuable node 등의 오류가 나오는데, local disk 의 점유가 높아지면 taskmanager (yarn) 가 일시적으로 node 를 kill 하고 다시 복구시킨다.

다만 그것이 반복되다가 결과가 제대로 나오지 않는 경우도 있다.

*위를 해결하기위해서 현재 DISK (100GB) 를 cluster 마다 추가 할당하였고, temp 파일 저장 경로를 그쪽으로 변경하였다.

* hadoop 의 경우 local_temp 경로에 shuffle 되는 결과들이 쏟아진다. 다만 계정에  temp 폴더 쓰기권한 등이 없으면 에러가난다.

* hadoop 실행 도중 중지시 temp 경로에(주로 /tmp) 로그와 셔플 중간 파일들이 쌓여있을 수 있다. 그파일은 주기적으로 정리 필요

 *spark 에서 pagerank 의 경우에는 ittr 반복이 될수록 local 에 점유정도가 어느정도인지 확인필요하고, 해소방안도 필요하다.

* hadoop 에서 conf.set("mapreduce.textoutputformat.separator", "::"); 등 config 셋팅등에 유의 하자 (yarn 또한 마찬가지) Hadoop - map reduce configuration

 *현재 돌고있는 프로세스는 yarn application -list 확인 후  -kill 등으로 죽일 수 있다.



hadoop 실행시간( log 기준)


https://stackoverflow.com/questions/34175386/precise-definitions-of-map-shuffle-merge-and-reduce-times-in-hadoop




duration비고
app start11:07:03

Set-up11:07:160:00:13
map 최초 실행11:07:180:00:15472개의 task 동시수행
Reducer 최초 실행11:07:450:00:421개의 task가 map 에서 reduce로 오기까지 약 27초
map end time11:14:190:07:16마지막 map 이완료된시간 (reduce = 32%)
reducer end time11:14:370:07:3468% reducing 완료 되는 시간 = 18초


Default zone 확인

firewall-cmd --get-default-zone

포트추가

--add-port=<portid>[-<portid>]/<protocol> 옵션을 사용하여 포트 추가

firewall-cmd --permanent --zone=public --add-port=8080/tcp

 설정 reload

firewall-cmd --reload


출처 : https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes


Programming Model

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

> Struct Streaming 의 idea 는  연속된 append 구조이다. 너는 스트리밍 계산을 노말한 batch query 형태로 계산 할 수 있을 것이다.

Basic Concepts

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

Stream as a Table

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

Model

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

  • Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

  • Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail later.

>> output mode 는 Complete mode / Append mode / Update 모드 3가지 가 있다.

Complete Mode 는 updated 된 Result Table 이 만들어졌을때졌을때,  외부 저장소에 출력된다. 

Append Mode 는 새로운 Rows 들만 Trigger 에 의해 외부저장소에 출력 된다.

Update Mode 란 새롭거나 수정된 Rows 들만 Trigger 에 의해 외부 저장소에 출력 된다. Completed 모드와 다른점은 만약 query 가 Aggregation 을 포함하지 않으면, 이것은 Append Mode 와 똑같이 똑같이 동작한다. (Complete 모드보다 빠를것으로 예상된다.)

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

>> 구현하자면  아래와같이 구현한다.

val query = wordCounts.writeStream
.outputMode("complete") or .outputMode("append") or .outputMode("update") 

>>  기존 SQL 과 똑같이 구현하는데, Spark 가 연속적으로 새로운 data 을 체킹한다. (socket 기반) 만약 새로운 Data 가 있으면 Spark 는 증가분에대해서 Query 를 날리게 되고 이전에 돌린 Running count 과 combine 하게 된다. (아래 그림 설명)

Model


Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result 

>> % Struct Streaming 은 전체 Table 을 구체화 하지 않는다. 그것은 가장 마지막 사용가느한 data 를 읽고 점진적으로 Result 를 update하는 구조이다.

그리고 원본 source data 는 지우게된다. 이것은 가능한 가장 작은 data 를 유지하는데 그것을 state Data 라한다. Result 를 업데이트하기위한 것이 state Data 이기도 하다.

Basic Operations - Selection, Projection, Aggregation

Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed laterin this section.

>> 기본적인 select 등의  Query 를 아래와 같이 지원한다. 다만 몇 몇개는 지원하지 않는다.

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
 
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
 
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs
 
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API
 
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
Note, you can identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.


 unsupported Operation

Schema inference and partition of streaming DataFrames/Datasets

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).

>> default 로 Struct Streaming 은 input schema 를 제시하여 입력 받도록 한다. (CSV 의 경우 지원하는데 JSON 은?? kafka 는? )

 여기를 클릭하여 펼치기...


Basic Operation (like dataframe)


case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs 
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API



2.3.0 부터 지원된다.,

(IPC - 2.2.0 , EPC 2.3.0)

Support matrix for joins in streaming queries
Left InputRight InputJoin Type
StaticStaticAll typesSupported, since its not on streaming data even though it can be present in a streaming query
StreamStaticInnerSupported, not stateful
Left OuterSupported, not stateful
Right OuterNot supported
Full OuterNot supported
StaticStreamInnerSupported, not stateful
Left OuterNot supported
Right OuterSupported, not stateful
Full OuterNot supported
StreamStreamInnerSupported, optionally specify watermark on both sides + time constraints for state cleanup
Left OuterConditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
Right OuterConditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
Full OuterNot supported
import org.apache.spark.sql.functions.expr
 
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
 
// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime""2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime""3 hours")
 
// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
 
 
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
 )


정리하자면, STREAM - STREAM JOIN 에서 INNER 조인은 watermark 가 옵셔날이지만 의미적으로도 써주는것이 좋고 성능도개선된다.

OUTER 조인은 Watermark 가 필수인데 ,그이유는 마지막까지 조인할 대상을 찾고 기다리기 때문에,  만약 null 일 경우 delay time 은 최대가된다.


  • Joins can be cascaded, that is, you can do df1.join(df2, ...).join(df3, ...).join(df4, ....).

  • As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

  • As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

    • Cannot use streaming aggregations before joins.

    • Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.


  1. deduplication

    / Without watermark using guid column
    streamingDf.dropDuplicates("guid")
     
    // With watermark using guid and eventTime columns
    streamingDf
      .withWatermark("eventTime""10 seconds")
      .dropDuplicates("guid""eventTime")

    watermark - watermark 까지 (참고로 이전 date 는 중복제거가 안 될 수 있음)
    without - sate 되는대까지

  2. Arbitary State
    Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).
    위의 Operation 은 유저가원하는(임의의) state 를 만든다. 다만 그 State 가 어떻게 활용되는지는 별이야기가없다.

     3. Unsupported!

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).


val query = df.writeStream.format("console").start()   // get the query object
 
query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data
 
query.runId       // get the unique id of this run of the query, which will be generated at every start/restart
 
query.name        // get the name of the auto-generated or user-specified name
 
query.explain()   // print detailed explanations of the query
 
query.stop()      // stop the query
 
query.awaitTermination()   // block until query is terminated, with stop() or with error
 
query.exception       // the exception if the query has been terminated with error
 
query.recentProgress  // an array of the most recent progress updates for this query
 
query.lastProgress    // the most recent progress update of this streaming query
 
 
 
 
 
 
val spark: SparkSession = ...
 
spark.streams.active    // get the list of currently active streaming queries
 
spark.streams.get(id)   // get a query object by its unique id
 
spark.streams.awaitAnyTermination()   // block until any one of them terminates


Query Type
Supported Output ModesNotes
Queries with aggregationAggregation on event-time with watermarkAppend, Update, CompleteAppend mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. 

Update mode uses watermark to drop old aggregation state. 

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregationsComplete, UpdateSince no watermark is defined (only defined in other category), old aggregation state is not dropped. 

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithStateUpdate
Queries with flatMapGroupsWithStateAppend operation modeAppendAggregations are allowed after flatMapGroupsWithState.
Update operation modeUpdateAggregations not allowed after flatMapGroupsWithState.
Queries with joinsAppendUpdate and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queriesAppend, UpdateComplete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
Output Sinks
There are a few types of built-in output sinks.
File sink - Stores the output to a directory.
 
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path""path/to/destination/dir")
    .start()
Kafka sink - Stores the output to one or more topics in Kafka.
 
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers""host1:port1,host2:port2")
    .option("topic""updates")
    .start()
Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.
 
writeStream
    .foreach(...)
    .start()
Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.
 
writeStream
    .format("console")
    .start()
Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
 
writeStream
    .format("memory")
    .queryName("tableName")
    .start()


Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are meant for debugging purposes only. See the earlier section on fault-tolerance semantics. Here are the details of all the sinks in Spark.

Sink
Supported Output Modes
Options
Fault-tolerant
Notes
File SinkAppendpath: path to the output directory, must be specified. 

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) 
truncate: Whether to truncate the output if too long (default: true)
No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.


https://stackoverflow.com/questions/28700624/spark-submit-scheduling-in-cron


crontab -e 를 통해서 script 를 정기적으로 돌릴때,


#!/bin/sh

spark submit >> 동작안함


#!/bin/bash 
/경로/spark-submit >> 동작함



아래와 같이 bash 로 수정하고 실행시키면 동작한다.


bash is the most common shell used as a default shell for users of Linux systems. It is a spiritual descendent of other shells used throughout Unix history. Its name, bash is an abbreviation of Bourne-Again Shell, an homage to the Bourne shell it was designed to replace, though it also incorporates features from the C Shell and the Korn Shell.



/bin/sh is an executable representing the system shell. Actually, it is usually implemented as a symbolic link pointing to the executable for whichever shell is the system shell. The system shell is kind of the default shell that system scripts should use. In Linux distributions, for a long time this was usually a symbolic link to bash, so much so that it has become somewhat of a convention to always link /bin/sh to bash or a bash-compatible shell. However, in the last couple of years Debian (and Ubuntu) decided to switch the system shell from bash to dash - a similar shell - breaking with a long tradition in Linux (well, GNU) of using bash for /bin/sh. Dash is seen as a lighter, and much faster, shell which can be beneficial to boot speed (and other things that require a lot of shell scripts, like package installation scripts).



If your script requires features only supported by bash, use #!/bin/bash.

But if at all possible, it would be good to make sure your script is POSIX-compatible, and use #!/bin/sh, which should always, quite reliably, point to the preferred POSIX-compatible system shell in any installation.

spark-shell command 에서는 write  할때 문제가 되지 않는데,

spark-submit 에서 정의된 함수를 사용해서 write 하는 과정에서 not serializable 에러가 나오는 경우가 있다.


예를들면,


가령 같은 scala class 안에 function 을 정의하고 바로 사용 하게되면,


org.apache.spark.SparkException: Task not serializable when write  에러가 나는데, 


def job(tpath : String) : Unit = {

object getgap {
def getgap(s1:String , s2 : String) : Long = {
if(s1.length<8 || s2.length<8) 0
val format = new java.text.SimpleDateFormat("HH:mm:ss.SSS")
val v1 = format.parse(s1)
val v2 = format.parse(s2)
(v2.getTime() - v1.getTime()) / 100
}
}
  import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("yarn").appName("message_summerization").getOrCreate()
import spark.implicits._
//val inputpath = "/user/sangil55/m_input"
val rd = spark.read.json(inputpath)


val df_timegap = df_semi2.map(row=>(row.getString(0), row.getString(1), row.getString(2), getgap(row.getString(1),row.getString(2)) ) )
.withColumnRenamed("_1", "jobid").withColumnRenamed("_2", "starttime").withColumnRenamed("_3", "endtime").withColumnRenamed("_4", "gap")
.sort("starttime")

이 getgap 함수를 object 로 빼주게되면 static class 로 인식하여

잘돌아간다.


spark 환경에서 같은클래스에서 선언된 함수는 따로 클래스화되지 않고, 임시저장되는데 이걸 바로 사용하게되면

mapper 에서 임시저장된 함수를 제대로 분산시키지 못하는듯하다.


이와 비슷한 에러가 자주나오니,

spark-submit 환경에서 function 으로 동작하는 놈들은 웬만하면 따로 뺴주는게 좋다.




rdd- groupbyKey 와 달리 spark dataset 은 groupby 로 리듀스를 할 수 있는데, 


이때 .agg 형식으로 aggregation 을 한다.


다만  .agg 안에 들어가는 function 은 단한 선언으로 안되고, 

udf (user defined aggregationfuncion) 을 만들어서 구현해주어야 하는데, 동작이 뭔가 까다롭다



일단 shell 이나 main 에서는


아래와 같이 실행 해주면되고

val td = rd.select($"jobid".alias("jobid"), $"time".alias("starttime"), $"time".alias("endtime"), $"result".alias("report_result"), $"result".alias("error"))
//val td = rd.select($"jobid".alias("jobid"),$"time".alias("starttime"))

import CustomAgg._
val minagg = new minAggregation()
val maxagg = new maxAggregation()
val erragg = new errorAggregation()

val td2 = td.groupBy("jobid").agg(minagg(td("starttime")), maxagg(td("endtime")) , erragg(td("error"))) 


UDF 는 아래와 같이 만들어준다.

(max 값찾는 agg / min 값찾는 agg , error 찾는 agg 등이다.)

update 에서는 파티션내의 병합이 

merge 에서는 파티션간의 병합이 이루어지는 것 같다.

ackage CustomAgg

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.mutable.ArrayBuffer

object CustomAggregation
{

class maxAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])// initialize array
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
println(input.getString(0))
println(buffer.getSeq[String](0).mkString)
if(buffer.getSeq[String](0).mkString.isEmpty)
buffer.update(0, Seq(input.getString(0)))
else if(buffer.getSeq[String](0).mkString.compareTo(input.getString(0)) > 0)
buffer.update(0, buffer.getSeq[String](0))
else
buffer.update(0, Seq(input.getString(0)))
println(buffer.getSeq[String](0).mkString)
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
if(buffer1.getSeq[String](0).mkString.compareTo(buffer2.getSeq[String](0).mkString) > 0)
buffer1.update(0, buffer1.getSeq[String](0))
else
buffer1.update(0, buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}

class minAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff",StringType)
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, "")// initialize array
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {

if(buffer.getString(0).isEmpty)
buffer.update(0, input.getString(0))
else if(buffer.getString(0).compareTo(input.getString(0)) < 0)
buffer.update(0, buffer.getString(0))
else
buffer.update(0, input.getString(0))

println( "updated :" + buffer.getString(0))
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {

if(buffer1.getString(0).compareTo(buffer2.getString(0))< 0 && !buffer1.getString(0).isEmpty)
buffer1.update(0, buffer1.getString(0))
else
buffer1.update(0, buffer2.getString(0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getString(0))
}


class errorAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, Seq(""))// initialize array
}

def checkerror (str : String) : Boolean = {

if(str.compareTo("Info") == 0) false
if(str.compareTo("0") == 0) false
if(str.compareTo("Success") == 0) false
if(str.compareTo("") == 0) false
true
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
if(checkerror(input.getString(0)))
{
input.getString(0)
buffer.update(0, Seq(input.getString(0)))
}
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
println(buffer1.getSeq[String](0).toString())
println(buffer2.getSeq[String](0).toString())
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}


class GeometricMean extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function.
override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", StringType) :: Nil)

// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
StructField("count", LongType) ::
StructField("product", StringType) :: Nil
)

// This is the output type of your aggregatation function.
override def dataType: DataType = StringType

override def deterministic: Boolean = true

// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
buffer(1) = " "
}

// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[String](0) + 1
buffer(1) = buffer.getAs[String](1) + input.getAs[String](0)
}

// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
}

// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
}
}

} 


https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_spark_tuning.html

spark 에는 여러가지 종류의 transformation / action 등이 있다. 종류에 따라서 동작이다른데, 

-map, filter : narrow transformation 

-coalesce : still narrow 

-groupbyKey ReducebyKey : wide transformation >> Shuffle 이 많이 일어나고, stage 가 생기게 된다.


이처럼 stage 와 shuffle 이 늘어날 수록 incur high disk & network I/O 등이 일어나 expensive 한 작업이 일어나게된다.

또는 numpatition 등에 의해 stage boundary 를 trigger 할수도 있다.


스파크를 튜닝 하는 방법은 여러가지가 있을 수 있다.


  1. chooosing Optimal transformatons
    a. map + reducebyKey   <<< aggregateByKey
    b. join <<< cogroup  (if already grouped by key)
    후자가 더빠르다.
  2. Shuffle 자체를 줄이는 방법
    reduce by key 등의 transformation 은 lazy evaluation 이 일어나는데,  부모 Partition 갯수가 같게 세팅되면, 한번에 일어날 수 있음
    그렇기 떄문에 ReducebyKey( XXX, numpartion = X) 에 들어가는 X 가 중요하다.





  3. 보통은 shuffle 을 줄이는 것의 performence가 늘어난다. 그러나 다음의 경우에는  그렇지 않다.
    -inputfile 이 크고 Partition 이 늘어야 하는 경우 (shuffle 도 늘어남)
    -Reduce 개수가 가 한번에 몰린경우 Tree / Tree Aggregate 등을 사용하면 shuffle 이 늘어나지만, perfomance 가 늘어날 수 있다.

  4. secondary sort 
    -RepartitionAndSortWithinPartitions >> repartition + sort
    전자가 shuffle machinery 를 이용하여 후자가 사용하는 sort 보다 빠르다.   이 sorting 은 join 에서도 쓰인다.

  5. Memory tunning 
    -a. Yarn 을 튜닝하자
     특히 Yarn.nodemanager.resouce.memory-mb  / Yarn.nodemanager.resouce.cpu-vcores 를 이용하여 리소스매니저의 리소스를 확보해야한다.
    -b. Executors-cores 과 executor-memory 를 실행시 옵션으로 셋팅  단, heap size 는 max(excutormemory * 0.07 ,384)
    -c. AM 은  client deploy mode 와 cluster deploy mode 가 있는데, 전자는  1GB 1core 를 client 에서 할당하고
       gnwksms --Driver 옵션을 이용하여 따로 설정에 유의
    -d. number of cores for excutor 는 5이하가 되는 것이 좋다.
    -e.tiny excutor → perf 저하

  6. tuning num of partitions
     보통 Partition 의 개수의 따르나 아래동작은 다르다.
    coalesce → fewer 
    union → sum of its parent partitions number
    cartesian → product of its parents

    특히 Join Cogroup / GrouBuKey 등의 동작은 Memory 사용률이 높고 exceed 가 발생하면, DISK I/O 와 GC 등이 일어나 속도가 감소된다. (파티션별 용량 설정이 중요)
    Repartitions 으로 partition 개수를 늘릴 수 있다
    heuristic 한 방법으로 OPTIMAL Partition # = 전체 코어수 *3 정도 가 좋다는데 정확하지않고, spark 에서도 1.5배씩 늘려가며 찾으라는 말을 한다.

         보통은 아주 많은 경우가 적은경우보다는 좋다고 한다.


7. Reduce data size 이건 당연한 소리

8. 출력 format 을 Sequence 파일로 해라

+ Recent posts