데이터 엔지니어링 정복/Python

[Python&Kafka] Python으로 UDP데이터 받은 뒤 Kafka Producing

noohhee 2025. 5. 29. 18:47
728x90
반응형

 

python3.8

kafka 2.8.2

 

 

기존에 NiFi로 UDP데이터 받은 뒤 Kafka에 프로듀싱하는 작업을 진행했었는데

데이터 유실이 발생하였다.

 

NiFi가 문제라고 확정지을 수 없지만 NiFi에서 처리됐던 것들을 Python으로 구현해서 데이터 유실이 생기는지 확인한다.

 

 

현재 Load Balancer Server로부터 NiFi 4대 서버로 UDP데이터를 보내주고 있고, 포트는 8888이다.

 

LB -(udp, 8888)-> NiFi Nodes

 

각 NiFi node에 python프로그램을 만든다.

 

모든 NiFi 노드들 (dn001~004, 4대)
ssh myuser@dn001~dn004

mkdir -p 01.listenUdp/log 01.listenUdp/src

cd 01.listenUdp/src

vim listen_udp_v01.py

```
import os
import socket
import threading
import queue
import time
import logging
from kafka import KafkaProducer
from datetime import datetime
 
# 설정
UDP_IP = "0.0.0.0"
UDP_PORT = 8888
KAFKA_BROKER = "10.10.10.13:6667,10.10.10.14:6667,10.10.10.15:6667,10.10.10.16:6667"
KAFKA_TOPIC = "TEST_TOPIC"
WORKER_COUNT = 4
QUEUE_MAXSIZE = 50000
LOG_FILE = "udp_kafka_stats.log"
 
 
# 현재 시각
now = datetime.now()
log_date = now.strftime("%Y-%m-%d")
log_time = now.strftime("%Y-%m-%d-%H:%M")
 
# 로그 디렉토리 생성
log_dir = f"/home/myuser/01.listenUdp/log/{log_date}"
os.makedirs(log_dir, exist_ok=True)
 
# 로그 파일 이름 생성
LOG_FILE = os.path.join(log_dir, f"udp_kafka_stats_{log_time}.log")
 
# 로그 설정
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
 
# Kafka 프로듀서
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: v
)
 
# 큐와 통계
data_queue = queue.Queue(maxsize=QUEUE_MAXSIZE)
stats = {
    "valid_received": 0,
    "sent": 0,
    "queue_drops": 0
}
stats_lock = threading.Lock()
 
# UDP 수신 스레드
def udp_receiver():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 26214400)
    sock.bind((UDP_IP, UDP_PORT))
    print(f"[Receiver] Listening on UDP port {UDP_PORT}...")
 
    while True:
        try:
            data, addr = sock.recvfrom(65535)
            if len(data) > 4:
                with stats_lock:
                    stats["valid_received"] += 1
                try:
                    data_queue.put((data, addr), timeout=1)
                except queue.Full:
                    with stats_lock:
                        stats["queue_drops"] += 1
        except Exception as e:
            print(f"[Receiver Error] {e}")
 
# Kafka 전송 스레드
def kafka_worker(worker_id):
    while True:
        try:
            data, addr = data_queue.get(timeout=1)
            producer.send(KAFKA_TOPIC, value=data)
            with stats_lock:
                stats["sent"] += 1
        except queue.Empty:
            continue
 
# 정각 기준 1분 단위 로깅 스레드
def logger():
    prev = stats.copy()
 
    while True:
        now = datetime.now()
        seconds_to_next_minute = 60 - now.second
        time.sleep(seconds_to_next_minute)
 
        with stats_lock:
            delta = {k: stats[k] - prev[k] for k in stats}
            prev = stats.copy()
 
        recv = delta["valid_received"]
        sent = delta["sent"]
        drop = delta["queue_drops"]
        log_time = datetime.now().replace(second=0, microsecond=0)
 
        logging.info(f"{log_time}, recv={recv}, sent={sent}, dropped={drop}")
        print(f"[1m Log] {log_time} | recv: {recv}, sent: {sent}, dropped: {drop}")
 
# 스레드 시작
threading.Thread(target=udp_receiver, daemon=True).start()
for i in range(WORKER_COUNT):
    threading.Thread(target=kafka_worker, args=(i,), daemon=True).start()
threading.Thread(target=logger, daemon=True).start()
 
# 메인 루프 (프로세스 유지를 위해)
try:
    while True:
        time.sleep(5)
except KeyboardInterrupt:
    print("Shutting down...")
finally:
    producer.flush()
    producer.close()
```

