RDD Operations

transformations_actions.xlsx

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation
Meaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.





Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action
Meaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacementnum, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

첨부 파일(transformations_actions_datasetupdated0322.xlsx)

다운로드 해서 보시면 좀 더 편하게 볼 수 있습니다.

Edit Document
dataset apiimport org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate() import spark.implicits._ val sc = spark.sparkContext
return typemethoddescriptionexampleresultRDD 영향도
Dataset<Row>agg(Column expr, Column... exprs)Aggregates on the entire Dataset without groups. Col 이 list 로 만들어질경우 합쳐주는 연산?import org.apache.spark.sql.functions._ val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), 1,2,3L),(9,9,9L)).toDS() ds.groupBy("_1").agg(collect_list("_2") as "list").show(false)+---+---------+ |_1 |list | +---+---------+ |1 |[1, 2, 3]| |2 |[1] | +---+---------+
Dataset<Row>agg(Column expr, scala.collection.Seq<Column> exprs)Aggregates on the entire Dataset without groups.ds.groupBy("_1").agg(count("*"), sum("_2").alias("summing"), sum("_3")).show(false)+---+--------+-------+-------+ |_1 |count(1)|sum(_2)|sum(_3)| +---+--------+-------+-------+ |1 |3 |6 |9 | |2 |1 |1 |5 | +---+--------+-------+-------+
Dataset<Row>agg(scala.collection.immutable.Map<String,String> exprs)(Scala-specific) Aggregates on the entire Dataset without groups.
Dataset<Row>agg(scala.Tuple2<String,String> aggExpr, scala.collection.Seq<scala.Tuple2<String,String>> aggExprs)(Scala-specific) Aggregates on the entire Dataset without groups.
Dataset<T>alias(String alias)Returns a new Dataset with an alias set.ds.groupBy("_1").agg(count("*"), sum("_2").alias("summing"), sum("_3")).show(false)'+---+--------+-------+-------+ |_1 |count(1)|summing|sum(_3)| +---+--------+-------+-------+ |1 |3 |6 |9 | |2 |1 |1 |5 | +---+--------+-------+-------+
Dataset<T>alias(scala.Symbol alias)
(Scala-specific) Returns a new Dataset with an alias set.
Columnapply(String colName)Selects column based on the column name and return it as a Column.ds.apply("_1").toString()
Dataset<T>as(String alias)Returns a new Dataset with an alias set.
Dataset<T>as(scala.Symbol alias)(Scala-specific) Returns a new Dataset with an alias set.
Dataset<T>cache()Persist this Dataset with the default storage level (MEMORY_AND_DISK).ds.cache()
Dataset<T>checkpoint()Eagerly checkpoint a Dataset and return the new Dataset.
Dataset<T>checkpoint(boolean eager)Returns a checkpointed version of this Dataset.
scala.reflect.ClassTag<T>classTag() 
Dataset<T>coalesce(int numPartitions)Returns a new Dataset that has exactly numPartitions partitions.
Columncol(String colName)Selects column based on the column name and return it as a Column.
Objectcollect()Returns an array that contains all of Rows in this Dataset.ds.select("_1").collect()
java.util.List<T>collectAsList()Returns a Java list that contains all of Rows in this Dataset.
String[]columns()Returns all column names as an array.
longcount()Returns the number of rows in the Dataset.
voidcreateGlobalTempView(String viewName)Creates a global temporary view using the given name.Global Temporary View는 모든 세션에서 공유 가능하며, Spark 어플리케이션이 종료되기 전까지 살아있게 됩니다. 제 경우 Master 노드의 하드디스크에 저장
voidcreateOrReplaceTempView(String viewName)Creates a local temporary view using the given name.
voidcreateTempView(String viewName)Creates a local temporary view using the given name.
Dataset<Row>crossJoin(Dataset<?> right)Explicit cartesian join with another DataFrame.
RelationalGroupedDatasetcube(Column... cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetcube(scala.collection.Seq<Column> cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.cube동작
RelationalGroupedDatasetcube(String col1, scala.collection.Seq<String> cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetcube(String col1, String... cols)Create a multi-dimensional cube for the current Dataset using the specified columns, so we can run aggregation on them.
Dataset<Row>describe(scala.collection.Seq<String> cols)Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.ds.discribe.sho간단한평균
Dataset<Row>describe(String... cols)Computes statistics for numeric and string columns, including count, mean, stddev, min, and max.
Dataset<T>distinct()Returns a new Dataset that contains only the unique rows from this Dataset.중복제거 (완전히 일치하는 row 끼리만)
Dataset<Row>drop(Column col)Returns a new Dataset with a column dropped.ds.drop("_2").show+---+---+ | _1| _3| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 5| +---+---+
Dataset<Row>drop(scala.collection.Seq<String> colNames)Returns a new Dataset with columns dropped.
Dataset<Row>drop(String... colNames)Returns a new Dataset with columns dropped.
Dataset<Row>drop(String colName)Returns a new Dataset with a column dropped.
Dataset<T>dropDuplicates()Returns a new Dataset that contains only the unique rows from this Dataset.
Dataset<T>dropDuplicates(scala.collection.Seq<String> colNames)(Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the subset of columns.ds.dropDuplicates.show+---+---+ | _1| _3| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 5| +---+---+
Dataset<T>dropDuplicates(String[] colNames)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.ds.dropDuplicates("_1").show+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 2| 1| 5| +---+---+---+
Dataset<T>dropDuplicates(String col1, scala.collection.Seq<String> cols)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
Dataset<T>dropDuplicates(String col1, String... cols)Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
scala.Tuple2<String,String>[]dtypes()Returns all column names and their data types as an array.ds.dtypesArray[(String, String)] = Array((_1,IntegerType), (_2,IntegerType), (_3,LongType))
Dataset<T>except(Dataset<T> other)Returns a new Dataset containing rows in this Dataset but not in another Dataset.
voidexplain()Prints the physical plan to the console for debugging purposes.
voidexplain(boolean extended)Prints the plans (logical and physical) to the console for debugging purposes.
Dataset<T>filter(Column condition)Filters rows using the given condition.
Tfirst()Returns the first row.
voidforeach(ForeachFunction<T> func)(Java-specific) Runs func on each element of this Dataset.
voidforeach(scala.Function1<T,scala.runtime.BoxedUnit> f)Applies a function f to all rows.
voidforeachPartition(ForeachPartitionFunction<T> func)(Java-specific) Runs func on each partition of this Dataset.
voidforeachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)Applies a function f to each partition of this Dataset.
RelationalGroupedDatasetgroupBy(Column... cols)Groups the Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetgroupBy(scala.collection.Seq<Column> cols)Groups the Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetgroupBy(String col1, scala.collection.Seq<String> cols)Groups the Dataset using the specified columns, so that we can run aggregation on them.
RelationalGroupedDatasetgroupBy(String col1, String... cols)Groups the Dataset using the specified columns, so that we can run aggregation on them.
Thead()Returns the first row.
Objecthead(int n)Returns the first n rows.
String[]inputFiles()Returns a best-effort snapshot of the files that compose this Dataset.in.inputFiles.foreach(println)file:///D:/scala/test2/input.txt인풋파일 브리핑
Dataset<T>intersect(Dataset<T> other)Returns a new Dataset containing rows only in both this Dataset and another Dataset.
booleanisLocal()Returns true if the collect and take methods can be run locally (without any Spark executors).ds.isLocal ds.cube(("_1"),("_2")).sum().isLocaltrue falselocal 로 도는지 아닌지 확인가능
booleanisStreaming()Returns true if this Dataset contains one or more sources that continuously return data as it arrives.스트리밍형식용도 잘 모르겠음
JavaRDD<T>javaRDD()Returns the content of the Dataset as a JavaRDD of Ts.
Dataset<Row>join(Dataset<?> right)Join with another DataFrame.
Dataset<Row>join(Dataset<?> right, Column joinExprs)Inner join with another DataFrame, using the given join expression.val ds = Seq((1, 1, "A"), (1, 2, "B"), (1, 3, "C"), (2, 1, "D")).toDS() val ds2 = Seq(("A",1), ("B",), (1, 3, 4L), (2, 1, 5L)).toDS()
Dataset<Row>join(Dataset<?> right, Column joinExprs, String joinType)Join with another DataFrame, using the given join expression.//inner employees .join(departments, "DepartmentID") .show() //outer employees .join(departments, Seq("DepartmentID"), "left_outer") .show() //inner, outer, left_outer, right_outer, leftsemi //carsetian employees .join(departments) .show(10) //conditional (nonequal) orders .join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate") .show()
Dataset<Row>join(Dataset<?> right, scala.collection.Seq<String> usingColumns)Inner equi-join with another DataFrame using the given columns.
Dataset<Row>join(Dataset<?> right, scala.collection.Seq<String> usingColumns, String joinType)Equi-join with another DataFrame using the given columns.
Dataset<Row>join(Dataset<?> right, String usingColumn)Inner equi-join with another DataFrame using the given column.
Dataset<T>limit(int n)Returns a new Dataset by taking the first n rows.
DataFrameNaFunctionsna()Returns a DataFrameNaFunctions for working with missing data.df.show() // +-----+----+ // | name| age| // +-----+----+ // |alice| 35| // | bob|null| // | | 24| // +-----+----+ scala> df.na.fill(10.0,Seq("age")) // res4: org.apache.spark.sql.DataFrame = [name: string, age: int] // scala> df.na.fill(10.0,Seq("age")).show // +-----+---+ // | name|age| // +-----+---+ // |alice| 35| // | bob| 10| // | | 24| // +-----+---+ scala> df.na.replace("age", Map(35 -> 61,24 -> 12))).show() // +-----+----+ // | name| age| // +-----+----+ // |alice| 61| // | bob|null| // | | 12| // +-----+----+update 및 replace 가능? 써도되나?
static Dataset<Row>ofRows(SparkSession sparkSession, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan) 
Dataset<T>orderBy(Column... sortExprs)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(scala.collection.Seq<Column> sortExprs)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>orderBy(String sortCol, String... sortCols)Returns a new Dataset sorted by the given expressions.OrderBy is just an alias for the sort function
Dataset<T>persist()Persist this Dataset with the default storage level (MEMORY_AND_DISK).
Dataset<T>persist(StorageLevel newLevel)Persist this Dataset with the given storage level.
voidprintSchema()Prints the schema to the console in a nice tree format.ds.printSchema()root |-- _1: integer (nullable = false) |-- _2: integer (nullable = false) |-- _3: long (nullable = false)
org.apache.spark.sql.execution.QueryExecutionqueryExecution() ds.queryexecution== Parsed Logical Plan == LocalRelation [_1#3, _2#4, _3#5L] == Analyzed Logical Plan == _1: int, _2: int, _3: bigint LocalRelation [_1#3, _2#4, _3#5L] == Optimized Logical Plan == LocalRelation [_1#3, _2#4, _3#5L] == Physical Plan == LocalTableScan [_1#3, _2#4, _3#5L]용도 잘 모르겠음
Dataset<T>[]randomSplit(double[] weights)Randomly splits this Dataset with the provided weights.val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), (1,2,3L),(9,9,9L)).toDS() ds.randomSplit(Array(0.3,0.2,0.5),5).foreach(t=>t.show)+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 5| 5| 5| | 9| 9| 9| +---+---+---+ +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 3| 4| | 2| 1| 5| +---+---+---+ +---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 2| 3| | 1| 2| 3| | 2| 2| 5| | 4| 4| 5| +---+---+---+
Dataset<T>[]randomSplit(double[] weights, long seed)Randomly splits this Dataset with the provided weights.
java.util.List<Dataset<T>>randomSplitAsList(double[] weights, long seed)Returns a Java list that contains randomly split Dataset with the provided weights.
RDD<T>rdd()Represents the content of the Dataset as an RDD of T.
Dataset<T>repartition(Column... partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions.in rdd 설명
Dataset<T>repartition(int numPartitions)Returns a new Dataset that has exactly numPartitions partitions.in rdd 설명
Dataset<T>repartition(int numPartitions, Column... partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions into numPartitions.
Dataset<T>repartition(int numPartitions, scala.collection.Seq<Column> partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions into numPartitions.
Dataset<T>repartition(scala.collection.Seq<Column> partitionExprs)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions.
RelationalGroupedDatasetrollup(Column... cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L),(2,2,5L), (4,4,5L), (5,5,5L), (1,2,3L),(9,9,9L)).toDS() ds.show ds.rollup("_1").count.show ds.rollup("_1").agg("_2","_3").show ds.rollup("_1").agg(sum("_2")).show+---+---+---+ | _1| _2| _3| +---+---+---+ | 1| 1| 2| | 1| 2| 3| | 1| 3| 4| | 2| 1| 5| | 2| 2| 5| | 4| 4| 5| | 5| 5| 5| | 1| 2| 3| | 9| 9| 9| +---+---+---+ +----+-----+ | _1|count| +----+-----+ | 1| 4| | 4| 1| |null| 9| | 2| 2| | 5| 1| | 9| 1| +----+-----+ +----+-------+ | _1|sum(_2)| +----+-------+ | 1| 8| | 4| 4| |null| 29| | 2| 3| | 5| 5| | 9| 9| +----+-------+
RelationalGroupedDatasetrollup(scala.collection.Seq<Column> cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetrollup(String col1, scala.collection.Seq<String> cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
RelationalGroupedDatasetrollup(String col1, String... cols)Create a multi-dimensional rollup for the current Dataset using the specified columns, so we can run aggregation on them.
Dataset<T>sample(boolean withReplacement, double fraction)Returns a new Dataset by sampling a fraction of rows, using a random seed.
Dataset<T>sample(boolean withReplacement, double fraction, long seed)Returns a new Dataset by sampling a fraction of rows, using a user-supplied seed.
StructTypeschema()Returns the schema of this Dataset.
Dataset<Row>select(Column... cols)Selects a set of column based expressions.
Dataset<Row>select(scala.collection.Seq<Column> cols)Selects a set of column based expressions.
Dataset<Row>select(String col, scala.collection.Seq<String> cols)Selects a set of columns.
Dataset<Row>select(String col, String... cols)Selects a set of columns.
Dataset<Row>selectExpr(scala.collection.Seq<String> exprs)Selects a set of SQL expressions.
Dataset<Row>selectExpr(String... exprs)Selects a set of SQL expressions.
voidshow()Displays the top 20 rows of Dataset in a tabular form.
voidshow(boolean truncate)Displays the top 20 rows of Dataset in a tabular form.
voidshow(int numRows)Displays the Dataset in a tabular form.
voidshow(int numRows, boolean truncate)Displays the Dataset in a tabular form.
voidshow(int numRows, int truncate)Displays the Dataset in a tabular form.
Dataset<T>sort(Column... sortExprs)Returns a new Dataset sorted by the given expressions.ds.sort(($"_1".desc)).show+---+---+---+ | _1| _2| _3| +---+---+---+ | 9| 9| 9| | 5| 5| 5| | 4| 4| 5| | 2| 1| 5| | 2| 2| 5| | 1| 1| 2| | 1| 3| 4| | 1| 2| 3| | 1| 2| 3| +---+---+---+
Dataset<T>sort(scala.collection.Seq<Column> sortExprs)Returns a new Dataset sorted by the given expressions.
Dataset<T>sort(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<T>sort(String sortCol, String... sortCols)Returns a new Dataset sorted by the specified column, all in ascending order.
Dataset<T>sortWithinPartitions(Column... sortExprs)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(scala.collection.Seq<Column> sortExprs)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(String sortCol, scala.collection.Seq<String> sortCols)Returns a new Dataset with each partition sorted by the given expressions.
Dataset<T>sortWithinPartitions(String sortCol, String... sortCols)Returns a new Dataset with each partition sorted by the given expressions.
SparkSessionsparkSession() 
SQLContextsqlContext() 
DataFrameStatFunctionsstat()Returns a DataFrameStatFunctions for working statistic functions support.??뭐가있을까
StorageLevelstorageLevel()Get the Dataset's current storage level, or StorageLevel.NONE if not persisted.
Objecttake(int n)Returns the first n rows in the Dataset.
java.util.List<T>takeAsList(int n)Returns the first n rows in the Dataset as a list.
Dataset<Row>toDF()Converts this strongly typed collection of data to generic Dataframe.
Dataset<Row>toDF(scala.collection.Seq<String> colNames)Converts this strongly typed collection of data to generic DataFrame with columns renamed.
Dataset<Row>toDF(String... colNames)Converts this strongly typed collection of data to generic DataFrame with columns renamed.
JavaRDD<T>toJavaRDD()Returns the content of the Dataset as a JavaRDD of Ts.
Dataset<String>toJSON()Returns the content of the Dataset as a Dataset of JSON strings.
java.util.Iterator<T>toLocalIterator()Return an iterator that contains all of Rows in this Dataset.
StringtoString() 
<U> Dataset<U>transform(scala.Function1<Dataset<T>,Dataset<U>> t)Concise syntax for chaining custom transformations.
Dataset<T>union(Dataset<T> other)Returns a new Dataset containing union of rows in this Dataset and another Dataset.
Dataset<T>unpersist()Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
Dataset<T>unpersist(boolean blocking)Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
Dataset<T>where(Column condition)Filters rows using the given condition.
Dataset<T>where(String conditionExpr)Filters rows using the given SQL expression.
Dataset<Row>withColumn(String colName, Column col)Returns a new Dataset by adding a column or replacing the existing column that has the same name.
Dataset<Row>withColumnRenamed(String existingName, String newName)Returns a new Dataset with a column renamed.
DataFrameWriter<T>write()Interface for saving the content of the non-streaming Dataset out into external storage.
DataStreamWriter<T>ExperimentalwriteStream()Interface for saving the content of the streaming Dataset out into external storage.


'spark,kafka,hadoop ecosystems > apache spark' 카테고리의 다른 글

spark udf  (0) 2018.11.20
spark tuning 하기  (0) 2018.11.20
spark memory  (0) 2018.11.20
join with spark  (0) 2018.11.20
spark vs hadoop page rank 실행시간 비교  (0) 2018.11.20






  • spark.executor.memory는 실행기에 사용할 수 있는 총 메모리 크기를 정의합니다.
  • spark.storage.memoryFraction(기본 ~60%)은 지속된 RDD를 저장하는 데 사용할 수 있는 메모리 양을 정의합니다.
  • spark.shuffle.memoryFraction(기본 ~20%)은 무작위 재생용으로 예약된 메모리 양을 정의합니다.
  • spark.storage.unrollFraction 및 spark.storage.safetyFraction(총 메모리의 ~30%) - 이러한 값은 Spark에서 내부적으로 사용되므로 변경하지 않아야 합니다



LRU 란?


Least recently used (LRU)[edit]


Discards the least recently used items first. This algorithm requires keeping track of what was used when, which is expensive if one wants to make sure the algorithm always discards the least recently used item. General implementations of this technique require keeping "age bits" for cache-lines and track the "Least Recently Used" cache-line based on age-bits. In such an implementation, every time a cache-line is used, the age of all other cache-lines changes. LRU is actually a family of caching algorithms with members including 2Q by Theodore Johnson and Dennis Shasha,[3] and LRU/K by Pat O'Neil, Betty O'Neil and Gerhard Weikum.[4]


The access sequence for the below example is A B C D E D F.


In the above example once A B C D gets installed in the blocks with sequence numbers (Increment 1 for each new Access) and when E is accessed, it is a miss and it needs to be installed in one of the blocks. According to the LRU Algorithm, since A has the lowest Rank(A(0)), E will replace A.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala,JavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is


Storage Level
Meaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER 
(Java and Scala)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER 
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.


  • Cache 와 Persist 의 차이 : Cache 는 Default 값(MEMORY_AND_DISK)이 정해져 있는 반면 Persist 는 위와 같은 Storage Level 을 설정할 수 있음
  • Dataset 과 RDD 의 Default Storage Level 이 다름 : RDD = MEMORY_ONLY, Dataset = MEMORY_AND_DISK

※ local 공간이 충분치 않으면, out of space 가 발생할 수 있음


Which Storage Level to Choose?

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.


An SQL join clause combines records from two or more tables. This operation is very common in data processing and understanding of what happens under the hood is important. There are several common join types: INNERLEFT OUTERRIGHT OUTERFULL OUTER and CROSS or CARTESIAN.

join types

Join which uses the same table is a self-join. If an operation uses equality operator, it is equi-join, otherwise, it is non-equi-join.

This article covers different join types in Apache Spark as well as examples of slowly changed dimensions (SCD) and joins on non-unique columns.

Sample data

All subsequent explanations on join types in this article make use of the following two tables, taken from Wikipedia article. The rows in these tables serve to illustrate the effect of different types of joins and join-predicates.

Employees table has a nullable column. To express it in terms of statically typed Scala, one needs to use Option type.

val employees = sc.parallelize(Array[(String, Option[Int])](
  ("Rafferty", Some(31)), ("Jones", Some(33)), ("Heisenberg", Some(33)), ("Robinson", Some(34)), ("Smith", Some(34)), ("Williams"null)
)).toDF("LastName""DepartmentID")
 
employees.show()
 
+----------+------------+
|  LastName|DepartmentID|
+----------+------------+
|  Rafferty|          31|
|     Jones|          33|
|Heisenberg|          33|
|  Robinson|          34|
|     Smith|          34|
|  Williams|        null|
+----------+------------+
val departments = sc.parallelize(Array(
  (31"Sales"), (33"Engineering"), (34"Clerical"),
  (35"Marketing")
)).toDF("DepartmentID""DepartmentName")
 
departments.show()
 
+------------+--------------+
|DepartmentID|DepartmentName|
+------------+--------------+
|          31|         Sales|
|          33|   Engineering|
|          34|      Clerical|
|          35|     Marketing|
+------------+--------------+

Department table does not have nullable columns, type specification could be omitted.


Inner join

Following SQL code

SELECT * FROM employee INNER JOIN department ON employee.DepartmentID = department.DepartmentID;

could be written in Spark as

employees
  .join(departments, "DepartmentID")
  .show()
 
+------------+----------+--------------+
|DepartmentID|  LastName|DepartmentName|
+------------+----------+--------------+
|          31|  Rafferty|         Sales|
|          33|     Jones|   Engineering|
|          33|Heisenberg|   Engineering|
|          34|  Robinson|      Clerical|
|          34|     Smith|      Clerical|
+------------+----------+--------------+



Beautiful, is not it? Spark automatically removes duplicated “DepartmentID” column, so column names are unique and one does not need to use table prefix to address them.

Left outer join

Left outer join is a very common operation, especially if there are nulls or gaps in a data. Note, that column name should be wrapped into scala Seq if join type is specified.

employees
  .join(departments, Seq("DepartmentID"), "left_outer")
  .show()
 
+------------+----------+--------------+
|DepartmentID|  LastName|DepartmentName|
+------------+----------+--------------+
|          31|  Rafferty|         Sales|
|          33|     Jones|   Engineering|
|          33|Heisenberg|   Engineering|
|          34|  Robinson|      Clerical|
|          34|     Smith|      Clerical|
|        null|  Williams|          null|
+------------+----------+--------------+


Other join types

Spark allows using following join types: innerouterleft_outerright_outerleftsemi. The interface is the same as for left outer join in the example above.

For cartesian join column specification should be omitted:

employees
  .join(departments)
  .show(10)
 
+----------+------------+------------+--------------+
|  LastName|DepartmentID|DepartmentID|DepartmentName|
+----------+------------+------------+--------------+
|  Rafferty|          31|          31|         Sales|
|  Rafferty|          31|          33|   Engineering|
|  Rafferty|          31|          34|      Clerical|
|  Rafferty|          31|          35|     Marketing|
|     Jones|          33|          31|         Sales|
|     Jones|          33|          33|   Engineering|
|     Jones|          33|          34|      Clerical|
|     Jones|          33|          35|     Marketing|
|Heisenberg|          33|          31|         Sales|
|Heisenberg|          33|          33|   Engineering|
+----------+------------+------------+--------------+
only showing top 10 rows



Warning: do not use cartesian join with big tables in production.

Join expression, slowly changing dimensions and non-equi join

Spark allows us to specify join expression instead of a sequence of columns. In general, expression specification is less readable, so why do we need such flexibility? The reason is non-equi join.

One application of it is slowly changing dimensions. Assume there is a table with product prices over time:

val products = sc.parallelize(Array(
  ("steak""1990-01-01""2000-01-01"150),
  ("steak""2000-01-02""2020-01-01"180),
  ("fish""1990-01-01""2020-01-01"100)
)).toDF("name""startDate""endDate""price")
 
products.show()
 
+-----+----------+----------+-----+
| name| startDate|   endDate|price|
+-----+----------+----------+-----+
|steak|1990-01-01|2000-01-01|  150|
|steak|2000-01-02|2020-01-01|  180|
| fish|1990-01-01|2020-01-01|  100|
+-----+----------+----------+-----+


There are two products only: steak and fish, price of steak has been changed once. Another table consists of product orders by day:

val orders = sc.parallelize(Array(
  ("1995-01-01""steak"),
  ("2000-01-01""fish"),
  ("2005-01-01""steak")
)).toDF("date""product")
 
orders.show()
 
+----------+-------+
|      date|product|
+----------+-------+
|1995-01-01|  steak|
|2000-01-01|   fish|
|2005-01-01|  steak|
+----------+-------+


Our goal is to assign an actual price for every record in the orders table. It is not obvious to do using only equality operators, however, spark join expression allows us to achieve the result in an elegant way:

orders
  .join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate")
  .show()
 
+----------+-------+-----+----------+----------+-----+
|      date|product| name| startDate|   endDate|price|
+----------+-------+-----+----------+----------+-----+
|2000-01-01|   fish| fish|1990-01-01|2020-01-01|  100|
|1995-01-01|  steak|steak|1990-01-01|2000-01-01|  150|
|2005-01-01|  steak|steak|2000-01-02|2020-01-01|  180|
+----------+-------+-----+----------+----------+-----+

This technique is very useful, yet not that common. It could save a lot of time for those who write as well as for those who read the code.

Inner join using non primary keys

Last part of this article is about joins on non unique columns and common mistakes related to it. Join (intersection) diagrams in the beginning of this article stuck in our heads. Because of visual comparison of sets intersection we assume, that result table after inner join should be smaller, than any of the source tables. This is correct only for joins on unique columns and wrong if columns in both tables are not unique. Consider following DataFrame with duplicated records and its self-join:

val df = sc.parallelize(Array(
  (0), (1), (1)
)).toDF("c1")
 
df.show()
df.join(df, "c1").show()
 
// Original DataFrame
+---+
| c1|
+---+
|  0|
|  1|
|  1|
+---+
 
// Self-joined DataFrame
+---+
| c1|
+---+
|  0|
|  1|
|  1|
|  1|
|  1|
+---+


Note, that size of the result DataFrame is bigger than the source size. It could be as big as n2, where n is a size of source.

Conclusion

The article covered different join types implementations with Apache Spark, including join expressions and join on non-unique keys.

Apache Spark allows developers to write the code in the way, which is easier to understand. It improves code quality and maintainability.


total input 30만줄 X 66rows = 1980만 rows = 약 5GB

output = ranks => about 1980만 rows  = 약 549MB



환경작업condtion1condition2condition3소요시간(m)결과 파일 개수(row수)
Hadoop/ PageRankpage rank itteration 5yarn - 2GB12 Containersmap split by 67files(80MB each)46.5
 setting


0
 itr1(setting + itr1) 

5.154917
 itr2 

9.771217

itr3 

7.103517

itr4


9.663967

itr5


14.85307
Spark / PageRankpage rank itteration 5yarn-3GB5Excutor - 2corespark.read with 67files(80MB each)13.6

setting

read 하는 시간 포함

mapPartition.cache() 포함

0.68

itr1


2.583

itr2


2.61

itr3


2.48

itr4


2.61

itr5


2.6



환경작업condtion1condition2condition3소요시간결과 파일 개수(row수)
Hadoop / ETL Hadoop local modetitle parsinglocal mode  1033586(ms) = 17분17773690
find links for (id,Title) : (id,Title)local modeid matching memory exceedmemory exceed
find links for (Title) : (Title)local modeonly title 2594021(ms) = 43분16861907/4.4GB
Hadoop / ETL Hadoop cluster modetitle parsing clustermode  447268(ms) = 7분16861907
find links for (id,Title) : (id,Title)clustermodeid matching 819340(ms) 13분 
find links for (Title) : (Title)clustermodeonly title

 memory = 1024

container 수 23

map/reduce = vcore = 2

map/reduces = 5

13.45분

16861907/4.4GB
find links for (Title) : (Title)clustermodeonly title

 memory = 2048

container 수 12

13.27분

16861907/4.4GB
ETL by Spark clustermode(1-3)title parsingclusermode

7분

find links for (Title) : (Title)

excutor = 5

excutorcore = 2

memory 3G

10분 15초














Hadoop/ PageRankpage rank itteration 3cluster modetitles 

itr 1 은 7분 20초

itr2 는 측정불가

 
   
   
       
       
Spark / PageRankpage rank itteration 5cluster modedataset without persist
10 분 



dataset with persistrepartition X14분

  dataset with persist repartition 1011분  
   dataset with persist...

*spark 테스트 중 unusuable node 등의 오류가 나오는데, local disk 의 점유가 높아지면 taskmanager (yarn) 가 일시적으로 node 를 kill 하고 다시 복구시킨다.

다만 그것이 반복되다가 결과가 제대로 나오지 않는 경우도 있다.

*위를 해결하기위해서 현재 DISK (100GB) 를 cluster 마다 추가 할당하였고, temp 파일 저장 경로를 그쪽으로 변경하였다.

* hadoop 의 경우 local_temp 경로에 shuffle 되는 결과들이 쏟아진다. 다만 계정에  temp 폴더 쓰기권한 등이 없으면 에러가난다.

* hadoop 실행 도중 중지시 temp 경로에(주로 /tmp) 로그와 셔플 중간 파일들이 쌓여있을 수 있다. 그파일은 주기적으로 정리 필요

 *spark 에서 pagerank 의 경우에는 ittr 반복이 될수록 local 에 점유정도가 어느정도인지 확인필요하고, 해소방안도 필요하다.

* hadoop 에서 conf.set("mapreduce.textoutputformat.separator", "::"); 등 config 셋팅등에 유의 하자 (yarn 또한 마찬가지) Hadoop - map reduce configuration

 *현재 돌고있는 프로세스는 yarn application -list 확인 후  -kill 등으로 죽일 수 있다.



hadoop 실행시간( log 기준)


https://stackoverflow.com/questions/34175386/precise-definitions-of-map-shuffle-merge-and-reduce-times-in-hadoop




duration비고
app start11:07:03

Set-up11:07:160:00:13
map 최초 실행11:07:180:00:15472개의 task 동시수행
Reducer 최초 실행11:07:450:00:421개의 task가 map 에서 reduce로 오기까지 약 27초
map end time11:14:190:07:16마지막 map 이완료된시간 (reduce = 32%)
reducer end time11:14:370:07:3468% reducing 완료 되는 시간 = 18초


Default zone 확인

firewall-cmd --get-default-zone

포트추가

--add-port=<portid>[-<portid>]/<protocol> 옵션을 사용하여 포트 추가

firewall-cmd --permanent --zone=public --add-port=8080/tcp

 설정 reload

firewall-cmd --reload


  • 컨트롤러 선정
  • 브로커 메타 데이터
  • 토픽 메타데이터
  • 클라이언트 할당quota 정보

컨트롤러 선정

컨트롤러 - 파티션 관리를 책임지는 브로커로 파티션 관리 범위는 아래와 같다.

  • 리더 선정
  • 토픽 생성
  • 파티션 생성
  • 복제본 관리

하나의 노드 또는 서버가 꺼지면 컨트롤러는 팔로워 중에서 파티션 리더를 선정한다.

카프카는 컨트롤러를 선정하기 위해 주키퍼의 메타데이터 정보를 활용한다.

주키퍼는 현재의 컨트롤러에 장애가 나면 새로운 컨트롤러가 선정되는 것을 보장한다.


브로커 메타데이터

주키퍼는 카프카 클러스터의 일부인 각 브로커에 대해 상태정보를 기록한다. 클러스터 내에서 각 브로커의 모든 메타데이터를 기록한다.

프로듀서와 컨슈머는 주키퍼와의 상호 작용으로 브로커의 상태 정보를 얻는다.


토픽 메타 데이터

주키퍼는 파티션수, 특정한 설정 파라미터 등의 토픽 메타데이터를 기록한다.


클라이언트 할당 정보

할당량은 카프카 토픽의 메시지를 읽고 쓰는 클라이언트에 대한 바이트 비율의 임계 값을 제한하며, 모든 정보와 상태는 주키퍼가 관리한다.


카프카 토픽 ACLs

카프카는 내장된 인증모듈(ACL, Access Control List)을 가지고 있다.

이러한 ACLs는 사용자 역할과 관련된 토픽에 대해 읽기와 쓰기 권한 종류를 결정한다. 카프카는 ACLs를 저장하는데 주키퍼를 사용한다.

'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka stream test  (0) 2018.11.21
1 topic vs multi topic  (0) 2018.11.21
kafka - partitions  (0) 2018.11.20
kafka log 정책  (0) 2018.11.20
kafka manager  (0) 2018.11.20

파티션 규칙(default partitioner)

  • key가 있는경우 :  key를 hash하여 특정 파티션으로 보냄. 키와 파티션간의 mapping은 topic의 파티션 개수가 변경되지 않는경우 유지됨.
  • key가 없는 경우 : 임의로 사용가능한 파티션으로 보낸다. 파티션간의 균형을 맞추는데는 Round Robin 알고리즘을 사용

파티션 개수의 산정방법

파티션 개수를 산정할 때 고려할 점은 다음과 같다

  • 단위시간당 토픽의 처리량(throughput). 예를 들어, 초당 100KB 또는 1GB를 예상 하는가?
  • 한 파티션의 데이터를 읽을 때 목표로 하는 최대 처리량은? 파티션 하나는 항상 한 컨슈머가 소비한다.(컨슈머 그룹을 사용하지 않을 때도 컨슈머는 항상 해당 파티션의 모든 메시지를 읽어야 하기 때문)  따라서 처리 속도가 느린 컨슈머가 피티션을 읽어서 그 데이터를 데이터베이스에 쓸 때 데이터를 쓰는 스레드마다 초당 50MB 까지만 처리할 수 있다면 파티션을 소비하는 최대 처리량이 초당 50MB로 제한 된는것을 염두해야 한다.
  • 하나의 파티션에 데이터를 생성하는 프로듀서당 최대 처리량도 컨슈머와 같은 방법으로 산정할 수 있다. 그러나 대개 프로듀서는 컨슈머보다 훨씬 빠르게 처리되므로 처리량을 조사하지 않아도 무방.
  • key를 사용해서 파티션에 메시지를 쓰는 경우에는 향후에 파티션을 추가할 때 개수 산정이 어려울 수 있다. 따라서 현재보다는 향후에 예상되는 사용 방법을 기준으로 처리량을 추산하자.
  • 브로커마다 파티션 개수와 디스크 용량 및 네트워크 처리속도를 고려하자.
  • 파티션 개수를 너무 많이 고려하지 말자. 각 파티션은 브러커의 메모리와 그 외 다른 자원을 사용하므로 리더선정에 더 많은 시간이 소요되기 때문.

이 모든 것을 고려할 때 파티션 개수를 너무 많이 산정하지 않아야 한다. 토픽의 목표 처리량과 컨슈머의 예상 처리량에 관한 추정치가 있다면, 목표 처리량을 컨슈머 예상 처리량으로 나누는 방법으로 파티션 개수를 산출할 수 있다. 예를 들어 초당 1GB로 토픽을 읽고 쓰기 원하는데 각 컨슈머가 초당 50MB만 처리할 수 있다면 최소한 20개(1000/50)의 파티션이 필요하다는 것을 알 수 있다. 이 경우 20개의 컨슈머가 토픽을 읽으므로 초당 1GB의 목표 처리량을 성취할 수 있다. 그러나 만일 이러한 자세한 정보가 없다면 디스크에 보존하는 파티션 크기를 하루에 6GB미만으로 제한할 것을 권함.

'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

1 topic vs multi topic  (0) 2018.11.21
zookeeper  (0) 2018.11.20
kafka log 정책  (0) 2018.11.20
kafka manager  (0) 2018.11.20
kafka tuning config  (0) 2018.11.20

ref: https://stackoverflow.com/questions/40369238/which-directory-does-apache-kafka-store-the-data-in-broker-nodes

ambari 기준으로 kafka 관련된 로그는 2군데에 저장 된다.

log.dirs : 메세지 보관을 위한 오프셋을 포함한 로그이다.

/var/log/kafka : kafka 자체 로그이며 kafka.env 파일에서 설정 할 수 있다.


1. Topic 에 관한 로그 는 아래 정책으로 log.dirs 경로(server.properties 파일에 설정) 에 저장된다.

로그의 잘려진 segment 는 상응하는 index 파일과 같이 저장된다.  그 파일 내임은 Base offset 을 표현하기도 한다.

이것을 설명하자면 log 파일 이저장되는 구조를 이해 해야하는데, 

log 파일은 실제메세지를 strucuted message format 으로 저장한다.  each message 는 처음 64 비트에 증가된 offset 을 포함한다.

그러므로 이파일에서 특정 오프셋의 메세지를 찾는것은 log 파일이 커질 수록 무거워진다.

또한 메세지를 공급하기 위해서는 브로커는 가장 나중의 offset 을 기억하고 메세지를 정확하게 받아들이기 위해 진행 시킨다.

그러므로 로그파일의 offset 기억하기 위해 index 파일이 존재한다. 

index 파일은 아래와 같은 항목을 저장한다.

  1. 4 Bytes: Relative Offset
  2. 4 Bytes: Physical Position

이때 파일내임은 base offset 을 표현한다. index 파일은 로그파일의 index.interval.bytes 의 단위마다 index 파일을 새로 쓴다.(default 4096)


2. kafka 서버자체의 로그 (INFO / ERROR / WARN 등) 은 kafka_env.sh 안에 로그 경로가 지정되어있다.

주로내용은 자바 info 로 채워지는데 이 또한 시간이 지나면 일주일에1~2G 정도 용량이 찬다.

log4j 설정을 이용해 로그양을 줄일 수 있다.  2번 은 말그대로 log 이다.

'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

zookeeper  (0) 2018.11.20
kafka - partitions  (0) 2018.11.20
kafka manager  (0) 2018.11.20
kafka tuning config  (0) 2018.11.20
kafka multi brokers  (0) 2018.11.20

Kafka-manager

github : https://github.com/yahoo/kafka-manager

2018년 8월 현재 kafka 1.1.0 버전 까지 지원한다.


kafka 매니저는 yahoo에서 개발한 kafka 관리 도구이며 아래의 기능을 제공한다.

  • 다수의 클러스터 관리
  • 간단하게 클러스터 상태 조회 가능(topics, consumers, offsets, brokers, replica distribution, partition distribution)
  • Preferred replica election 실행
  • 사용할 브로커 선택 옵션을 사용하여 파티션 할당 생성

  • 파티션 재할당 (based on generated assignments)
  • 토픽 생성시 옵션 설정 가능(0.8.1.1 has different configs than 0.8.2+)
  • 토픽 삭제(버전 0.8.2 이상, delete.topic.enable=true 설정시)
  • 삭제표시된 토픽리스트 조회 (only supported on 0.8.2+)
  • 다수의 토픽에 일괄 파티션생성 및 사용할 브로커 할당
  • 다수의 토픽에 파티션 재할당 일괄 실행
  • 기존 토픽에 파티션 추가.
  • 기존 토픽 설정 업데이트
  • (Optional) enable JMX polling for broker level and topic level metrics.
  • (Optional) filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

설치

SBT(simple build tool) 설치

  • windows
  • CentOS
    참고페이지
    아래 커맨드를 이용해 repo 파일을 받고, yum을 이용해 설치

    $ curl https://bintray.com/sbt/rpm/rpm sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
    sudo yum install sbt

Kafka-manager 설치

  1. 파일 다운로드 : 다운로드페이지에서 압축파일(zip or tar.gz)을 다운로드.
  2. 압축해재 후 sbt를 이용해 배포

    C:\dev\kafka-manager>sbt clean dist
     실행 결과

    `sbt run` 명령어를 입력해 실행 가능. 또는 \target\universal 경로에 생성되는 zip 파일을  원하는 경로에 압축 해제

kafka-manager 설정변경

압축 해제한 경로로 이동하여 설정파일을 수정해야 한다.

  • 주키퍼 호스트 설정 : conf/application.conf 파일 수정

    conf/application.conf
    #zookeeper 서버가 한 대인 경우
    $ kafka-manager.zkhosts="localhost:2181" #사용하는 zookeeper 정보에 맞춰 수정
     
     
    #zookeeper 서버가 다수인 경우
    $ kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181" #쉼표를 이용해 다수의 주키퍼 호스트 입력가능

    더 자세한 내용은 https://github.com/yahoo/kafka-manager#configuration 참조

  • JMX port 설정
    kafka-manager에서 metric 정보를 확인하고자 하면 jmx를 활성화 해 주어야 한다. 
    `bin/kafka-server-start.sh` 파일을 열어 JMX 포트 설정을 추가한 뒤 kafka를 재기동 해준다.

    ${KAFKA_HOME}/bin/kafka-server-start.sh
    export JMX_PORT=9999


    Kafka Manager UI에 접속해 Clusters메뉴에서 modify 버튼은 클릭해 JMX 사용옵션을 활성화 한다.


kafka-manager 구동

압축 해제한 경로로 이동해 프로그램을 실행한다.

$ bin/kafka-manager


기본적으로 9000번 포트를 사용하며 실행시 아래 옵션을 통해 설정파일의 위치를 지정하거나 포트를 설정할 수 있다.

$ bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080


'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka - partitions  (0) 2018.11.20
kafka log 정책  (0) 2018.11.20
kafka tuning config  (0) 2018.11.20
kafka multi brokers  (0) 2018.11.20
kafka connect  (0) 2018.11.20

Usually, you don't need to modify these settings, however, if you want to extract every last bit of performance from your machines, then changing some of them can help. You may have to tweak some of the values, but these worked 80% of the cases for me:

  • message.max.bytes=1000000
  • num.network.threads=3
  • num.io.threads=8
  • background.threads=10
  • queued.max.requests=500
  • socket.send.buffer.bytes=102400
  • socket.receive.buffer.bytes=102400
  • socket.request.max.bytes=104857600
  • num.partitions=1

Quick explanations of the numbers:

  • message.max.bytes: This sets the maximum size of the message that the server can receive. This should be set to prevent any producer from inadvertently sending extra large messages and swamping the consumers. The default size is 1000000.
  • num.network.threads: This sets the number of threads running to handle the network's request. If you are going to have too many requests coming in, then you need to change this value. Else, you are good to go. Its default value is 3.
  • num.io.threads: This sets the number of threads spawned for IO operations. This is should be set to the number of disks present at the least. Its default value is 8.
  • background.threads: This sets the number of threads that will be running and doing various background jobs. These include deleting old log files. Its default value is 10 and you might not need to change it.
  • queued.max.requests: This sets the queue size that holds the pending messages while others are being processed by the IO threads. If the queue is full, the network threads will stop accepting any more messages. If you have erratic loads in your application, you need to set queued.max.requests to a value at which it will not throttle.
  • socket.send.buffer.bytes: This sets the SO_SNDBUFF buffer size, which is used for socket connections.
  • socket.receive.buffer.bytes: This sets the SO_RCVBUFF buffer size, which is used for socket connections.
  • socket.request.max.bytes: This sets the maximum size of the request that the server can receive. This should be smaller than the Java heap size you have set.
  • num.partitions: This sets the number of default partitions of a topic you create without explicitly giving any partition size.

Number of partitions may have to be higher than 1 for reliability, but for performance (even not realistic :)), 1 is better.

These are no silver bullet :), however, you could test these changes with a test topic and 1,000/10,000/100,000 messages per second to see the difference between default values and adjusted values. Vary some of them to see the difference.

You may need to configure your Java installation for maximum performance. This includes the settings for heap, socket size, and so on.

'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka log 정책  (0) 2018.11.20
kafka manager  (0) 2018.11.20
kafka multi brokers  (0) 2018.11.20
kafka connect  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20

+ Recent posts