본문 바로가기
트레이딩

암호화폐 과거 캔들 데이터 DB 에 저장하기 . 파이썬 코드

by i.got.it 2024. 9. 1.

 

 

 

 

개요  

바이비트 거래 종목의 과거 캔들 데이터 확보하여 DB 에 저장하는 파이썬 코드 작성 상세. 

 

 

사전필수 

 

이전 파이썬에서 구현 완료했던 바이빗 거래소로 정보 요청하여 확보된 과거 캔들 데이터를 csv 파일로 저장하는 코드 (상세보기 : https://igotit.tistory.com/5766  )에서 파일 저장하는 기능 대신 DB 로 저장하는 부분 구현한다.

이전 작성 글에서는 바이비트 거래소 api 에 접근하는 기초적인 부분부터 단계적으로 구현 진행하는 스텝 바이 스텝 방식으로 설명되었기에 본 글에서 이 부분 설명 생략하고 바로 완전하게 작동하는 결론적인 코드 정리해둔다. 

 

 

본 예에서 사용되는 SQLite DB 스키마는 https://igotit.tistory.com/5809  에서 정의된 것과 동일하게 한다. 

 

 

DB 저장 파이썬 코드 1. 

 

아래 코드는 이전 작성한 https://igotit.tistory.com/5766  의 코드 5 에서의 CSV 파일로 저장하는 기능을 제거하고 SQLite DB 로 저장하는 완전한 코드. 

 

DB 파일명은 바이빗 거래소의 category , symbol 정보로 자동 생성된다. 

예 : CyMD_Candle_bybit_linear_BTCUSDT.sqlite 

 

DB 내의 테이블명은 interval 에 기록한 내용에 준하여 이름 자동 생성되고  (예 : 1분봉이면 T_M1), 해당테이블이 없는 경우 자동 생성하고 데이터 기록한다. 

예 : T_M1

 

이미 테이블에 기록된 timestamp_s 가 있는 경우에는 기록처리 하지 않는다. 

 

from pybit.unified_trading import HTTP
from datetime import datetime, timezone
import sqlite3
import os # 파일 존재 여부 확인 위해 추가
from enum import Enum
class KlineInterval(Enum):
"""Kline 간격 문자열 형식으로 정의하고, 밀리초 단위 시간 간격 계산하는 Enum 클래스"""
M1 = "1"
M3 = "3"
M5 = "5"
M15 = "15"
M30 = "30"
H1 = "60"
H2 = "120"
H4 = "240"
H6 = "360"
H12 = "720"
D1 = "1440"
W1 = "10080"
def __str__(self):
return self.value
def to_milliseconds(self):
intervals_in_minutes = {
KlineInterval.M1: 1,
KlineInterval.M3: 3,
KlineInterval.M5: 5,
KlineInterval.M15: 15,
KlineInterval.M30: 30,
KlineInterval.H1: 60,
KlineInterval.H2: 120,
KlineInterval.H4: 240,
KlineInterval.H6: 360,
KlineInterval.H12: 720,
KlineInterval.D1: 1440,
KlineInterval.W1: 10080
}
return intervals_in_minutes[self] * 60 * 1000
def utc_datetime_to_epoch_ms(datetime_str, time_format="%Y.%m.%d_%H.%M.%S"):
dt_utc = datetime.strptime(datetime_str, time_format).replace(tzinfo=timezone.utc)
return int(dt_utc.timestamp()) * 1000
def cy_get_kline(category, symbol, interval: KlineInterval, start_epoch_ms, end_epoch_ms):
response = session.get_kline(
category=category,
symbol=symbol,
interval=str(interval),
start=start_epoch_ms,
end=end_epoch_ms,
limit=1000
)
if response["retCode"] != 0:
raise Exception(f"API Error: {response['retMsg']}")
return response["result"]["list"]
def create_database_if_not_exists(db_name):
"""DB 파일이 존재하지 않으면 생성하는 함수"""
if not os.path.exists(db_name):
# DB 파일이 없으면 SQLite 연결 시 자동 생성됨
print(f"Database '{db_name}' does not exist. Creating new database...")
conn = sqlite3.connect(db_name)
conn.close()
def create_table_if_not_exists(db_name, table_name):
"""테이블이 존재하지 않으면 생성하는 함수"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
timestamp_s INTEGER PRIMARY KEY,
open TEXT,
high TEXT,
low TEXT,
close TEXT,
volume_base TEXT,
volume_quote TEXT
)
''')
conn.commit()
conn.close()
def save_to_sqlite(db_name, table_name, data):
# SQLite에 연결
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# 데이터 삽입
for row in data:
# timestamp를 밀리초에서 초로 변환하여 저장
timestamp_s = int(row[0]) // 1000
cursor.execute(f'''
INSERT OR IGNORE INTO {table_name} (timestamp_s, open, high, low, close, volume_base, volume_quote)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (timestamp_s, row[1], row[2], row[3], row[4], row[5], row[6]))
# 변경 사항 저장 및 연결 종료
conn.commit()
conn.close()
def generate_db_filename(category, symbol):
"""카테고리와 심볼 사용하여 SQLite 데이터베이스 파일명 자동 생성"""
return f"CyMD_Candle_bybit_{category}_{symbol}.sqlite"
def generate_table_name(interval: KlineInterval):
"""KlineInterval 사용하여 테이블명 자동 생성"""
return f"T_{interval.name}"
def main():
global session
session = HTTP(testnet=False)
# 데이터 요청할 심볼 및 시간 간격 지정.
category = "spot"
symbol = "BTCUSDT"
interval = KlineInterval.M1
# 시간 범위 설정
utc_datetime_start_str = "2024.08.01_00.00.00"
utc_datetime_end_str = "2024.08.21_00.00.00"
start_ms = utc_datetime_to_epoch_ms(utc_datetime_start_str)
end_ms = utc_datetime_to_epoch_ms(utc_datetime_end_str)
total_number_of_candle_mustbe = 1 + (end_ms - start_ms) // interval.to_milliseconds()
# SQLite DB 파일명 및 테이블명 설정
db_name = generate_db_filename(category, symbol) # 자동으로 DB 파일명 생성
table_name = generate_table_name(interval) # 자동으로 테이블명 생성
# DB 파일이 없으면 생성
create_database_if_not_exists(db_name)
# 테이블이 없으면 생성
create_table_if_not_exists(db_name, table_name)
total_number_of_candle_acquired = 0
while True:
try:
candle_list = cy_get_kline(category, symbol, interval, start_ms, end_ms)
except Exception as e:
print(f"Stop Processing. {str(e)}")
break
if not candle_list:
print("Complete Candle Acquisition. the number of candle 0")
break
total_number_of_candle_acquired += len(candle_list)
print(f"First candle: {candle_list[0]}")
print(f"Last candle: {candle_list[-1]}")
# 데이터를 SQLite DB에 저장
save_to_sqlite(db_name, table_name, candle_list)
oldest_candle_epoch_ms = int(candle_list[-1][0])
end_ms = oldest_candle_epoch_ms - interval.to_milliseconds()
if end_ms < start_ms:
print("Completed Candle Acquisition")
print(f"Total number of acquired candles = {total_number_of_candle_acquired}")
print(f"The number of candle must be {total_number_of_candle_mustbe}")
print(f"The missing number of candle = {total_number_of_candle_mustbe - total_number_of_candle_acquired}")
break
if __name__ == "__main__":
main()

 

위 코드 실행하면 DB 테이블에 완전하게 저장 되고, DB4S 에서 파일 오픈하여 Browse Data 에서 기록된 데이터들 확인 가능하다.  

bybit spot 1분봉 SQLite DB 에 저장

 

 

 

 

코드 2. 캔들 데이터 누락 구간 점검하여 출력하기  추가. 

기능 추가 1. 상기 코드 1에  테이블에 저장된 캔들 데이터의 첫시각과 마지막 시각 사이에 누락된 캔들이 있는 경우 해당 구간 출력하는 기능 추가.

 

기능 추가2. 테이블에 기록된 첫 시각과 마지막 시각 출력 기능 추가. 

 

 

from pybit.unified_trading import HTTP
from datetime import datetime, timezone
import sqlite3
import os # 파일 존재 여부 확인 위해 추가
from enum import Enum
class KlineInterval(Enum):
"""Kline 간격 문자열 형식으로 정의하고, 밀리초 단위 시간 간격 계산하는 Enum 클래스"""
M1 = "1"
M3 = "3"
M5 = "5"
M15 = "15"
M30 = "30"
H1 = "60"
H2 = "120"
H4 = "240"
H6 = "360"
H12 = "720"
D1 = "1440"
W1 = "10080"
def __str__(self):
return self.value
def to_milliseconds(self):
intervals_in_minutes = {
KlineInterval.M1: 1,
KlineInterval.M3: 3,
KlineInterval.M5: 5,
KlineInterval.M15: 15,
KlineInterval.M30: 30,
KlineInterval.H1: 60,
KlineInterval.H2: 120,
KlineInterval.H4: 240,
KlineInterval.H6: 360,
KlineInterval.H12: 720,
KlineInterval.D1: 1440,
KlineInterval.W1: 10080
}
return intervals_in_minutes[self] * 60 * 1000
def utc_datetime_to_epoch_ms(datetime_str, time_format="%Y.%m.%d_%H.%M.%S"):
dt_utc = datetime.strptime(datetime_str, time_format).replace(tzinfo=timezone.utc)
return int(dt_utc.timestamp()) * 1000
def cy_get_kline(category, symbol, interval: KlineInterval, start_epoch_ms, end_epoch_ms):
response = session.get_kline(
category=category,
symbol=symbol,
interval=str(interval),
start=start_epoch_ms,
end=end_epoch_ms,
limit=1000
)
if response["retCode"] != 0:
raise Exception(f"API Error: {response['retMsg']}")
return response["result"]["list"]
def create_database_if_not_exists(db_name):
"""DB 파일이 존재하지 않으면 생성하는 함수"""
if not os.path.exists(db_name):
# DB 파일이 없으면 SQLite 연결 시 자동 생성됨
print(f"Database '{db_name}' does not exist. Creating new database...")
conn = sqlite3.connect(db_name)
conn.close()
def create_table_if_not_exists(db_name, table_name):
"""테이블이 존재하지 않으면 생성하는 함수"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
timestamp_s INTEGER PRIMARY KEY,
open TEXT,
high TEXT,
low TEXT,
close TEXT,
volume_base TEXT,
volume_quote TEXT
)
''')
conn.commit()
conn.close()
def save_to_sqlite(db_name, table_name, data):
# SQLite에 연결
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# 데이터 삽입
for row in data:
# timestamp를 밀리초에서 초로 변환하여 저장
timestamp_s = int(row[0]) // 1000
cursor.execute(f'''
INSERT OR IGNORE INTO {table_name} (timestamp_s, open, high, low, close, volume_base, volume_quote)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (timestamp_s, row[1], row[2], row[3], row[4], row[5], row[6]))
# 변경 사항 저장 및 연결 종료
conn.commit()
conn.close()
def generate_db_filename(category, symbol):
"""카테고리와 심볼 사용하여 SQLite 데이터베이스 파일명 자동 생성"""
return f"CyMD_Candle_bybit_{category}_{symbol}.sqlite"
def generate_table_name(interval: KlineInterval):
"""KlineInterval 사용하여 테이블명 자동 생성"""
return f"T_{interval.name}"
def timestamp_to_utc_datetime(timestamp):
"""Convert a Unix timestamp to a timezone-aware UTC datetime object."""
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
def find_missing_candle_intervals(db_name, table_name, interval: KlineInterval):
"""테이블에서 timestamp_s 값 사이에 누락된 구간을 찾아 출력하는 함수"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# 모든 timestamp_s를 정렬하여 가져오기
cursor.execute(f"SELECT timestamp_s FROM {table_name} ORDER BY timestamp_s")
timestamps = cursor.fetchall()
if not timestamps:
print("No data found in the table.")
conn.close()
return
interval_seconds = interval.to_milliseconds() // 1000
previous_timestamp = timestamps[0][0]
for current_timestamp in timestamps[1:]:
current_timestamp = current_timestamp[0]
if current_timestamp - previous_timestamp > interval_seconds:
# 누락된 구간의 시작과 끝 출력
missing_start = previous_timestamp + interval_seconds
missing_end = current_timestamp - interval_seconds
print(f"Missing interval: {timestamp_to_utc_datetime(missing_start).strftime('%Y-%m-%d %H:%M:%S')} to {timestamp_to_utc_datetime(missing_end).strftime('%Y-%m-%d %H:%M:%S')}")
previous_timestamp = current_timestamp
conn.close()
def print_table_time_range(db_name, table_name):
"""테이블에서 첫 시각과 마지막 시각을 출력하는 함수"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# 가장 오래된 timestamp_s 값 가져오기
cursor.execute(f"SELECT MIN(timestamp_s) FROM {table_name}")
min_timestamp = cursor.fetchone()[0]
# 가장 최신 timestamp_s 값 가져오기
cursor.execute(f"SELECT MAX(timestamp_s) FROM {table_name}")
max_timestamp = cursor.fetchone()[0]
conn.close()
if min_timestamp is None or max_timestamp is None:
print("Table is empty or timestamps are missing.")
return
# UTC 시간으로 변환
min_datetime = timestamp_to_utc_datetime(min_timestamp)
max_datetime = timestamp_to_utc_datetime(max_timestamp)
print(f"First recorded time: {min_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Last recorded time: {max_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
def main():
global session
session = HTTP(testnet=False)
# 데이터 요청할 심볼 및 시간 간격 지정.
category = "spot"
symbol = "BTCUSDT"
interval = KlineInterval.M1
# 시간 범위 설정
utc_datetime_start_str = "2024.08.01_00.00.00"
utc_datetime_end_str = "2024.08.21_00.00.00"
start_ms = utc_datetime_to_epoch_ms(utc_datetime_start_str)
end_ms = utc_datetime_to_epoch_ms(utc_datetime_end_str)
total_number_of_candle_mustbe = 1 + (end_ms - start_ms) // interval.to_milliseconds()
# SQLite DB 파일명 및 테이블명 설정
db_name = generate_db_filename(category, symbol) # 자동으로 DB 파일명 생성
table_name = generate_table_name(interval) # 자동으로 테이블명 생성
# DB 파일이 없으면 생성
create_database_if_not_exists(db_name)
# 테이블이 없으면 생성
create_table_if_not_exists(db_name, table_name)
total_number_of_candle_acquired = 0
while True:
try:
candle_list = cy_get_kline(category, symbol, interval, start_ms, end_ms)
except Exception as e:
print(f"Stop Processing. {str(e)}")
break
if not candle_list:
print("Complete Candle Acquisition. the number of candle 0")
break
total_number_of_candle_acquired += len(candle_list)
print(f"First candle: {candle_list[0]}")
print(f"Last candle: {candle_list[-1]}")
# 데이터를 SQLite DB에 저장
save_to_sqlite(db_name, table_name, candle_list)
oldest_candle_epoch_ms = int(candle_list[-1][0])
end_ms = oldest_candle_epoch_ms - interval.to_milliseconds()
if end_ms < start_ms:
print("Completed Candle Acquisition")
print(f"Total number of acquired candles = {total_number_of_candle_acquired}")
print(f"The number of candle must be {total_number_of_candle_mustbe}")
print(f"The missing number of candle = {total_number_of_candle_mustbe - total_number_of_candle_acquired}")
break
# 캔들 데이터 누락 구간 찾기
find_missing_candle_intervals(db_name, table_name, interval)
# 첫 시각과 마지막 시각 출력
print_table_time_range(db_name, table_name)
if __name__ == "__main__":
main()

 

코드 2. 실행결과 누락 데이터 있는 경우  출력 예. 

- 본 코드 실행 마지막 시점에, 테이블 기록된 내용 점검하여 누락된 캔들이 있는 경우 해당 시구간을 보여준다. 

 

 

상기 누락된 구간이 포함되도록 시간 설정부를 수정하여 다시 실행시키면 해당 구간 데이터 확보하여 DB 에 기록한다. 

 

  # 시간 범위 설정. 위 출력의 누락된 시구간이 포함되도록 여유있게 설정한다. 
    utc_datetime_start_str = "2024.07.14_00.00.00"
    utc_datetime_end_str = "2024.08.01_00.00.00"

 

위 설정으로 먼저 실행시키고 나서, 2번째 누락 구간 포함하는 시간 설정하여 실행시킨다. 

  # 시간 범위 설정
    utc_datetime_start_str = "2024.08.19_00.00.00"
    utc_datetime_end_str = "2024.08.25_00.00.00"

 

 

 

위와 같은 방식으로 DB 저장 처리하면 누락된 데이터 없는 무결성 고신뢰 데이터 품질  달성 된다. 

 

반면, csv 파일 저장 방식에서는 이와 같은 처리가 쉽지 않기 때문에 DB 활용은 필수다. 

 

 

코드2. 테이블에 기록된 첫캔들의 시각과 마지막 시각 출력 모습. 

 

 

 

 

 

DB 파일 크기  확인 해봄 

이 코드로 바이비트 거래소  리니어 USDT   종목의  2022년1월1일부터 2024년 8월 20일 까지(총 2년 8개월) 1분봉, 15분봉, 1시간봉 모두 저장하면 DB 파일 용량 175Mbyte 정도 됨. 

 

 

 

 

주의 . 서버에 데이터 없는 경우도 있음. 

바이비트 거래소의 현물 BTCUSDT 의 2021년 8월 1일 부터 2024년 8월 1일 구간까지  1분봉, 1시간봉은 모두 캔들데이터 확보가능하나, 15분봉의 경우 아래 2개  시 구간이 누락되어있고, 이 시 구간은 서버에 재요청해도 데이터 제공 안해줌. 

 

Missing interval: 2021-12-31 00:15:00 to 2021-12-31 23:45:00
Missing interval: 2022-02-01 02:45:00 to 2022-02-05 06:30:00
First recorded time: 2021-08-01 00:00:00
Last recorded time: 2024-08-01 00:00:00

 

 

 

 

연관 

 

CSV 파일로 저장하는 파이썬 코드 작성 과정 단계별 상세 설명

 

암호화폐 캔들 데이터 확보 코드 . 파이썬 pybit

바이비트 암호화폐 캔들 데이터 처리 코드 . 파이썬 pybit 이용.   개요 암호화폐 거래소 바이비트 API 통신 위한 공식 파이썬 패키지 pybit 이용하여 파이썬에서  캔들 데이터 확보하기 위한 가

igotit.tistory.com

 

 


첫 등록 : 2024.09.01

최종 수정 : 

단축 주소 : https://igotit.tistory.com/5810


 



비트코인




암호화폐       외환/나스닥/골드         암호화폐/외환/나스닥/골드
     
현물 |선물 인버스 |선물 USDT       전략매니저(카피트레이딩)         프랍 트레이더 온라인 지원가능. MT4,MT5