본문 바로가기
트레이딩/암호화폐

암호화폐 실시간 틱 데이터 수신 코딩 방법 . 파이썬

by i.got.it 2024. 9. 7.

 

 

 

 

개요 

암호화폐 거래소 바이비트 API 통신 위한 공식 파이썬 패키지 pybit 이용하여 파이썬에서  실시간 틱 데이터 수신 위한 가장 쉬운 기본적인 형태에서 출발하여 점진적으로 더 유용한 형식으로 코드 발전시키는 과정  단계별로 모두 정리. 

 

본 글에서의 바이비트 API 버전 : 현재(2024. 08.21) 시점  최신 버전 API V5 

 

 

사전 필수 셋팅 

- 파이썬 개발환경 구축 되어있어야 함. 구축예 :  https://igotit.tistory.com/5761

- 파이썬에 pybit 설치되어있어야 함.  설치구문 :  pip install pybit 

 

 

 

사전 필수 개념 이해

 

- 본 글에서 pybit 이용하여 실시간 틱 데이터 수신하는 것은  bybit 거래소 API v5 의 웹소켓 Trade 를  pybit 으로 편리하게 접근하는 것. 

 

-  API v5 의 웹소켓 Trade  의 subscribe , response 이해하고 있어야 함. 

 

 

코드1. 

- 바이비트 리니어 BTCUSDT 1개 종목의 실시간 틱 수신하면서 출력하기.

 

 

from pybit.unified_trading import WebSocket
from time import sleep
# 메시지 핸들러 함수 정의
def handle_message(message):
print(message)
def main():
# WebSocket 객체 생성
ws = WebSocket(
testnet=False, # 실제 거래가 아닌 테스트넷을 사용할 경우 True로 설정
channel_type="linear" # 리니어 채널 설정
)
# BTCUSDT 거래 데이터 구독
ws.trade_stream(
symbol="BTCUSDT", # 구독할 심볼 설정
callback=handle_message # 데이터 수신 시 호출될 콜백 함수
)
# 데이터 수신 대기
try:
while True:
sleep(1) # 1초마다 반복
except KeyboardInterrupt: # CTRL C
print("Interrupted by user")
if __name__ == "__main__":
main()
view raw pybit_tick_1.py hosted with ❤ by GitHub

 

 

코드1 실행결과

- 위 코드 실행하면 터미널창에 실시간으로 틱 데이터 표시된다. 

- 서버에서 송신한 message 를 그대로 출력한것. 

 

 

 

코드2.  코드1에서 필요한 키 "data" 추출 

코드1에서 수신한 데이터를 보면 우리가 필요한 것은 키 가  'data' 부분이며 리스트 형식이어서 1회 수신당 여러개의 틱 데이터가 있을 수 있다. 이 부분을 분리 추출하자. 

코드1에서 의 함수 def handle_message 내부만 수정하면된다. 

 

# 메시지 핸들러 함수 정의
def handle_message(message):
if 'data' in message:
data = message['data']
for tick_data in data:
print("Received Tick Data:", tick_data)
else:
print("Received Message without 'data' field:", message)

 

 

코드2 실행결과 

- 키 "data" 의 것을 누락없이 모두 확보하여 출력중. 

 

 

 

코드3. 키 "data" 내의 요소 개별적으로  접근하기 좋게하기 

 

앞의 코드 2에서는 우리가 활용할 데이터 요소 개별적으로는 접근하기 좋지 않은 형식이므로, 우리는 자료형 Dictionary 로 저장하여 이후 활용하기 편하게 한다. 

 

 

- 주의 :  코드3과 4의 의  tick_dict 의 키 volume_quote 는 오류임. price 로 해야함. 이후 코드 5에서 모두 수정함. 

 

 

수정된  def handle_message 

 

