본문으로 건너뛰기

샘플 코드

개요

NazareDB를 활용하는 다양한 방법을 Python 샘플 기반으로 소개합니다. 이 섹션에서는 Arrow FlightSQL, PostgreSQL, 그리고 REST API를 이용한 데이터 접근 및 조작 방법에 대한 코드 예제를 제공합니다.

각 접근 방식에서 최종적으로 SQL 쿼리를 NazareDB에 전달하고 결과를 받아오는 방법은 동일합니다. 다만 각 방법은 데이터를 전송하고 받아오는 방식이 다르며, 각 방법마다 장단점이 있습니다. 따라서 각 방법을 사용자의 필요와 환경에 맞추어 적절히 선택하여 사용하시기 바랍니다.

아래의 예제 들은 도구 메뉴의 JupyterHub 서비스 를 실행하여 확인하실 수 있습니다.

ADBC DB-API (Arrow Flight SQL)

ADBC API를 통해 NazareDB에 연결하고 Arrow Flight SQL를 프로토콜을 통해 SQL 쿼리를 실행하는 예제입니다. NazareDB가 사용하는 Arrow Flight SQL은 고성능 데이터 전송을 위한 프로토콜이기에 고속으로 대용량 데이터를 처리하는데 효과적입니다. ADBC API에 대한 자세한 설명은 Introducing Arrow ADBC를 참조하세요.

ADBC API는 DB-API와 유사한 인터페이스를 제공하기 때문에 기존 Python 사용자가 쉽게 사용할 수 있습니다.

import base64
import os
from datetime import datetime, timedelta, timezone

from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import InternalError, connect

NAZAREDB_FLIGHTSQL_HOST = "NazareDB FlightSQL Host"
NAZAREDB_FLIGHTSQL_PORT = 8180
NAZAREDB_USERNAME = "NazareDB Username"
NAZAREDB_PASSWORD = "NazareDB Password"

AUTH = "Basic " + base64.b64encode(
f"{NAZAREDB_USERNAME}:{NAZAREDB_PASSWORD}".encode("utf-8")
).decode("utf-8")

TABLE_NAME = "수집 table ID"
COL_NAME = "수집 column name"

def query(sql):
with connect(
f"grpc://{NAZAREDB_FLIGHTSQL_HOST}:{NAZAREDB_FLIGHTSQL_PORT}",
db_kwargs={
DatabaseOptions.AUTHORIZATION_HEADER.value: AUTH,
DatabaseOptions.WITH_MAX_MSG_SIZE.value: "1073741824",
},
autocommit=True,
) as conn:
with conn.cursor() as cur:
cur.execute(sql)
return cur.fetch_arrow_table()

def get_data(
cur_table, cur_column, from_min_delta=10, to_min_delta=5
):
dt = datetime.now(timezone.utc) - timedelta(minutes=from_min_delta)
start = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
end = start + timedelta(minutes=to_min_delta)

sql = f"SELECT timestamp, \"{cur_column}\" FROM \"{cur_table}\" WHERE date='{start.strftime('%Y-%m-%d')}' AND timestamp BETWEEN '{start}' and '{end}' ORDER BY timestamp"
return query(sql)

get_data(TABLE_NAME, COL_NAME)

ADBC API 는 Python 이외에 다른 언어에서도 사용할 수 있으며, 다양한 데이터베이스와 연동하여 데이터를 처리할 수 있습니다. 다른 언어 샘플 코드는 Arrow ADBC를 참조하세요.

REST API

NazareDB는 REST API를 통해 데이터에 접근하고 조작할 수 있습니다.

REST API를 이용한 접근 방법은 HTTP 요청을 통해 NazareDB 데이터에 접근하고자 할 때 유용하며 사용자에게 가장 추천되는 방식입니다. 이 방법은 웹 기반 애플리케이션 개발자에게 친숙하며, 프로그래밍 언어와 플랫폼에 상관없이 쉽게 사용할 수 있습니다.

NazareDB는 REST의 결과 값으로 arrow/stream, json, csv, files 형식을 지원합니다.

  • Arrow Stream: Arrow Stream은 Arrow 데이터를 바이너리 형태로 스트리밍하는 방식으로, Arrow FlightSQL과 유사한 방식으로 데이터를 전송합니다. Arrow Stream은 Arrow FlightSQL과 동일한 성능을 제공하며, 대용량 데이터를 처리하는데 효과적입니다.
  • JSON: JSON은 데이터를 텍스트로 표현하는 방식으로, 가장 일반적이고 쉬운 데이터 교환 형식이지만, 성능이 떨어질 수 있습니다.
  • CSV: CSV는 데이터를 텍스트로 표현하는 방식으로, 가장 일반적이고 쉬운 데이터 교환 형식이지만, 성능이 떨어질 수 있습니다.
  • files: 이 방식은 Parquet raw 파일 들의 S3 Presigned URL을 반환합니다. 사용자는 DuckDB 또는 Pandas와 같은 라이브러리를 사용하여 직접 파일을 다운르드하거나 데이터를 처리할 수 있습니다. 이 방식은 사용하기 까다롭고 복잡하면서 중복데이터 발생할 수 있지만 백엔드를 거치지 않기 때문에 가장 속도가 빠른 방식입니다.

이 샘플 코드는 Python의 requests 라이브러리를 사용하여 REST API를 통해 NazareDB에 접근하는 방법을 보여줍니다.

