シストレどうですか

  Algorithmic Trading for Dummies

FXボット とりあえず動かしてみた編 その4 ー エラーや通知機能をまとめてみた

仮想通貨用のPublic APIを試していたり、とりあえず作成したボットをテストしている間に前回からだいぶ時間がたってしまいました。
とりあえず作成したボットに少しずつ機能を足してきましたが、だんだんとごちゃごちゃしてきた感じがします。そこでほとんどがエンドポイント回りで発生するであろうエラーやそれに関連する通知機能を一つにまとめてみます。

jantzen.hatenablog.com


エンドポイントの呼び出しをまとめる


各エンドポイントによって引数の違いはありますが、APIの呼び出す時は基本どのAPIでも同じような構文ですのでひとつにまとめる方法を考えます。
いままでは以下の例のように、それぞれ必要な引数をセットした後にエンドポイントを呼び出すだけですから、

from oandapyV20 import API
import oandapyV20.endpoints.pricing as pricing
  
(省略)
  
    api = API(access_token= TOKEN)
    params = {
            "instruments": INSTRUMENT
          }
          
    r = pricing.PricingInfo(accountID=ID, params=params)  
    rv = api.request(r)

呼び出したいエンドポイントを引数によって使い分けできるような関数があればよいという事になります。

関数の設計

  1. 機能

    概要としては指定されたエンドポイントを呼び出してその結果を返し、エラー時にはそれを通知するような処理を行う。 さらに104など特定のエラーの場合はリトライする機能も加えます。

    ① 上位から受け取った引数で呼び出すエンドポイントを決定し、受けとった引数をそのまま渡す。
    ② エラーがない場合は、サーバーからの戻り値をそのまま返し関数を終了させる。
    ③ 特定のエラーが発生した場合は、そのまま終了せずに任意の回数を任意の間隔でリトライさせる。
    ④ その他のエラーが発生した場合は、その内容を外部へ通知し例外を発生させ関数を終了させる。

    リトライするエラーとしては、このとりあえずなボットを作成して走らせ続けて出たリトライ可能そうなエラーコードを対象とします。 (104, 502, 503)

  2. 引数

    この関数の引数としては、呼び出すエンドポイントの種類とそのエンドポイントに渡す引数が最低限必要な受け取り項目になります。
    ただしエンドポイントで使う引数は種類によって数も内容も変わって来ますので可変長引数(*args, **kwargs)のほうがわかりやすそうです。

  3. 戻り値

    この関数内で呼び出したエンドポイントから得た情報をそのまま戻します。エラーが発生した場合は、returnで戻さず例外を発生させそこにエラーの内容をのせるようにします。

関数の作成

以上の事から、処理内容を具体的に作成してみます。
引数は、エンドポイントの種類を決める引数とそのエンドポイントに渡す引数を分けてみました。

def Endpoint(ep_type, **kwargs): 
  
  #リトライ条件
  RETRY_LOOP = 3 #回
  RETRY_WAIT = 1 #秒
  RETRY_ERROR = [104, 502, 503]
  
  for i in range(RETRY_LOOP): 
    
    try:
      #エンドポイントの呼び出し判定
      if ep_type == "pricing":  #👈最新レート
        r = pricing.PricingInfo(accountID=kwargs["id"], params=kwargs["params"])  
      elif ep_type == "positions":  #👈ポジション保有
        r = positions.PositionDetails(accountID=kwargs["id"], instrument=kwargs["instrument"])
      elif ep_type == "candles":  #👈ろうそく足
        r = instruments.InstrumentsCandles(instrument=kwargs["instrument"], params=kwargs["params"])
      elif ep_type == "orders":  #👈注文
        r = orders.OrderCreate(accountID=kwargs["id"], data=kwargs["data"])
      else:
        message = "Endpoint type error!"
        print(message)
        raise Exception(message)
  
      #エンドポイント呼び出し
      rv = api.request(r)
      return rv
          
    except V20Error as e:
      err_type = "V20Error"
      status = e.code
      contents = e
  
    except Exception as e:
      err_type = "Exception"
      if hasattr(e, 'code'):
        status = e.code
      else:
        status = 999
      contents = e
    
    #リトライ判定
    if status in RETRY_ERROR:
      time.sleep(RETRY_WAIT)
      message = "リトライ" + str(i+1) + "回目 " + str(status)
      print(message)
      Discord(ep_type, err_type, message, contents)
      
    else:
      Discord(ep_type, err_type, str(status), contents)
      message = "エラー!: %s %s %s %s" %(ep_type,err_type,str(status),contents)
      raise Exception(message)
  
  #リトライ失敗
  message = "Endpoint: %s リトライに失敗! (%d)" %(ep_type, status) 
  #message = "Endpoint: " + ep_type + " リトライに失敗! (" + str(status) + ")" 
  raise Exception(message)