# Tick 데이터 형식 정의
tick_dict = {
'timestamp_ms': int, # 밀리초 단위의 타임스탬프
'symbol': str, # 거래 심볼
'side': str, # 매수/매도 측 (예: "Buy", "Sell")
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리)
'volume_quote': str, # 기준 통화의 거래량 (문자열로 처리)
'block_trade': str # 블록 거래 여부 (문자열로 처리)
}
# 메시지 핸들러 함수 정의
def handle_message(message):
if 'data' in message:
data = message['data']
for tick_data in data:
# 필요한 항목만 추출하여 dictionary로 정리
tick_info = {
'timestamp_ms': int(tick_data.get('T', 0)), # 정수형
'symbol': str(tick_data.get('s', '')), # 문자열
'side': str(tick_data.get('S', '')), # 문자열
'volume_base': str(tick_data.get('v', '')), # 문자열
'volume_quote': str(tick_data.get('p', '')), # 문자열
'block_trade': str(tick_data.get('BT', '')) # 문자열
}
# 출력 또는 이후 처리
print("Processed Tick Data:", tick_info)
else:
print("Received Message without 'data' field:", message)

 

코드3. 실행결과 

- 이제   timestamp_ms, symbol 등으로 데이터 접근 가능하므로 편리해졌다. 

 

 

 

 

코드 4. 여러 종목 실시간 시세 받기 

 

- 코드 3까지는 단일 종목 요청한 예였는데 2개이상 동시에 요청하고 수신처리하는 코드예. 

 

ws.trade_stream 의 symbol 에 여러종목 요청가능하다. 



    # 다수의 거래 데이터 구독
    symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"]
    ws.trade_stream(
        symbol=symbols,    # 구독할 심볼 설정
        callback=handle_message  # 데이터 수신 시 호출될 콜백 함수
    )

 

 

 전체코드. 

from pybit.unified_trading import WebSocket
from time import sleep
# Tick 데이터 형식 정의
tick_dict = {
'timestamp_ms': int, # 밀리초 단위의 타임스탬프
'symbol': str, # 거래 심볼
'side': str, # 매수/매도 측 (예: "Buy", "Sell")
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리)
'volume_quote': str, # 기준 통화의 거래량 (문자열로 처리)
'block_trade': str # 블록 거래 여부 (문자열로 처리)
}
# 메시지 핸들러 함수 정의
def handle_message(message):
if 'data' in message:
data = message['data']
for tick_data in data:
# 필요한 항목만 추출하여 dictionary로 정리
tick_info = {
'timestamp_ms': int(tick_data.get('T', 0)), # 정수형
'symbol': str(tick_data.get('s', '')), # 문자열
'side': str(tick_data.get('S', '')), # 문자열
'volume_base': str(tick_data.get('v', '')), # 문자열
'volume_quote': str(tick_data.get('p', '')), # 문자열
'block_trade': str(tick_data.get('BT', '')) # 문자열
}
# 출력 또는 이후 처리
print("Processed Tick Data:", tick_info)
else:
print("Received Message without 'data' field:", message)
def main():
# WebSocket 객체 생성
ws = WebSocket(
testnet=False, # 실제 거래가 아닌 테스트넷을 사용할 경우 True로 설정
channel_type="linear" # 리니어 채널 설정
)
# 다수의 거래 데이터 구독
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"]
ws.trade_stream(
symbol=symbols, # 구독할 심볼 설정
callback=handle_message # 데이터 수신 시 호출될 콜백 함수
)
# 데이터 수신 대기
try:
while True:
sleep(1) # 1초마다 반복
except KeyboardInterrupt: # CTRL C
print("Interrupted by user")
if __name__ == "__main__":
main()
view raw pybit_tick_4.py hosted with ❤ by GitHub

 

 

코드 4. 실행결과 

 

 

 

코드 5. 여러 종목 개별적으로 접근하기 위한 dictionary 변수 활용. 

 

symbol_tick_last 는 키는 symbol 명으로하고 최신 1개의 틱데이터를 저장하는 파이썬 딕셔너리 변수이다. 

 

