An SQL join clause combines records from two or more tables. This operation is very common in data processing and understanding of what happens under the hood is important. There are several common join types: INNERLEFT OUTERRIGHT OUTERFULL OUTER and CROSS or CARTESIAN.

join types

Join which uses the same table is a self-join. If an operation uses equality operator, it is equi-join, otherwise, it is non-equi-join.

This article covers different join types in Apache Spark as well as examples of slowly changed dimensions (SCD) and joins on non-unique columns.

Sample data

All subsequent explanations on join types in this article make use of the following two tables, taken from Wikipedia article. The rows in these tables serve to illustrate the effect of different types of joins and join-predicates.

Employees table has a nullable column. To express it in terms of statically typed Scala, one needs to use Option type.

val employees = sc.parallelize(Array[(String, Option[Int])](
  ("Rafferty", Some(31)), ("Jones", Some(33)), ("Heisenberg", Some(33)), ("Robinson", Some(34)), ("Smith", Some(34)), ("Williams"null)
)).toDF("LastName""DepartmentID")
 
employees.show()
 
+----------+------------+
|  LastName|DepartmentID|
+----------+------------+
|  Rafferty|          31|
|     Jones|          33|
|Heisenberg|          33|
|  Robinson|          34|
|     Smith|          34|
|  Williams|        null|
+----------+------------+
val departments = sc.parallelize(Array(
  (31"Sales"), (33"Engineering"), (34"Clerical"),
  (35"Marketing")
)).toDF("DepartmentID""DepartmentName")
 
departments.show()
 
+------------+--------------+
|DepartmentID|DepartmentName|
+------------+--------------+
|          31|         Sales|
|          33|   Engineering|
|          34|      Clerical|
|          35|     Marketing|
+------------+--------------+

Department table does not have nullable columns, type specification could be omitted.


Inner join

Following SQL code

SELECT * FROM employee INNER JOIN department ON employee.DepartmentID = department.DepartmentID;

could be written in Spark as

employees
  .join(departments, "DepartmentID")
  .show()
 
+------------+----------+--------------+
|DepartmentID|  LastName|DepartmentName|
+------------+----------+--------------+
|          31|  Rafferty|         Sales|
|          33|     Jones|   Engineering|
|          33|Heisenberg|   Engineering|
|          34|  Robinson|      Clerical|
|          34|     Smith|      Clerical|
+------------+----------+--------------+



Beautiful, is not it? Spark automatically removes duplicated “DepartmentID” column, so column names are unique and one does not need to use table prefix to address them.

Left outer join

Left outer join is a very common operation, especially if there are nulls or gaps in a data. Note, that column name should be wrapped into scala Seq if join type is specified.

employees
  .join(departments, Seq("DepartmentID"), "left_outer")
  .show()
 
+------------+----------+--------------+
|DepartmentID|  LastName|DepartmentName|
+------------+----------+--------------+
|          31|  Rafferty|         Sales|
|          33|     Jones|   Engineering|
|          33|Heisenberg|   Engineering|
|          34|  Robinson|      Clerical|
|          34|     Smith|      Clerical|
|        null|  Williams|          null|
+------------+----------+--------------+


Other join types

Spark allows using following join types: innerouterleft_outerright_outerleftsemi. The interface is the same as for left outer join in the example above.

For cartesian join column specification should be omitted:

employees
  .join(departments)
  .show(10)
 
+----------+------------+------------+--------------+
|  LastName|DepartmentID|DepartmentID|DepartmentName|
+----------+------------+------------+--------------+
|  Rafferty|          31|          31|         Sales|
|  Rafferty|          31|          33|   Engineering|
|  Rafferty|          31|          34|      Clerical|
|  Rafferty|          31|          35|     Marketing|
|     Jones|          33|          31|         Sales|
|     Jones|          33|          33|   Engineering|
|     Jones|          33|          34|      Clerical|
|     Jones|          33|          35|     Marketing|
|Heisenberg|          33|          31|         Sales|
|Heisenberg|          33|          33|   Engineering|
+----------+------------+------------+--------------+
only showing top 10 rows



