X-Git-Url: https://git.immae.eu/?p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git;a=blobdiff_plain;f=market.py;h=9612b17406de5b45bec1e6dabb59e3e3c870d16d;hp=7a37cf622aed28899f4815cd1b388e89e0a98a34;hb=HEAD;hpb=c8df27385e02b22d36b240fe29532e97dbba1f43 diff --git a/market.py b/market.py index 7a37cf6..9612b17 100644 --- a/market.py +++ b/market.py @@ -1,7 +1,7 @@ -from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce +from ccxt import AuthenticationError, ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time -import psycopg2 +import dbs from store import * from cachetools.func import ttl_cache from datetime import datetime @@ -15,6 +15,7 @@ class Market: report = None trades = None balances = None + options = None def __init__(self, ccxt_instance, args, **kwargs): self.args = args @@ -26,7 +27,8 @@ class Market: self.balances = BalanceStore(self) self.processor = Processor(self) - for key in ["user_id", "market_id", "pg_config"]: + self.options = kwargs.get("options", {}) + for key in ["user_id", "market_id"]: setattr(self, key, kwargs.get(key, None)) self.report.log_market(self.args) @@ -44,8 +46,10 @@ class Market: date = datetime.datetime.now() if self.args.report_path is not None: self.store_file_report(date) - if self.pg_config is not None and self.args.report_db: + if dbs.psql_connected() and self.args.report_db: self.store_database_report(date) + if dbs.redis_connected() and self.args.report_redis: + self.store_redis_report(date) def store_file_report(self, date): try: @@ -61,34 +65,44 @@ class Market: try: report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' - connection = psycopg2.connect(**self.pg_config) - cursor = connection.cursor() + cursor = dbs.psql.cursor() cursor.execute(report_query, (date, self.market_id, self.debug)) report_id = cursor.fetchone()[0] for date, type_, payload in self.report.to_json_array(): cursor.execute(line_query, (date, report_id, type_, payload)) - connection.commit() + dbs.psql.commit() cursor.close() - connection.close() except Exception as e: print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def store_redis_report(self, date): + try: + for type_, log in self.report.to_json_redis(): + key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) + dbs.redis.set(key, log, ex=31*24*60*60) + key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) + dbs.redis.set(key, log) + key = "/cryptoportfolio/{}/latest/date".format(self.market_id) + dbs.redis.set(key, date.isoformat()) + except Exception as e: + print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): try: - if len(actions or []) == 0: - if before: - self.processor.process("sell_all", steps="before") - if after: - self.processor.process("sell_all", steps="after") - else: - for action in actions: - if hasattr(self, action): - getattr(self, action)() - else: - self.report.log_error("market_process", message="Unknown action {}".format(action)) + self.ccxt.check_required_credentials() + for action in actions: + if bool(before) is bool(after): + self.processor.process(action, steps="all", options=self.options) + elif before: + self.processor.process(action, steps="before", options=self.options) + elif after: + self.processor.process(action, steps="after", options=self.options) + except AuthenticationError: + self.report.log_error("market_authentication", message="Impossible to authenticate to market") except Exception as e: - self.report.log_error("market_process", exception=e) + import traceback + self.report.log_error("market_process", exception=e, message=traceback.format_exc()) finally: self.store_report() @@ -192,36 +206,41 @@ class Market: if status == "error_disappeared": self.report.log_error("follow_orders", message="{} disappeared, recreating it".format(order)) - order.trade.prepare_order( + new_order = order.trade.prepare_order( compute_value=order.trade.tick_actions_recreate(tick)) + if new_order is not None: + new_order.run() + self.report.log_order(order, tick, new_order=new_order) self.report.log_stage("follow_orders_end") def prepare_trades(self, base_currency="BTC", liquidity="medium", - compute_value="average", repartition=None, only=None): + compute_value="average", repartition=None, only=None, + available_balance_only=False): self.report.log_stage("prepare_trades", base_currency=base_currency, liquidity=liquidity, compute_value=compute_value, only=only, - repartition=repartition) + repartition=repartition, available_balance_only=available_balance_only) - values_in_base = self.balances.in_currency(base_currency, - compute_value=compute_value) - total_base_value = sum(values_in_base.values()) + if available_balance_only: + repartition, total_base_value, values_in_base = self.balances.available_balances_for_repartition( + base_currency=base_currency, liquidity=liquidity, + repartition=repartition, compute_value=compute_value) + else: + values_in_base = self.balances.in_currency(base_currency, + compute_value=compute_value) + total_base_value = sum(values_in_base.values()) new_repartition = self.balances.dispatch_assets(total_base_value, liquidity=liquidity, repartition=repartition) - self.trades.compute_trades(values_in_base, new_repartition, only=only) + if available_balance_only: + for currency, amount in values_in_base.items(): + if currency != base_currency and currency not in new_repartition: + new_repartition[currency] = amount - # Helpers - def print_orders(self, base_currency="BTC"): - self.report.log_stage("print_orders") - self.balances.fetch_balances(tag="print_orders") - self.prepare_trades(base_currency=base_currency, compute_value="average") - self.trades.prepare_orders(compute_value="average") + self.trades.compute_trades(values_in_base, new_repartition, only=only) - def print_balances(self, base_currency="BTC"): - self.report.log_stage("print_balances") - self.balances.fetch_balances() + def print_tickers(self, base_currency="BTC"): if base_currency is not None: self.report.print_log("total:") self.report.print_log(sum(self.balances.in_currency(base_currency).values())) @@ -237,12 +256,24 @@ class Processor: "wait_for_recent": {}, }, ], + "print_balances": [ + { + "name": "print_balances", + "number": 1, + "fetch_balances_begin": { + "log_tickers": True, + "add_usdt": True, + "add_portfolio": True + }, + "print_tickers": { "base_currency": "BTC" }, + } + ], "print_orders": [ { "name": "wait", "number": 1, - "before": False, - "after": True, + "before": True, + "after": False, "wait_for_recent": {}, }, { @@ -250,25 +281,38 @@ class Processor: "number": 2, "before": False, "after": True, - "fetch_balances": ["begin"], + "fetch_balances_begin": {}, "prepare_trades": { "compute_value": "average" }, "prepare_orders": { "compute_value": "average" }, }, ], "sell_needed": [ { - "name": "wait", + "name": "print_balances", "number": 0, + "before": True, + "after": False, + "fetch_balances_begin": { + "checkpoint": "end", + "log_tickers": True, + "add_usdt": True, + "add_portfolio": True + }, + }, + { + "name": "wait", + "number": 1, "before": False, "after": True, "wait_for_recent": {}, }, { "name": "sell", - "number": 1, + "number": 2, "before": False, "after": True, - "fetch_balances": ["begin", "end"], + "fetch_balances_begin": {}, + "fetch_balances_end": {}, "prepare_trades": {}, "prepare_orders": { "only": "dispose", "compute_value": "average" }, "run_orders": {}, @@ -277,11 +321,16 @@ class Processor: }, { "name": "buy", - "number": 2, + "number": 3, "before": False, "after": True, - "fetch_balances": ["begin", "end"], - "prepare_trades": { "only": "acquire" }, + "fetch_balances_begin": {}, + "fetch_balances_end": { + "checkpoint": "begin", + "add_usdt": True, + "log_tickers": True + }, + "prepare_trades": { "only": "acquire", "available_balance_only": True }, "prepare_orders": { "only": "acquire", "compute_value": "average" }, "move_balances": {}, "run_orders": {}, @@ -295,7 +344,13 @@ class Processor: "number": 1, "before": True, "after": False, - "fetch_balances": ["begin", "end"], + "fetch_balances_begin": { + "checkpoint": "end", + "log_tickers": True, + "add_usdt": True, + "add_portfolio": True + }, + "fetch_balances_end": {}, "prepare_trades": { "repartition": { "base_currency": (1, "long") } }, "prepare_orders": { "compute_value": "average" }, "run_orders": {}, @@ -314,8 +369,13 @@ class Processor: "number": 3, "before": False, "after": True, - "fetch_balances": ["begin", "end"], - "prepare_trades": {}, + "fetch_balances_begin": {}, + "fetch_balances_end": { + "checkpoint": "begin", + "add_usdt": True, + "log_tickers": True + }, + "prepare_trades": { "available_balance_only": True }, "prepare_orders": { "compute_value": "average" }, "move_balances": {}, "run_orders": {}, @@ -328,7 +388,7 @@ class Processor: ordered_actions = [ "wait_for_recent", "prepare_trades", "prepare_orders", "move_balances", "run_orders", "follow_orders", - "close_trades"] + "close_trades", "print_tickers"] def __init__(self, market): self.market = market @@ -337,7 +397,7 @@ class Processor: if step == "all": return scenario elif step == "before" or step == "after": - return list(filter(lambda x: step in x and x[step], scenario)) + return list(filter(lambda x: x.get(step, False), scenario)) elif type(step) == int: return [scenario[step-1]] elif type(step) == str: @@ -345,7 +405,12 @@ class Processor: else: raise TypeError("Unknown step {}".format(step)) - def process(self, scenario_name, steps="all", **kwargs): + def can_process(self, scenario_name): + return scenario_name in self.scenarios + + def process(self, scenario_name, steps="all", options={}): + if not self.can_process(scenario_name): + raise TypeError("Unknown scenario {}".format(scenario_name)) scenario = self.scenarios[scenario_name] selected_steps = [] @@ -355,20 +420,24 @@ class Processor: for step in steps: selected_steps += self.select_steps(scenario, step) for step in selected_steps: - self.process_step(scenario_name, step, kwargs) + self.process_step(scenario_name, step, options) - def process_step(self, scenario_name, step, kwargs): + def process_step(self, scenario_name, step, options): process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) self.market.report.log_stage("{}_begin".format(process_name)) - if "begin" in step.get("fetch_balances", []): - self.market.balances.fetch_balances(tag="{}_begin".format(process_name)) + + if "fetch_balances_begin" in step: + self.run_action("fetch_balances", step["fetch_balances_begin"], + dict(options, tag="{}_begin".format(process_name))) for action in self.ordered_actions: if action in step: - self.run_action(action, step[action], kwargs) + self.run_action(action, step[action], options) + + if "fetch_balances_end" in step: + self.run_action("fetch_balances", step["fetch_balances_end"], + dict(options, tag="{}_end".format(process_name))) - if "end" in step.get("fetch_balances", []): - self.market.balances.fetch_balances(tag="{}_end".format(process_name)) self.market.report.log_stage("{}_end".format(process_name)) def method_arguments(self, action): @@ -388,6 +457,10 @@ class Processor: method = self.market.follow_orders elif action == "close_trades": method = self.market.trades.close_trades + elif action == "print_tickers": + method = self.market.print_tickers + elif action == "fetch_balances": + method = self.market.balances.fetch_balances signature = inspect.getfullargspec(method) defaults = signature.defaults or [] @@ -395,9 +468,9 @@ class Processor: return [method, kwargs] - def parse_args(self, action, default_args, kwargs): + def parse_args(self, action, default_args, options): method, allowed_arguments = self.method_arguments(action) - args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments } + args = {k: v for k, v in {**default_args, **options}.items() if k in allowed_arguments } if "repartition" in args and "base_currency" in args["repartition"]: r = args["repartition"] @@ -405,7 +478,7 @@ class Processor: return method, args - def run_action(self, action, default_args, kwargs): - method, args = self.parse_args(action, default_args, kwargs) + def run_action(self, action, default_args, options): + method, args = self.parse_args(action, default_args, options) method(**args)