kafka vs flink  : https://www.confluent.io/blog/apache-flink-apache-kafka-streams-comparison-guideline-users/


'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka multi brokers  (0) 2018.11.20
kafka connect  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
kafka 로그 설정 에 대하여  (0) 2018.10.17
Kafka Connector , Simple Connector 구현  (0) 2018.07.16

개요

구성요소

Confluent Platform OpenSource

  • kafka core : 카프카 브로커
  • Kafka Streams : 스트림 처리 시스템을 만들기 위한 카프카 라이브러리
  • kafka connector : 카프카를 데이터베이스, 파일시스템 등에 연결하는 프래임워크.
  • kafka client : 카프카에서 카프카로 메시지를 읽고 ㅆ는 라이브러리
  • kafka REST proxy : 애플리케이션을 카프카 클라이언트용 프로그램으로 동작시킬 수 없는 경우 HTTP를 사용해 카프카에 연결하도록 구성 가능.
  • kafka schema registry : 모든 스키마와 그들의 변경사항에 대한 버전을 보관하는 저장소. 관련된 구성요소가 변경을 인지할 수 있게 함.

Confluent Platform Enterprise

  • 컨플루언트 컨트롤 센터 : 웹 그래픽 사용자 인터페이스, 카프카 시스템을 관리하고 모니터링.
  • 컨플루언트 기술지원, 전문서비스, 컨설팅.

as-is








(2번의 host 를 새롭게 추가 할경우)

  1. SSH 설정 및 1번의 ssh 설정 복사필요~ (AMBARI 설치 참고)

  2. Repoistery setting 추가, /etc/yum.repos.d/ 경로의 repo를 local repo로 재구성 필요(다른서버참고)
  3. data01 mount  ambari 설치 참고 (sdb1)


warning 해결 필요(질문)

      3. 2번서버의 JDK 설치 (/usr/jdk/   /usr/jdk64/ 경로에  기존 JDK 파일 복사 또는 yum install(?))

The following registered hosts have issues related to JDK


JDK not found at /usr/jdk64/jdk1.8.0_112/

       4. THP 이슈

The following hosts have Transparent Huge Pages (THP) enabled. THP should be disabled to avoid potential Hadoop performance issues.

Transparent Huge Pages


5.ntpd 이슈

The following services should be up

ntpd or chronyd

'spark,kafka,hadoop ecosystems > hotornworks-ambari' 카테고리의 다른 글

amabari - ssh 접속  (0) 2018.11.19
ambari 설치  (0) 2018.11.19

ssh 접속


cmd 열어서

window ssh 툴킷(?)설치

ex

ssh -p 3000 -D 1080 -q -C root@14.63.172.232

id/pw 치고


chrome 에 아래와 같이 ㅇ파라미터 입력

"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" swinno01:8080 --proxy-server="socks5://localhost:1080"   --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost"   --user-data-dir=c:/temp

'spark,kafka,hadoop ecosystems > hotornworks-ambari' 카테고리의 다른 글

ambari host 수동 추가시 필요한 부분  (0) 2018.11.19
ambari 설치  (0) 2018.11.19

PTABLES 비활성화

서버간 통신을 위해 iptables 비활성화

# 서버 재기동시 iptable 실행 x
chkconfig iptables off
 
# 실행중인 iptables 종료
service iptables stop

SSH 설정

master node

ssh 공인키 생성

ssh-keygen -t rsa
slave node

root 디렉토리 아래 .ssh폴더를 생성한뒤 authorized_keys라는 파일을 만들어 master node의 id_rsa.pub파일 내용을 붙여넣는다.

AMBARI 설치

기본 설정

  1. slave node에 `authorized_keys` 파일 생성하고, 이 파일에 앞서 생성한 master node의 퍼블릭키 내용을 붙여넣기 한다.

    vim /root/.ssh/authorized_keys
    authorized_keys
    #master node의 public key 내용을 붙여넣기함.
    AAAAB3NzaC1yc2EAAAABIwAAAQEA2va8ON20pT/mMvCW9NAgIwYKM0/fq5tAZKYk
    ...생략...
  2. 디스크 설정

    authorized_keys
    mkfs -t ext4 /dev/xvdb mkdir /data01 mount /dev/xvdb /data01
  3. 마운트 자동 등록

    authorized_keys
    vim /etc/fstab
    fstab
    # 파일내용 끝에 아래 내용 추가
    /dev/xvdb /data01 ext4 noatime 0 0

JDK 설치