説明

  • 引数は呼び出すエンドポイントの種類を判定する引数とそのエンドポイントで使う引数をキーワード引数(**)でもらうようにします。(キーワードはエンドポイント用の名前をそのまま使用)
  • 関数の頭の部分でリトライする場合の回数・間隔および対象のエラーコードを定義しておく。
  • エンドポイントを呼び出して正常に処理が終了した場合は、そこで受け取ったデータをそのまま戻す。
  • 関数内でエラーかエンドポイント呼び出し処理でエラーが発生した場合は、raiseを使って例外を発生させ呼び出し元に知らせる。

問題点

問題点としては、関数を完成させて以降当初出ていたエラー(104,502,503)が最近出なくなってしまったので、エラーが発生した場合上記のどのexcept構文でエラーを拾えるか検証しきれていません。(104はV20Errorでは拾えないことは確認できています。)
このサンプルではひょっとしたらエラーをうまく拾えなくてリトライできない可能性もあります。という訳ですので今後これらのエラーが実際に発生した時点で修正が入る可能性もあります。

関数の呼び出し

次に元のプログラムでエンドポイントの呼び出していた部分を作成した新しい関数に置き換えていきます。
もとになるソースコード前回参照

具体的にはエンドポイントを呼び出している以下のようなところを置き換えていくだけです。

最新レートの取得の部分であれば

    r = pricing.PricingInfo(accountID=ID, params=params)      
    rv = api.request(r)

  rv = Endpoint("pricing", id=ID, params=params)  

のようにあたらしく作成した関数に置き換えればいいので、

それぞれのエンドポイントは

  #最新レートの取得
  rv = Endpoint("pricing", id=ID, params=params)    
  #ポジションの確認処理追加
  rv = Endpoint("positions", id=ID, instrument=INSTRUMENT)  
  #ろうそく足の取得
  rv = Endpoint("candles", instrument=INSTRUMENT, params=params)  
  #発注
  rv = Endpoint("orders", id=ID, data=data)  

のように書き換えていきます。
また前回はレートの取得処理を変更する際にtry~except構文をわざわざ追加しましたが、それも元に戻しました。

テスト

修正が完了しましたので機能の確認をしてみます。
修正したコードのすべては以下の通り。

#外部モジュール
from oandapyV20 import API
import oandapyV20.endpoints.pricing as pricing
import oandapyV20.endpoints.positions as positions   
import oandapyV20.endpoints.instruments as instruments 
import oandapyV20.endpoints.orders as orders   
from oandapyV20.exceptions import V20Error
    
import json
import time
import datetime
import pandas as pd
import requests
    
#口座情報(自分の情報を入力)
TOKEN = ""
ID = ""
        
#取引通貨  
INSTRUMENT = "USD_JPY"
#レート桁数
DECIMALS = 3 
#Pip桁数
PIP_LOCATION = -2 
    
#最大許容スプレッド  
MAX_SPREAD_PIPS = 2 #Pips
        
