]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blobdiff - market.py
Merge branch 'dev'
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / market.py
index eff670cab0b5660fe0de2e37289cffb847160371..9612b17406de5b45bec1e6dabb59e3e3c870d16d 100644 (file)
--- a/market.py
+++ b/market.py
@@ -1,8 +1,7 @@
-from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce
+from ccxt import AuthenticationError, ExchangeError, NotSupported, RequestTimeout, InvalidNonce
 import ccxt_wrapper as ccxt
 import time
-import psycopg2
-import redis
+import dbs
 from store import *
 from cachetools.func import ttl_cache
 from datetime import datetime
@@ -16,6 +15,7 @@ class Market:
     report = None
     trades = None
     balances = None
+    options = None
 
     def __init__(self, ccxt_instance, args, **kwargs):
         self.args = args
@@ -27,7 +27,8 @@ class Market:
         self.balances = BalanceStore(self)
         self.processor = Processor(self)
 
-        for key in ["user_id", "market_id", "pg_config", "redis_config"]:
+        self.options = kwargs.get("options", {})
+        for key in ["user_id", "market_id"]:
             setattr(self, key, kwargs.get(key, None))
 
         self.report.log_market(self.args)
@@ -45,9 +46,9 @@ class Market:
         date = datetime.datetime.now()
         if self.args.report_path is not None:
             self.store_file_report(date)
-        if self.pg_config is not None and self.args.report_db:
+        if dbs.psql_connected() and self.args.report_db:
             self.store_database_report(date)
-        if self.redis_config is not None and self.args.report_redis:
+        if dbs.redis_connected() and self.args.report_redis:
             self.store_redis_report(date)
 
     def store_file_report(self, date):
@@ -64,43 +65,44 @@ class Market:
         try:
             report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
             line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
-            connection = psycopg2.connect(**self.pg_config)
-            cursor = connection.cursor()
+            cursor = dbs.psql.cursor()
             cursor.execute(report_query, (date, self.market_id, self.debug))
             report_id = cursor.fetchone()[0]
             for date, type_, payload in self.report.to_json_array():
                 cursor.execute(line_query, (date, report_id, type_, payload))
 
-            connection.commit()
+            dbs.psql.commit()
             cursor.close()
-            connection.close()
         except Exception as e:
             print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
 
     def store_redis_report(self, date):
         try:
-            conn = redis.Redis(**self.redis_config)
             for type_, log in self.report.to_json_redis():
                 key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_)
-                conn.set(key, log, ex=31*24*60*60)
+                dbs.redis.set(key, log, ex=31*24*60*60)
                 key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_)
-                conn.set(key, log)
+                dbs.redis.set(key, log)
             key = "/cryptoportfolio/{}/latest/date".format(self.market_id)
-            conn.set(key, date.isoformat())
+            dbs.redis.set(key, date.isoformat())
         except Exception as e:
             print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e))
 
     def process(self, actions, before=False, after=False):
         try:
+            self.ccxt.check_required_credentials()
             for action in actions:
                 if bool(before) is bool(after):
-                    self.processor.process(action, steps="all")
+                    self.processor.process(action, steps="all", options=self.options)
                 elif before:
-                    self.processor.process(action, steps="before")
+                    self.processor.process(action, steps="before", options=self.options)
                 elif after:
-                    self.processor.process(action, steps="after")
+                    self.processor.process(action, steps="after", options=self.options)
+        except AuthenticationError:
+            self.report.log_error("market_authentication", message="Impossible to authenticate to market")
         except Exception as e:
-            self.report.log_error("market_process", exception=e)
+            import traceback
+            self.report.log_error("market_process", exception=e, message=traceback.format_exc())
         finally:
             self.store_report()
 
@@ -204,8 +206,11 @@ class Market:
                 if status == "error_disappeared":
                     self.report.log_error("follow_orders",
                             message="{} disappeared, recreating it".format(order))