JDK 설치

java rpm 설치파일 다운로드

 

rpm 파일 설치

yum localinstall jdk-8-linux-x64.rpm

.bash_profile 파일 수정

vim .bash_profile # 파일 오픈
.bash_profile
# .bash_profile에 아래 내용 추가
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
export SPARK_MAJOR_VERSION=2
변경 내용 적용
source .bash_profile


ulimit 설정

오픈 파일 개수 확인 후 높게 설정

#ulimit 확인
ulimit -Sn
#ulimit 10000으로 증가
ulimit -n 10000

NTP 설정

시간 동기화를 위해 ntp 를 설치 및 설정한다.

# 설치 확인
rpm -qa ntp
# 설치 안된 경우 설치
yum install ntp

설치 확인

[root@zetawiki ~]# whereis ntp
ntp: /etc/ntp /etc/ntp.conf
[root@zetawiki ~]# which ntpq
/usr/sbin/ntpq
[root@zetawiki ~]# which ntpdate
/usr/sbin/ntpdate
[root@zetawiki ~]# which ntpd
/usr/sbin/ntpd
[root@zetawiki ~]# service ntpd status
ntpd is stopped

AMBARI 설치

yum repository 설정

아래 링크를 참고하여 ambari버전과 os타입에 맞는 ambari yum repo 파일을 /etc/yum.repos.d/ambari.repo 경로에 복사

https://docs.hortonworks.com/HDPDocuments/Ambari-2.6.1.5/bkambari-installation/content/ambarirepositories.html

ambari 설치

yum install ambari-server

ambari setup

ambari-server setup
ambari 구동
ambari-server start

postgre sql

postgresql 설치

참고: https://www.postgresql.org/download/linux/redhat/

  • version 9.5
  • platform centos6
yum install https://download.postgresql.org/pub/repos/yum/9.5/redhat/rhel-6-x86_64/pgdg-centos95-9.5-3.noarch.rpm
yum install postgresql95 postgresql95-server
service postgresql-9.5 initdb
chkconfig postgresql-9.5 on
service postgresql-9.5 start


jdbc connector 설정

ambari master

jdbc connector 파일을 받아 설정.

wget https://jdbc.postgresql.org/download/postgresql-42.2.2.jar
ambari-server setup --jdbc-db=postgres --jdbc-driver=/postgresql-42.2.2.jar #파일 다운로드 경로


postgre 설치 서버

postgre.conf 설정 변경 (파일위치: /var/lib/pgsql/9.5/data/postgresql.conf)

#listen_address 설정을 아래와 같이 변경
listen_address='*'


pghba.conf 설정 변경 (파일위치: /var/lib/pgsql/9.5/data/pg_hba.conf)

#설정파일 제일 마지막 부분에 아래 설정 추가
host    hive            hive            0.0.0.0/0               md5
# root에서 hdfs 유저로 사용자 변경
su hdfs
# root 사용자가 접근 가능한 hdfs 폴더 생성
[hdfs@swinno01 resources]$ hadoop dfs -mkdir /user/root
[hdfs@swinno01 resources]$ hadoop dfs -chown root /user/root
[hdfs@swinno01 resources]$ hadoop dfs -ls /user/root

postgre - hive 설정

postgre 실행

  • sudo su postgres
  • psql -d postgres -U postgres

hive 사용자 생성

  • create user hive with password 'password';

hive database 생성

  • create database hive;

Ambari cluster 설정

클러스터 생성

로그인 후 나오는 첫 페이지에서 launch install wizard를 실행해 설치 마법사 실행

(그림1)

Get Started

클러스터 이름 설정

(그림2)

이름 : bigdata_mms

Select Version

상관없는 OS 관련 정보는 delete.

repository 주소를 설치한 repository url 로 변경(/etc/yum.repos.d/ambari-hdp-1.repo) 참고

  • http://bigdata07.mms.com/HDP-2.6-repo-1

  • http://bigdata07.mms.com/HDP-UTILS-1.1.0.22-repo-1

(그림3)

Install Options

host name 추가

ssh key(ambari master에서 생성한 private key 정보 입력)

(그림4)

host list

bigdata01.mms.com
bigdata02.mms.com
bigdata03.mms.com
bigdata04.mms.com
bigdata05.mms.com
bigdata06.mms.com
bigdata07.mms.com
bigdata08.mms.com
bigdata09.mms.com
bigdata10.mms.com

Confirm Hosts

기도

(그림5)

kafka 로그 설정

