출처 : https://www.confluent.io/product/connectors/ , https://docs.confluent.io/current/connect/connect-filestream/filestream_connector.html

소스 업데이트 >  https://github.com/Sangil55/SimpleFileConnetor/

Understanding Kafka Connect and Kafka Stream


Kafka Connect is a tool that provides the ability to move data into and out of the Kafka system. Thousands of use cases use the same kind of source and target system while using Kafka. Kafka Connect is comprised of Connectors for those common source or target systems.

Connect makes it simple to use existing connector implementations for common data sources and sinks to move data into and out of Kafka. A source connector can ingest entire databases and stream table updates to Kafka topics. It can also collect metrics from all of your application servers into Kafka topics, making the data available for stream processing with low latency. A sink connector can deliver data from Kafka topics into secondary indexes such as Elasticsearch or batch systems such as Hadoop for offline analysis.

Kafka Connect can run either as a standalone process for running jobs on a single machine (e.g., log collection), or as a distributed, scalable, fault tolerant service supporting an entire organization. This allows it to scale down to development, testing, and small production deployments with a low barrier to entry and low operational overhead, and to scale up to support a large organization’s data pipeline.


  •  Import Connectors are used to bring data from the source system into Kafka topics. These Connectors accept configuration through property files and bring data into a Kafka topic in any way you wish. You don't need to write your own producer to do such jobs. A few of the popular Connectors are JDBC Connector, file Connector, and so on.
  • Export Connectors: Unlike Import Connectors, Export Connectors are used to copy data from Kafka topics to the target system. This also works based on the configuration property file, which can vary based on which Connector you are using. Some popular Export Connectors are HDFS, S3, Elasticsearch, and so on.


kind of import Connectors 

  • Active MQ -  AMAZONE 서버용 Apache ActiveMQ용 관리형 메시지 브로커 서비스,  
  • IBM MQ -  Source Connector is used to read messages from an IBM MQ cluster and write them to a Kafka topic.
  • JDBC (Source) - for jdbc
  • Kinetica (Source) - for GPU, RDBMS
  • Azure IoTHub (Source)IoT, messaging
  • Files/Directories (Source)File System, Directories, Logs  Community Community 1Community 2 (confluent 사)
  • FileSystem Connector (Source)File System, S3, HDFS  Community Community
  • ..... various

kind of export Connectors 

  • Amazon S3 (Sink)
  • HDFS (Sink)
  • Cassandra (Sink)
  • .. File System out
  • ...various

등등, 여러가지 connector 플랫폼이 지원되 는데, FILE SYSTEM에서도 connetor 사용 용도는 있지만

주 용도는 직접 data 를 직접 stream 하기 어려운 환경을 위한 지원 툴이 Connector 라고 보면된다. AMAZON SERVER / DB / GPU / AMAZON &  IBM MQ(legacy) / IOT 등 다양한 곳에서 producer 로 data를 stream 하기 위한 connector 의 용도별로 컨풀르엔트사에서 공식 지원 하고 있다.


기업형 메세징 플랫폼의 경우에는 사내 서버이고, 별도 AP AGENT 단의 MQ 구축이 어렵기 때문에  (비교적 간단한 MESSAGE 서버로 NATS SERVER  , 간편 CDC 등도 조사필요)


 import 용 Connector 는 confulent 에서 제공하는 connect-api 를 사용 해서 구현 해야한다.
