Python/파이썬과 주식, 코인

파이썬 - 코인원 - 웹 소켓 데모

컴닥 2025. 4. 12. 08:24
반응형

Python, CoinOne, Websocket

코인원 개발자 센터에 웹 소켓 예제 코드가 없기에 직접 작성해 보았다. 

import asyncio
import json
from pprint import pprint

import websockets


async def subscribe(channel, target_currency="BTC", quote_currency="KRW"):
    """https://docs.coinone.co.kr/reference/public-websocket-1"""
    uri = "wss://stream.coinone.co.kr"
    request = {
        "request_type": "SUBSCRIBE",
        "channel": channel,  # TICKER, TRADE, ORDERBOOK
        "topic": {"quote_currency": quote_currency, "target_currency": target_currency},
    }
    request = json.dumps(request)
    async with websockets.connect(uri) as websocket:
        await websocket.send(request)
        while True:
            response = await websocket.recv()
            response_json = json.loads(response)
            pprint(response_json)
            if response_json["response_type"] == "ERROR":
                print("error_code", response_json["error_code"])


async def gather():
    await asyncio.gather(
        subscribe("TRADE"),
        subscribe("TICKER"),
        subscribe("ORDERBOOK"),
    )


if __name__ == '__main__':
    asyncio.run(gather())

threading도 같이 알아두자.    

import threading

threading.Thread(target=asyncio.run, args=(gather(),), daemon=True).start()
반응형