import io
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Optional

import duckdb
import fsspec
import pyarrow as pa
import pyarrow.csv as pcs
import pyarrow.dataset as pda
import pyarrow.ipc as pi
import pyarrow.json as pj
import requests
from sqlglot import exp, parse_one

NAZAREDB_LAYOUT = "arrow/stream" # arrow/stream, json, csv, files
NAZAREDB_REST_URL = "NazareDB REST URL endpoint/{NAZAREDB_LAYOUT}"
NAZAREDB_USERNAME = "NazareDB Username"
NAZAREDB_PASSWORD = "NazareDB Password"
AUTH = (NAZAREDB_USERNAME, NAZAREDB_PASSWORD)
HEADERS = {"Content-Type": "application/json", "Accept-Encoding": "gzip"}
COMPRESSION = "gzip"

TABLE_NAME = "수집 table ID"
COL_NAME = "수집 column name"

def query(
url: str,
auth: tuple,
headers: dict,
sql: str,
):
res = requests.post(
url,
data=json.dumps({"sql": sql, "compression": "gzip"}),
auth=auth,
headers=headers,
)

if res.status_code != 200:
print(f"Query Failed: {res.reason}, {res.content}")
raise RuntimeError(f"Query Failed: {res}")

print(f"response size: {len(res.content)/1024/1024:4f}MB")
print(json.dumps(dict(res.headers), indent=2))

layout = os.path.basename(url)
if layout == "stream":
with pi.open_stream(res.content) as reader:
return pa.Table.from_batches(reader)
elif layout == "json":
return pj.read_json(io.BytesIO(res.content))

elif layout == "csv":
return pcs.read_csv(io.BytesIO(res.content))
elif layout == "files":
table0 = pda.dataset(
source=json.loads(res.text),
format="parquet",
filesystem=fsspec.filesystem("http"),
)
for table in parse_one(sql).find_all(exp.Table):
sql = sql.replace(str(table), table0)

conn = duckdb.connect()
return conn.execute(sql).arrow()


def get_data(cur_table, cur_column, from_min_delta=10, to_min_delta=5):
print(f"Getting data of {cur_column} from {cur_table}....")

dt = datetime.now(timezone.utc) - timedelta(minutes=from_min_delta)
start = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
end = start + timedelta(minutes=to_min_delta)
sql = f"SELECT timestamp, \"{cur_column}\" FROM \"{cur_table}\" WHERE date='{start.strftime('%Y-%m-%d')}' AND timestamp BETWEEN '{start}' and '{end}' ORDER BY timestamp"

ret = query(NAZAREDB_REST_URL, AUTH, HEADERS, sql)
print(ret)

get_data(TABLE_NAME, COL_NAME)

DB-API (PostgreSQL)

NazareDB는 Legacy 어플리케이션 지원을 위하여 PostgreSQL 프로토콜을 지원합니다. 따라서 기존 PostgreSQL 접근 방식으로 NazareDB에 접근할 수 있습니다. 이 방법은 기존 JDBCDB-API에 친숙한 사용자에게 적합합니다. 다만 PostgreSQL 호환 프로토콜을 사용하는 경우, NazareDB의 고성능 데이터 전송 프로토콜인 Arrow FlightSQL을 사용하는 것보다 성능이 떨어질 수 있으며, 100% 호환되지 않는 기능이 있을 수 있습니다.

이 샘플 코드는 psycopg2 라이브러리를 사용하여 DB-API Python 표준 데이터베이스 인터페이스로 NazareDB에 접근하는 방법을 보여줍니다. psycopg2 라이브러리 외에도 다양한 PostgreSQL 라이브러리가 있으며, 자세한 내용은 PostgreSQL Wiki를 참조하세요.

import os
from datetime import datetime, timedelta, timezone
from psycopg import connect

NAZAREDB_POSTGRESQL_HOST = "NazareDB PostgreSQL Host"
NAZAREDB_POSTGRESQL_PORT = 5432
NAZAREDB_USERNAME = "NazareDB Username"
NAZAREDB_PASSWORD = "NazareDB Password"

TABLE_NAME = "수집 table ID"
COL_NAME = "수집 column name"

def query(sql):
with connect(
f"host={NAZAREDB_POSTGRESQL_HOST} port={NAZAREDB_POSTGRESQL_PORT} user={NAZAREDB_USERNAME}"
) as conn:
with conn.cursor() as cur:
cur.execute(sql)
return cur.fetchall()


def get_data(
cur_table, cur_column, from_min_delta=10, to_min_delta=5
):
dt = datetime.now(timezone.utc) - timedelta(minutes=from_min_delta)
start = datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
end = start + timedelta(minutes=to_min_delta)

sql = f"SELECT timestamp, \"{cur_column}\" FROM \"{cur_table}\" WHERE date='{start.strftime('%Y-%m-%d')}' AND timestamp BETWEEN '{start}' and '{end}' ORDER BY timestamp"
return query(sql)

get_data(TABLE_NAME, COL_NAME)

지금까지 NazareDB에 접근하는 다양한 방법을 Python 샘플 코드로 소개했습니다. 각 방법은 사용자의 환경과 필요에 따라 선택하여 사용하시기 바랍니다.

다음 단계로는 NazareDB가 제공하는 다양한 SQL 문법에 대해 알아보겠습니다.