오류 수정및 자료형 변경.

- 앞의 코드3, 4 에서 tick_info  에 잘못 기록한 volume_quote   대신  price 로 수정하였고,  side 를 정수로 , block_trade 는 bool 로 자료형 변경함. 

 

 

from pybit.unified_trading import WebSocket
from time import sleep
# multi symbol
symbols_to_get_tick = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"]
category = "linear"
# 모든 심볼의 최신 틱 데이터를 저장할 dictionary. key=symbol name
symbol_tick_last = {}
# Tick 데이터 형식 정의 (symbol 제외)
tick_dict = {
'timestamp_ms': int, # 밀리초 단위의 타임스탬프
'price' : str, # 시장가 체결가격
'side': int, # 매수/매도 측 (Buy: +1, Sell: -1로 처리)
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리)
'block_trade': bool # 블록 거래 여부 (True/False로 처리)
}
# 메시지 핸들러 함수 정의
def handle_message(message):
# 틱 데이터 처리
if 'data' in message:
data = message['data']
for tick_data in data:
symbol = tick_data.get('s', '')
# side를 정수형으로 변환: Buy는 +1, Sell은 -1
side = tick_data.get('S', '')
side_int = +1 if side == 'Buy' else -1 if side == 'Sell' else 0
# tick_data를 tick_dict 형식으로 변환 (symbol 제외)
tick_info = {
'timestamp_ms': int(tick_data.get('T', 0)), # 타임스탬프는 정수형
'price' : str(tick_data.get('p', '')),
'side': side_int, # 매수/매도 측은 정수형
'volume_base': str(tick_data.get('v', '')), # 거래량(기본 통화)은 문자열
'block_trade': bool(tick_data.get('BT', False)) # 블록 거래 여부는 bool로 변환
}
# symbol_tick_last에 최신 데이터만 저장 (symbol 없이 저장)
symbol_tick_last[symbol] = tick_info
# 출력 또는 이후 처리
print(f"Updated Tick Data for {symbol}: {tick_info}")
def main():
# WebSocket 객체 생성
ws = WebSocket(
testnet=False, #
channel_type=category # 리니어 채널 설정
)
ws.trade_stream(
symbol=symbols_to_get_tick, # 구독할 심볼 설정
callback=handle_message # 데이터 수신 시 호출될 콜백 함수
)
# 데이터 수신 대기
try:
while True:
sleep(1) # 1초마다 반복
except KeyboardInterrupt:
print("Interrupted by user")
finally:
ws.exit()
if __name__ == "__main__":
main()
view raw pybit_tick_5.py hosted with ❤ by GitHub

 

 

코드 5. 실행결과 

 

 

 

코드6. 큐 , 스레드 추가 

- websocket 내부적으로 기본적으로 큐버퍼가 구현되어있기에 필수는 아니지만 콜백함수인 handle_message 내에서 수신된 데이터를 que 버퍼에 기록하고 즉시 리턴하고 이후 처리는 que 읽고 처리하는 별도의 스레드에서 수행되게 하였다. 

 

 