#ループ回数
LOOP = 10000 #回
#待機時間
WAIT = 60 #秒
      
#ろうそく足取得用
COUNT = 10 #ろうそく足の取得本数
GRANULARITY = "M5" #時間足(5分)
      
UNITS = 1
N = 2 #Pips
    

def Discord(event_type, exception_type, code, contents):
    
  #Webhook
  DISCORD_URL =  ''  #自分で取得したwebhookにおきかえ
      
  #メッセージの編集
  message = '''\
  エンドポイント種類: %s
  例外種類: %s 
  コード: %s 
  内容: %s 
  時間(UTC): %s 
  ''' %(event_type, exception_type, code, contents, \
  f"{datetime.datetime.now(datetime.timezone.utc):%Y-%m-%d %H:%M:%S}")
    
  data = {'content' :  message}
    
  try:
          #メッセージの送信
          response_body = requests.post(DISCORD_URL, data=data)
          response_body.raise_for_status() #Discordの呼び出しでエラーが出た場合  
  
  except Exception as e:
          print(e)
          raise #ボットを停止させる

    
def Endpoint(ep_type, **kwargs):     #👈
  
  #リトライ条件
  RETRY_LOOP = 3 #回
  RETRY_WAIT = 1 #秒
  RETRY_ERROR = [104, 502, 503]
  
  for i in range(RETRY_LOOP): 
    
    try:
      #エンドポイントの呼び出し判定
      if ep_type == "pricing":  #👈最新レート
        r = pricing.PricingInfo(accountID=kwargs["id"], params=kwargs["params"])  
      elif ep_type == "positions":  #👈ポジション保有
        r = positions.PositionDetails(accountID=kwargs["id"], instrument=kwargs["instrument"])
      elif ep_type == "candles":  #👈ろうそく足
        r = instruments.InstrumentsCandles(instrument=kwargs["instrument"], params=kwargs["params"])
      elif ep_type == "orders":  #👈注文
        r = orders.OrderCreate(accountID=kwargs["id"], data=kwargs["data"])
      else:
        message = "Endpoint type error!"
        print(message)
        raise Exception(message)
  
      #エンドポイント呼び出し
      rv = api.request(r)
      return rv
          
    except V20Error as e:
      err_type = "V20Error"
      status = e.code
      contents = e
  
    except Exception as e:
      err_type = "Exception"
      if hasattr(e, 'code'):
        status = e.code
      else:
        status = 999
      contents = e
    
    #リトライ判定
    if status in RETRY_ERROR:
      time.sleep(RETRY_WAIT)
      message = "リトライ" + str(i+1) + "回目 " + str(status)
      print(message)
      Discord(ep_type, err_type, message, contents)
      
    else:
      Discord(ep_type, err_type, str(status), contents)
      message = "エラー!: %s %s %s %s" %(ep_type,err_type,str(status),contents)
      raise Exception(message)
  
  #リトライ失敗
  message = "Endpoint: %s リトライに失敗! (%d)" %(ep_type, status) 
  #message = "Endpoint: " + ep_type + " リトライに失敗! (" + str(status) + ")" 
  raise Exception(message)

    
def CurrentRate():
          
  #最新レートの取得
  params = {
          "instruments": INSTRUMENT
        }
        
  rv = Endpoint("pricing", id=ID, params=params)  #👈
  
  #スプレッドの計算
  bid = rv['prices'][0]['closeoutBid']
  ask = rv['prices'][0]['closeoutAsk']
  spread = round(float(ask) - float(bid), DECIMALS)
    
  #トレード可能?
  if rv['prices'][0]['tradeable'] == True:
    max_spread = MAX_SPREAD_PIPS * (10 ** PIP_LOCATION)
    if spread < max_spread:
      status = "GO"
    else: #スプレッド拡大中
      status = "SKIP"
  #クローズ/メンテ中
  else:
    status = "CLOSED" #original "STOP" 
    #Discord("CurentRate", status, "", "マーケットクローズ") 
        
  #戻り値  
  return {'status': status, 'bid': bid, 'ask': ask, 'spread': spread}
  
        
