출처 : https://www.confluent.io/product/connectors/ , https://docs.confluent.io/current/connect/connect-filestream/filestream_connector.html

소스 업데이트 >  https://github.com/Sangil55/SimpleFileConnetor/

Understanding Kafka Connect and Kafka Stream


Kafka Connect is a tool that provides the ability to move data into and out of the Kafka system. Thousands of use cases use the same kind of source and target system while using Kafka. Kafka Connect is comprised of Connectors for those common source or target systems.

Connect makes it simple to use existing connector implementations for common data sources and sinks to move data into and out of Kafka. A source connector can ingest entire databases and stream table updates to Kafka topics. It can also collect metrics from all of your application servers into Kafka topics, making the data available for stream processing with low latency. A sink connector can deliver data from Kafka topics into secondary indexes such as Elasticsearch or batch systems such as Hadoop for offline analysis.

Kafka Connect can run either as a standalone process for running jobs on a single machine (e.g., log collection), or as a distributed, scalable, fault tolerant service supporting an entire organization. This allows it to scale down to development, testing, and small production deployments with a low barrier to entry and low operational overhead, and to scale up to support a large organization’s data pipeline.


  •  Import Connectors are used to bring data from the source system into Kafka topics. These Connectors accept configuration through property files and bring data into a Kafka topic in any way you wish. You don't need to write your own producer to do such jobs. A few of the popular Connectors are JDBC Connector, file Connector, and so on.
  • Export Connectors: Unlike Import Connectors, Export Connectors are used to copy data from Kafka topics to the target system. This also works based on the configuration property file, which can vary based on which Connector you are using. Some popular Export Connectors are HDFS, S3, Elasticsearch, and so on.


kind of import Connectors 

  • Active MQ -  AMAZONE 서버용 Apache ActiveMQ용 관리형 메시지 브로커 서비스,  
  • IBM MQ -  Source Connector is used to read messages from an IBM MQ cluster and write them to a Kafka topic.
  • JDBC (Source) - for jdbc
  • Kinetica (Source) - for GPU, RDBMS
  • Azure IoTHub (Source)IoT, messaging
  • Files/Directories (Source)File System, Directories, Logs  Community Community 1Community 2 (confluent 사)
  • FileSystem Connector (Source)File System, S3, HDFS  Community Community
  • ..... various

kind of export Connectors 

  • Amazon S3 (Sink)
  • HDFS (Sink)
  • Cassandra (Sink)
  • .. File System out
  • ...various

등등, 여러가지 connector 플랫폼이 지원되 는데, FILE SYSTEM에서도 connetor 사용 용도는 있지만

주 용도는 직접 data 를 직접 stream 하기 어려운 환경을 위한 지원 툴이 Connector 라고 보면된다. AMAZON SERVER / DB / GPU / AMAZON &  IBM MQ(legacy) / IOT 등 다양한 곳에서 producer 로 data를 stream 하기 위한 connector 의 용도별로 컨풀르엔트사에서 공식 지원 하고 있다.


기업형 메세징 플랫폼의 경우에는 사내 서버이고, 별도 AP AGENT 단의 MQ 구축이 어렵기 때문에  (비교적 간단한 MESSAGE 서버로 NATS SERVER  , 간편 CDC 등도 조사필요)


 import 용 Connector 는 confulent 에서 제공하는 connect-api 를 사용 해서 구현 해야한다.