Warning: do not use cartesian join with big tables in production.

Join expression, slowly changing dimensions and non-equi join

Spark allows us to specify join expression instead of a sequence of columns. In general, expression specification is less readable, so why do we need such flexibility? The reason is non-equi join.

One application of it is slowly changing dimensions. Assume there is a table with product prices over time:

val products = sc.parallelize(Array(
  ("steak""1990-01-01""2000-01-01"150),
  ("steak""2000-01-02""2020-01-01"180),
  ("fish""1990-01-01""2020-01-01"100)
)).toDF("name""startDate""endDate""price")
 
products.show()
 
+-----+----------+----------+-----+
| name| startDate|   endDate|price|
+-----+----------+----------+-----+
|steak|1990-01-01|2000-01-01|  150|
|steak|2000-01-02|2020-01-01|  180|
| fish|1990-01-01|2020-01-01|  100|
+-----+----------+----------+-----+


There are two products only: steak and fish, price of steak has been changed once. Another table consists of product orders by day:

val orders = sc.parallelize(Array(
  ("1995-01-01""steak"),
  ("2000-01-01""fish"),
  ("2005-01-01""steak")
)).toDF("date""product")
 
orders.show()
 
+----------+-------+
|      date|product|
+----------+-------+
|1995-01-01|  steak|
|2000-01-01|   fish|
|2005-01-01|  steak|
+----------+-------+


Our goal is to assign an actual price for every record in the orders table. It is not obvious to do using only equality operators, however, spark join expression allows us to achieve the result in an elegant way:

orders
  .join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate")
  .show()
 
+----------+-------+-----+----------+----------+-----+
|      date|product| name| startDate|   endDate|price|
+----------+-------+-----+----------+----------+-----+
|2000-01-01|   fish| fish|1990-01-01|2020-01-01|  100|
|1995-01-01|  steak|steak|1990-01-01|2000-01-01|  150|
|2005-01-01|  steak|steak|2000-01-02|2020-01-01|  180|
+----------+-------+-----+----------+----------+-----+

This technique is very useful, yet not that common. It could save a lot of time for those who write as well as for those who read the code.

Inner join using non primary keys

Last part of this article is about joins on non unique columns and common mistakes related to it. Join (intersection) diagrams in the beginning of this article stuck in our heads. Because of visual comparison of sets intersection we assume, that result table after inner join should be smaller, than any of the source tables. This is correct only for joins on unique columns and wrong if columns in both tables are not unique. Consider following DataFrame with duplicated records and its self-join:

val df = sc.parallelize(Array(
  (0), (1), (1)
)).toDF("c1")
 
df.show()
df.join(df, "c1").show()
 
// Original DataFrame
+---+
| c1|
+---+
|  0|
|  1|
|  1|
+---+
 
// Self-joined DataFrame
+---+
| c1|
+---+
|  0|
|  1|
|  1|
|  1|
|  1|
+---+


Note, that size of the result DataFrame is bigger than the source size. It could be as big as n2, where n is a size of source.

Conclusion

The article covered different join types implementations with Apache Spark, including join expressions and join on non-unique keys.

Apache Spark allows developers to write the code in the way, which is easier to understand. It improves code quality and maintainability.


total input 30만줄 X 66rows = 1980만 rows = 약 5GB

output = ranks => about 1980만 rows  = 약 549MB



환경작업condtion1condition2condition3소요시간(m)결과 파일 개수(row수)
Hadoop/ PageRankpage rank itteration 5yarn - 2GB12 Containersmap split by 67files(80MB each)46.5
 setting


0
 itr1(setting + itr1) 

5.154917
 itr2 

9.771217

itr3 

7.103517

itr4


9.663967

itr5


