シストレどうですか

  Algorithmic Trading for Dummies

FXボット とりあえず動かしてみた編 その3 ー エラー発生時の通知処理を追加

クリスマスが終わるとあっという間に新年を迎えます。
最近年末になるといつも考えてしまう事があるのですが、それは、若いころと比べ加速度的に1年がたってしまうのがはやいという事です。 とは言え自分の子供たちと同じ年齢の頃はもっと遅く感じていたわけで、その子供と同じ時間軸の中で過ごしているわけだから他の人に比べて本当に時間がはやくたっているわけではなく、感覚的な問題だからと自分自身を納得させてみたものの、結局その人の時間の感覚がその人の時間を決めるわけだからとわけのわからない思考ループにおちいり、自分に残された時間に何ができるのだろうかとこの時期になると焦りを感じてしまいます。
そのいう事に悩むような年代になってきたからなのか、または映画の「インターステラー」を観て以来なのかはわかりませんが...。
よく考えてみれば、誕生日やクリスマスプレゼントに何が欲しいか聞かれても、「特になし」と答えるのが定番になってしまっているので、生活がただ単調になっているだけかもしれませんね。

話しが変な方向に進みすぎるので、本題に入ります。
とりあえずボットが設計したとおり動く事を確認できたので、次に何かイベントが発生した際に携帯電話に通知する機能を加えてみます。
イベントにはボット上でエラーが発生した時や新しいオーダーがあった場合などを想定しています。
通知するアプリは以前試してみたDiscordを使いたいと思います。 LineでもLine Notifyを使えば結構簡単に作れてしまいますが私の場合は普段使っているLineと混じってしまうと逆にわかりずらくなると思いDiscordにしました。 Discordとの連携の方法は以前のブログを参照。

jantzen.hatenablog.com


Discordへの送信


Discordのアカウント登録やウェブフックのキーを既に取得しているという前提で、まずはDiscordへの送信関数の部分について作成します。

関数の設計

Discordへメッセージの送信を行う関数を呼び出す場合、どんな種類のイベントか、その内容、またいつ発生したかがわかるように編集して、それをパラメータとし渡す事にします。

  1. 呼び出すタイミング

    想定するイベントとしてエラー発生時と約定時の通知用という事ですから、エンドポイント関連のエラーが発生した後のexcept内と発注後の結果が戻って来る部分に処理を追加します。
    ついでに予期せぬエラーも想定してボットが停止する最後の部分にも追加しておきます。

  2. 引数

    関数に渡すパラメータは、①イベントの種類、②エラーのコード、③エラーor約定の内容をセットする。

  3. 引数の編集

    受け取った引数を改行コードでつないで1文に編集したものをDiscord送信用のAPIへ渡す。

関数の作成

通知する機能の内容が決まりましたので、以前のブログから必要な部分をコピーしてそれを修正していきます。
前回はDiscordへの送信が失敗した場合はラインに送信するように二重化しましたが簡略化するためにはずしました。

また、requestsモジュールが必要になりますので先頭の部分に追加しておきます。

import requests
  
#省略

def Discord(event, status, contents):
    
  #Webhook
  DISCORD_URL =  'https://discord.com/api/webhooks/~'    #自分で取得したwebhookにおきかえ
      
  #メッセージの編集
  message = '''\
  種類: %s 
  コード: %s 
  内容: %s 
  時間(UTC): %s 
  ''' %(event, status, contents, \
  f"{datetime.datetime.now(datetime.timezone.utc):%Y-%m-%d %H:%M:%S}")
    
  data = {'content' :  message}
    
  try:
          #メッセージの送信
          response_body = requests.post(DISCORD_URL, data=data)
          response_body.raise_for_status() #Discordの呼び出しでエラーが出た場合  
  
  except Exception as e:
          print(e)
          raise #ボットを停止させる

関数の呼び出し

次に完成した関数をエンドポイントでエラーが発生した時の例外処理と発注時の結果が戻ってきた処理のところに追加します。
修正前のもとになるソースコードこちら

エンドポイントでエラーの時

現行のボットではエンドポイントごとにエラーをキャッチする形式になっていませんので、エンドポイントの呼び出しするごとにtry~except構文を追加し、そこに今回作成したDiscord関数をエラーを拾うexceptの部分とボットが停止してしまうところに追加していきます。