-                    order.trade.prepare_order(
+                    new_order = order.trade.prepare_order(
                             compute_value=order.trade.tick_actions_recreate(tick))
+                    if new_order is not None:
+                        new_order.run()
+                        self.report.log_order(order, tick, new_order=new_order)
 
         self.report.log_stage("follow_orders_end")
 
@@ -218,23 +223,20 @@ class Market:
                 compute_value=compute_value, only=only,
                 repartition=repartition, available_balance_only=available_balance_only)
 
-        values_in_base = self.balances.in_currency(base_currency,
-                compute_value=compute_value)
         if available_balance_only:
-            balance = self.balances.all.get(base_currency)
-            if balance is None:
-                total_base_value = portfolio.Amount(base_currency, 0)
-            else:
-                total_base_value = balance.exchange_free + balance.margin_available
+            repartition, total_base_value, values_in_base = self.balances.available_balances_for_repartition(
+                    base_currency=base_currency, liquidity=liquidity,
+                    repartition=repartition, compute_value=compute_value)
         else:
+            values_in_base = self.balances.in_currency(base_currency,
+                    compute_value=compute_value)
             total_base_value = sum(values_in_base.values())
         new_repartition = self.balances.dispatch_assets(total_base_value,
                 liquidity=liquidity, repartition=repartition)
         if available_balance_only:
             for currency, amount in values_in_base.items():
-                if currency != base_currency:
-                    new_repartition.setdefault(currency, portfolio.Amount(base_currency, 0))
-                    new_repartition[currency] += amount
+                if currency != base_currency and currency not in new_repartition:
+                    new_repartition[currency] = amount
 
         self.trades.compute_trades(values_in_base, new_repartition, only=only)
 
@@ -258,7 +260,11 @@ class Processor:
                 {
                     "name": "print_balances",
                     "number": 1,
-                    "fetch_balances": ["begin"],
+                    "fetch_balances_begin": {
+                        "log_tickers": True,
+                        "add_usdt": True,
+                        "add_portfolio": True
+                        },
                     "print_tickers": { "base_currency": "BTC" },
                     }
                 ],
@@ -275,25 +281,38 @@ class Processor:
                     "number": 2,
                     "before": False,
                     "after": True,