14.85307
Spark / PageRankpage rank itteration 5yarn-3GB5Excutor - 2corespark.read with 67files(80MB each)13.6

setting

read 하는 시간 포함

mapPartition.cache() 포함

0.68

itr1


2.583

itr2


2.61

itr3


2.48

itr4


2.61

itr5


2.6



환경작업condtion1condition2condition3소요시간결과 파일 개수(row수)
Hadoop / ETL Hadoop local modetitle parsinglocal mode  1033586(ms) = 17분17773690
find links for (id,Title) : (id,Title)local modeid matching memory exceedmemory exceed
find links for (Title) : (Title)local modeonly title 2594021(ms) = 43분16861907/4.4GB
Hadoop / ETL Hadoop cluster modetitle parsing clustermode  447268(ms) = 7분16861907
find links for (id,Title) : (id,Title)clustermodeid matching 819340(ms) 13분 
find links for (Title) : (Title)clustermodeonly title

 memory = 1024

container 수 23

map/reduce = vcore = 2

map/reduces = 5

13.45분

16861907/4.4GB
find links for (Title) : (Title)clustermodeonly title

 memory = 2048

container 수 12

13.27분

16861907/4.4GB
ETL by Spark clustermode(1-3)title parsingclusermode

7분

find links for (Title) : (Title)

excutor = 5

excutorcore = 2

memory 3G

10분 15초














Hadoop/ PageRankpage rank itteration 3cluster modetitles 

itr 1 은 7분 20초

itr2 는 측정불가

 
   
   
       
       
Spark / PageRankpage rank itteration 5cluster modedataset without persist
10 분 



dataset with persistrepartition X14분

  dataset with persist repartition 1011분  
   dataset with persist...

*spark 테스트 중 unusuable node 등의 오류가 나오는데, local disk 의 점유가 높아지면 taskmanager (yarn) 가 일시적으로 node 를 kill 하고 다시 복구시킨다.

다만 그것이 반복되다가 결과가 제대로 나오지 않는 경우도 있다.

*위를 해결하기위해서 현재 DISK (100GB) 를 cluster 마다 추가 할당하였고, temp 파일 저장 경로를 그쪽으로 변경하였다.

* hadoop 의 경우 local_temp 경로에 shuffle 되는 결과들이 쏟아진다. 다만 계정에  temp 폴더 쓰기권한 등이 없으면 에러가난다.

* hadoop 실행 도중 중지시 temp 경로에(주로 /tmp) 로그와 셔플 중간 파일들이 쌓여있을 수 있다. 그파일은 주기적으로 정리 필요

 *spark 에서 pagerank 의 경우에는 ittr 반복이 될수록 local 에 점유정도가 어느정도인지 확인필요하고, 해소방안도 필요하다.

* hadoop 에서 conf.set("mapreduce.textoutputformat.separator", "::"); 등 config 셋팅등에 유의 하자 (yarn 또한 마찬가지) Hadoop - map reduce configuration

 *현재 돌고있는 프로세스는 yarn application -list 확인 후  -kill 등으로 죽일 수 있다.



hadoop 실행시간( log 기준)


https://stackoverflow.com/questions/34175386/precise-definitions-of-map-shuffle-merge-and-reduce-times-in-hadoop




duration비고
app start11:07:03

Set-up11:07:160:00:13
map 최초 실행11:07:180:00:15472개의 task 동시수행
Reducer 최초 실행11:07:450:00:421개의 task가 map 에서 reduce로 오기까지 약 27초
map end time11:14:190:07:16마지막 map 이완료된시간 (reduce = 32%)
reducer end time11:14:370:07:3468% reducing 완료 되는 시간 = 18초


Default zone 확인

firewall-cmd --get-default-zone

포트추가

--add-port=<portid>[-<portid>]/<protocol> 옵션을 사용하여 포트 추가

firewall-cmd --permanent --zone=public --add-port=8080/tcp

 설정 reload

firewall-cmd --reload


+ Recent posts