レート取得処理

def CurrentRate():
          
  try: #👈try~except追加
  
    #最新レートの取得
    params = {
            "instruments": INSTRUMENT
          }
          
    r = pricing.PricingInfo(accountID=ID, params=params)  
    rv = api.request(r)
    
    #スプレッドの計算
    bid = rv['prices'][0]['closeoutBid']
    ask = rv['prices'][0]['closeoutAsk']
    spread = round(float(ask) - float(bid), DECIMALS)
      
    #トレード可能?
    if rv['prices'][0]['tradeable'] == True:
      max_spread = MAX_SPREAD_PIPS * (10 ** PIP_LOCATION)
      if spread < max_spread:
        status = "GO"
      else: #スプレッド拡大中
        status = "SKIP"
    #クローズ/メンテ中
    else:
      status = "STOP"
      Discord("CurentRate", status, "マーケットクローズ") #👈
          
    #戻り値  
    return {'status': status, 'bid': bid, 'ask': ask, 'spread': spread}
  
  #👇
  except Exception as e:
    if hasattr(e, 'code'):
      status = e.code
    else:
      status = "999"
    Discord("CurentRate", status, e) 
    raise

        

エンドポイントを呼び出す部分にひとつひとつ修正を加えるのが結構大変な事がわかりましたので今回はレート取得処理の部分だけ修正してみました。

新しい注文の約定

新しいオーダーを発注した後その結果を受け取った部分に関数を追加します。

発注処理

def Order(r_rate, r_signal): 
      
#前半省略
       
  #発注データのセット      
  data = {
        "order": {
            "type": "MARKET",
            "instrument": INSTRUMENT,
            "units": str(UNITS * r_signal['signal']),
            "takeProfitOnFill": {
                "distance": str(distance2)
            },
            "stopLossOnFill": {
                "distance": str(distance2)
            },
        }
    }
        
  #発注
  r = orders.OrderCreate(accountID=ID, data=data)
  rv = api.request(r)
  
  #結果確認
  print(json.dumps(rv, indent=2))
  
  if r.status_code == 201:
    if "orderFillTransaction" in rv.keys():
      status = "ORDERED"
      result = rv['orderFillTransaction']['orderID']
      
    elif 'orderCancelTransaction' in rv.keys():  
      status = "STOP"
      result = rv['orderCancelTransaction']['reason']  
    else:
      status = "STOP"  
      result = "予期せぬエラー(Status = 201)"
  else:
    status = "STOP"
    result = "予期せぬエラー(Status != 201)"
  
  Discord("New Order", status, result) #👈
  print(status, " : " ,result)  
  
  #戻り値のセット
  return {'status': status}

テスト

修正が完了しましたので機能の確認をしてみます。
修正したコードのすべては以下の通り。

#外部モジュール
from oandapyV20 import API
import oandapyV20.endpoints.pricing as pricing
import oandapyV20.endpoints.positions as positions   
import oandapyV20.endpoints.instruments as instruments 
import oandapyV20.endpoints.orders as orders   
  
import json
import time
import datetime
import pandas as pd
  
#👉Discord通知用
import requests
  
#口座情報(自分の情報を入力)
TOKEN = "~"
ID = "~"
        
#取引通貨  
INSTRUMENT = "USD_JPY"
#レート桁数
DECIMALS = 3 
#Pip桁数
PIP_LOCATION = -2 
    
#最大許容スプレッド  
MAX_SPREAD_PIPS = 2 #Pips
        
#ループ回数
LOOP = 10000 #回
#待機時間
WAIT = 60 #秒
      
#ろうそく足取得用
COUNT = 10 #ろうそく足の取得本数
GRANULARITY = "M5" #時間足(5分)
      
