데이터 엔지니어링 정복/Python
[Python&Kafka] Python으로 UDP데이터 받은 뒤 Kafka Producing
noohhee
2025. 5. 29. 18:47
728x90
반응형
python3.8
kafka 2.8.2
기존에 NiFi로 UDP데이터 받은 뒤 Kafka에 프로듀싱하는 작업을 진행했었는데
데이터 유실이 발생하였다.
NiFi가 문제라고 확정지을 수 없지만 NiFi에서 처리됐던 것들을 Python으로 구현해서 데이터 유실이 생기는지 확인한다.
현재 Load Balancer Server로부터 NiFi 4대 서버로 UDP데이터를 보내주고 있고, 포트는 8888이다.
LB -(udp, 8888)-> NiFi Nodes
각 NiFi node에 python프로그램을 만든다.
모든 NiFi 노드들 (dn001~004, 4대) ssh myuser@dn001~dn004 mkdir -p 01.listenUdp/log 01.listenUdp/src cd 01.listenUdp/src vim listen_udp_v01.py ``` import os import socket import threading import queue import time import logging from kafka import KafkaProducer from datetime import datetime # 설정 UDP_IP = "0.0.0.0" UDP_PORT = 8888 KAFKA_BROKER = "10.10.10.13:6667,10.10.10.14:6667,10.10.10.15:6667,10.10.10.16:6667" KAFKA_TOPIC = "TEST_TOPIC" WORKER_COUNT = 4 QUEUE_MAXSIZE = 50000 LOG_FILE = "udp_kafka_stats.log" # 현재 시각 now = datetime.now() log_date = now.strftime("%Y-%m-%d") log_time = now.strftime("%Y-%m-%d-%H:%M") # 로그 디렉토리 생성 log_dir = f"/home/myuser/01.listenUdp/log/{log_date}" os.makedirs(log_dir, exist_ok=True) # 로그 파일 이름 생성 LOG_FILE = os.path.join(log_dir, f"udp_kafka_stats_{log_time}.log") # 로그 설정 logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) # Kafka 프로듀서 producer = KafkaProducer( bootstrap_servers=KAFKA_BROKER, value_serializer=lambda v: v ) # 큐와 통계 data_queue = queue.Queue(maxsize=QUEUE_MAXSIZE) stats = { "valid_received": 0, "sent": 0, "queue_drops": 0 } stats_lock = threading.Lock() # UDP 수신 스레드 def udp_receiver(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 26214400) sock.bind((UDP_IP, UDP_PORT)) print(f"[Receiver] Listening on UDP port {UDP_PORT}...") while True: try: data, addr = sock.recvfrom(65535) if len(data) > 4: with stats_lock: stats["valid_received"] += 1 try: data_queue.put((data, addr), timeout=1) except queue.Full: with stats_lock: stats["queue_drops"] += 1 except Exception as e: print(f"[Receiver Error] {e}") # Kafka 전송 스레드 def kafka_worker(worker_id): while True: try: data, addr = data_queue.get(timeout=1) producer.send(KAFKA_TOPIC, value=data) with stats_lock: stats["sent"] += 1 except queue.Empty: continue # 정각 기준 1분 단위 로깅 스레드 def logger(): prev = stats.copy() while True: now = datetime.now() seconds_to_next_minute = 60 - now.second time.sleep(seconds_to_next_minute) with stats_lock: delta = {k: stats[k] - prev[k] for k in stats} prev = stats.copy() recv = delta["valid_received"] sent = delta["sent"] drop = delta["queue_drops"] log_time = datetime.now().replace(second=0, microsecond=0) logging.info(f"{log_time}, recv={recv}, sent={sent}, dropped={drop}") print(f"[1m Log] {log_time} | recv: {recv}, sent: {sent}, dropped: {drop}") # 스레드 시작 threading.Thread(target=udp_receiver, daemon=True).start() for i in range(WORKER_COUNT): threading.Thread(target=kafka_worker, args=(i,), daemon=True).start() threading.Thread(target=logger, daemon=True).start() # 메인 루프 (프로세스 유지를 위해) try: while True: time.sleep(5) except KeyboardInterrupt: print("Shutting down...") finally: producer.flush() producer.close() ``` chmod +x listen_udp_v01.py #py를 실행시켜주거나 종료시켜주는 sh제작 cd ~/01.listenUdp vim start_listen_udp.sh ``` #!/bin/bash SCRIPT_NAME="/home/myuser/01.listenUdp/src/listen_udp_v01.py" PYTHON_EXEC="python38" # 이미 실행 중인지 확인 PIDS=$(ps -ef | grep "[p]ython38.*$SCRIPT_NAME" | awk '{print $2}') if [ -n "$PIDS" ]; then echo "[$SCRIPT_NAME] is already running (PID: $PIDS)" exit 1 fi # 실행 echo "Starting $SCRIPT_NAME..." nohup $PYTHON_EXEC $SCRIPT_NAME & sleep 1 NEW_PID=$(ps -ef | grep "[p]ython38.*$SCRIPT_NAME" | awk '{print $2}') echo "Started with PID: $NEW_PID" ``` vim stop_listen_udp.sh ``` #!/bin/bash # python38으로 실행된 listen_udp_v*.py 프로세스 모두 종료 PIDS=$(ps -ef | grep '[p]ython38.*listen_udp_v[0-9]\+\.py' | awk '{print $2}') if [ -z "$PIDS" ]; then echo "No matching listen_udp_v*.py process found." else echo "Killing listen_udp_v*.py process(es): $PIDS" kill -9 $PIDS fi ``` chmod +x *.sh #이제 gn001에서 위 dn001~004 서버에 있는 위 스크립트들을 원격에서 실행시켜주도록 한다. ssh myuser@gn001 ssh-keygen (모두엔터) ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn001 ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn002 ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn003 ssh-copy-id -i ~/.ssh/id_rsa.pub myuser@dn004 pwd mkdir 01.python_listenUdp cd 01.python_listenUdp vim control_udp_cluster.sh ``` #!/bin/bash # 서버 리스트 SERVERS=("dn001" "dn002" "dn003" "dn004") # 사용자 및 스크립트 경로 USER="myuser" SCRIPT_DIR="/home/myuser/01.listenUdp" SCRIPT_NAME=$1 # start 또는 stop # 실행 스크립트 이름 결정 if [[ "$SCRIPT_NAME" == "start" ]]; then TARGET_SCRIPT="start_listen_udp.sh" elif [[ "$SCRIPT_NAME" == "stop" ]]; then TARGET_SCRIPT="stop_listen_udp.sh" else echo "Usage: $0 [start|stop]" exit 1 fi # 병렬로 각 서버 접속하여 스크립트 실행 for HOST in "${SERVERS[@]}" do { echo "[$HOST] Running $TARGET_SCRIPT..." ssh ${USER}@${HOST} "bash ${SCRIPT_DIR}/${TARGET_SCRIPT}" echo "[$HOST] Done." } & done echo "All start commands triggered (not waiting)." ``` chmod +x control_udp_cluster.sh #시작방법 ./control_udp_cluster.sh start #종료방법 ./control_udp_cluster.sh stop #로그확인방법 #각 dn001~004에 접속 ssh myuser@dn001 tail -f /home/myuser/01.listenUdp/log/{오늘날짜}/udp_kafka_stats_{오늘날짜}.log |
728x90
반응형