-                    "fetch_balances": ["begin"],
+                    "fetch_balances_begin": {},
                     "prepare_trades": { "compute_value": "average" },
                     "prepare_orders": { "compute_value": "average" },
                     },
                 ],
             "sell_needed": [
                 {
-                    "name": "wait",
+                    "name": "print_balances",
                     "number": 0,
+                    "before": True,
+                    "after": False,
+                    "fetch_balances_begin": {
+                        "checkpoint": "end",
+                        "log_tickers": True,
+                        "add_usdt": True,
+                        "add_portfolio": True
+                        },
+                    },
+                {
+                    "name": "wait",
+                    "number": 1,
                     "before": False,
                     "after": True,
                     "wait_for_recent": {},
                     },
                 {
                     "name": "sell",
-                    "number": 1,
+                    "number": 2,
                     "before": False,
                     "after": True,
-                    "fetch_balances": ["begin", "end"],
+                    "fetch_balances_begin": {},
+                    "fetch_balances_end": {},
                     "prepare_trades": {},
                     "prepare_orders": { "only": "dispose", "compute_value": "average" },
                     "run_orders": {},
@@ -302,10 +321,15 @@ class Processor:
                     },
                 {
                     "name": "buy",
-                    "number": 2,
+                    "number": 3,
                     "before": False,
                     "after": True,
-                    "fetch_balances": ["begin", "end"],
+                    "fetch_balances_begin": {},
+                    "fetch_balances_end": {
+                        "checkpoint": "begin",
+                        "add_usdt": True,
+                        "log_tickers": True
+                        },
                     "prepare_trades": { "only": "acquire", "available_balance_only": True },
                     "prepare_orders": { "only": "acquire", "compute_value": "average" },
                     "move_balances": {},
@@ -320,7 +344,13 @@ class Processor:
                     "number": 1,
                     "before": True,
                     "after": False,
-                    "fetch_balances": ["begin", "end"],
+                    "fetch_balances_begin": {
+                        "checkpoint": "end",
+                        "log_tickers": True,
+                        "add_usdt": True,
+                        "add_portfolio": True
+                        },
+                    "fetch_balances_end": {},
                     "prepare_trades": { "repartition": { "base_currency": (1, "long") } },
                     "prepare_orders": { "compute_value": "average" },
                     "run_orders": {},
@@ -339,7 +369,12 @@ class Processor:
                     "number": 3,
                     "before": False,
                     "after": True,
-                    "fetch_balances": ["begin", "end"],
+                    "fetch_balances_begin": {},
+                    "fetch_balances_end": {
+                        "checkpoint": "begin",
+                        "add_usdt": True,
+                        "log_tickers": True
+                        },
                     "prepare_trades": { "available_balance_only": True },
                     "prepare_orders": { "compute_value": "average" },
                     "move_balances": {},
@@ -373,7 +408,7 @@ class Processor:
     def can_process(self, scenario_name):
         return scenario_name in self.scenarios
 
-    def process(self, scenario_name, steps="all", **kwargs):
+    def process(self, scenario_name, steps="all", options={}):
         if not self.can_process(scenario_name):
             raise TypeError("Unknown scenario {}".format(scenario_name))
         scenario = self.scenarios[scenario_name]
@@ -385,20 +420,24 @@ class Processor:
             for step in steps:
                 selected_steps += self.select_steps(scenario, step)
         for step in selected_steps:
-            self.process_step(scenario_name, step, kwargs)
+            self.process_step(scenario_name, step, options)
 
-    def process_step(self, scenario_name, step, kwargs):
+    def process_step(self, scenario_name, step, options):
         process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
         self.market.report.log_stage("{}_begin".format(process_name))
-        if "begin" in step.get("fetch_balances", []):
-            self.market.balances.fetch_balances(tag="{}_begin".format(process_name))
+
+        if "fetch_balances_begin" in step:
+            self.run_action("fetch_balances", step["fetch_balances_begin"],
+                    dict(options, tag="{}_begin".format(process_name)))
 
         for action in self.ordered_actions:
             if action in step:
-                self.run_action(action, step[action], kwargs)
+                self.run_action(action, step[action], options)
+
+        if "fetch_balances_end" in step:
+            self.run_action("fetch_balances", step["fetch_balances_end"],
+                    dict(options, tag="{}_end".format(process_name)))
 
-        if "end" in step.get("fetch_balances", []):
-            self.market.balances.fetch_balances(tag="{}_end".format(process_name))
         self.market.report.log_stage("{}_end".format(process_name))
 
     def method_arguments(self, action):
@@ -420,6 +459,8 @@ class Processor:
             method = self.market.trades.close_trades
         elif action == "print_tickers":
             method = self.market.print_tickers
+        elif action == "fetch_balances":
+            method = self.market.balances.fetch_balances
 
         signature = inspect.getfullargspec(method)
         defaults = signature.defaults or []
@@ -427,9 +468,9 @@ class Processor:
 
         return [method, kwargs]
 
-    def parse_args(self, action, default_args, kwargs):
+    def parse_args(self, action, default_args, options):
         method, allowed_arguments = self.method_arguments(action)
-        args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments }
+        args = {k: v for k, v in {**default_args, **options}.items() if k in allowed_arguments }
 
         if "repartition" in args and "base_currency" in args["repartition"]:
             r = args["repartition"]
@@ -437,7 +478,7 @@ class Processor:
 
         return method, args
 
-    def run_action(self, action, default_args, kwargs):
-        method, args = self.parse_args(action, default_args, kwargs)
+    def run_action(self, action, default_args, options):
+        method, args = self.parse_args(action, default_args, options)
 
         method(**args)