UNITS = 1
N = 2 #Pips
      
  
def Discord(event, status, contents):
    
  #Webhook
  DISCORD_URL =  'https://discord.com/api/webhooks/~'  #自分で取得したwebhookにおきかえ
      
  #メッセージの編集
  message = '''\
  種類: %s 
  コード: %s 
  内容: %s 
  時間(UTC): %s 
  ''' %(event, status, contents, \
  f"{datetime.datetime.now(datetime.timezone.utc):%Y-%m-%d %H:%M:%S}")
    
  data = {'content' :  message}
    
  try:
          #メッセージの送信
          response_body = requests.post(DISCORD_URL, data=data)
          response_body.raise_for_status() #Discordの呼び出しでエラーが出た場合  
  
  except Exception as e:
          print(e)
          raise #ボットを停止させる
  
  
def CurrentRate():
          
  try: #👈try~except追加
  
    #最新レートの取得
    params = {
            "instruments": INSTRUMENT
          }
          
    r = pricing.PricingInfo(accountID=ID, params=params)  
    rv = api.request(r)
    
    #スプレッドの計算
    bid = rv['prices'][0]['closeoutBid']
    ask = rv['prices'][0]['closeoutAsk']
    spread = round(float(ask) - float(bid), DECIMALS)
      
    #トレード可能?
    if rv['prices'][0]['tradeable'] == True:
      max_spread = MAX_SPREAD_PIPS * (10 ** PIP_LOCATION)
      if spread < max_spread:
        status = "GO"
      else: #スプレッド拡大中
        status = "SKIP"
    #クローズ/メンテ中
    else:
      status = "STOP"
      Discord("CurentRate", status, "マーケットクローズ") #👈
          
    #戻り値  
    return {'status': status, 'bid': bid, 'ask': ask, 'spread': spread}
  
  #👇
  except Exception as e:
    if hasattr(e, 'code'):
      status = e.code
    else:
      status = "999"
    Discord("CurentRate", status, e) 
    raise
  
        
def Position():  
        
  #ポジションの確認処理追加
  r = positions.PositionDetails(accountID=ID, instrument=INSTRUMENT)
  rv = api.request(r)
        
  if rv['position']['long']['units'] != "0" or rv['position']['short']['units'] != "0": 
    print("ポジあり。待機")
    status = "SKIP"         
  else:
    print("ポジなし。 継続")
    status =  "GO"
      
  return {'status': status}
      
      
def Signal():  #シグナル判定(赤3黒3)
        
  #ろうそく足の取得
  #引数セット
  params = {
          "count": COUNT,
          "granularity": GRANULARITY
        }
  #エンドポイント呼び出し
  r = instruments.InstrumentsCandles(instrument=INSTRUMENT, params=params)
  rv = api.request(r)
        
  #データフレームへの変換
  df = pd.json_normalize(rv, record_path='candles', meta=['instrument', 'granularity'], sep='_')
  #コラム名の変更
  df.columns = ['complete', 'volume', 'time_UTC', 'open', 'high', 'low', 'close', 'pair', 'ashi']
          
  #完成形のろうそく足を最後から3本分のみ取得
  df = df[df['complete'] == True].tail(3)
      
  #計算用に属性を変更
  df = df.astype({'open': float, 'close': float, 'high': float, 'low': float})
        
  #3本分のろうそく足毎のトレンドの判定
  df.loc[round(df['close'] - df['open'],DECIMALS) > 0, 'trend'] = 1  #陽線(赤)
  df.loc[round(df['close'] - df['open'],DECIMALS) < 0, 'trend'] = -1 #陰線(黒)
  df.loc[round(df['close'] - df['open'],DECIMALS) == 0, 'trend'] = 0  #同じ
        
  #売買シグナルの判定
  if df.trend.sum() == 3: #すべて陽線(上昇)
    signal = 1
    print("買シグナル!")
  elif df.trend.sum() == -3: #すべて陰線(下降)
    signal = -1
    print("売シグナル!")
  else:
    signal = 0
    print("シグナルなし")
      
  print(df)
        
  #戻り値のセット
  return {'signal': signal, 'df': df}
      
    
