X-Git-Url: https://git.immae.eu/?p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git;a=blobdiff_plain;f=market.py;h=9612b17406de5b45bec1e6dabb59e3e3c870d16d;hp=1e1e083447bd4b319a7edb35b749592be0408cf3;hb=HEAD;hpb=350ed24de673dc125be9e2fdecb0f1abc7835b41 diff --git a/market.py b/market.py index 1e1e083..9612b17 100644 --- a/market.py +++ b/market.py @@ -1,202 +1,484 @@ -import ccxt -import decimal - -def exchange_sum(self, *args): - return sum([arg for arg in args if isinstance(arg, (float, int, decimal.Decimal))]) -ccxt.Exchange.sum = exchange_sum - -def poloniex_fetch_balance(self, params={}): - self.load_markets() - balances = self.privatePostReturnCompleteBalances(self.extend({ - 'account': 'all', - }, params)) - result = {'info': balances} - currencies = list(balances.keys()) - for c in range(0, len(currencies)): - id = currencies[c] - balance = balances[id] - currency = self.common_currency_code(id) - account = { - 'free': decimal.Decimal(balance['available']), - 'used': decimal.Decimal(balance['onOrders']), - 'total': decimal.Decimal(0.0), - } - account['total'] = self.sum(account['free'], account['used']) - result[currency] = account - return self.parse_balance(result) -ccxt.poloniex.fetch_balance = poloniex_fetch_balance - -def poloniex_fetch_margin_balances(self): - positions = self.privatePostGetMarginPosition({"currencyPair": "all"}) - parsed = {} - for symbol, position in positions.items(): - if position["type"] == "none": - continue - base_currency, currency = symbol.split("_") - parsed[currency] = { - "amount": decimal.Decimal(position["amount"]), - "borrowedPrice": decimal.Decimal(position["basePrice"]), - "lendingFees": decimal.Decimal(position["lendingFees"]), - "pl": decimal.Decimal(position["pl"]), - "liquidationPrice": decimal.Decimal(position["liquidationPrice"]), - "type": position["type"], - "total": decimal.Decimal(position["total"]), - "base_currency": base_currency, - } - return parsed -ccxt.poloniex.fetch_margin_balances = poloniex_fetch_margin_balances - -def poloniex_fetch_balance_with_margin(self, params={}): - exchange_balance = self.fetch_balance(params=params) - margin_balances = self.fetch_margin_balances() - - for currency, balance in margin_balances.items(): - assert exchange_balance[currency]["total"] == 0 - assert balance["type"] == "short" - exchange_balance[currency]["total"] = balance["amount"] - exchange_balance[currency]["marginPosition"] = balance - return exchange_balance -ccxt.poloniex.fetch_balance_with_margin = poloniex_fetch_balance_with_margin - - -def poloniex_fetch_balance_per_type(self): - balances = self.privatePostReturnAvailableAccountBalances() - result = {'info': balances} - for key, balance in balances.items(): - result[key] = {} - for currency, amount in balance.items(): - if currency not in result: - result[currency] = {} - result[currency][key] = decimal.Decimal(amount) - result[key][currency] = decimal.Decimal(amount) - return result -ccxt.poloniex.fetch_balance_per_type = poloniex_fetch_balance_per_type - -def poloniex_parse_ticker(self, ticker, market=None): - timestamp = self.milliseconds() - symbol = None - if market: - symbol = market['symbol'] - return { - 'symbol': symbol, - 'timestamp': timestamp, - 'datetime': self.iso8601(timestamp), - 'high': decimal.Decimal(ticker['high24hr']), - 'low': decimal.Decimal(ticker['low24hr']), - 'bid': decimal.Decimal(ticker['highestBid']), - 'ask': decimal.Decimal(ticker['lowestAsk']), - 'vwap': None, - 'open': None, - 'close': None, - 'first': None, - 'last': decimal.Decimal(ticker['last']), - 'change': decimal.Decimal(ticker['percentChange']), - 'percentage': None, - 'average': None, - 'baseVolume': decimal.Decimal(ticker['quoteVolume']), - 'quoteVolume': decimal.Decimal(ticker['baseVolume']), - 'info': ticker, - } -ccxt.poloniex.parse_ticker = poloniex_parse_ticker - -def poloniex_create_margin_order(self, symbol, type, side, amount, price=None, lending_rate=None, params={}): - if type == 'market': - raise ccxt.ExchangeError(self.id + ' allows limit orders only') - self.load_markets() - method = 'privatePostMargin' + self.capitalize(side) - market = self.market(symbol) - price = float(price) - amount = float(amount) - if lending_rate is not None: - params = self.extend({"lendingRate": lending_rate}, params) - response = getattr(self, method)(self.extend({ - 'currencyPair': market['id'], - 'rate': self.price_to_precision(symbol, price), - 'amount': self.amount_to_precision(symbol, amount), - }, params)) - timestamp = self.milliseconds() - order = self.parse_order(self.extend({ - 'timestamp': timestamp, - 'status': 'open', - 'type': type, - 'side': side, - 'price': price, - 'amount': amount, - }, response), market) - id = order['id'] - self.orders[id] = order - return self.extend({'info': response}, order) -ccxt.poloniex.create_margin_order = poloniex_create_margin_order - -def poloniex_create_order(self, symbol, type, side, amount, price=None, account="exchange", lending_rate=None, params={}): - if account == "exchange": - return self.create_exchange_order(symbol, type, side, amount, price=price, params=params) - elif account == "margin": - return self.create_margin_order(symbol, type, side, amount, price=price, lending_rate=lending_rate, params=params) - else: - raise NotImplementedError - -def poloniex_order_precision(self, symbol): - return 8 - -ccxt.poloniex.create_exchange_order = ccxt.poloniex.create_order -ccxt.poloniex.create_order = poloniex_create_order -ccxt.poloniex.order_precision = poloniex_order_precision - -def poloniex_transfer_balance(self, currency, amount, from_account, to_account): - result = self.privatePostTransferBalance({ - "currency": currency, - "amount": amount, - "fromAccount": from_account, - "toAccount": to_account, - "confirmed": 1}) - return result["success"] == 1 -ccxt.poloniex.transfer_balance = poloniex_transfer_balance - -# portfolio.market.create_order("DASH/BTC", "limit", "sell", 0.1, price=0.06828800, account="margin") - -# portfolio.market.privatePostReturnTradableBalances() -# Returns tradable balances in margin -# 'BTC_DASH': {'BTC': '0.01266999', 'DASH': '0.08574839'}, -# Je peux emprunter jusqu’à 0.08574839 DASH ou 0.01266999 BTC (une position est -# déjà ouverte) -# 'BTC_CLAM': {'BTC': '0.00585143', 'CLAM': '7.79300395'}, -# Je peux emprunter 7.7 CLAM pour les vendre contre des BTC, ou emprunter -# 0.00585143 BTC pour acheter des CLAM - -# portfolio.market.privatePostReturnMarginAccountSummary() -# Returns current informations for margin -# {'currentMargin': '1.49680968', -> marge (ne doit pas descendre sous 20% / 0.2) -# = netValue / totalBorrowedValue -# 'lendingFees': '0.00000000', -> fees totaux -# 'netValue': '0.01008254', -> balance + plus-value -# 'pl': '0.00008254', -> plus value latente (somme des positions) -# 'totalBorrowedValue': '0.00673602', -> valeur en BTC empruntée -# 'totalValue': '0.01000000'} -> valeur totale en compte - - -# portfolio.market.privatePostGetMarginPosition({"currencyPair": "BTC_DASH"}) -# See DASH/BTC positions -# {'amount': '-0.10000000', -> DASH empruntés -# 'basePrice': '0.06818560', -> à ce prix là (0.06828800 demandé * (1-0.15%)) -# 'lendingFees': '0.00000000', -> ce que je dois à mon créditeur -# 'liquidationPrice': '0.15107132', -> prix auquel ça sera liquidé (dépend de ce que j’ai déjà sur mon compte margin) -# 'pl': '-0.00000371', -> plus-value latente si je rachète tout de suite (négatif = perdu) -# 'total': '0.00681856', -> valeur totale empruntée en BTC -# 'type': 'short'} - - -# closeMarginPosition({"currencyPair": "BTC_DASH"}) : fermer la position au prix -# du marché -# Nécessaire à la fin -# portfolio.market.create_order("DASH/BTC", "limit", "buy", 0.1, price=0.06726487, account="margin") - -# portfolio.market.fetch_balance_per_type() -# Ne suffit pas pour calculer les positions: ne contient que les 0.01 envoyés -# TODO: vérifier si fetch_balance marque ces 0.01 comme disponibles -> oui - -market = ccxt.poloniex({ - "apiKey": "XXXXXXXX-XXXXXXXX-XXXXXXXX-XXXXXXXX", - "secret": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", - }) +from ccxt import AuthenticationError, ExchangeError, NotSupported, RequestTimeout, InvalidNonce +import ccxt_wrapper as ccxt +import time +import dbs +from store import * +from cachetools.func import ttl_cache +from datetime import datetime +import datetime +from retry import retry +import portfolio +class Market: + debug = False + ccxt = None + report = None + trades = None + balances = None + options = None + + def __init__(self, ccxt_instance, args, **kwargs): + self.args = args + self.debug = args.debug + self.ccxt = ccxt_instance + self.ccxt._market = self + self.report = ReportStore(self, verbose_print=(not args.quiet)) + self.trades = TradeStore(self) + self.balances = BalanceStore(self) + self.processor = Processor(self) + + self.options = kwargs.get("options", {}) + for key in ["user_id", "market_id"]: + setattr(self, key, kwargs.get(key, None)) + + self.report.log_market(self.args) + + @classmethod + def from_config(cls, config, args, **kwargs): + config["apiKey"] = config.pop("key", None) + + ccxt_instance = ccxt.poloniexE(config) + + return cls(ccxt_instance, args, **kwargs) + + def store_report(self): + self.report.merge(Portfolio.report) + date = datetime.datetime.now() + if self.args.report_path is not None: + self.store_file_report(date) + if dbs.psql_connected() and self.args.report_db: + self.store_database_report(date) + if dbs.redis_connected() and self.args.report_redis: + self.store_redis_report(date) + + def store_file_report(self, date): + try: + report_file = "{}/{}_{}".format(self.args.report_path, date.isoformat(), self.user_id) + with open(report_file + ".json", "w") as f: + f.write(self.report.to_json()) + with open(report_file + ".log", "w") as f: + f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) + except Exception as e: + print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) + + def store_database_report(self, date): + try: + report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' + line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' + cursor = dbs.psql.cursor() + cursor.execute(report_query, (date, self.market_id, self.debug)) + report_id = cursor.fetchone()[0] + for date, type_, payload in self.report.to_json_array(): + cursor.execute(line_query, (date, report_id, type_, payload)) + + dbs.psql.commit() + cursor.close() + except Exception as e: + print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + + def store_redis_report(self, date): + try: + for type_, log in self.report.to_json_redis(): + key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) + dbs.redis.set(key, log, ex=31*24*60*60) + key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) + dbs.redis.set(key, log) + key = "/cryptoportfolio/{}/latest/date".format(self.market_id) + dbs.redis.set(key, date.isoformat()) + except Exception as e: + print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) + + def process(self, actions, before=False, after=False): + try: + self.ccxt.check_required_credentials() + for action in actions: + if bool(before) is bool(after): + self.processor.process(action, steps="all", options=self.options) + elif before: + self.processor.process(action, steps="before", options=self.options) + elif after: + self.processor.process(action, steps="after", options=self.options) + except AuthenticationError: + self.report.log_error("market_authentication", message="Impossible to authenticate to market") + except Exception as e: + import traceback + self.report.log_error("market_process", exception=e, message=traceback.format_exc()) + finally: + self.store_report() + + @retry((RequestTimeout, InvalidNonce), tries=5) + def move_balances(self): + needed_in_margin = {} + moving_to_margin = {} + + for currency, balance in self.balances.all.items(): + needed_in_margin[currency] = balance.margin_in_position - balance.margin_pending_gain + for trade in self.trades.pending: + needed_in_margin.setdefault(trade.base_currency, 0) + if trade.trade_type == "short": + needed_in_margin[trade.base_currency] -= trade.delta + for currency, needed in needed_in_margin.items(): + current_balance = self.balances.all[currency].margin_available + moving_to_margin[currency] = (needed - current_balance) + delta = moving_to_margin[currency].value + action = "Moving {} from exchange to margin".format(moving_to_margin[currency]) + + if self.debug and delta != 0: + self.report.log_debug_action(action) + continue + try: + if delta > 0: + self.ccxt.transfer_balance(currency, delta, "exchange", "margin") + elif delta < 0: + self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + except (RequestTimeout, InvalidNonce) as e: + self.report.log_error(action, message="Retrying", exception=e) + self.report.log_move_balances(needed_in_margin, moving_to_margin) + self.balances.fetch_balances() + raise e + self.report.log_move_balances(needed_in_margin, moving_to_margin) + + self.balances.fetch_balances() + + @ttl_cache(ttl=3600) + def fetch_fees(self): + return self.ccxt.fetch_fees() + + @ttl_cache(maxsize=20, ttl=5) + def get_tickers(self, refresh=False): + try: + return self.ccxt.fetch_tickers() + except NotSupported: + return None + + @ttl_cache(maxsize=20, ttl=5) + def get_ticker(self, c1, c2, refresh=False): + def invert(ticker): + return { + "inverted": True, + "average": (1/ticker["bid"] + 1/ticker["ask"]) / 2, + "original": ticker, + } + def augment_ticker(ticker): + ticker.update({ + "inverted": False, + "average": (ticker["bid"] + ticker["ask"] ) / 2, + }) + return ticker + + tickers = self.get_tickers() + if tickers is None: + try: + ticker = augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c1, c2))) + except ExchangeError: + try: + ticker = invert(augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c2, c1)))) + except ExchangeError: + ticker = None + else: + if "{}/{}".format(c1, c2) in tickers: + ticker = augment_ticker(tickers["{}/{}".format(c1, c2)]) + elif "{}/{}".format(c2, c1) in tickers: + ticker = invert(augment_ticker(tickers["{}/{}".format(c2, c1)])) + else: + ticker = None + return ticker + + def follow_orders(self, sleep=None): + if sleep is None: + sleep = 7 if self.debug else 30 + if self.debug: + self.report.log_debug_action("Set follow_orders tick to {}s".format(sleep)) + tick = 0 + self.report.log_stage("follow_orders_begin") + while len(self.trades.all_orders(state="open")) > 0: + time.sleep(sleep) + tick += 1 + open_orders = self.trades.all_orders(state="open") + self.report.log_stage("follow_orders_tick_{}".format(tick)) + self.report.log_orders(open_orders, tick=tick) + for order in open_orders: + status = order.get_status() + if status != "open": + self.report.log_order(order, tick, finished=True) + else: + order.trade.update_order(order, tick) + if status == "error_disappeared": + self.report.log_error("follow_orders", + message="{} disappeared, recreating it".format(order)) + new_order = order.trade.prepare_order( + compute_value=order.trade.tick_actions_recreate(tick)) + if new_order is not None: + new_order.run() + self.report.log_order(order, tick, new_order=new_order) + + self.report.log_stage("follow_orders_end") + + def prepare_trades(self, base_currency="BTC", liquidity="medium", + compute_value="average", repartition=None, only=None, + available_balance_only=False): + + self.report.log_stage("prepare_trades", + base_currency=base_currency, liquidity=liquidity, + compute_value=compute_value, only=only, + repartition=repartition, available_balance_only=available_balance_only) + + if available_balance_only: + repartition, total_base_value, values_in_base = self.balances.available_balances_for_repartition( + base_currency=base_currency, liquidity=liquidity, + repartition=repartition, compute_value=compute_value) + else: + values_in_base = self.balances.in_currency(base_currency, + compute_value=compute_value) + total_base_value = sum(values_in_base.values()) + new_repartition = self.balances.dispatch_assets(total_base_value, + liquidity=liquidity, repartition=repartition) + if available_balance_only: + for currency, amount in values_in_base.items(): + if currency != base_currency and currency not in new_repartition: + new_repartition[currency] = amount + + self.trades.compute_trades(values_in_base, new_repartition, only=only) + + def print_tickers(self, base_currency="BTC"): + if base_currency is not None: + self.report.print_log("total:") + self.report.print_log(sum(self.balances.in_currency(base_currency).values())) + +class Processor: + scenarios = { + "wait_for_cryptoportfolio": [ + { + "name": "wait", + "number": 1, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + ], + "print_balances": [ + { + "name": "print_balances", + "number": 1, + "fetch_balances_begin": { + "log_tickers": True, + "add_usdt": True, + "add_portfolio": True + }, + "print_tickers": { "base_currency": "BTC" }, + } + ], + "print_orders": [ + { + "name": "wait", + "number": 1, + "before": True, + "after": False, + "wait_for_recent": {}, + }, + { + "name": "make_orders", + "number": 2, + "before": False, + "after": True, + "fetch_balances_begin": {}, + "prepare_trades": { "compute_value": "average" }, + "prepare_orders": { "compute_value": "average" }, + }, + ], + "sell_needed": [ + { + "name": "print_balances", + "number": 0, + "before": True, + "after": False, + "fetch_balances_begin": { + "checkpoint": "end", + "log_tickers": True, + "add_usdt": True, + "add_portfolio": True + }, + }, + { + "name": "wait", + "number": 1, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "sell", + "number": 2, + "before": False, + "after": True, + "fetch_balances_begin": {}, + "fetch_balances_end": {}, + "prepare_trades": {}, + "prepare_orders": { "only": "dispose", "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + { + "name": "buy", + "number": 3, + "before": False, + "after": True, + "fetch_balances_begin": {}, + "fetch_balances_end": { + "checkpoint": "begin", + "add_usdt": True, + "log_tickers": True + }, + "prepare_trades": { "only": "acquire", "available_balance_only": True }, + "prepare_orders": { "only": "acquire", "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + ], + "sell_all": [ + { + "name": "all_sell", + "number": 1, + "before": True, + "after": False, + "fetch_balances_begin": { + "checkpoint": "end", + "log_tickers": True, + "add_usdt": True, + "add_portfolio": True + }, + "fetch_balances_end": {}, + "prepare_trades": { "repartition": { "base_currency": (1, "long") } }, + "prepare_orders": { "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + { + "name": "wait", + "number": 2, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "all_buy", + "number": 3, + "before": False, + "after": True, + "fetch_balances_begin": {}, + "fetch_balances_end": { + "checkpoint": "begin", + "add_usdt": True, + "log_tickers": True + }, + "prepare_trades": { "available_balance_only": True }, + "prepare_orders": { "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + ] + } + + ordered_actions = [ + "wait_for_recent", "prepare_trades", "prepare_orders", + "move_balances", "run_orders", "follow_orders", + "close_trades", "print_tickers"] + + def __init__(self, market): + self.market = market + + def select_steps(self, scenario, step): + if step == "all": + return scenario + elif step == "before" or step == "after": + return list(filter(lambda x: x.get(step, False), scenario)) + elif type(step) == int: + return [scenario[step-1]] + elif type(step) == str: + return list(filter(lambda x: x["name"] == step, scenario)) + else: + raise TypeError("Unknown step {}".format(step)) + + def can_process(self, scenario_name): + return scenario_name in self.scenarios + + def process(self, scenario_name, steps="all", options={}): + if not self.can_process(scenario_name): + raise TypeError("Unknown scenario {}".format(scenario_name)) + scenario = self.scenarios[scenario_name] + selected_steps = [] + + if type(steps) == str or type(steps) == int: + selected_steps += self.select_steps(scenario, steps) + else: + for step in steps: + selected_steps += self.select_steps(scenario, step) + for step in selected_steps: + self.process_step(scenario_name, step, options) + + def process_step(self, scenario_name, step, options): + process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) + self.market.report.log_stage("{}_begin".format(process_name)) + + if "fetch_balances_begin" in step: + self.run_action("fetch_balances", step["fetch_balances_begin"], + dict(options, tag="{}_begin".format(process_name))) + + for action in self.ordered_actions: + if action in step: + self.run_action(action, step[action], options) + + if "fetch_balances_end" in step: + self.run_action("fetch_balances", step["fetch_balances_end"], + dict(options, tag="{}_end".format(process_name))) + + self.market.report.log_stage("{}_end".format(process_name)) + + def method_arguments(self, action): + import inspect + + if action == "wait_for_recent": + method = Portfolio.wait_for_recent + elif action == "prepare_trades": + method = self.market.prepare_trades + elif action == "prepare_orders": + method = self.market.trades.prepare_orders + elif action == "move_balances": + method = self.market.move_balances + elif action == "run_orders": + method = self.market.trades.run_orders + elif action == "follow_orders": + method = self.market.follow_orders + elif action == "close_trades": + method = self.market.trades.close_trades + elif action == "print_tickers": + method = self.market.print_tickers + elif action == "fetch_balances": + method = self.market.balances.fetch_balances + + signature = inspect.getfullargspec(method) + defaults = signature.defaults or [] + kwargs = signature.args[-len(defaults):] + + return [method, kwargs] + + def parse_args(self, action, default_args, options): + method, allowed_arguments = self.method_arguments(action) + args = {k: v for k, v in {**default_args, **options}.items() if k in allowed_arguments } + + if "repartition" in args and "base_currency" in args["repartition"]: + r = args["repartition"] + r[args.get("base_currency", "BTC")] = r.pop("base_currency") + + return method, args + + def run_action(self, action, default_args, options): + method, args = self.parse_args(action, default_args, options) + + method(**args)