메시지가 브로커 컴퓨터에 보존되도록 구성하는 방법을 설정.

설정항목

  • log.segment.bytes : 바이트 단위로 최대 세그먼트 크기를 정의. 기본값은 1GB. 
    세그먼트 파일이 지정된 크기에 도달하면 새로운 세그먼트 파일이 생성됨. 토픽은 로그 디렉터리에 여러 세그먼트 파일로 저장되며, 토픽단위로 설정이 가능.
  • log.roll.{ms,hours} : 새로운 세그먼트 파일이 생성된 이후 크기한도에 도달하지 않았더라도 다음 파일로 넘어갈 시간 주기를 정의. 기본값은 7일, 토픽단위로 설정가능
  • log.cleanup.policy : delete 또는 compact로 설정하며 기본값은 delete. delete로 설정된 경우 로그 세그먼트는 시간이나 크기제한에 도달할 때 주기적으로 삭제됨.
    compact로 설정된 경우 불필요한 레코드를 없애기 위해 압축을 사용. 토픽단위로 설정 가능.
  • log.retention.{ms,minutes,hours} : 로그 세그먼트를 보유할 시간을 정의. 기본값은 7이며 토픽별로 정의 가능.
  • log.retention.bytes :  삭제하기 전에 보관할 파티션 당 로그 수. 기본값은 -1. 토픽별로 설정이 가능하며, 로그의 시간이나 크기제한에 도달하면 삭제됨.
  • log.retention.check.interval.ms : 로그의 보유 정책을 만족하기 위해 삭제할 대상을 확인하는 시간 주기. 기본값은 5분
  • log.cleaner.enable : 압축을 활성화 하려면 true로 설정
  • log.cleaner.threads : 로그의 압축을 위한 작업자 스레드 수를 지정.
  • log.cleaner.backoff.ms : 정리가 필요한 로그가 있는지 확인하는 주기
  • log.index.size.max.bytes : 바이트 단위로 오프셋 인덱스의 최대 크기를 설정. 기본값은 1GB. 토픽별로 설정 가능.
  • log.index.interval.byte : 새로운 항목이 오프셋 인덱스에 추가되는 주기. 기본값은 4096. 
    데이터를 가져오는 개별요청에서 브로커는 가져오기를 시작하고 끝낼 로그 내의 올바른 위치를 찾기 위한 바이트 수에 대해 일정하게 살펴본다.
    값을 높게 설정한 경우 인덱스 파일이 커지고 더 많은 메모리를 사용하게 되지만 검사하는 횟수는 줄어듬.
  • log.flush.interval.message : 디스크로 내보내기 전에 메모리에 보유할 메시지의 개수. 기본값은 922337036854775807. 확실하게 보유하는지 보장하는 것은 아니나, 알맞게 제어하도록 도와줌.
  • log.flush.interval.ms : 디스크로 내보내기 전에 메모리에 보유할 토픽 내의 메시지에 대한 최대 시간을 밀리초 단위로 설정.

참고



추가 로그 정책 관련 설명

ref: https://stackoverflow.com/questions/40369238/which-directory-does-apache-kafka-store-the-data-in-broker-nodes

ambari 기준으로 kafka 관련된 로그는 2군데에 저장 된다.

log.dirs : 메세지 보관을 위한 오프셋을 포함한 로그이다.

/var/log/kafka : kafka 자체 로그이며 kafka.env 파일에서 설정 할 수 있다.


1. Topic 에 관한 로그 는 아래 정책으로 log.dirs 경로(server.properties 파일에 설정) 에 저장된다.

로그의 잘려진 segment 는 상응하는 index 파일과 같이 저장된다.  그 파일 내임은 Base offset 을 표현하기도 한다.

이것을 설명하자면 log 파일 이저장되는 구조를 이해 해야하는데, 

log 파일은 실제메세지를 strucuted message format 으로 저장한다.  each message 는 처음 64 비트에 증가된 offset 을 포함한다.

그러므로 이파일에서 특정 오프셋의 메세지를 찾는것은 log 파일이 커질 수록 무거워진다.

또한 메세지를 공급하기 위해서는 브로커는 가장 나중의 offset 을 기억하고 메세지를 정확하게 받아들이기 위해 진행 시킨다.

그러므로 로그파일의 offset 기억하기 위해 index 파일이 존재한다. 

index 파일은 아래와 같은 항목을 저장한다.

  1. 4 Bytes: Relative Offset
  2. 4 Bytes: Physical Position