def Order(r_rate, r_signal): 
    
  #発注処理
  #最後のローソク足の終値の取得
  xclose = r_signal['df']['close'].iat[-1]
      
  #区間内の最高値と最安値の取得
  xmin = r_signal['df']['low'].min()
  xmax = r_signal['df']['high'].max()
  
  print("終値: %s 最高値: %s 最安値: %s" %(xclose, xmax, xmin))
      
  #Risk(Spread)の計算
  if r_signal['signal'] == 1:
    #買:区間の最安値と直前の足の終値の差にN Pips足す
    distance = round(xclose - xmin, DECIMALS)
  elif r_signal['signal'] == -1:
    #売:区間の最高値と直前の足の終値の差にN Pips足す
    distance = round(xmax - xclose, DECIMALS)
    
  #計算した値幅に一定の値を加える  
  distance2 = round(distance + N * (10 ** PIP_LOCATION), DECIMALS)
      
  print("値幅: %s 値幅+N: %s スプレッド: %s" %(distance, distance2, r_rate['spread']))
    
  #計算結果より現在のスプレッドと比較
  if distance2 < r_rate['spread']:
    print("値幅が足りません。")
    return {'status': 'SKIP'}
    
      
  #発注データのセット      
  data = {
        "order": {
            "type": "MARKET",
            "instrument": INSTRUMENT,
            "units": str(UNITS * r_signal['signal']),
            "takeProfitOnFill": {
                "distance": str(distance2)
            },
            "stopLossOnFill": {
                "distance": str(distance2)
            },
        }
    }
        
  #発注
  r = orders.OrderCreate(accountID=ID, data=data)
  rv = api.request(r)
  
  #結果確認
  print(json.dumps(rv, indent=2))
  
  if r.status_code == 201:
    if "orderFillTransaction" in rv.keys():
      status = "ORDERED"
      result = rv['orderFillTransaction']['orderID']
      
    elif 'orderCancelTransaction' in rv.keys():  
      status = "STOP"
      result = rv['orderCancelTransaction']['reason']  
    else:
      status = "STOP"  
      result = "予期せぬエラー(Status = 201)"
  else:
    status = "STOP"
    result = "予期せぬエラー(Status != 201)"
  
  Discord("New Order", status, result) #👈
  print(status, " : " ,result)  
  
  #戻り値のセット
  return {'status': status}
  
                
if __name__ == "__main__":
          
  try:
         
    api = API(access_token= TOKEN)
  
    for i in range(LOOP):
          
      #最新レート確認  
      r_rate = CurrentRate()  
      print(r_rate)
      #次の処理
      if r_rate['status'] == "GO":
        print("継続-トレード可能")
        
       #保有ポジションの確認
        r_pos = Position()             
        #ポジション無しの時
        if r_pos['status'] == "GO":   
          #売買シグナルの判定
          r_signal = Signal()
          if r_signal['signal'] != 0: #売買シグナルがでたら注文処理へ
            #注文処理
            r_order = Order(r_rate, r_signal)
            if r_order['status'] != "ORDERED" and r_order['status'] != "SKIP":
              print("停止ー発注エラー")    
              break #ボット終了
                          
      #スプレッドが広すぎる
      elif r_rate['status'] == "SKIP":
        print("スキップ-スプレッド拡大中")    
        
      #マーケットがクローズ(またはメンテ中)
      elif r_rate['status'] == "STOP":
        print("停止ーマーケットクローズ")    
        break #ボット終了
              
      else:
        print("停止ー予期せぬエラー発生")    
        break #ボット終了
        
      #次のサイクル
      print("待機中 ", i) 
      time.sleep(WAIT)
          
  except Exception as e:
    print(e) 
        
  finally:
    Discord("停止", "999", "Botが停止しました。") #👈
    print("Botが停止しました。 UTC:", datetime.datetime.now(datetime.timezone.utc))

これを動かしてみるとエラーや約定した際に通知メッセージがDiscordに送られた事が無事確認できました。

Discordの携帯アプリ

新規オーダーの約定時

エラー発生時


まとめ


これでどの部分でエラーが発生したのかわかりやすくなりました。また、ボットが停止した場合でも気づきやすくなりそうです。 ネットがダウンした場合はそもそも送信できませんがだいぶ改善されたと思います。
ただ送信したい部分に一回一回関数の呼び出し処理を追加するのは結構大変です。 引数の追加等何か変更を加えたい場合でも直す箇所が多くなり間違いのもとになります。

次回は、今回の方法ですと処理を追加するのが面倒なのでエンドポイントを呼び出部分を一つにまとめて、その部分にいろいろな処理をまとめて記述してメンテナンスしやすいように変更してみたいと思います。