출처 : https://kafka.apache.org/quickstart /


궁극적으로 BIG DATA 실시간 분석이 목적인데,


Spark Streaming 하기 위한 전단계 중 Kafka Infra 를 앞단에 놓고 실시간 Streaming 을 사용 하여 구현하게된다.


Kafka 는 개략적으로

이런 모양을 하고 cluseter, 즉 broker 가 중간 역할을 하여 data 를 전달 한다.


프로듀서 - 컨슈머 동작을 하기위해선 아래 순서로 설치한다.

1.일단 KAFKA 설치 (직접설치 또는 ambari 등을 이용)


2.zookeeper 환경설치 (직접설치 또는 ambari 등을 이용)


3.Kafka 경로의 server.properties 파일에서 설정 확인


Listener = PLAINTEXT://localhost:6667 또는 9092포트

advertisment.Listener(deprecated)

>>으로 설정 이후 Zookeeper 는 6667 포트를 기준으로 consumer / producer 를 보내게 된다.

default.replication.factor = 1

offsets.topic.replication.factor  = 1 (3 으로설정되어있으면 consumer 가 topic 이 3개 기준으로 동작하기 때문에 추후 조정)


4. 토픽 생성 

/usr/bin/kafka-topics.sh --create --zookeeper localhost:6667 --replication-factor 1 --partitions 1 --topic test



5. 프로듀서 테스트

/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list swinno01.cs9046cloud.internal:6667 --topic Hello 


6.컨슈머 테스트

/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --zookeeper swinno01:2181 --topic Hello

/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --bootstrap-server swinno01.cs9046cloud.internal:6667 --topic Hello --from-beginning 


이때 컨슈머가 제대로 윗라인으론 동작을 하는데, 아래 bootstrap 옵션으로 동작 안하는 경우가있다.

그땐 다수의 삽질과 / 구글링등을 통해서 아래를 확인해보자

a.broker 의 netstat -tnlp 를 통해 6667 (9092) 포트가 오픈되있는지 확인

b.실행환경에서 브로커ip:port 로 telnet 확인

c. zookeeper 스크립트를 통한 브로커 상태확인

/usr/hdp/2.6.5.0-292/kafka/bin/zookeeper-shell.sh localhost:2181   

 get /brokers/ids/1001  (ids num 은 확인필요)

d. topic/ broker 모두 삭제후 KAFKA 재시작

e.  offsets.topic.replication.factor  = 1 확인


연결이 상태가 안 좋다면 아래글 참고

https://github.com/wurstmeister/kafka-docker/wiki/Connectivity


지금도 forum / stackoverflow 등에는 위의 이유등으로 컨슈머가 bootstrap 에서 안된다는 이슈는 꾸준히 나오고있다 -_-ㅋ



이런 설정문제이거나/ 연결문제이거나 / 브로커 잔재 문제이거나


컨슈머까지 스크립트에서 동작하는것을 확인하면

spark 를 이용한 streaming 하기 위한 전단계 까지 완료 !





'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka connect  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
kafka 로그 설정 에 대하여  (0) 2018.10.17
Kafka Connector , Simple Connector 구현  (0) 2018.07.16

spark-shell command 에서는 write  할때 문제가 되지 않는데,

spark-submit 에서 정의된 함수를 사용해서 write 하는 과정에서 not serializable 에러가 나오는 경우가 있다.


예를들면,


가령 같은 scala class 안에 function 을 정의하고 바로 사용 하게되면,


org.apache.spark.SparkException: Task not serializable when write  에러가 나는데, 


