Python/파이썬과 주식, 코인
파이썬 - 코인원 - 웹 소켓 데모
컴닥
2025. 4. 12. 08:24
반응형
Python, CoinOne, Websocket
코인원 개발자 센터에 웹 소켓 예제 코드가 없기에 직접 작성해 보았다.
import asyncio
import json
from pprint import pprint
import websockets
async def subscribe(channel, target_currency="BTC", quote_currency="KRW"):
"""https://docs.coinone.co.kr/reference/public-websocket-1"""
uri = "wss://stream.coinone.co.kr"
request = {
"request_type": "SUBSCRIBE",
"channel": channel, # TICKER, TRADE, ORDERBOOK
"topic": {"quote_currency": quote_currency, "target_currency": target_currency},
}
request = json.dumps(request)
async with websockets.connect(uri) as websocket:
await websocket.send(request)
while True:
response = await websocket.recv()
response_json = json.loads(response)
pprint(response_json)
if response_json["response_type"] == "ERROR":
print("error_code", response_json["error_code"])
async def gather():
await asyncio.gather(
subscribe("TRADE"),
subscribe("TICKER"),
subscribe("ORDERBOOK"),
)
if __name__ == '__main__':
asyncio.run(gather())
threading도 같이 알아두자.
import threading
threading.Thread(target=asyncio.run, args=(gather(),), daemon=True).start()
반응형