chmod +x listen_udp_v01.py





#py를 실행시켜주거나 종료시켜주는 sh제작

cd ~/01.listenUdp

vim start_listen_udp.sh

```
#!/bin/bash
 
SCRIPT_NAME="/home/myuser/01.listenUdp/src/listen_udp_v01.py"
PYTHON_EXEC="python38"
 
# 이미 실행 중인지 확인
PIDS=$(ps -ef | grep "[p]ython38.*$SCRIPT_NAME" | awk '{print $2}')
 
if [ -n "$PIDS" ]; then
  echo "[$SCRIPT_NAME] is already running (PID: $PIDS)"
  exit 1
fi
 
# 실행
echo "Starting $SCRIPT_NAME..."
nohup $PYTHON_EXEC $SCRIPT_NAME &
 
sleep 1
NEW_PID=$(ps -ef | grep "[p]ython38.*$SCRIPT_NAME" | awk '{print $2}')
echo "Started with PID: $NEW_PID"
```

vim stop_listen_udp.sh
```
#!/bin/bash
 
# python38으로 실행된 listen_udp_v*.py 프로세스 모두 종료
PIDS=$(ps -ef | grep '[p]ython38.*listen_udp_v[0-9]\+\.py' | awk '{print $2}')
 
if [ -z "$PIDS" ]; then
  echo "No matching listen_udp_v*.py process found."
else
  echo "Killing listen_udp_v*.py process(es): $PIDS"
  kill -9 $PIDS
fi
```

chmod +x *.sh



#이제 gn001에서 위 dn001~004 서버에 있는 위 스크립트들을 원격에서 실행시켜주도록 한다.

ssh myuser@gn001

ssh-keygen
(모두엔터)

ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn001

ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn002

ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn003

ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn004



pwd

mkdir 01.python_listenUdp

cd 01.python_listenUdp

vim control_udp_cluster.sh
```
#!/bin/bash
 
# 서버 리스트
SERVERS=("dn001" "dn002" "dn003" "dn004")
 
# 사용자 및 스크립트 경로
USER="myuser"
SCRIPT_DIR="/home/myuser/01.listenUdp"
SCRIPT_NAME=$1  # start 또는 stop
 
# 실행 스크립트 이름 결정
if [[ "$SCRIPT_NAME" == "start" ]]; then
  TARGET_SCRIPT="start_listen_udp.sh"
elif [[ "$SCRIPT_NAME" == "stop" ]]; then
  TARGET_SCRIPT="stop_listen_udp.sh"
else
  echo "Usage: $0 [start|stop]"
  exit 1
fi
 
# 병렬로 각 서버 접속하여 스크립트 실행
for HOST in "${SERVERS[@]}"
do
  {
    echo "[$HOST] Running $TARGET_SCRIPT..."
    ssh ${USER}@${HOST} "bash ${SCRIPT_DIR}/${TARGET_SCRIPT}"
    echo "[$HOST] Done."
  } &
done
 
 
echo "All start commands triggered (not waiting)."
```


chmod +x control_udp_cluster.sh





#시작방법
./control_udp_cluster.sh start



#종료방법
./control_udp_cluster.sh stop





#로그확인방법
#각 dn001~004에 접속

ssh myuser@dn001

tail -f /home/myuser/01.listenUdp/log/{오늘날짜}/udp_kafka_stats_{오늘날짜}.log



 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

728x90
반응형