이때 파일내임은 base offset 을 표현한다. index 파일은 로그파일의 index.interval.bytes 의 단위마다 index 파일을 새로 쓴다.(default 4096)


2. kafka 서버자체의 로그 (INFO / ERROR / WARN 등) 은 kafka_env.sh 안에 로그 경로가 지정되어있다.

주로내용은 자바 info 로 채워지는데 이 또한 시간이 지나면 일주일에1~2G 정도 용량이 찬다.

log4j 설정을 이용해 로그양을 줄일 수 있다.  2번 은 말그대로 log 이다.




'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka connect  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
Kafka Connector , Simple Connector 구현  (0) 2018.07.16
1. Kafka 설치 및 producer & Consumer test  (0) 2018.06.21

출처 : https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes


Programming Model

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

> Struct Streaming 의 idea 는  연속된 append 구조이다. 너는 스트리밍 계산을 노말한 batch query 형태로 계산 할 수 있을 것이다.

Basic Concepts

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

Stream as a Table

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

Model

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

  • Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

  • Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail later.

>> output mode 는 Complete mode / Append mode / Update 모드 3가지 가 있다.

Complete Mode 는 updated 된 Result Table 이 만들어졌을때졌을때,  외부 저장소에 출력된다. 

Append Mode 는 새로운 Rows 들만 Trigger 에 의해 외부저장소에 출력 된다.

Update Mode 란 새롭거나 수정된 Rows 들만 Trigger 에 의해 외부 저장소에 출력 된다. Completed 모드와 다른점은 만약 query 가 Aggregation 을 포함하지 않으면, 이것은 Append Mode 와 똑같이 똑같이 동작한다. (Complete 모드보다 빠를것으로 예상된다.)

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

>> 구현하자면  아래와같이 구현한다.

val query = wordCounts.writeStream
.outputMode("complete") or .outputMode("append") or .outputMode("update") 

>>  기존 SQL 과 똑같이 구현하는데, Spark 가 연속적으로 새로운 data 을 체킹한다. (socket 기반) 만약 새로운 Data 가 있으면 Spark 는 증가분에대해서 Query 를 날리게 되고 이전에 돌린 Running count 과 combine 하게 된다. (아래 그림 설명)

Model


Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result 

>> % Struct Streaming 은 전체 Table 을 구체화 하지 않는다. 그것은 가장 마지막 사용가느한 data 를 읽고 점진적으로 Result 를 update하는 구조이다.

그리고 원본 source data 는 지우게된다. 이것은 가능한 가장 작은 data 를 유지하는데 그것을 state Data 라한다. Result 를 업데이트하기위한 것이 state Data 이기도 하다.

Basic Operations - Selection, Projection, Aggregation

Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed laterin this section.

>> 기본적인 select 등의  Query 를 아래와 같이 지원한다. 다만 몇 몇개는 지원하지 않는다.

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
 
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
 
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs
 
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API
 
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
Note, you can identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.


 unsupported Operation

Schema inference and partition of streaming DataFrames/Datasets

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).

>> default 로 Struct Streaming 은 input schema 를 제시하여 입력 받도록 한다. (CSV 의 경우 지원하는데 JSON 은?? kafka 는? )

 여기를 클릭하여 펼치기...


Basic Operation (like dataframe)


case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs 
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API



2.3.0 부터 지원된다.,

(IPC - 2.2.0 , EPC 2.3.0)

Support matrix for joins in streaming queries
Left InputRight InputJoin Type
StaticStaticAll typesSupported, since its not on streaming data even though it can be present in a streaming query
StreamStaticInnerSupported, not stateful
Left OuterSupported, not stateful
Right OuterNot supported
Full OuterNot supported
StaticStreamInnerSupported, not stateful
Left OuterNot supported
Right OuterSupported, not stateful
Full OuterNot supported
StreamStreamInnerSupported, optionally specify watermark on both sides + time constraints for state cleanup
Left OuterConditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
Right OuterConditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
Full OuterNot supported
import org.apache.spark.sql.functions.expr
 
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
 
// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime""2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime""3 hours")
 
// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
 
 
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
 )


정리하자면, STREAM - STREAM JOIN 에서 INNER 조인은 watermark 가 옵셔날이지만 의미적으로도 써주는것이 좋고 성능도개선된다.