완성된 번들로도 제공 하며, JAVA 로는 아래 dependency 등 SourceTask을 참조하여 링크처럼 커스터마이징 하여 구현 할 수 있다 (참고 . https://docs.confluent.io/current/connect/devguide.html )

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-file</artifactId>
<version>1.0.0</version>
</dependency>


  • *완성된 파일 커넥터를 사용 하기 위해선 기본 컨플루엔트 솔루션을 설치하라고 이야기한다.  https://docs.confluent.io/current/connect/quickstart.html#connect-quickstart
  • 그렇게 하지 않으려면 , JAR 형태로 전체 빌드하여 사용하는것을 생각 해보아야 한다. (다만 파일 읽고 offset 설정하는 부분 직접 구현 필요)


Export 용 Connector 는 우리 상황에서는  Confulent (Default) or  HDFS 용 Export connector 로 고려 할 만하다.

다만 >> KAFKA STREAMS API 를 사용하거나 SPARK - SREAMING 을 사용해서 별도 출력 하게되면, 별도의 output connector 의 사용의미가 없어진다.


KAFKA CONNECTOR   VS   FILEBEAT (ELK)

본격적으로 성능 비교를 하자면, 

ELK 기반의 FILE BEATS AGENT 도 OUT FORMAT 으로 KAFKA topic 을 입력 가능하다.

Filebeat 의 yml 설정파일

filebeat.prospectors:

- input_type: log

paths:
- /root/m_input/*.json
logging.level: debug
logging.to_files: true
logging.to_syslog: false
logging.files:
path: /root/sangil/log
name: mybeat.log
keepfiles: 7

output.kafka:
hosts: ["swinno04.cs9046cloud.internal:6667"]
topic: 'Hello2'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000


KAFKA 에서 제공하는 Connector 사용과 FILEBEAT 를 비교하여 아래 특성등을 비교할 필요가 있다.

  • producer import perfomance

              >> FILE BEAT 의 경우 아래  테스트 진행
SubjecttopicpartitionReplicationBroker Memcoreopt1coreopt1input sizeinput Throughput (MB/S)output workoutput sizeoutput Throughput (MB/S)시작시간종료시간
filebeat(01번서버) - kafka(04번 서버) - jush flush with bundle consumer.sh1111GB~4GBfilebeat   300MB just flush with consumer665MB4.5MB/S10:03:10
KafkaConnector(java-custom),1번  - kafka(04번 서버) - jush flush with bundle consumer.sh1111GB~4GBkafk-connect


324MB

324MB0.98MB/S06:19:4506:20:00
KA














         - bundle connector 의 경우 파일 크기가 일정 크기 이상이면, 동작 안함, 800K 까지는 정상 동작확인, - 약 4166 rows

          -java code 의 경우 에는 그대로 빌드시 Context 관련 초기화 오류 발생 (implement 하여 재구성 필요!)


Confluent 에서 제공하는 CONNECTOR API 소스를 간략히 설명하자면, Java Buffer  reader 를 통해 polling 하는 구조이다.

 관련 소스

          - 해당 class를 참고하여 재구현 할 경우 line input & offset 쓰고 읽는 부분에 대해서 직접 구현이 필요한데, 시간이 더 소요 될듯 하다. 

          - 이것을 Filesystem 으로 기반하여 구현한 OPEN SOURCE 등을 다시 참고해보면

            오픈소스1 :  https://github.com/jcustenborder/kafka-connect-spooldir/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnector.java

             오픈소스2 : https://github.com/mmolimar/kafka-connect-fs

             위 2개중에 Connector 는 agent 단에 놓고,  내부 NETWORK 의 KAFKA BROKER 까지로 전송 할 수 있는 오픈소스를 찾아서 성능 테스트 해보아야한다.

             다만 위 오픈소스 2개 모두 , KAFKA CONFULENT 플랫폼 기반이기 때문에, 원정서버(MAS01~26) 에 일부 설치를 고려해야한다.(즉, Kafka Connector (stand alone등이)  자체적으로 실행 되어야 된다.)

  • Replication / Partition  Performance


  • Scalblity


  •  fault tolerance 


  • *source 수정 편의성


'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka tuning config  (0) 2018.11.20
kafka multi brokers  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
kafka 로그 설정 에 대하여  (0) 2018.10.17

Kmean clustering lib (1).pptx


  • k-mean 알고리즘 개선 Topic

  • 추가 논의 및 결정 필요 사항

    • 데이터 전처리

  • config 분리 관련 결정사항

    • 파일명 : kmeanconfig.xml
    • 확장자 : xml
    속성명
    설명
    타입
    예시
    기본 값
    k_rangek값 범위 지정숫자, 숫자~숫자4, 10~11
    10 18
    inputdir
    입력파일 경로파일 경로(문자열)/user/hadoop/input
    data/kmeans-input
    outputpath
    결과파일 경로파일 경로(문자열)/user/hadoop/output
    data/kmeans-output_20180205_21
    dimension입력데이터의 dimension숫자3
    2
    maxitr
    최대 Iteration 횟수숫자1060
    convdelta
    반복 종료 조건숫자30.01
  • conf 파일 형식 은아래와 같으며 이곳에 저장 후 사용 한다 $(hadoophome)/data/kmeans/conf

  • Default 셋팅은 아래와 같다.

    $HADOOP_HOME/data/kmeans/conf
    1
    2
    3
    4
    5
    6
    k_range 10 18
    inputdir data/kmeans-input
    outputpath data/kmeans-output_20180205_21
    dimension 2
    maxitr 60
    convdelta 0.01



  • JSON 포맷

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    {
        "k":5,
        "dimension":2,
        "clusters": [
            {
              "center" : [2 5] ,
              "count" 3,
              "points":[ [34], [25], [16]]
             },
        {
              "center" : [2,5] ,
              "count" 3,
              "points":[ [34], [25], [16]]
             },
        {
              "center" : [8 8] ,
              "count" 2,
              "points":[ [109], [88]]
             },
        {
              "center" : [4 5] ,
              "count" 1,
              "points":[ [45] ]
             },
        {
              "center" : [12 12] ,
              "count" 1,
              "points":[ [12,12]]
             }
        ]
     }


'Algorithm' 카테고리의 다른 글

FDM vs FVM vs FEM  (0) 2021.01.04
ML flow  (0) 2018.11.19
1. TSP ( Travel salesman Person ) - Ant colony  (0) 2018.11.19



YARN(Yet Another Resource Negotiator) 이란?

  • A framework for job scheduling and cluster resource management.

YARN 핵심 구성 요소

  • Resource Manager(RM)

    • YARN 클러스터의 Master 서버로 하나 또는 이중화를 위해 두개의 서버에만 실행됨

    • 클러스터 전체의 리소스를 관리

    • YARN 클러스터의 리소스를 사용하고자 하는 다른 플랫롬으로부터 요청을 받아 리소스 할당(스케줄링)

  • Node Manager(NM)

    • YARN 클러스터의 Worker 서버로 Resource Manager를 제외한 모든 서버에 실행

    • 사용자가 요청한 프로그램을 실행하는 Container를 fork 시키고 Container를 모니터링

    • Container 장애 상황 또는 Container가 요청한 리소스보다 많이 사용하고 있는지 감시(요청한 리소스보다 많이 사용하면 해당 Container를 kill 시킴)

  • Application Master(AM)

    • RM과 협상하여 하둡 클러스터에서 자기가 담당하는 어플리케이션에 필요한 리소스를 할당.
    • NM과 협의하여 자기가 담당하는 어플리케이션을 실행하고 그 결과를 주기적으로 모니터
    • 자기가 담당하는 어플이케이션의 실행 현황을 주기적으로 RM에게 보고합니다.

DEV 시스템 구성

  • EPC VM 4 식 (CPU: 2 Core, Mem: 4GB, HDD: 80GB)

  • Hadoop 3.0 (2017-12-13 GA)

    • 주요 특징 : Erasure Coding in HDFS

  • 구현 알고리즘: k-means



'spark,kafka,hadoop ecosystems > apache.hadoop' 카테고리의 다른 글

hadoop 구성  (0) 2018.11.20
hadoop zipfile as input format  (0) 2018.11.20
hadoop - mapreduce  (0) 2018.11.20

+ Recent posts