지구정복

[Flume] Source, Channel, Sink, Interceptor의 종류 및 예제 본문

데이터 엔지니어링 정복/Hadoop Ecosystem

[Flume] Source, Channel, Sink, Interceptor의 종류 및 예제

nooh._.jl 2021. 5. 6. 23:18
728x90
반응형

참고 사이트

 

flume.apache.org/FlumeUserGuide.html#flume-sources

 

Flume 1.9.0 User Guide — Apache Flume

The file_roll sink and the hdfs sink both support the EventSerializer interface. Details of the EventSerializers that ship with Flume are provided below. Body Text Serializer Alias: text. This interceptor writes the body of the event to an output stream wi

flume.apache.org

cyberx.tistory.com/139

 

데이터 수집 – flume [1/2]

Apache Flume <그림 출처 : https://flume.apache.org/FlumeUserGuide.html> 빅데이터를 시작할 때 가장 기초가 되는 데이터 수집 부분에는 여러 오픈소스들이 존재합니다. 그 중에서 대체적으로 많이 사용되는..

cyberx.tistory.com

 

1. 소스, 채널, 싱크 변수 정의하기

더보기

-사용법

agent명.sources = 변수명
agent명.channels = 변수명
agent명.sinks = 변수명

 

-예시

Example_Agent.sources = Example_Source
Example_Agent.channels = Example_Channel
Example_Agent.sinks = Example_Sink

2. 에이전트의 Source 설정하기

더보기

-Source 타입선언하기

agent명.sources.변수명.type = 타입

 -예시

Example_Agent.sources.Example_Source.type = spooldir

 

-Source type의 종류

1. spooldir : 지정한 특정 디렉터리를 모니터링하고 있다가 새로운 파일이 생성되면
이벤트를 감지해서 batchsize의 설정값만큼 읽어서 channel에 데이터를 전송한다.

이때 주의할 점으로  spoolDir로 설정한 디렉터리에 읽기, 쓰기, 실행 등의 권한을 다 부여하는 것이 좋다(chmod 777 -R ~)

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.deletePolicy = immediate   #입력하면 즉시 데이터 삭제
a1.sources.src-1.batchSize = 1000
a1.sources.src-1.fileHeader = true

 

2. avro : 외부 avro클라이언트에서 전송하는 데이터 수신해서 사용

(avro란 아파치 아브로이고 데이터 직렬화 프로그램이다.)
(데이터 직렬화(serialize)는 객체의 내용을 바이트 단위로 변환하여 파일 또는 네트워크를 통해서 스트림(송수신)이 가능하도록 하는 것을 의미)

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

 

3. thrift : 외부 thrift 클라이언트에서 전송하는 데이터 수신해서 사용

(페이스북에서 개발한 다양한 언어를 지원하는 RPC프레임워크)
(RPC프레임워크는 클라이언트-서버간 커뮤니케이션에 필요한 상세 정보는 최대한 감추고
클라이언트는 일반 메소드를 호출하는 것처럼 호출한다.
서버도 마찬가지로 일반 메소드를 다루는 것처럼 하는 방법

# Client
z = function(x, y)

# Server
function(x, y) {
 compute x, y
 return z
}
)

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

 

4. exec : 시스템 명령어를 실행하고 출력 내용을 데이터 입력으로 사용

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

 

5. jms : JMS 메시지를 입력으로 사용

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

 

6. org.apache.flume.source.kafka.KafkaSource : kafka로부터 데이터를 가져와 입력으로 사용

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

혹은

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$    #토픽 0부터 9까지 가져온다.

 

7. seq : 일련번호를 생성해서 입력으로 사용

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

 

8. syslogtcp : 시스템 로그를 입력으로 사용

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

 


3. Channel 설정

더보기

대표적인 채널 설정방법을 알아보자.

-memory : 메모리에 데이터 저장

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

 

-jdbc : 내장되어 있는 Derby를 사용하여 데이터를 저장

(Derby는 아파치의 관계형 데이터베이스)

a1.channels = c1
a1.channels.c1.type = jdbc

-org.apache.flume.channel.kafka.KafkaChannel : kafka cluster(topic)에 저장
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

 

-org.apache.flume.channel.kafka.KafkaChannel : kafka cluster(topic)에 저장

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

 

-file : 로컬 파일에 저장

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

 


4. Sink 설정하기

더보기

-hdfs : HDFS에 데이터를 파일로 저장

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

 

-logger : 수집한 데이터를 테스트 및 디버깅 목적으로 플럼의 표준 출력 로그파일인 /var/log/flume-ng/~ 에 출력한다.

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

 

-avro : 다른 avro 서버에 이벤트를 전달

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

 

-thrift : 다른 Thrift 서버에 이벤트를 전달

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

 

-file_roll : 로컬 파일에 데이터를 저장

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

 

-null : 데이터를 파기

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

 

-hbase : HBASE에 데이터를 저장

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

 

-org.apache.flume.sink.elasticsearch.ElasticSearchSink : 데이터를 ElasticSearch 클러스터에 저장

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

 

-org.apache.flume.sink.kafka.KafkaSink : kafka에 데이터를 저장

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

5. Interceptor 종류

더보기

인터셉터 한 개 이상 적용이 가능하다.

 

-timestamp : 이벤트 헤더에 현재 시간 값 추가

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

 

-static : 이벤트 헤더에 지정한 값 추가

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

 

-regex_filter : 정규 표현식에 일치하는지 여부에 따라 이벤트를 버릴지 말지 결정

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

 

 

 

 

 

728x90
반응형
Comments