from ccxt import AuthenticationError, ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time import dbs from store import * from cachetools.func import ttl_cache from datetime import datetime import datetime from retry import retry import portfolio class Market: debug = False ccxt = None report = None trades = None balances = None options = None def __init__(self, ccxt_instance, args, **kwargs): self.args = args self.debug = args.debug self.ccxt = ccxt_instance self.ccxt._market = self self.report = ReportStore(self, verbose_print=(not args.quiet)) self.trades = TradeStore(self) self.balances = BalanceStore(self) self.processor = Processor(self) self.options = kwargs.get("options", {}) for key in ["user_id", "market_id"]: setattr(self, key, kwargs.get(key, None)) self.report.log_market(self.args) @classmethod def from_config(cls, config, args, **kwargs): config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) return cls(ccxt_instance, args, **kwargs) def store_report(self): self.report.merge(Portfolio.report) date = datetime.datetime.now() if self.args.report_path is not None: self.store_file_report(date) if dbs.psql_connected() and self.args.report_db: self.store_database_report(date) if dbs.redis_connected() and self.args.report_redis: self.store_redis_report(date) def store_file_report(self, date): try: report_file = "{}/{}_{}".format(self.args.report_path, date.isoformat(), self.user_id) with open(report_file + ".json", "w") as f: f.write(self.report.to_json()) with open(report_file + ".log", "w") as f: f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) except Exception as e: print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) def store_database_report(self, date): try: report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' cursor = dbs.psql.cursor() cursor.execute(report_query, (date, self.market_id, self.debug)) report_id = cursor.fetchone()[0] for date, type_, payload in self.report.to_json_array(): cursor.execute(line_query, (date, report_id, type_, payload)) dbs.psql.commit() cursor.close() except Exception as e: print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) def store_redis_report(self, date): try: for type_, log in self.report.to_json_redis(): key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) dbs.redis.set(key, log, ex=31*24*60*60) key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) dbs.redis.set(key, log) key = "/cryptoportfolio/{}/latest/date".format(self.market_id) dbs.redis.set(key, date.isoformat()) except Exception as e: print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) def process(self, actions, before=False, after=False): try: self.ccxt.check_required_credentials() for action in actions: if bool(before) is bool(after): self.processor.process(action, steps="all", options=self.options) elif before: self.processor.process(action, steps="before", options=self.options) elif after: self.processor.process(action, steps="after", options=self.options) except AuthenticationError: self.report.log_error("market_authentication", message="Impossible to authenticate to market") except Exception as e: import traceback self.report.log_error("market_process", exception=e, message=traceback.format_exc()) finally: self.store_report() @retry((RequestTimeout, InvalidNonce), tries=5) def move_balances(self): needed_in_margin = {} moving_to_margin = {} for currency, balance in self.balances.all.items(): needed_in_margin[currency] = balance.margin_in_position - balance.margin_pending_gain for trade in self.trades.pending: needed_in_margin.setdefault(trade.base_currency, 0) if trade.trade_type == "short": needed_in_margin[trade.base_currency] -= trade.delta for currency, needed in needed_in_margin.items(): current_balance = self.balances.all[currency].margin_available moving_to_margin[currency] = (needed - current_balance) delta = moving_to_margin[currency].value action = "Moving {} from exchange to margin".format(moving_to_margin[currency]) if self.debug and delta != 0: self.report.log_debug_action(action) continue try: if delta > 0: self.ccxt.transfer_balance(currency, delta, "exchange", "margin") elif delta < 0: self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") except (RequestTimeout, InvalidNonce) as e: self.report.log_error(action, message="Retrying", exception=e) self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() raise e self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() @ttl_cache(ttl=3600) def fetch_fees(self): return self.ccxt.fetch_fees() @ttl_cache(maxsize=20, ttl=5) def get_tickers(self, refresh=False): try: return self.ccxt.fetch_tickers() except NotSupported: return None @ttl_cache(maxsize=20, ttl=5) def get_ticker(self, c1, c2, refresh=False): def invert(ticker): return { "inverted": True, "average": (1/ticker["bid"] + 1/ticker["ask"]) / 2, "original": ticker, } def augment_ticker(ticker): ticker.update({ "inverted": False, "average": (ticker["bid"] + ticker["ask"] ) / 2, }) return ticker tickers = self.get_tickers() if tickers is None: try: ticker = augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c1, c2))) except ExchangeError: try: ticker = invert(augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c2, c1)))) except ExchangeError: ticker = None else: if "{}/{}".format(c1, c2) in tickers: ticker = augment_ticker(tickers["{}/{}".format(c1, c2)]) elif "{}/{}".format(c2, c1) in tickers: ticker = invert(augment_ticker(tickers["{}/{}".format(c2, c1)])) else: ticker = None return ticker def follow_orders(self, sleep=None): if sleep is None: sleep = 7 if self.debug else 30 if self.debug: self.report.log_debug_action("Set follow_orders tick to {}s".format(sleep)) tick = 0 self.report.log_stage("follow_orders_begin") while len(self.trades.all_orders(state="open")) > 0: time.sleep(sleep) tick += 1 open_orders = self.trades.all_orders(state="open") self.report.log_stage("follow_orders_tick_{}".format(tick)) self.report.log_orders(open_orders, tick=tick) for order in open_orders: status = order.get_status() if status != "open": self.report.log_order(order, tick, finished=True) else: order.trade.update_order(order, tick) if status == "error_disappeared": self.report.log_error("follow_orders", message="{} disappeared, recreating it".format(order)) new_order = order.trade.prepare_order( compute_value=order.trade.tick_actions_recreate(tick)) if new_order is not None: new_order.run() self.report.log_order(order, tick, new_order=new_order) self.report.log_stage("follow_orders_end") def prepare_trades(self, base_currency="BTC", liquidity="medium", compute_value="average", repartition=None, only=None, available_balance_only=False): self.report.log_stage("prepare_trades", base_currency=base_currency, liquidity=liquidity, compute_value=compute_value, only=only, repartition=repartition, available_balance_only=available_balance_only) if available_balance_only: repartition, total_base_value, values_in_base = self.balances.available_balances_for_repartition( base_currency=base_currency, liquidity=liquidity, repartition=repartition, compute_value=compute_value) else: values_in_base = self.balances.in_currency(base_currency, compute_value=compute_value) total_base_value = sum(values_in_base.values()) new_repartition = self.balances.dispatch_assets(total_base_value, liquidity=liquidity, repartition=repartition) if available_balance_only: for currency, amount in values_in_base.items(): if currency != base_currency and currency not in new_repartition: new_repartition[currency] = amount self.trades.compute_trades(values_in_base, new_repartition, only=only) def print_tickers(self, base_currency="BTC"): if base_currency is not None: self.report.print_log("total:") self.report.print_log(sum(self.balances.in_currency(base_currency).values())) class Processor: scenarios = { "wait_for_cryptoportfolio": [ { "name": "wait", "number": 1, "before": False, "after": True, "wait_for_recent": {}, }, ], "print_balances": [ { "name": "print_balances", "number": 1, "fetch_balances_begin": { "log_tickers": True, "add_usdt": True, "add_portfolio": True }, "print_tickers": { "base_currency": "BTC" }, } ], "print_orders": [ { "name": "wait", "number": 1, "before": True, "after": False, "wait_for_recent": {}, }, { "name": "make_orders", "number": 2, "before": False, "after": True, "fetch_balances_begin": {}, "prepare_trades": { "compute_value": "average" }, "prepare_orders": { "compute_value": "average" }, }, ], "sell_needed": [ { "name": "print_balances", "number": 0, "before": True, "after": False, "fetch_balances_begin": { "checkpoint": "end", "log_tickers": True, "add_usdt": True, "add_portfolio": True }, }, { "name": "wait", "number": 1, "before": False, "after": True, "wait_for_recent": {}, }, { "name": "sell", "number": 2, "before": False, "after": True, "fetch_balances_begin": {}, "fetch_balances_end": {}, "prepare_trades": {}, "prepare_orders": { "only": "dispose", "compute_value": "average" }, "run_orders": {}, "follow_orders": {}, "close_trades": {}, }, { "name": "buy", "number": 3, "before": False, "after": True, "fetch_balances_begin": {}, "fetch_balances_end": { "checkpoint": "begin", "add_usdt": True, "log_tickers": True }, "prepare_trades": { "only": "acquire", "available_balance_only": True }, "prepare_orders": { "only": "acquire", "compute_value": "average" }, "move_balances": {}, "run_orders": {}, "follow_orders": {}, "close_trades": {}, }, ], "sell_all": [ { "name": "all_sell", "number": 1, "before": True, "after": False, "fetch_balances_begin": { "checkpoint": "end", "log_tickers": True, "add_usdt": True, "add_portfolio": True }, "fetch_balances_end": {}, "prepare_trades": { "repartition": { "base_currency": (1, "long") } }, "prepare_orders": { "compute_value": "average" }, "run_orders": {}, "follow_orders": {}, "close_trades": {}, }, { "name": "wait", "number": 2, "before": False, "after": True, "wait_for_recent": {}, }, { "name": "all_buy", "number": 3, "before": False, "after": True, "fetch_balances_begin": {}, "fetch_balances_end": { "checkpoint": "begin", "add_usdt": True, "log_tickers": True }, "prepare_trades": { "available_balance_only": True }, "prepare_orders": { "compute_value": "average" }, "move_balances": {}, "run_orders": {}, "follow_orders": {}, "close_trades": {}, }, ] } ordered_actions = [ "wait_for_recent", "prepare_trades", "prepare_orders", "move_balances", "run_orders", "follow_orders", "close_trades", "print_tickers"] def __init__(self, market): self.market = market def select_steps(self, scenario, step): if step == "all": return scenario elif step == "before" or step == "after": return list(filter(lambda x: x.get(step, False), scenario)) elif type(step) == int: return [scenario[step-1]] elif type(step) == str: return list(filter(lambda x: x["name"] == step, scenario)) else: raise TypeError("Unknown step {}".format(step)) def can_process(self, scenario_name): return scenario_name in self.scenarios def process(self, scenario_name, steps="all", options={}): if not self.can_process(scenario_name): raise TypeError("Unknown scenario {}".format(scenario_name)) scenario = self.scenarios[scenario_name] selected_steps = [] if type(steps) == str or type(steps) == int: selected_steps += self.select_steps(scenario, steps) else: for step in steps: selected_steps += self.select_steps(scenario, step) for step in selected_steps: self.process_step(scenario_name, step, options) def process_step(self, scenario_name, step, options): process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) self.market.report.log_stage("{}_begin".format(process_name)) if "fetch_balances_begin" in step: self.run_action("fetch_balances", step["fetch_balances_begin"], dict(options, tag="{}_begin".format(process_name))) for action in self.ordered_actions: if action in step: self.run_action(action, step[action], options) if "fetch_balances_end" in step: self.run_action("fetch_balances", step["fetch_balances_end"], dict(options, tag="{}_end".format(process_name))) self.market.report.log_stage("{}_end".format(process_name)) def method_arguments(self, action): import inspect if action == "wait_for_recent": method = Portfolio.wait_for_recent elif action == "prepare_trades": method = self.market.prepare_trades elif action == "prepare_orders": method = self.market.trades.prepare_orders elif action == "move_balances": method = self.market.move_balances elif action == "run_orders": method = self.market.trades.run_orders elif action == "follow_orders": method = self.market.follow_orders elif action == "close_trades": method = self.market.trades.close_trades elif action == "print_tickers": method = self.market.print_tickers elif action == "fetch_balances": method = self.market.balances.fetch_balances signature = inspect.getfullargspec(method) defaults = signature.defaults or [] kwargs = signature.args[-len(defaults):] return [method, kwargs] def parse_args(self, action, default_args, options): method, allowed_arguments = self.method_arguments(action) args = {k: v for k, v in {**default_args, **options}.items() if k in allowed_arguments } if "repartition" in args and "base_currency" in args["repartition"]: r = args["repartition"] r[args.get("base_currency", "BTC")] = r.pop("base_currency") return method, args def run_action(self, action, default_args, options): method, args = self.parse_args(action, default_args, options) method(**args)