X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=market.py;h=10d1ad8936b3f671ca2ca7e9a1623a856dce29db;hb=90d7423eec074a0ed0af680c223180f8d7e1d4e6;hp=931de09e350c1a37b0bcd54baf312d433641a527;hpb=f86ee14037646bedc3a3dee4a48f085308981757;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/market.py b/market.py index 931de09..10d1ad8 100644 --- a/market.py +++ b/market.py @@ -1,7 +1,12 @@ -from ccxt import ExchangeError +from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time +import psycopg2 from store import * +from cachetools.func import ttl_cache +from datetime import datetime +from retry import retry +import portfolio class Market: debug = False @@ -10,67 +15,129 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, debug=False): - self.debug = debug + def __init__(self, ccxt_instance, args, **kwargs): + self.args = args + self.debug = args.debug self.ccxt = ccxt_instance self.ccxt._market = self - self.report = ReportStore(self) + self.report = ReportStore(self, verbose_print=(not args.quiet)) self.trades = TradeStore(self) self.balances = BalanceStore(self) + self.processor = Processor(self) + + for key in ["user_id", "market_id", "report_path", "pg_config"]: + setattr(self, key, kwargs.get(key, None)) + + self.report.log_market(self.args, self.user_id, self.market_id, + self.report_path, self.debug) @classmethod - def from_config(cls, config, debug=False): + def from_config(cls, config, args, **kwargs): + config["apiKey"] = config.pop("key", None) + ccxt_instance = ccxt.poloniexE(config) - # For requests logging - ccxt_instance.session.origin_request = ccxt_instance.session.request - ccxt_instance.session._parent = ccxt_instance + return cls(ccxt_instance, args, **kwargs) + + def store_report(self): + self.report.merge(Portfolio.report) + date = datetime.now() + if self.report_path is not None: + self.store_file_report(date) + if self.pg_config is not None: + self.store_database_report(date) + + def store_file_report(self, date): + try: + report_file = "{}/{}_{}".format(self.report_path, date.isoformat(), self.user_id) + with open(report_file + ".json", "w") as f: + f.write(self.report.to_json()) + with open(report_file + ".log", "w") as f: + f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) + except Exception as e: + print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) - def request_wrap(self, *args, **kwargs): - r = self.origin_request(*args, **kwargs) - self._parent._market.report.log_http_request(args[0], - args[1], kwargs["data"], kwargs["headers"], r) - return r - ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, - ccxt_instance.session.__class__) + def store_database_report(self, date): + try: + report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' + line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' + connection = psycopg2.connect(**self.pg_config) + cursor = connection.cursor() + cursor.execute(report_query, (date, self.market_id, self.debug)) + report_id = cursor.fetchone()[0] + for date, type_, payload in self.report.to_json_array(): + cursor.execute(line_query, (date, report_id, type_, payload)) - return cls(ccxt_instance, debug=debug) + connection.commit() + cursor.close() + connection.close() + except Exception as e: + print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): + try: + if len(actions or []) == 0: + if before: + self.processor.process("sell_all", steps="before") + if after: + self.processor.process("sell_all", steps="after") + else: + for action in actions: + if hasattr(self, action): + getattr(self, action)() + else: + self.report.log_error("market_process", message="Unknown action {}".format(action)) + except Exception as e: + self.report.log_error("market_process", exception=e) + finally: + self.store_report() + + @retry((RequestTimeout, InvalidNonce), tries=5) def move_balances(self): needed_in_margin = {} moving_to_margin = {} - for currency in self.balances.all: - if self.balances.all[currency].margin_free != 0: - needed_in_margin[currency] = 0 - for trade in self.trades.all: - if trade.value_to.currency not in needed_in_margin: - needed_in_margin[trade.value_to.currency] = 0 + for currency, balance in self.balances.all.items(): + needed_in_margin[currency] = balance.margin_in_position - balance.margin_pending_gain + for trade in self.trades.pending: + needed_in_margin.setdefault(trade.base_currency, 0) if trade.trade_type == "short": - needed_in_margin[trade.value_to.currency] += abs(trade.value_to) + needed_in_margin[trade.base_currency] -= trade.delta for currency, needed in needed_in_margin.items(): - current_balance = self.balances.all[currency].margin_free + current_balance = self.balances.all[currency].margin_available moving_to_margin[currency] = (needed - current_balance) delta = moving_to_margin[currency].value - if self.debug: - self.report.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency])) + action = "Moving {} from exchange to margin".format(moving_to_margin[currency]) + + if self.debug and delta != 0: + self.report.log_debug_action(action) continue - if delta > 0: - self.ccxt.transfer_balance(currency, delta, "exchange", "margin") - elif delta < 0: - self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + try: + if delta > 0: + self.ccxt.transfer_balance(currency, delta, "exchange", "margin") + elif delta < 0: + self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + except (RequestTimeout, InvalidNonce) as e: + self.report.log_error(action, message="Retrying", exception=e) + self.report.log_move_balances(needed_in_margin, moving_to_margin) + self.balances.fetch_balances() + raise e self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() - fees_cache = {} + @ttl_cache(ttl=3600) def fetch_fees(self): - if self.ccxt.__class__ not in self.fees_cache: - self.fees_cache[self.ccxt.__class__] = self.ccxt.fetch_fees() - return self.fees_cache[self.ccxt.__class__] + return self.ccxt.fetch_fees() - ticker_cache = {} - ticker_cache_timestamp = time.time() + @ttl_cache(maxsize=20, ttl=5) + def get_tickers(self, refresh=False): + try: + return self.ccxt.fetch_tickers() + except NotSupported: + return None + + @ttl_cache(maxsize=20, ttl=5) def get_ticker(self, c1, c2, refresh=False): def invert(ticker): return { @@ -83,26 +150,25 @@ class Market: "inverted": False, "average": (ticker["bid"] + ticker["ask"] ) / 2, }) + return ticker - if time.time() - self.ticker_cache_timestamp > 5: - self.ticker_cache = {} - self.ticker_cache_timestamp = time.time() - elif not refresh: - if (c1, c2, self.ccxt.__class__) in self.ticker_cache: - return self.ticker_cache[(c1, c2, self.ccxt.__class__)] - if (c2, c1, self.ccxt.__class__) in self.ticker_cache: - return invert(self.ticker_cache[(c2, c1, self.ccxt.__class__)]) - - try: - self.ticker_cache[(c1, c2, self.ccxt.__class__)] = self.ccxt.fetch_ticker("{}/{}".format(c1, c2)) - augment_ticker(self.ticker_cache[(c1, c2, self.ccxt.__class__)]) - except ExchangeError: + tickers = self.get_tickers() + if tickers is None: try: - self.ticker_cache[(c2, c1, self.ccxt.__class__)] = self.ccxt.fetch_ticker("{}/{}".format(c2, c1)) - augment_ticker(self.ticker_cache[(c2, c1, self.ccxt.__class__)]) + ticker = augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c1, c2))) except ExchangeError: - self.ticker_cache[(c1, c2, self.ccxt.__class__)] = None - return self.get_ticker(c1, c2) + try: + ticker = invert(augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c2, c1)))) + except ExchangeError: + ticker = None + else: + if "{}/{}".format(c1, c2) in tickers: + ticker = augment_ticker(tickers["{}/{}".format(c1, c2)]) + elif "{}/{}".format(c2, c1) in tickers: + ticker = invert(augment_ticker(tickers["{}/{}".format(c2, c1)])) + else: + ticker = None + return ticker def follow_orders(self, sleep=None): if sleep is None: @@ -118,33 +184,228 @@ class Market: self.report.log_stage("follow_orders_tick_{}".format(tick)) self.report.log_orders(open_orders, tick=tick) for order in open_orders: - if order.get_status() != "open": + status = order.get_status() + if status != "open": self.report.log_order(order, tick, finished=True) else: order.trade.update_order(order, tick) + if status == "error_disappeared": + self.report.log_error("follow_orders", + message="{} disappeared, recreating it".format(order)) + order.trade.prepare_order( + compute_value=order.trade.tick_actions_recreate(tick)) + self.report.log_stage("follow_orders_end") - def prepare_trades(self, base_currency="BTC", liquidity="medium", compute_value="average"): - self.report.log_stage("prepare_trades") - values_in_base = self.balances.in_currency(base_currency, compute_value=compute_value) - total_base_value = sum(values_in_base.values()) - new_repartition = self.balances.dispatch_assets(total_base_value, liquidity=liquidity) - # Recompute it in case we have new currencies - values_in_base = self.balances.in_currency(base_currency, compute_value=compute_value) - self.trades.compute_trades(values_in_base, new_repartition) - - def update_trades(self, base_currency="BTC", liquidity="medium", compute_value="average", only=None): - self.report.log_stage("update_trades") - values_in_base = self.balances.in_currency(base_currency, compute_value=compute_value) + def prepare_trades(self, base_currency="BTC", liquidity="medium", + compute_value="average", repartition=None, only=None): + + self.report.log_stage("prepare_trades", + base_currency=base_currency, liquidity=liquidity, + compute_value=compute_value, only=only, + repartition=repartition) + + values_in_base = self.balances.in_currency(base_currency, + compute_value=compute_value) total_base_value = sum(values_in_base.values()) - new_repartition = self.balances.dispatch_assets(total_base_value, liquidity=liquidity) + new_repartition = self.balances.dispatch_assets(total_base_value, + liquidity=liquidity, repartition=repartition) self.trades.compute_trades(values_in_base, new_repartition, only=only) - def prepare_trades_to_sell_all(self, base_currency="BTC", compute_value="average"): - self.report.log_stage("prepare_trades_to_sell_all") - values_in_base = self.balances.in_currency(base_currency, compute_value=compute_value) - total_base_value = sum(values_in_base.values()) - new_repartition = self.balances.dispatch_assets(total_base_value, repartition={ base_currency: (1, "long") }) - self.trades.compute_trades(values_in_base, new_repartition) + # Helpers + def print_orders(self, base_currency="BTC"): + self.report.log_stage("print_orders") + self.balances.fetch_balances(tag="print_orders") + self.prepare_trades(base_currency=base_currency, compute_value="average") + self.trades.prepare_orders(compute_value="average") + + def print_balances(self, base_currency="BTC"): + self.report.log_stage("print_balances") + self.balances.fetch_balances() + if base_currency is not None: + self.report.print_log("total:") + self.report.print_log(sum(self.balances.in_currency(base_currency).values())) + +class Processor: + scenarios = { + "wait_for_cryptoportfolio": [ + { + "name": "wait", + "number": 1, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + ], + "print_orders": [ + { + "name": "wait", + "number": 1, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "make_orders", + "number": 2, + "before": False, + "after": True, + "fetch_balances": ["begin"], + "prepare_trades": { "compute_value": "average" }, + "prepare_orders": { "compute_value": "average" }, + }, + ], + "sell_needed": [ + { + "name": "wait", + "number": 0, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "sell", + "number": 1, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": {}, + "prepare_orders": { "only": "dispose", "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + { + "name": "buy", + "number": 2, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": { "only": "acquire" }, + "prepare_orders": { "only": "acquire", "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + ], + "sell_all": [ + { + "name": "all_sell", + "number": 1, + "before": True, + "after": False, + "fetch_balances": ["begin", "end"], + "prepare_trades": { "repartition": { "base_currency": (1, "long") } }, + "prepare_orders": { "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + { + "name": "wait", + "number": 2, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "all_buy", + "number": 3, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": {}, + "prepare_orders": { "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + ] + } + + ordered_actions = [ + "wait_for_recent", "prepare_trades", "prepare_orders", + "move_balances", "run_orders", "follow_orders", + "close_trades"] + + def __init__(self, market): + self.market = market + + def select_steps(self, scenario, step): + if step == "all": + return scenario + elif step == "before" or step == "after": + return list(filter(lambda x: step in x and x[step], scenario)) + elif type(step) == int: + return [scenario[step-1]] + elif type(step) == str: + return list(filter(lambda x: x["name"] == step, scenario)) + else: + raise TypeError("Unknown step {}".format(step)) + + def process(self, scenario_name, steps="all", **kwargs): + scenario = self.scenarios[scenario_name] + selected_steps = [] + + if type(steps) == str or type(steps) == int: + selected_steps += self.select_steps(scenario, steps) + else: + for step in steps: + selected_steps += self.select_steps(scenario, step) + for step in selected_steps: + self.process_step(scenario_name, step, kwargs) + + def process_step(self, scenario_name, step, kwargs): + process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) + self.market.report.log_stage("{}_begin".format(process_name)) + if "begin" in step.get("fetch_balances", []): + self.market.balances.fetch_balances(tag="{}_begin".format(process_name)) + + for action in self.ordered_actions: + if action in step: + self.run_action(action, step[action], kwargs) + + if "end" in step.get("fetch_balances", []): + self.market.balances.fetch_balances(tag="{}_end".format(process_name)) + self.market.report.log_stage("{}_end".format(process_name)) + + def method_arguments(self, action): + import inspect + + if action == "wait_for_recent": + method = Portfolio.wait_for_recent + elif action == "prepare_trades": + method = self.market.prepare_trades + elif action == "prepare_orders": + method = self.market.trades.prepare_orders + elif action == "move_balances": + method = self.market.move_balances + elif action == "run_orders": + method = self.market.trades.run_orders + elif action == "follow_orders": + method = self.market.follow_orders + elif action == "close_trades": + method = self.market.trades.close_trades + + signature = inspect.getfullargspec(method) + defaults = signature.defaults or [] + kwargs = signature.args[-len(defaults):] + + return [method, kwargs] + + def parse_args(self, action, default_args, kwargs): + method, allowed_arguments = self.method_arguments(action) + args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments } + + if "repartition" in args and "base_currency" in args["repartition"]: + r = args["repartition"] + r[args.get("base_currency", "BTC")] = r.pop("base_currency") + + return method, args + def run_action(self, action, default_args, kwargs): + method, args = self.parse_args(action, default_args, kwargs) + method(**args)