지구정복
[Flume] Source, Channel, Sink, Interceptor의 종류 및 예제 본문
[Flume] Source, Channel, Sink, Interceptor의 종류 및 예제
nooh._.jl 2021. 5. 6. 23:18참고 사이트
flume.apache.org/FlumeUserGuide.html#flume-sources
1. 소스, 채널, 싱크 변수 정의하기
-사용법
agent명.sources = 변수명
agent명.channels = 변수명
agent명.sinks = 변수명
-예시
Example_Agent.sources = Example_Source
Example_Agent.channels = Example_Channel
Example_Agent.sinks = Example_Sink
2. 에이전트의 Source 설정하기
-Source 타입선언하기
agent명.sources.변수명.type = 타입
-예시
Example_Agent.sources.Example_Source.type = spooldir
-Source type의 종류
1. spooldir : 지정한 특정 디렉터리를 모니터링하고 있다가 새로운 파일이 생성되면
이벤트를 감지해서 batchsize의 설정값만큼 읽어서 channel에 데이터를 전송한다.
이때 주의할 점으로 spoolDir로 설정한 디렉터리에 읽기, 쓰기, 실행 등의 권한을 다 부여하는 것이 좋다(chmod 777 -R ~)
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.deletePolicy = immediate #입력하면 즉시 데이터 삭제
a1.sources.src-1.batchSize = 1000
a1.sources.src-1.fileHeader = true
2. avro : 외부 avro클라이언트에서 전송하는 데이터 수신해서 사용
(avro란 아파치 아브로이고 데이터 직렬화 프로그램이다.)
(데이터 직렬화(serialize)는 객체의 내용을 바이트 단위로 변환하여 파일 또는 네트워크를 통해서 스트림(송수신)이 가능하도록 하는 것을 의미)
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
3. thrift : 외부 thrift 클라이언트에서 전송하는 데이터 수신해서 사용
(페이스북에서 개발한 다양한 언어를 지원하는 RPC프레임워크)
(RPC프레임워크는 클라이언트-서버간 커뮤니케이션에 필요한 상세 정보는 최대한 감추고
클라이언트는 일반 메소드를 호출하는 것처럼 호출한다.
서버도 마찬가지로 일반 메소드를 다루는 것처럼 하는 방법
# Client
z = function(x, y)
# Server
function(x, y) {
compute x, y
return z
}
)
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
4. exec : 시스템 명령어를 실행하고 출력 내용을 데이터 입력으로 사용
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
5. jms : JMS 메시지를 입력으로 사용
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
6. org.apache.flume.source.kafka.KafkaSource : kafka로부터 데이터를 가져와 입력으로 사용
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
혹은
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ #토픽 0부터 9까지 가져온다.
7. seq : 일련번호를 생성해서 입력으로 사용
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
8. syslogtcp : 시스템 로그를 입력으로 사용
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
3. Channel 설정
대표적인 채널 설정방법을 알아보자.
-memory : 메모리에 데이터 저장
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
-jdbc : 내장되어 있는 Derby를 사용하여 데이터를 저장
(Derby는 아파치의 관계형 데이터베이스)
a1.channels = c1
a1.channels.c1.type = jdbc
-org.apache.flume.channel.kafka.KafkaChannel : kafka cluster(topic)에 저장
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
-org.apache.flume.channel.kafka.KafkaChannel : kafka cluster(topic)에 저장
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
-file : 로컬 파일에 저장
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
4. Sink 설정하기
-hdfs : HDFS에 데이터를 파일로 저장
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
-logger : 수집한 데이터를 테스트 및 디버깅 목적으로 플럼의 표준 출력 로그파일인 /var/log/flume-ng/~ 에 출력한다.
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
-avro : 다른 avro 서버에 이벤트를 전달
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
-thrift : 다른 Thrift 서버에 이벤트를 전달
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
-file_roll : 로컬 파일에 데이터를 저장
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
-null : 데이터를 파기
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1
-hbase : HBASE에 데이터를 저장
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
-org.apache.flume.sink.elasticsearch.ElasticSearchSink : 데이터를 ElasticSearch 클러스터에 저장
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
-org.apache.flume.sink.kafka.KafkaSink : kafka에 데이터를 저장
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
5. Interceptor 종류
인터셉터 한 개 이상 적용이 가능하다.
-timestamp : 이벤트 헤더에 현재 시간 값 추가
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
-static : 이벤트 헤더에 지정한 값 추가
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
-regex_filter : 정규 표현식에 일치하는지 여부에 따라 이벤트를 버릴지 말지 결정
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
'데이터 엔지니어링 정복 > Hadoop Ecosystem' 카테고리의 다른 글
[Storm] Storm 개념 및 구성요소 (0) | 2021.05.08 |
---|---|
[Redis] Redis 개념 및 구성요소 (0) | 2021.05.07 |
[Cloudera Manager] 클라우데라 매니저로 Flume, Kafka 설치하기 (0) | 2021.05.06 |
[Flume, Kafka] 개념 및 아키텍처 (0) | 2021.05.06 |
[Hadoop, Zookeeper] 기본명령어 (0) | 2021.05.06 |