5 - 데이터 읽기 (2)
외부에서 데이터를 읽기 위한 방법을 안내합니다.
데이터는 실시간 데이터와 과거 데이터로 나누어져 있습니다. 실시간 데이터는 MQTT 프로토콜을 통해 읽을 수 있으며, 과거 데이터는 NazareDB SQL 쿼리를 통해 읽을 수 있습니다.
전송된 데이터는 수집 파이라인 종류와 상관없이 MQTT 프로토콜을 통해 실시간으로 읽을 수 있습니다. 그리고 Delta Lake를 통해 쌓인 데이터는 NazareDB SQL 쿼리를 통해 읽을 수 있습니다.
A. MQTT 프로토콜을 통한 실시간 데이터 읽기
예제는 paho-mqtt client 라이브러리를 사용하였습니다.
MQTT client를 통해 데이터를 읽으려면 MQTT 클러스터의 연결 정보(e.g. endpoint, port, username, password, topic)가 필요합니다. 연결 정보는 데이터 수집 파이프라인 생성 시 이미 제공되었습니다. (연결 정보 확인)
아래의 예제는 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.
import json
from ssl import create_default_context
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311, connack_string
MQTT_HOST = "MQTT Endpoint (e.g. 192.168.200.12)"
MQTT_PORT = 31083
MQTT_USERNAME = "MQTT Username"
MQTT_PASSWORD = "MQTT Password"
MQTT_TOPIC = "MQTT Topic"
MQTT_CLIENT_ID = "MQTT Unique Client ID"
MQTT_QOS = 0
def on_connect(client: mqtt.Client, userdata, flags, rc, properties):
if rc != 0:
print("Connection failed: %s", connack_string(rc))
return
print(
"Connected succeed: flags: %s, properties: %s",
flags,
properties,
)
rc, mid = client.subscribe(MQTT_TOPIC, qos=MQTT_QOS)
print(
"Subscribing: %s, {}: rc: %s, mid: %s",
MQTT_TOPIC,
connack_string(rc),
mid,
)
def on_connect_fail(client: mqtt.Client, userdata, rc):
print("Connection failed: %s, userdata", connack_string(rc))
client.loop_stop()
def on_disconnect(client: mqtt.Client, userdata, flags, rc, properties):
print(
"Disconnection succeed: %s, flags: %s, properties: %s",
flags,
connack_string(rc),
properties,
)
client.loop_stop()
def on_unhandled_message(_: mqtt.Client, userdata, msg):
print("Unhandled message received from %s: %s", msg.topic, str(msg.payload))
def on_message(client: mqtt.Client, userdata, msg):
value = json.loads(msg.payload.decode("utf-8"))
print("Message received: %s:%s", msg.topic, value)
def on_subscribe(client: mqtt.Client, userdata, mid, rc_list: list, properties):
for rc in rc_list:
if rc != 0:
print("Subscription failed: %s", connack_string(rc))
print(
"Subscribed succeed: %s, mid: %s, properties: %s",
MQTT_TOPIC,
mid,
properties,
)
def on_unsubscribe(client: mqtt.Client, userdata, mid, rc_list: list, properties):
for rc in rc_list:
if rc != 0:
print("Subscription failed: %s", connack_string(rc))
print("Unsubscribed succeed: mid: %s, properties: %s", mid, properties)
client.disconnect()
client = mqtt.Client(
client_id=MQTT_CLIENT_ID,
userdata="userdata",
protocol=MQTTv311,
transport="tcp",
clean_session=True,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
client.tls_set_context(create_default_context())
client.tls_insecure_set(True)
client.username_pw_set(username=MQTT_USERNAME, password=MQTT_PASSWORD)
client.reconnect_delay_set(1, 120)
client.enable_logger()
client.on_connect = on_connect
client.on_connect_fail = on_connect_fail
client.on_disconnect = on_disconnect
client.on_message = on_unhandled_message
client.on_subscribe = on_subscribe
client.on_unsubscribe = on_unsubscribe
client.message_callback_add(MQTT_TOPIC, on_message)
client.connect(host=MQTT_HOST, port=MQTT_PORT)
client.user_data_set("userdata")
client.loop_forever()
B. NazareDB SQL 쿼리를 통한 과거 데이터 읽기
NazareDB는 Arrow Flight SQL, Postgre SQL 및 HTTP REST 프로토콜 등을 지원합니다.
이 중 HTTP REST를 사용하여 데이터를 읽는 예제를 안내합니다. HTTP REST를 통해 읽을 시에 결과 데이터 형식을 선택할 수 있습니다.
현재 지원되는 데이터 형식은 json
, csv
, arrow/stream
입니다. arrow/stream
은 Apache Arrow에서 제공하는 스트리밍을 위한 binary format으로 가장 빠른 속도를 제공합니다.
예제에서는 arrow/stream
을 사용하여 데이터를 읽습니다.
NazareDB를 사용하기 위해서는 NazareDB 각 프로토콜 별 endpoint를 확인하고 NazareDB 용 앱 클라이언트를 생성해야 합니다.
각 프로토콜 별 endpoint는 도구 > 쿼리 에서 확인할 수 있습니다.
앱 클라이언트 생성을 위해서는 좌측 메뉴에서 설정 > 앱통합를 클릭하여 NazareDB
유형으로 앱 클라이언트를 생성하여 username, password를 발급해 주세요.
아래의 예제는 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.
import io
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
import pyarrow as pa
import pyarrow.ipc as pi
import requests
NAZAREDB_REST_URL = "NazareDB REST URL endpoint"
NAZAREDB_USERNAME = "NazareDB Username"
NAZAREDB_PASSWORD = "NazareDB Password"
AUTH = (NAZAREDB_USERNAME, NAZAREDB_PASSWORD)
HEADERS = {"Content-Type": "application/json", "Accept-Encoding": "gzip"}
COMPRESSION = "gzip"
TABLE_NAME = "수집 table ID"
COL_NAME = "수집 column name"
def query(
url: str,
auth: tuple,
headers: dict,
sql: str,
verbose: bool = False,
) -> Optional[pa.Table]:
# print(f"Querying: {sql}....")
res = requests.post(
url,
data=json.dumps({"sql": sql, "compression": "gzip"}),
auth=auth,
headers=headers,
)
if res.status_code != 200:
print(f"Query Failed: {res.reason}, {res.content}")
raise RuntimeError(f"Query Failed: {res}")
if verbose:
print(f"response size: {len(res.content)/1024/1024:4f}MB")
print(json.dumps(dict(res.headers), indent=2))
with pi.open_stream(res.content) as reader:
return pa.Table.from_batches(reader)
def get_data(cur_table, cur_column, from_min_delta=10, to_min_delta=5):
print(f"Getting data of {cur_column} from {cur_table}....")
dt = datetime.now(timezone.utc) - timedelta(minutes=from_min_delta)
start = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
end = start + timedelta(minutes=to_min_delta)
sql = f"SELECT timestamp, \"{cur_column}\" FROM \"{cur_table}\" WHERE date='{start.strftime('%Y-%m-%d')}' AND timestamp BETWEEN '{start}' and '{end}' ORDER BY timestamp"
tb = query(NAZAREDB_REST_URL, AUTH, HEADERS, sql)
df = tb.to_pandas().dropna()
print(df)
get_data(TABLE_NAME, COL_NAME)
NazareDB에 대한 자세한 내용은 NazareDB를 참조하세요.