본문으로 건너뛰기

5 - 데이터 읽기 (2)

외부에서 데이터를 읽기 위한 방법을 안내합니다.

데이터는 실시간 데이터와 과거 데이터로 나누어져 있습니다. 실시간 데이터는 MQTT 프로토콜을 통해 읽을 수 있으며, 과거 데이터는 NazareDB SQL 쿼리를 통해 읽을 수 있습니다.

전송된 데이터는 수집 파이라인 종류와 상관없이 MQTT 프로토콜을 통해 실시간으로 읽을 수 있습니다. 그리고 Delta Lake를 통해 쌓인 데이터는 NazareDB SQL 쿼리를 통해 읽을 수 있습니다.

A. MQTT 프로토콜을 통한 실시간 데이터 읽기

예제는 paho-mqtt client 라이브러리를 사용하였습니다.

MQTT client를 통해 데이터를 읽으려면 MQTT 클러스터의 연결 정보(e.g. endpoint, port, username, password, topic)가 필요합니다. 연결 정보는 데이터 수집 파이프라인 생성 시 이미 제공되었습니다. (연결 정보 확인)

아래의 예제는 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.

import json
from ssl import create_default_context

import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311, connack_string

MQTT_HOST = "MQTT Endpoint (e.g. 192.168.200.12)"
MQTT_PORT = 31083
MQTT_USERNAME = "MQTT Username"
MQTT_PASSWORD = "MQTT Password"
MQTT_TOPIC = "MQTT Topic"
MQTT_CLIENT_ID = "MQTT Unique Client ID"
MQTT_QOS = 0

def on_connect(client: mqtt.Client, userdata, flags, rc, properties):
if rc != 0:
print("Connection failed: %s", connack_string(rc))
return

print(
"Connected succeed: flags: %s, properties: %s",
flags,
properties,
)

rc, mid = client.subscribe(MQTT_TOPIC, qos=MQTT_QOS)
print(
"Subscribing: %s, {}: rc: %s, mid: %s",
MQTT_TOPIC,
connack_string(rc),
mid,
)


def on_connect_fail(client: mqtt.Client, userdata, rc):
print("Connection failed: %s, userdata", connack_string(rc))
client.loop_stop()


def on_disconnect(client: mqtt.Client, userdata, flags, rc, properties):
print(
"Disconnection succeed: %s, flags: %s, properties: %s",
flags,
connack_string(rc),
properties,
)
client.loop_stop()


def on_unhandled_message(_: mqtt.Client, userdata, msg):
print("Unhandled message received from %s: %s", msg.topic, str(msg.payload))


def on_message(client: mqtt.Client, userdata, msg):
value = json.loads(msg.payload.decode("utf-8"))
print("Message received: %s:%s", msg.topic, value)


def on_subscribe(client: mqtt.Client, userdata, mid, rc_list: list, properties):
for rc in rc_list:
if rc != 0:
print("Subscription failed: %s", connack_string(rc))

print(
"Subscribed succeed: %s, mid: %s, properties: %s",
MQTT_TOPIC,
mid,
properties,
)


def on_unsubscribe(client: mqtt.Client, userdata, mid, rc_list: list, properties):
for rc in rc_list:
if rc != 0:
print("Subscription failed: %s", connack_string(rc))

print("Unsubscribed succeed: mid: %s, properties: %s", mid, properties)
client.disconnect()


