X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=market.py;h=ce418415ad30e30ab6c9e556ee1a3d2b1c4cd94f;hb=17fd3f752d5e37df906abddf1f13fd7ad1de6c00;hp=78ced1a209eea10c181dfd429b238ff7ca30c659;hpb=b4e0ba0b0aa84550d0b06338b59557c3050798c9;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/market.py b/market.py index 78ced1a..ce41841 100644 --- a/market.py +++ b/market.py @@ -1,10 +1,13 @@ -from ccxt import ExchangeError, NotSupported +from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time import psycopg2 +import redis from store import * from cachetools.func import ttl_cache from datetime import datetime +import datetime +from retry import retry import portfolio class Market: @@ -14,9 +17,7 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, args, - user_id=None, market_id=None, - report_path=None, pg_config=None): + def __init__(self, ccxt_instance, args, **kwargs): self.args = args self.debug = args.debug self.ccxt = ccxt_instance @@ -26,45 +27,32 @@ class Market: self.balances = BalanceStore(self) self.processor = Processor(self) - self.user_id = user_id - self.market_id = market_id - self.report_path = report_path - self.pg_config = pg_config + for key in ["user_id", "market_id", "pg_config", "redis_config"]: + setattr(self, key, kwargs.get(key, None)) + + self.report.log_market(self.args) @classmethod - def from_config(cls, config, args, - user_id=None, market_id=None, report_path=None, pg_config=None): + def from_config(cls, config, args, **kwargs): config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) - # For requests logging - ccxt_instance.session.origin_request = ccxt_instance.session.request - ccxt_instance.session._parent = ccxt_instance - - def request_wrap(self, *args, **kwargs): - r = self.origin_request(*args, **kwargs) - self._parent._market.report.log_http_request(args[0], - args[1], kwargs["data"], kwargs["headers"], r) - return r - ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, - ccxt_instance.session.__class__) - - return cls(ccxt_instance, args, - user_id=user_id, market_id=market_id, - pg_config=pg_config, report_path=report_path) + return cls(ccxt_instance, args, **kwargs) def store_report(self): self.report.merge(Portfolio.report) - date = datetime.now() - if self.report_path is not None: + date = datetime.datetime.now() + if self.args.report_path is not None: self.store_file_report(date) - if self.pg_config is not None: + if self.pg_config is not None and self.args.report_db: self.store_database_report(date) + if self.redis_config is not None and self.args.report_redis: + self.store_redis_report(date) def store_file_report(self, date): try: - report_file = "{}/{}_{}".format(self.report_path, date.isoformat(), self.user_id) + report_file = "{}/{}_{}".format(self.args.report_path, date.isoformat(), self.user_id) with open(report_file + ".json", "w") as f: f.write(self.report.to_json()) with open(report_file + ".log", "w") as f: @@ -89,24 +77,34 @@ class Market: except Exception as e: print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def store_redis_report(self, date): + try: + conn = redis.Redis(**self.redis_config) + for type_, log in self.report.to_json_redis(): + key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) + conn.set(key, log, ex=31*24*60*60) + key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) + conn.set(key, log) + key = "/cryptoportfolio/{}/latest/date".format(self.market_id) + conn.set(key, date.isoformat()) + except Exception as e: + print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): try: - if len(actions or []) == 0: - if before: - self.processor.process("sell_all", steps="before") - if after: - self.processor.process("sell_all", steps="after") - else: - for action in actions: - if hasattr(self, action): - getattr(self, action)() - else: - self.report.log_error("market_process", message="Unknown action {}".format(action)) + for action in actions: + if bool(before) is bool(after): + self.processor.process(action, steps="all") + elif before: + self.processor.process(action, steps="before") + elif after: + self.processor.process(action, steps="after") except Exception as e: self.report.log_error("market_process", exception=e) finally: self.store_report() + @retry((RequestTimeout, InvalidNonce), tries=5) def move_balances(self): needed_in_margin = {} moving_to_margin = {} @@ -121,13 +119,21 @@ class Market: current_balance = self.balances.all[currency].margin_available moving_to_margin[currency] = (needed - current_balance) delta = moving_to_margin[currency].value + action = "Moving {} from exchange to margin".format(moving_to_margin[currency]) + if self.debug and delta != 0: - self.report.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency])) + self.report.log_debug_action(action) continue - if delta > 0: - self.ccxt.transfer_balance(currency, delta, "exchange", "margin") - elif delta < 0: - self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + try: + if delta > 0: + self.ccxt.transfer_balance(currency, delta, "exchange", "margin") + elif delta < 0: + self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + except (RequestTimeout, InvalidNonce) as e: + self.report.log_error(action, message="Retrying", exception=e) + self.report.log_move_balances(needed_in_margin, moving_to_margin) + self.balances.fetch_balances() + raise e self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() @@ -190,10 +196,17 @@ class Market: self.report.log_stage("follow_orders_tick_{}".format(tick)) self.report.log_orders(open_orders, tick=tick) for order in open_orders: - if order.get_status() != "open": + status = order.get_status() + if status != "open": self.report.log_order(order, tick, finished=True) else: order.trade.update_order(order, tick) + if status == "error_disappeared": + self.report.log_error("follow_orders", + message="{} disappeared, recreating it".format(order)) + order.trade.prepare_order( + compute_value=order.trade.tick_actions_recreate(tick)) + self.report.log_stage("follow_orders_end") def prepare_trades(self, base_currency="BTC", liquidity="medium", @@ -211,16 +224,7 @@ class Market: liquidity=liquidity, repartition=repartition) self.trades.compute_trades(values_in_base, new_repartition, only=only) - # Helpers - def print_orders(self, base_currency="BTC"): - self.report.log_stage("print_orders") - self.balances.fetch_balances(tag="print_orders") - self.prepare_trades(base_currency=base_currency, compute_value="average") - self.trades.prepare_orders(compute_value="average") - - def print_balances(self, base_currency="BTC"): - self.report.log_stage("print_balances") - self.balances.fetch_balances() + def print_tickers(self, base_currency="BTC"): if base_currency is not None: self.report.print_log("total:") self.report.print_log(sum(self.balances.in_currency(base_currency).values())) @@ -236,12 +240,20 @@ class Processor: "wait_for_recent": {}, }, ], + "print_balances": [ + { + "name": "print_balances", + "number": 1, + "fetch_balances": ["begin"], + "print_tickers": { "base_currency": "BTC" }, + } + ], "print_orders": [ { "name": "wait", "number": 1, - "before": False, - "after": True, + "before": True, + "after": False, "wait_for_recent": {}, }, { @@ -327,7 +339,7 @@ class Processor: ordered_actions = [ "wait_for_recent", "prepare_trades", "prepare_orders", "move_balances", "run_orders", "follow_orders", - "close_trades"] + "close_trades", "print_tickers"] def __init__(self, market): self.market = market @@ -336,7 +348,7 @@ class Processor: if step == "all": return scenario elif step == "before" or step == "after": - return list(filter(lambda x: step in x and x[step], scenario)) + return list(filter(lambda x: x.get(step, False), scenario)) elif type(step) == int: return [scenario[step-1]] elif type(step) == str: @@ -344,7 +356,12 @@ class Processor: else: raise TypeError("Unknown step {}".format(step)) + def can_process(self, scenario_name): + return scenario_name in self.scenarios + def process(self, scenario_name, steps="all", **kwargs): + if not self.can_process(scenario_name): + raise TypeError("Unknown scenario {}".format(scenario_name)) scenario = self.scenarios[scenario_name] selected_steps = [] @@ -387,6 +404,8 @@ class Processor: method = self.market.follow_orders elif action == "close_trades": method = self.market.trades.close_trades + elif action == "print_tickers": + method = self.market.print_tickers signature = inspect.getfullargspec(method) defaults = signature.defaults or []