OUTER 조인은 Watermark 가 필수인데 ,그이유는 마지막까지 조인할 대상을 찾고 기다리기 때문에,  만약 null 일 경우 delay time 은 최대가된다.


  • Joins can be cascaded, that is, you can do df1.join(df2, ...).join(df3, ...).join(df4, ....).

  • As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

  • As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

    • Cannot use streaming aggregations before joins.

    • Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.


  1. deduplication

    / Without watermark using guid column
    streamingDf.dropDuplicates("guid")
     
    // With watermark using guid and eventTime columns
    streamingDf
      .withWatermark("eventTime""10 seconds")
      .dropDuplicates("guid""eventTime")

    watermark - watermark 까지 (참고로 이전 date 는 중복제거가 안 될 수 있음)
    without - sate 되는대까지

  2. Arbitary State
    Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).
    위의 Operation 은 유저가원하는(임의의) state 를 만든다. 다만 그 State 가 어떻게 활용되는지는 별이야기가없다.

     3. Unsupported!

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).


val query = df.writeStream.format("console").start()   // get the query object
 
query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data
 
query.runId       // get the unique id of this run of the query, which will be generated at every start/restart
 
query.name        // get the name of the auto-generated or user-specified name
 
query.explain()   // print detailed explanations of the query
 
query.stop()      // stop the query
 
query.awaitTermination()   // block until query is terminated, with stop() or with error
 
query.exception       // the exception if the query has been terminated with error
 
query.recentProgress  // an array of the most recent progress updates for this query
 
query.lastProgress    // the most recent progress update of this streaming query
 
 
 
 
 
 
val spark: SparkSession = ...
 
spark.streams.active    // get the list of currently active streaming queries
 
spark.streams.get(id)   // get a query object by its unique id
 
spark.streams.awaitAnyTermination()   // block until any one of them terminates


Query Type
Supported Output ModesNotes
Queries with aggregationAggregation on event-time with watermarkAppend, Update, CompleteAppend mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. 

Update mode uses watermark to drop old aggregation state. 

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregationsComplete, UpdateSince no watermark is defined (only defined in other category), old aggregation state is not dropped. 

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithStateUpdate
Queries with flatMapGroupsWithStateAppend operation modeAppendAggregations are allowed after flatMapGroupsWithState.
Update operation modeUpdateAggregations not allowed after flatMapGroupsWithState.
Queries with joinsAppendUpdate and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queriesAppend, UpdateComplete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
Output Sinks
There are a few types of built-in output sinks.
File sink - Stores the output to a directory.
 
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path""path/to/destination/dir")
    .start()
Kafka sink - Stores the output to one or more topics in Kafka.
 
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers""host1:port1,host2:port2")
    .option("topic""updates")
    .start()
Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.
 
writeStream
    .foreach(...)
    .start()
Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.
 
writeStream
    .format("console")
    .start()
Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
 
writeStream
    .format("memory")
    .queryName("tableName")
    .start()


Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are meant for debugging purposes only. See the earlier section on fault-tolerance semantics. Here are the details of all the sinks in Spark.

Sink
Supported Output Modes
Options
Fault-tolerant
Notes
File SinkAppendpath: path to the output directory, must be specified. 

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes (exactly-once)Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka SinkAppend, Update, CompleteSee the Kafka Integration GuideYes (at-least-once)More details in the Kafka Integration Guide
Foreach SinkAppend, Update, CompleteNoneDepends on ForeachWriter implementationMore details in the next section
Console SinkAppend, Update, CompletenumRows: Number of rows to print every trigger (default: 20) 
truncate: Whether to truncate the output if too long (default: true)
No
Memory SinkAppend, CompleteNoneNo. But in Complete Mode, restarted query will recreate the full table.Table name is the query name.


https://stackoverflow.com/questions/28700624/spark-submit-scheduling-in-cron


crontab -e 를 통해서 script 를 정기적으로 돌릴때,


#!/bin/sh

spark submit >> 동작안함


#!/bin/bash 
/경로/spark-submit >> 동작함



아래와 같이 bash 로 수정하고 실행시키면 동작한다.


bash is the most common shell used as a default shell for users of Linux systems. It is a spiritual descendent of other shells used throughout Unix history. Its name, bash is an abbreviation of Bourne-Again Shell, an homage to the Bourne shell it was designed to replace, though it also incorporates features from the C Shell and the Korn Shell.



