본문으로 건너뛰기

3 - 데이터 전송

외부에서 데이터를 전송하는 방법을 안내합니다.

데이터 전송은 앞서 생성한 데이터 수집 파이프라인의 종류에 따라 다릅니다.

아래 예제는 Python 코드를 사용하여 데이터를 전송하는 방법을 안내합니다. 다른 언어로 데이터를 전송하는 방법은 각 프로토콜의 공식 문서를 참조하세요.

데이터 전송 값은 앞서 설정한 수집 파이프라인의 스키마에 맞추어 JSON 형식으로 전송되어야 합니다. timestamp 필드는 필수이며, timestamp 필드는 UTC 기준의 마이크로초 단위의 정수값이어야 합니다.

A. Kafka 타입 데이터 전송

데이터 수집 파이프라인을 Kafka로 생성한 경우, Kafka 형식으로 데이터를 전송해야 합니다. 아래에서는 Kafka로 데이터를 전송하는 간단한 Python 예제를 안내합니다. 예제는 Confluent kafka client 라이브러리를 사용하였습니다.

Kafka client를 통해 전송하려면 Kafka 클러스터의 연결 정보(e.g. bootstrap servers, username, password, topic)가 필요합니다. 연결 정보는 데이터 수집 파이프라인 생성 시 이미 제공되었습니다. (연결 정보 확인) Nazare 외부에서의 데이터 전송은 SASL_SSL security protocol과 SCRAM-SHA-512 SASL mechanism을 사용해야만 하며, Nzdata와 client 사이에 데이터는 모두 암호화 됩니다.

아래의 예제는 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.

import json
import os
from datetime import datetime, timezone
from time import sleep

from confluent_kafka import KafkaException, Producer

KAFKA_BOOTSTRAP_SERVERS="Endpoint:Port (e.g. 192.168.200.12:31092)"
KAFKA_USERNAME="Kafka Username"
KAFKA_PASSWORD="Kafka Password"
KAFKA_TOPIC="Kafka Topic"

# https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
configs = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": KAFKA_USERNAME,
"sasl.password": KAFKA_PASSWORD,
"enable.ssl.certificate.verification": False,
"compression.type": "snappy",
"delivery.timeout.ms": 3000,
"batch.size": 16384,
"linger.ms": 1000,
"acks": 0,
}

producer = Producer(configs)

def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")

print(
f"Produced record to topic={msg.topic()} partition=[{msg.partition()}]"
f" offset={msg.offset()}"
)


while True:
try:
now = datetime.now(timezone.utc)

producer.produce(
KAFKA_TOPIC,
json.dumps(
{
"timestamp": int(now.timestamp() * 1e6),
"field0": "value0",
"field1": 1234,
}
),
"test-key",
on_delivery=delivery_report,
)
except KafkaException as e:
print(f"KafkaException: {e}")

producer.flush()
print("Produced")
sleep(1)

Kafka는 다양한 client 라이브러리를 지원하므로, 사용자가 선호하는 라이브러리를 사용하여 데이터를 전송할 수 있습니다. 지원되는 client 라이브러리는 Apache Kafka Clients 또는 Apache kafka Clients on Confluent를 참조하세요.

B. MQTT 타입 데이터 전송

데이터 수집 파이프라인을 MQTT로 생성한 경우, MQTT 형식으로 데이터를 전송해야 합니다. 아래에서는 MQTT로 데이터를 전송하는 간단한 Python 예제를 안내합니다. 예제는 paho-mqtt client 라이브러리를 사용하였습니다.

MQTT client를 통해 전송하려면 MQTT 클러스터의 연결 정보(e.g. endpoint, port, username, password, topic)가 필요합니다. 연결 정보는 데이터 수집 파이프라인 생성 시 이미 제공되었습니다. (연결 정보 확인) Nazare 외부에서의 데이터 전송은 SSL을 통해 모두 암호화 됩니다.

아래의 예제는 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.

import json
from datetime import datetime, timezone
from time import sleep

import paho.mqtt.publish as publish
from paho.mqtt.client import MQTTv311

MQTT_HOST = "MQTT Endpoint (e.g. 192.168.200.12)"
MQTT_PORT = 31083
MQTT_USERNAME = "MQTT Username"
MQTT_PASSWORD = "MQTT Password"
MQTT_TOPIC = "MQTT Topic"
MQTT_CLIENT_ID = "MQTT Unique Client ID"

while True:
try:
now = datetime.now(timezone.utc)
publish.single(
topic=MQTT_TOPIC,
payload=json.dumps(
{
"timestamp": int(now.timestamp() * 1e6),
"field0": "value0",
"field1": 1234,
}
),
qos=1,
hostname=MQTT_HOST,
port=MQTT_PORT,
client_id=MQTT_CLIENT_ID,
auth={
"username": MQTT_USERNAME,
"password": MQTT_PASSWORD,
},
tls={"insecure": True},
protocol=MQTTv311,
transport="tcp",
)

print("Published")
except RuntimeError as e:
print(f"RuntimeError: {e}")
print("Published")
sleep(1)

print("Finished")

MQTT 다양한 client 라이브러리를 지원하므로, 사용자가 선호하는 라이브러리를 사용하여 데이터를 전송할 수 있습니다. 지원되는 client 라이브러리는 MQTT Client SDK 또는 MQTT Client Library Encyclopedia를 참조하세요.

다음 단계

지금까지 데이터 수집 파이프라인을 생성하고 데이터를 전송하는 방법을 안내했습니다. 다음 단계에서는 데이터를 읽어오는 방법을 안내합니다.