client = mqtt.Client(
client_id=MQTT_CLIENT_ID,
userdata="userdata",
protocol=MQTTv311,
transport="tcp",
clean_session=True,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
client.tls_set_context(create_default_context())
client.tls_insecure_set(True)

client.username_pw_set(username=MQTT_USERNAME, password=MQTT_PASSWORD)
client.reconnect_delay_set(1, 120)
client.enable_logger()
client.on_connect = on_connect
client.on_connect_fail = on_connect_fail
client.on_disconnect = on_disconnect
client.on_message = on_unhandled_message
client.on_subscribe = on_subscribe
client.on_unsubscribe = on_unsubscribe
client.message_callback_add(MQTT_TOPIC, on_message)

client.connect(host=MQTT_HOST, port=MQTT_PORT)
client.user_data_set("userdata")

client.loop_forever()

B. NazareDB SQL 쿼리를 통한 과거 데이터 읽기

NazareDB는 Arrow Flight SQL, Postgre SQLHTTP REST 프로토콜 등을 지원합니다. 이 중 HTTP REST를 사용하여 데이터를 읽는 예제를 안내합니다. HTTP REST를 통해 읽을 시에 결과 데이터 형식을 선택할 수 있습니다. 현재 지원되는 데이터 형식은 json, csv, arrow/stream 입니다. arrow/stream 은 Apache Arrow에서 제공하는 스트리밍을 위한 binary format으로 가장 빠른 속도를 제공합니다. 예제에서는 arrow/stream을 사용하여 데이터를 읽습니다.

NazareDB를 사용하기 위해서는 NazareDB 각 프로토콜 별 endpoint를 확인하고 NazareDB 용 앱 클라이언트를 생성해야 합니다. 각 프로토콜 별 endpoint는 도구 > 쿼리 에서 확인할 수 있습니다. 앱 클라이언트 생성을 위해서는 좌측 메뉴에서 설정 > 앱통합를 클릭하여 NazareDB 유형으로 앱 클라이언트를 생성하여 username, password를 발급해 주세요.

아래의 예제는 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.

import io
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Optional

import pyarrow as pa
import pyarrow.ipc as pi
import requests

NAZAREDB_REST_URL = "NazareDB REST URL endpoint"
NAZAREDB_USERNAME = "NazareDB Username"
NAZAREDB_PASSWORD = "NazareDB Password"
AUTH = (NAZAREDB_USERNAME, NAZAREDB_PASSWORD)
HEADERS = {"Content-Type": "application/json", "Accept-Encoding": "gzip"}
COMPRESSION = "gzip"

TABLE_NAME = "수집 table ID"
COL_NAME = "수집 column name"

def query(
url: str,
auth: tuple,
headers: dict,
sql: str,
verbose: bool = False,
) -> Optional[pa.Table]:
# print(f"Querying: {sql}....")

res = requests.post(
url,
data=json.dumps({"sql": sql, "compression": "gzip"}),
auth=auth,
headers=headers,
)

if res.status_code != 200:
print(f"Query Failed: {res.reason}, {res.content}")
raise RuntimeError(f"Query Failed: {res}")

if verbose:
print(f"response size: {len(res.content)/1024/1024:4f}MB")
print(json.dumps(dict(res.headers), indent=2))

with pi.open_stream(res.content) as reader:
return pa.Table.from_batches(reader)


def get_data(cur_table, cur_column, from_min_delta=10, to_min_delta=5):
print(f"Getting data of {cur_column} from {cur_table}....")

dt = datetime.now(timezone.utc) - timedelta(minutes=from_min_delta)
start = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
end = start + timedelta(minutes=to_min_delta)
sql = f"SELECT timestamp, \"{cur_column}\" FROM \"{cur_table}\" WHERE date='{start.strftime('%Y-%m-%d')}' AND timestamp BETWEEN '{start}' and '{end}' ORDER BY timestamp"

tb = query(NAZAREDB_REST_URL, AUTH, HEADERS, sql)
df = tb.to_pandas().dropna()
print(df)

get_data(TABLE_NAME, COL_NAME)

NazareDB에 대한 자세한 내용은 NazareDB를 참조하세요.

추가 정보

기술 지원

  • 연락처:

(주)잉클:

  • 주소: 충남 천안시 서북구 천안천4길 32, 그린스타트업타운 3F
  • 연구소: 경기 수원시 영통구 덕영대로 1556번길 16, 디지털엠파이어빌딩 D동 504호
  • 문의 사항: 대표전화 또는 이메일을 통해 연락주시기 바랍니다.

다음 단계

제품 전반에 대한 자세한 정보는 전체 사용자 가이드에서 확인할 수 있습니다.