/bin/sh is an executable representing the system shell. Actually, it is usually implemented as a symbolic link pointing to the executable for whichever shell is the system shell. The system shell is kind of the default shell that system scripts should use. In Linux distributions, for a long time this was usually a symbolic link to bash, so much so that it has become somewhat of a convention to always link /bin/sh to bash or a bash-compatible shell. However, in the last couple of years Debian (and Ubuntu) decided to switch the system shell from bash to dash - a similar shell - breaking with a long tradition in Linux (well, GNU) of using bash for /bin/sh. Dash is seen as a lighter, and much faster, shell which can be beneficial to boot speed (and other things that require a lot of shell scripts, like package installation scripts).



If your script requires features only supported by bash, use #!/bin/bash.

But if at all possible, it would be good to make sure your script is POSIX-compatible, and use #!/bin/sh, which should always, quite reliably, point to the preferred POSIX-compatible system shell in any installation.

파일전송을 KAFKA PRODUCER 쪽으로 보내기위한 AGENT 단의 구현이 필요 했다.


보통 JDBC 등의 소스에서는 kafkaconnector 가 지원하는데,


단순 경로에서 spool 하여 Producer 쪽에 쏴주는 kafkaconnector 는 존재 하지 않기 때문에 ,

직접구현하고 github 에 구현하였다.


https://github.com/Sangil55/SimpleFileConnetor



'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka connect  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
kafka 로그 설정 에 대하여  (0) 2018.10.17
1. Kafka 설치 및 producer & Consumer test  (0) 2018.06.21

출처 : https://kafka.apache.org/quickstart /


궁극적으로 BIG DATA 실시간 분석이 목적인데,


Spark Streaming 하기 위한 전단계 중 Kafka Infra 를 앞단에 놓고 실시간 Streaming 을 사용 하여 구현하게된다.


Kafka 는 개략적으로

이런 모양을 하고 cluseter, 즉 broker 가 중간 역할을 하여 data 를 전달 한다.


프로듀서 - 컨슈머 동작을 하기위해선 아래 순서로 설치한다.

1.일단 KAFKA 설치 (직접설치 또는 ambari 등을 이용)


2.zookeeper 환경설치 (직접설치 또는 ambari 등을 이용)


3.Kafka 경로의 server.properties 파일에서 설정 확인


Listener = PLAINTEXT://localhost:6667 또는 9092포트

advertisment.Listener(deprecated)

>>으로 설정 이후 Zookeeper 는 6667 포트를 기준으로 consumer / producer 를 보내게 된다.

default.replication.factor = 1

offsets.topic.replication.factor  = 1 (3 으로설정되어있으면 consumer 가 topic 이 3개 기준으로 동작하기 때문에 추후 조정)


4. 토픽 생성 

/usr/bin/kafka-topics.sh --create --zookeeper localhost:6667 --replication-factor 1 --partitions 1 --topic test



5. 프로듀서 테스트

/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list swinno01.cs9046cloud.internal:6667 --topic Hello 


6.컨슈머 테스트

/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --zookeeper swinno01:2181 --topic Hello

/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --bootstrap-server swinno01.cs9046cloud.internal:6667 --topic Hello --from-beginning 


이때 컨슈머가 제대로 윗라인으론 동작을 하는데, 아래 bootstrap 옵션으로 동작 안하는 경우가있다.

그땐 다수의 삽질과 / 구글링등을 통해서 아래를 확인해보자

a.broker 의 netstat -tnlp 를 통해 6667 (9092) 포트가 오픈되있는지 확인

b.실행환경에서 브로커ip:port 로 telnet 확인

c. zookeeper 스크립트를 통한 브로커 상태확인

/usr/hdp/2.6.5.0-292/kafka/bin/zookeeper-shell.sh localhost:2181   

 get /brokers/ids/1001  (ids num 은 확인필요)

d. topic/ broker 모두 삭제후 KAFKA 재시작

e.  offsets.topic.replication.factor  = 1 확인


연결이 상태가 안 좋다면 아래글 참고

https://github.com/wurstmeister/kafka-docker/wiki/Connectivity


지금도 forum / stackoverflow 등에는 위의 이유등으로 컨슈머가 bootstrap 에서 안된다는 이슈는 꾸준히 나오고있다 -_-ㅋ



이런 설정문제이거나/ 연결문제이거나 / 브로커 잔재 문제이거나


컨슈머까지 스크립트에서 동작하는것을 확인하면

spark 를 이용한 streaming 하기 위한 전단계 까지 완료 !





'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka connect  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
kafka 로그 설정 에 대하여  (0) 2018.10.17
Kafka Connector , Simple Connector 구현  (0) 2018.07.16

+ Recent posts