def job(tpath : String) : Unit = {

object getgap {
def getgap(s1:String , s2 : String) : Long = {
if(s1.length<8 || s2.length<8) 0
val format = new java.text.SimpleDateFormat("HH:mm:ss.SSS")
val v1 = format.parse(s1)
val v2 = format.parse(s2)
(v2.getTime() - v1.getTime()) / 100
}
}
  import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("yarn").appName("message_summerization").getOrCreate()
import spark.implicits._
//val inputpath = "/user/sangil55/m_input"
val rd = spark.read.json(inputpath)


val df_timegap = df_semi2.map(row=>(row.getString(0), row.getString(1), row.getString(2), getgap(row.getString(1),row.getString(2)) ) )
.withColumnRenamed("_1", "jobid").withColumnRenamed("_2", "starttime").withColumnRenamed("_3", "endtime").withColumnRenamed("_4", "gap")
.sort("starttime")

이 getgap 함수를 object 로 빼주게되면 static class 로 인식하여

잘돌아간다.


spark 환경에서 같은클래스에서 선언된 함수는 따로 클래스화되지 않고, 임시저장되는데 이걸 바로 사용하게되면

mapper 에서 임시저장된 함수를 제대로 분산시키지 못하는듯하다.


이와 비슷한 에러가 자주나오니,

spark-submit 환경에서 function 으로 동작하는 놈들은 웬만하면 따로 뺴주는게 좋다.




rdd- groupbyKey 와 달리 spark dataset 은 groupby 로 리듀스를 할 수 있는데, 


이때 .agg 형식으로 aggregation 을 한다.


다만  .agg 안에 들어가는 function 은 단한 선언으로 안되고, 

udf (user defined aggregationfuncion) 을 만들어서 구현해주어야 하는데, 동작이 뭔가 까다롭다



일단 shell 이나 main 에서는


아래와 같이 실행 해주면되고

val td = rd.select($"jobid".alias("jobid"), $"time".alias("starttime"), $"time".alias("endtime"), $"result".alias("report_result"), $"result".alias("error"))
//val td = rd.select($"jobid".alias("jobid"),$"time".alias("starttime"))

import CustomAgg._
val minagg = new minAggregation()
val maxagg = new maxAggregation()
val erragg = new errorAggregation()

val td2 = td.groupBy("jobid").agg(minagg(td("starttime")), maxagg(td("endtime")) , erragg(td("error"))) 


UDF 는 아래와 같이 만들어준다.

(max 값찾는 agg / min 값찾는 agg , error 찾는 agg 등이다.)

update 에서는 파티션내의 병합이 

merge 에서는 파티션간의 병합이 이루어지는 것 같다.

ackage CustomAgg

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.mutable.ArrayBuffer

object CustomAggregation
{

class maxAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])// initialize array
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
println(input.getString(0))
println(buffer.getSeq[String](0).mkString)
if(buffer.getSeq[String](0).mkString.isEmpty)
buffer.update(0, Seq(input.getString(0)))
else if(buffer.getSeq[String](0).mkString.compareTo(input.getString(0)) > 0)
buffer.update(0, buffer.getSeq[String](0))
else
buffer.update(0, Seq(input.getString(0)))
println(buffer.getSeq[String](0).mkString)
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
if(buffer1.getSeq[String](0).mkString.compareTo(buffer2.getSeq[String](0).mkString) > 0)
buffer1.update(0, buffer1.getSeq[String](0))
else
buffer1.update(0, buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}

class minAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff",StringType)
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, "")// initialize array
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {

if(buffer.getString(0).isEmpty)
buffer.update(0, input.getString(0))
else if(buffer.getString(0).compareTo(input.getString(0)) < 0)
buffer.update(0, buffer.getString(0))
else
buffer.update(0, input.getString(0))

println( "updated :" + buffer.getString(0))
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {

if(buffer1.getString(0).compareTo(buffer2.getString(0))< 0 && !buffer1.getString(0).isEmpty)
buffer1.update(0, buffer1.getString(0))
else
buffer1.update(0, buffer2.getString(0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getString(0))
}


class errorAggregation extends UserDefinedAggregateFunction {
// Input Data Type Schema
//def inputSchema: StructType = StructType(Array(StructField("col5", StringType)))

// Intermediate Schema
//def bufferSchema = StructType(Array( StructField("col5_collapsed", StringType)))

def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
// Returned Data Type .
def dataType: DataType = StringType

// Self-explaining
def deterministic = true

// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, Seq(""))// initialize array
}

def checkerror (str : String) : Boolean = {

if(str.compareTo("Info") == 0) false
if(str.compareTo("0") == 0) false
if(str.compareTo("Success") == 0) false
if(str.compareTo("") == 0) false
true
}

// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
if(checkerror(input.getString(0)))
{
input.getString(0)
buffer.update(0, Seq(input.getString(0)))
}
}
}


// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
println(buffer1.getSeq[String](0).toString())
println(buffer2.getSeq[String](0).toString())
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}


class GeometricMean extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function.
override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", StringType) :: Nil)

// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
StructField("count", LongType) ::
StructField("product", StringType) :: Nil
)

// This is the output type of your aggregatation function.
override def dataType: DataType = StringType

override def deterministic: Boolean = true

// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
buffer(1) = " "
}

// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[String](0) + 1
buffer(1) = buffer.getAs[String](1) + input.getAs[String](0)
}

// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
}

// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
}
}

} 


+ Recent posts