완성된 번들로도 제공 하며, JAVA 로는 아래 dependency 등 SourceTask을 참조하여 링크처럼 커스터마이징 하여 구현 할 수 있다 (참고 . https://docs.confluent.io/current/connect/devguide.html )

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-file</artifactId>
<version>1.0.0</version>
</dependency>


  • *완성된 파일 커넥터를 사용 하기 위해선 기본 컨플루엔트 솔루션을 설치하라고 이야기한다.  https://docs.confluent.io/current/connect/quickstart.html#connect-quickstart
  • 그렇게 하지 않으려면 , JAR 형태로 전체 빌드하여 사용하는것을 생각 해보아야 한다. (다만 파일 읽고 offset 설정하는 부분 직접 구현 필요)


Export 용 Connector 는 우리 상황에서는  Confulent (Default) or  HDFS 용 Export connector 로 고려 할 만하다.

다만 >> KAFKA STREAMS API 를 사용하거나 SPARK - SREAMING 을 사용해서 별도 출력 하게되면, 별도의 output connector 의 사용의미가 없어진다.


KAFKA CONNECTOR   VS   FILEBEAT (ELK)

본격적으로 성능 비교를 하자면, 

ELK 기반의 FILE BEATS AGENT 도 OUT FORMAT 으로 KAFKA topic 을 입력 가능하다.

Filebeat 의 yml 설정파일

filebeat.prospectors:

- input_type: log

paths:
- /root/m_input/*.json
logging.level: debug
logging.to_files: true
logging.to_syslog: false
logging.files:
path: /root/sangil/log
name: mybeat.log
keepfiles: 7

output.kafka:
hosts: ["swinno04.cs9046cloud.internal:6667"]
topic: 'Hello2'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000


KAFKA 에서 제공하는 Connector 사용과 FILEBEAT 를 비교하여 아래 특성등을 비교할 필요가 있다.

  • producer import perfomance

              >> FILE BEAT 의 경우 아래  테스트 진행
SubjecttopicpartitionReplicationBroker Memcoreopt1coreopt1input sizeinput Throughput (MB/S)output workoutput sizeoutput Throughput (MB/S)시작시간종료시간
filebeat(01번서버) - kafka(04번 서버) - jush flush with bundle consumer.sh1111GB~4GBfilebeat   300MB just flush with consumer665MB4.5MB/S10:03:10
KafkaConnector(java-custom),1번  - kafka(04번 서버) - jush flush with bundle consumer.sh1111GB~4GBkafk-connect


324MB

324MB0.98MB/S06:19:4506:20:00
KA














         - bundle connector 의 경우 파일 크기가 일정 크기 이상이면, 동작 안함, 800K 까지는 정상 동작확인, - 약 4166 rows

          -java code 의 경우 에는 그대로 빌드시 Context 관련 초기화 오류 발생 (implement 하여 재구성 필요!)


Confluent 에서 제공하는 CONNECTOR API 소스를 간략히 설명하자면, Java Buffer  reader 를 통해 polling 하는 구조이다.

 관련 소스

          - 해당 class를 참고하여 재구현 할 경우 line input & offset 쓰고 읽는 부분에 대해서 직접 구현이 필요한데, 시간이 더 소요 될듯 하다. 

          - 이것을 Filesystem 으로 기반하여 구현한 OPEN SOURCE 등을 다시 참고해보면

            오픈소스1 :  https://github.com/jcustenborder/kafka-connect-spooldir/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnector.java

             오픈소스2 : https://github.com/mmolimar/kafka-connect-fs

             위 2개중에 Connector 는 agent 단에 놓고,  내부 NETWORK 의 KAFKA BROKER 까지로 전송 할 수 있는 오픈소스를 찾아서 성능 테스트 해보아야한다.

             다만 위 오픈소스 2개 모두 , KAFKA CONFULENT 플랫폼 기반이기 때문에, 원정서버(MAS01~26) 에 일부 설치를 고려해야한다.(즉, Kafka Connector (stand alone등이)  자체적으로 실행 되어야 된다.)

  • Replication / Partition  Performance


  • Scalblity


  •  fault tolerance 


  • *source 수정 편의성


'spark,kafka,hadoop ecosystems > apache.kafka' 카테고리의 다른 글

kafka tuning config  (0) 2018.11.20
kafka multi brokers  (0) 2018.11.20
kafka vs flink vs esper vs storm vs spark  (0) 2018.11.20
kafka-개요  (0) 2018.11.20
kafka 로그 설정 에 대하여  (0) 2018.10.17

+ Recent posts