def Position():  
        
  #ポジションの確認処理追加
  rv = Endpoint("positions", id=ID, instrument=INSTRUMENT)  #👈
        
  if rv['position']['long']['units'] != "0" or rv['position']['short']['units'] != "0": 
    print("ポジあり。待機")
    status = "SKIP"         
  else:
    print("ポジなし。 継続")
    status =  "GO"
      
  return {'status': status}
      
      
def Signal():  #シグナル判定(赤3黒3)
        
  #ろうそく足の取得
  #引数セット
  params = {
          "count": COUNT,
          "granularity": GRANULARITY
        }
    
  rv = Endpoint("candles", instrument=INSTRUMENT, params=params)   #👈
        
  #データフレームへの変換
  df = pd.json_normalize(rv, record_path='candles', meta=['instrument', 'granularity'], sep='_')
  #コラム名の変更
  df.columns = ['complete', 'volume', 'time_UTC', 'open', 'high', 'low', 'close', 'pair', 'ashi']
          
  #完成形のろうそく足を最後から3本分のみ取得
  df = df[df['complete'] == True].tail(3)
      
  #計算用に属性を変更
  df = df.astype({'open': float, 'close': float, 'high': float, 'low': float})
        
  #3本分のろうそく足毎のトレンドの判定
  df.loc[round(df['close'] - df['open'],DECIMALS) > 0, 'trend'] = 1  #陽線(赤)
  df.loc[round(df['close'] - df['open'],DECIMALS) < 0, 'trend'] = -1 #陰線(黒)
  df.loc[round(df['close'] - df['open'],DECIMALS) == 0, 'trend'] = 0  #同じ
        
  #売買シグナルの判定
  if df.trend.sum() == 3: #すべて陽線(上昇)
    signal = 1
    print("買シグナル!")
  elif df.trend.sum() == -3: #すべて陰線(下降)
    signal = -1
    print("売シグナル!")
  else:
    signal = 0
    print("シグナルなし")
      
  print(df)
        
  #戻り値のセット
  return {'signal': signal, 'df': df}
      
    
def Order(r_rate, r_signal): 
    
  #発注処理
  #最後のローソク足の終値の取得
  xclose = r_signal['df']['close'].iat[-1]
      
  #区間内の最高値と最安値の取得
  xmin = r_signal['df']['low'].min()
  xmax = r_signal['df']['high'].max()
  
  print("終値: %s 最高値: %s 最安値: %s" %(xclose, xmax, xmin))
      
  #Risk(Spread)の計算
  if r_signal['signal'] == 1:
    #買:区間の最安値と直前の足の終値の差にN Pips足す
    distance = round(xclose - xmin, DECIMALS)
  elif r_signal['signal'] == -1:
    #売:区間の最高値と直前の足の終値の差にN Pips足す
    distance = round(xmax - xclose, DECIMALS)
    
  #計算した値幅に一定の値を加える  
  distance2 = round(distance + N * (10 ** PIP_LOCATION), DECIMALS)
      
  print("値幅: %s 値幅+N: %s スプレッド: %s" %(distance, distance2, r_rate['spread']))
    
  #計算結果より現在のスプレッドと比較
  if distance2 < r_rate['spread']:
    print("値幅が足りません。")
    return {'status': 'SKIP'}
    
      
  #発注データのセット      
  data = {
        "order": {
            "type": "MARKET",
            "instrument": INSTRUMENT,
            "units": str(UNITS * r_signal['signal']),
            "takeProfitOnFill": {
                "distance": str(distance2)
            },
            "stopLossOnFill": {
                "distance": str(distance2)
            },
        }
    }
        
  #発注
  rv = Endpoint("orders", id=ID, data=data)   #👈
  
  #結果確認
  print(json.dumps(rv, indent=2))
  
  if "orderFillTransaction" in rv.keys():
    status = "FILLED"
    result = rv['orderFillTransaction']['id']
    contents = rv['orderFillTransaction']
  elif 'orderCancelTransaction' in rv.keys():  
    status = "STOP"
    result = rv['orderCancelTransaction']['reason']
    contents = rv['orderCancelTransaction']  
  else:
    status = "STOP"  
    result = "予期せぬエラー(Status = 201)"
    contents = rv
  
  type="New Order"
  Discord(type, status, result, contents)
  print(status, " : " ,result)  
  
  #戻り値のセット
  return {'status': status}
  
                