from pybit.unified_trading import WebSocket
from time import sleep
import queue
import threading
# multi symbol
symbols_to_get_tick = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "SUIUSDT", "XRPUSDT"]
category = "linear"
# 모든 심볼의 최신 틱 데이터를 저장할 dictionary. key=symbol name
symbol_tick_last = {}
# Tick 데이터 형식 정의 (symbol 제외)
tick_dict = {
'timestamp_ms': int, # 밀리초 단위의 타임스탬프
'price': str, # 시장가 체결가격
'side': int, # 매수/매도 측 (Buy: +1, Sell: -1로 처리)
'volume_base': str, # 기본 통화의 거래량 (문자열로 처리)
'block_trade': bool # 블록 거래 여부 (True/False로 처리)
}
# 메시지 큐
message_queue = queue.Queue()
def handle_message(message):
"""WebSocket으로부터 수신된 메시지를 큐에 추가하는 함수."""
if 'data' in message:
data = message['data']
message_queue.put(data) # 메시지를 큐에 넣음
def process_messages():
"""큐에 쌓인 메시지를 처리하는 함수."""
while True:
try:
data = message_queue.get(timeout=1) # 큐에서 메시지를 가져옴
if data:
for tick_data in data:
symbol = tick_data.get('s', '')
# side를 정수형으로 변환: Buy는 +1, Sell은 -1
side = tick_data.get('S', '')
side_int = +1 if side == 'Buy' else -1 if side == 'Sell' else 0
# tick_data를 tick_dict 형식으로 변환 (symbol 제외)
tick_info = {
'timestamp_ms': int(tick_data.get('T', 0)), # 타임스탬프는 정수형
'price': str(tick_data.get('p', '')),
'side': side_int, # 매수/매도 측은 정수형
'volume_base': str(tick_data.get('v', '')), # 거래량(기본 통화)은 문자열
'block_trade': bool(tick_data.get('BT', False)) # 블록 거래 여부는 bool로 변환
}
# symbol_tick_last에 최신 데이터만 저장 (symbol 없이 저장)
symbol_tick_last[symbol] = tick_info
# 출력 또는 이후 처리
print(f"Updated Tick Data for {symbol}: {tick_info}")
except queue.Empty:
continue
def main():
# 메시지 처리 스레드 시작
processing_thread = threading.Thread(target=process_messages, daemon=True)
processing_thread.start()
# WebSocket 객체 생성
ws = WebSocket(
testnet=False, #
channel_type=category # 리니어 채널 설정
)
ws.trade_stream(
symbol=symbols_to_get_tick, # 구독할 심볼 설정
callback=handle_message # 데이터 수신 시 호출될 콜백 함수
)
# 데이터 수신 대기
try:
while True:
sleep(1) # 1초마다 반복
except KeyboardInterrupt:
print("Interrupted by user")
finally:
ws.close()
if __name__ == "__main__":
main()

 

 

 

 

 

 

 

이후 코딩 작업에서는 symbol_tick_last를 사용하면 각 심볼에 대한 최신 틱 데이터를 쉽게 저장하고 관리가능해졌다. 

 

- 최신 데이터 접근: symbol_tick_last를 통해 각 심볼의 최신 틱 데이터에 빠르게 접근할 수 있다. 

 

- 데이터 업데이트: 새로운 틱 데이터가 도착할 때마다 해당 심볼의 데이터만 갱신되므로, 메모리 사용과 데이터 관리가 효율적.

- 후속 처리: 이 구조를 사용하면 데이터베이스 저장, 수치 연산, 실시간 분석 등 이후 작업을 위해 데이터를 손쉽게 활용할 수 있다. 

- 확장성: 더 많은 심볼을 추가하거나 다른 데이터 처리 로직을 추가하는 것이 쉽다. 

 

symbol_tick_last를 기반으로 다음과 같은 작업을 수행할 수 있다. 

 

- DB 저장: 최신 틱 데이터를 데이터베이스에 저장할 때, symbol_tick_last에서 필요한 정보를 추출하여 저장할 수 있다.

 

- 수치 연산: 최신 틱 데이터를 이용해 수치 연산을 수행할 때, symbol_tick_last를 사용하여 각 심볼의 최신 데이터에 직접 접근할 수 있다. 

 

- 알림 및 트리거: 특정 조건이 충족되면 알림을 보내거나 특정 작업을 트리거하는 데 유용.

 

 

 

 

 

 

 

 

 

 


첫 등록 :  2024.09.07

최종 수정 : 

단축 주소 : https://igotit.tistory.com/5831


 



비트코인




암호화폐       외환/나스닥/골드         암호화폐/외환/나스닥/골드
     
현물 |선물 인버스 |선물 USDT       전략매니저(카피트레이딩)         프랍 트레이더 온라인 지원가능. MT4,MT5