if __name__ == "__main__":
          
  try:
         
    api = API(access_token= TOKEN)
    message = ""
    for i in range(LOOP):
          
      #最新レート確認  
      r_rate = CurrentRate()  
      print(r_rate)
      #次の処理
      if r_rate['status'] == "GO":
        print("継続-トレード可能")
        
       #保有ポジションの確認
        r_pos = Position()             
        #ポジション無しの時
        if r_pos['status'] == "GO":   
          #売買シグナルの判定
          r_signal = Signal()
          if r_signal['signal'] != 0: #売買シグナルがでたら注文処理へ
            #注文処理
            r_order = Order(r_rate, r_signal)
            if r_order['status'] != "FILLED" and r_order['status'] != "SKIP":
              raise Exception("停止ー発注エラー") #ボット終了
                                        
      #スプレッドが広すぎる
      elif r_rate['status'] == "SKIP":
        print("スキップ-スプレッド拡大中")    
        
      #マーケットがクローズ(またはメンテ中)
      elif r_rate['status'] == "CLOSED":
        print("スキップーマーケットクローズ中")    
        #break #ボット終了
              
      else:
        raise Exception("停止ー予期せぬエラー発生") #ボット終了
        
      #次のサイクル
      print("待機中 ", i) 
      time.sleep(WAIT)

    message= "Loop上限到達"

  except Exception as e:
    #print(e)
    message = e  
        
  finally:
    Discord("停止", "", "", message)
    print(message, " によりBotが停止しました。 UTC:", datetime.datetime.now(datetime.timezone.utc))

これを動かしてみるとエラーや約定した際に通知メッセージがDiscordに送られた事が無事確認できました。

f:id:jantzen:20220208090619j:plain:w300

試したかった104や502のエラーが再現できなかったので、401の場合の例です。(トークンのエラー)
また今の仕様ではDisocrdへの通知がこの例のように2回でてしまうパターンがありますが、そこはあまり気にしないことにします。


まとめ


このような形でまとめてみれば、少なくともエンドポイント回りで例外処理の扱いなど更なる改造の必要がでてきた場合も修正箇所が少なくなります。少しアイデアを加えただけで何箇所も修正しないといけないようですとバグのもとになります。

とはいえ、今までは全体のソースコードをひつまとめに記述したほうが理解しやすかったのですが、関数を少しずつ追加していくうちに段々長くなってきており逆にわかりずらくなってきました。

次のステップとしては、Discordの通知機能のように完全におまけ機能のようなものやエンドポイントの呼び出し部分は外部モジュール化して呼び出すようにして、使い回しできるようにして簡素化したいところです。
更に2%ルールのようなロットサイズの機能を組み込んでいったり、シグナル判定を手法によって変更する部分をカプセル化して切り替え易いようにする必要もあったりと追加していったほうが良い機能がまだまだあります。
これらの変更を加えるためには、ここまで完成?したボットのプログラム全体の構成を大きく見直さないといけないこともみえて来ました。

ただこれ以上改造を続けても終わりがみえないので、次回は一旦今までの自動売買ボットの作成で得た教訓?をこのシリーズの最後にまとめてみたいと思います。