X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=helper.py;h=f14fd580f967c2e64985235c14af1bd4d80873fb;hb=9db7d156833cd384baa64b6148b5c646bfcc41f8;hp=fa92ac71ab631d78f36ffeacc4ad1d94bfecb63f;hpb=eb9c92e155941b51042ba57e23f651454bd8e55a;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/helper.py b/helper.py index fa92ac7..f14fd58 100644 --- a/helper.py +++ b/helper.py @@ -1,178 +1,317 @@ -import time -from ccxt import ExchangeError -from store import * - -def move_balances(market, debug=False): - needed_in_margin = {} - moving_to_margin = {} - - for currency in BalanceStore.all: - if BalanceStore.all[currency].margin_free != 0: - needed_in_margin[currency] = 0 - for trade in TradeStore.all: - if trade.value_to.currency not in needed_in_margin: - needed_in_margin[trade.value_to.currency] = 0 - if trade.trade_type == "short": - needed_in_margin[trade.value_to.currency] += abs(trade.value_to) - for currency, needed in needed_in_margin.items(): - current_balance = BalanceStore.all[currency].margin_free - moving_to_margin[currency] = (needed - current_balance) - delta = moving_to_margin[currency].value - if debug: - ReportStore.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency])) - continue - if delta > 0: - market.transfer_balance(currency, delta, "exchange", "margin") - elif delta < 0: - market.transfer_balance(currency, -delta, "margin", "exchange") - ReportStore.log_move_balances(needed_in_margin, moving_to_margin, debug) - - BalanceStore.fetch_balances(market) - -ticker_cache = {} -ticker_cache_timestamp = time.time() -def get_ticker(c1, c2, market, refresh=False): - global ticker_cache, ticker_cache_timestamp - def invert(ticker): - return { - "inverted": True, - "average": (1/ticker["bid"] + 1/ticker["ask"]) / 2, - "original": ticker, - } - def augment_ticker(ticker): - ticker.update({ - "inverted": False, - "average": (ticker["bid"] + ticker["ask"] ) / 2, - }) - - if time.time() - ticker_cache_timestamp > 5: - ticker_cache = {} - ticker_cache_timestamp = time.time() - elif not refresh: - if (c1, c2, market.__class__) in ticker_cache: - return ticker_cache[(c1, c2, market.__class__)] - if (c2, c1, market.__class__) in ticker_cache: - return invert(ticker_cache[(c2, c1, market.__class__)]) +from datetime import datetime +import argparse +import configparser +import psycopg2 +import os +import sys - try: - ticker_cache[(c1, c2, market.__class__)] = market.fetch_ticker("{}/{}".format(c1, c2)) - augment_ticker(ticker_cache[(c1, c2, market.__class__)]) - except ExchangeError: - try: - ticker_cache[(c2, c1, market.__class__)] = market.fetch_ticker("{}/{}".format(c2, c1)) - augment_ticker(ticker_cache[(c2, c1, market.__class__)]) - except ExchangeError: - ticker_cache[(c1, c2, market.__class__)] = None - return get_ticker(c1, c2, market) - -fees_cache = {} -def fetch_fees(market): - global fees_cache - if market.__class__ not in fees_cache: - fees_cache[market.__class__] = market.fetch_fees() - return fees_cache[market.__class__] - -def prepare_trades(market, base_currency="BTC", liquidity="medium", compute_value="average", debug=False): - ReportStore.log_stage("prepare_trades") - values_in_base = BalanceStore.in_currency(base_currency, market, compute_value=compute_value) - total_base_value = sum(values_in_base.values()) - new_repartition = BalanceStore.dispatch_assets(total_base_value, liquidity=liquidity) - # Recompute it in case we have new currencies - values_in_base = BalanceStore.in_currency(base_currency, market, compute_value=compute_value) - TradeStore.compute_trades(values_in_base, new_repartition, market=market, debug=debug) - -def update_trades(market, base_currency="BTC", liquidity="medium", compute_value="average", only=None, debug=False): - ReportStore.log_stage("update_trades") - values_in_base = BalanceStore.in_currency(base_currency, market, compute_value=compute_value) - total_base_value = sum(values_in_base.values()) - new_repartition = BalanceStore.dispatch_assets(total_base_value, liquidity=liquidity) - TradeStore.compute_trades(values_in_base, new_repartition, only=only, market=market, debug=debug) - -def prepare_trades_to_sell_all(market, base_currency="BTC", compute_value="average", debug=False): - ReportStore.log_stage("prepare_trades_to_sell_all") - values_in_base = BalanceStore.in_currency(base_currency, market, compute_value=compute_value) - total_base_value = sum(values_in_base.values()) - new_repartition = BalanceStore.dispatch_assets(total_base_value, repartition={ base_currency: (1, "long") }) - TradeStore.compute_trades(values_in_base, new_repartition, market=market, debug=debug) - -def follow_orders(sleep=None): - if sleep is None: - sleep = 7 if TradeStore.debug else 30 - if TradeStore.debug: - ReportStore.log_debug_action("Set follow_orders tick to {}s".format(sleep)) - tick = 0 - ReportStore.log_stage("follow_orders_begin") - while len(TradeStore.all_orders(state="open")) > 0: - time.sleep(sleep) - tick += 1 - open_orders = TradeStore.all_orders(state="open") - ReportStore.log_stage("follow_orders_tick_{}".format(tick)) - ReportStore.log_orders(open_orders, tick=tick) - for order in open_orders: - if order.get_status() != "open": - ReportStore.log_order(order, tick, finished=True) +import portfolio + +def make_order(market, value, currency, action="acquire", + close_if_possible=False, base_currency="BTC", follow=True, + compute_value="average"): + """ + Make an order on market + "market": The market on which to place the order + "value": The value in *base_currency* to acquire, + or in *currency* to dispose. + use negative for margin trade. + "action": "acquire" or "dispose". + "acquire" will buy long or sell short, + "dispose" will sell long or buy short. + "currency": The currency to acquire or dispose + "base_currency": The base currency. The value is expressed in that + currency (default: BTC) + "follow": Whether to follow the order once run (default: True) + "close_if_possible": Whether to try to close the position at the end + of the trade, i.e. reach exactly 0 at the end + (only meaningful in "dispose"). May have + unwanted effects if the end value of the + currency is not 0. + "compute_value": Compute value to place the order + """ + market.report.log_stage("make_order_begin") + market.balances.fetch_balances(tag="make_order_begin") + if action == "acquire": + trade = portfolio.Trade( + portfolio.Amount(base_currency, 0), + portfolio.Amount(base_currency, value), + currency, market) + else: + amount = portfolio.Amount(currency, value) + trade = portfolio.Trade( + amount.in_currency(base_currency, market, compute_value=compute_value), + portfolio.Amount(base_currency, 0), + currency, market) + market.trades.all.append(trade) + order = trade.prepare_order( + close_if_possible=close_if_possible, + compute_value=compute_value) + market.report.log_orders([order], None, compute_value) + market.trades.run_orders() + if follow: + market.follow_orders() + market.balances.fetch_balances(tag="make_order_end") + else: + market.report.log_stage("make_order_end_not_followed") + return order + market.report.log_stage("make_order_end") + +def get_user_market(config_path, user_id, debug=False): + import market + pg_config, report_path = main_parse_config(config_path) + market_config = list(main_fetch_markets(pg_config, str(user_id)))[0][0] + return market.Market.from_config(market_config, debug=debug) + +def main_parse_args(argv): + parser = argparse.ArgumentParser( + description="Run the trade bot") + + parser.add_argument("-c", "--config", + default="config.ini", + required=False, + help="Config file to load (default: config.ini)") + parser.add_argument("--before", + default=False, action='store_const', const=True, + help="Run the steps before the cryptoportfolio update") + parser.add_argument("--after", + default=False, action='store_const', const=True, + help="Run the steps after the cryptoportfolio update") + parser.add_argument("--debug", + default=False, action='store_const', const=True, + help="Run in debug mode") + parser.add_argument("--user", + default=None, required=False, help="Only run for that user") + parser.add_argument("--action", + action='append', + help="Do a different action than trading (add several times to chain)") + + args = parser.parse_args(argv) + + if not os.path.exists(args.config): + print("no config file found, exiting") + sys.exit(1) + + return args + +def main_parse_config(config_file): + config = configparser.ConfigParser() + config.read(config_file) + + if "postgresql" not in config: + print("no configuration for postgresql in config file") + sys.exit(1) + + if "app" in config and "report_path" in config["app"]: + report_path = config["app"]["report_path"] + + if not os.path.exists(report_path): + os.makedirs(report_path) + else: + report_path = None + + return [config["postgresql"], report_path] + +def main_fetch_markets(pg_config, user): + connection = psycopg2.connect(**pg_config) + cursor = connection.cursor() + + if user is None: + cursor.execute("SELECT config,user_id FROM market_configs") + else: + cursor.execute("SELECT config,user_id FROM market_configs WHERE user_id = %s", user) + + for row in cursor: + yield row + +def main_process_market(user_market, actions, before=False, after=False): + if len(actions or []) == 0: + if before: + Processor(user_market).process("sell_all", steps="before") + if after: + Processor(user_market).process("sell_all", steps="after") + else: + for action in actions: + if action in globals(): + (globals()[action])(user_market) else: - order.trade.update_order(order, tick) - ReportStore.log_stage("follow_orders_end") + raise NotImplementedError("Unknown action {}".format(action)) + +def main_store_report(report_path, user_id, user_market): + try: + if report_path is not None: + report_file = "{}/{}_{}.json".format(report_path, datetime.now().isoformat(), user_id) + with open(report_file, "w") as f: + f.write(user_market.report.to_json()) + except Exception as e: + print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) def print_orders(market, base_currency="BTC"): - ReportStore.log_stage("print_orders") - BalanceStore.fetch_balances(market, tag="print_orders") - prepare_trades(market, base_currency=base_currency, compute_value="average", debug=True) - TradeStore.prepare_orders(compute_value="average") + market.report.log_stage("print_orders") + market.balances.fetch_balances(tag="print_orders") + market.prepare_trades(base_currency=base_currency, compute_value="average") + market.trades.prepare_orders(compute_value="average") def print_balances(market, base_currency="BTC"): - BalanceStore.fetch_balances(market) + market.report.log_stage("print_balances") + market.balances.fetch_balances() if base_currency is not None: - ReportStore.print_log("total:") - ReportStore.print_log(sum(BalanceStore.in_currency(base_currency, market).values())) - -def reset_all(): - # use them as regular classes, sub-object of market - ReportStore.logs = [] - BalanceStore.all = {} - TradeStore.all = [] - -def process_sell_needed__1_sell(market, liquidity="medium", base_currency="BTC", debug=False): - ReportStore.log_stage("process_sell_needed__1_sell_begin") - BalanceStore.fetch_balances(market, tag="process_sell_needed__1_sell_begin") - prepare_trades(market, liquidity=liquidity, base_currency=base_currency, debug=debug) - TradeStore.prepare_orders(compute_value="average", only="dispose") - TradeStore.run_orders() - follow_orders() - BalanceStore.fetch_balances(market, tag="process_sell_needed__1_sell_end") - ReportStore.log_stage("process_sell_needed__1_sell_end") - -def process_sell_needed__2_buy(market, liquidity="medium", base_currency="BTC", debug=False): - ReportStore.log_stage("process_sell_needed__2_buy_begin") - BalanceStore.fetch_balances(market, tag="process_sell_needed__2_buy_begin") - update_trades(market, base_currency=base_currency, liquidity=liquidity, debug=debug, only="acquire") - TradeStore.prepare_orders(compute_value="average", only="acquire") - move_balances(market, debug=debug) - TradeStore.run_orders() - follow_orders() - BalanceStore.fetch_balances(market, tag="process_sell_needed__2_buy_end") - ReportStore.log_stage("process_sell_needed__2_buy_end") - -def process_sell_all__1_all_sell(market, base_currency="BTC", debug=False, liquidity="medium"): - ReportStore.log_stage("process_sell_all__1_all_sell_begin") - BalanceStore.fetch_balances(market, tag="process_sell_all__1_all_sell_begin") - prepare_trades_to_sell_all(market, base_currency=base_currency, debug=debug) - TradeStore.prepare_orders(compute_value="average") - TradeStore.run_orders() - follow_orders() - BalanceStore.fetch_balances(market, tag="process_sell_all__1_all_sell_end") - ReportStore.log_stage("process_sell_all__1_all_sell_end") - -def process_sell_all__2_all_buy(market, base_currency="BTC", debug=False, liquidity="medium"): - ReportStore.log_stage("process_sell_all__2_all_buy_begin") - BalanceStore.fetch_balances(market, tag="process_sell_all__2_all_buy_begin") - prepare_trades(market, liquidity=liquidity, base_currency=base_currency, debug=debug) - TradeStore.prepare_orders(compute_value="average") - move_balances(market, debug=debug) - TradeStore.run_orders() - follow_orders() - BalanceStore.fetch_balances(market, tag="process_sell_all__2_all_buy_end") - ReportStore.log_stage("process_sell_all__2_all_buy_end") + market.report.print_log("total:") + market.report.print_log(sum(market.balances.in_currency(base_currency).values())) + +class Processor: + scenarios = { + "sell_needed": [ + { + "name": "wait", + "number": 0, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "sell", + "number": 1, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": {}, + "prepare_orders": { "only": "dispose", "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + }, + { + "name": "buy", + "number": 2, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": { "only": "acquire" }, + "prepare_orders": { "only": "acquire", "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + }, + ], + "sell_all": [ + { + "name": "all_sell", + "number": 1, + "before": True, + "after": False, + "fetch_balances": ["begin", "end"], + "prepare_trades": { "repartition": { "BTC": (1, "long") } }, + "prepare_orders": { "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + }, + { + "name": "wait", + "number": 2, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "all_buy", + "number": 3, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": {}, + "prepare_orders": { "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + }, + ] + } + + allowed_arguments = { + "wait_for_recent": ["delta"], + "prepare_trades": ["base_currency", "liquidity", "compute_value", "repartition", "only"], + "prepare_orders": ["only", "compute_value"], + "move_balances": [], + "run_orders": [], + "follow_orders": ["sleep"], + } + + def __init__(self, market): + self.market = market + + def select_steps(self, scenario, step): + if step == "all": + return scenario + elif step == "before" or step == "after": + return list(filter(lambda x: step in x and x[step], scenario)) + elif type(step) == int: + return [scenario[step-1]] + elif type(step) == str: + return list(filter(lambda x: x["name"] == step, scenario)) + else: + raise TypeError("Unknown step {}".format(step)) + + def process(self, scenario_name, steps="all", **kwargs): + scenario = self.scenarios[scenario_name] + selected_steps = [] + + if type(steps) == str or type(steps) == int: + selected_steps += self.select_steps(scenario, steps) + else: + for step in steps: + selected_steps += self.select_steps(scenario, step) + for step in selected_steps: + self.process_step(scenario_name, step, **kwargs) + + def process_step(self, scenario_name, step, **kwargs): + process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) + self.market.report.log_stage("{}_begin".format(process_name)) + if "begin" in step.get("fetch_balances", []): + self.market.balances.fetch_balances(tag="{}_begin".format(process_name)) + + for action in ["wait_for_recent", "prepare_trades", + "prepare_orders", "move_balances", "run_orders", + "follow_orders"]: + if action in step: + self.run_action(action, step[action], **kwargs) + + if "end" in step.get("fetch_balances", []): + self.market.balances.fetch_balances(tag="{}_end".format(process_name)) + self.market.report.log_stage("{}_end".format(process_name)) + + def run_action(self, action, default_args, **kwargs): + args = {k: v for k, v in {**default_args, **kwargs}.items() if k in self.allowed_arguments[action] } + + if action == "wait_for_recent": + portfolio.Portfolio.wait_for_recent(self.market, **args) + if action == "prepare_trades": + self.market.prepare_trades(**args) + if action == "prepare_orders": + self.market.trades.prepare_orders(**args) + if action == "move_balances": + self.market.move_balances(**args) + if action == "run_orders": + self.market.trades.run_orders(**args) + if action == "follow_orders": + self.market.follow_orders(**args) + +def process_sell_needed__1_sell(market, liquidity="medium", base_currency="BTC"): + Processor(market).process("sell_needed", steps="sell", + liquidity=liquidity, base_currency=base_currency) + +def process_sell_needed__2_buy(market, liquidity="medium", base_currency="BTC"): + Processor(market).process("sell_needed", steps="buy", + liquidity=liquidity, base_currency=base_currency) + +def process_sell_all__1_all_sell(market, base_currency="BTC", liquidity="medium"): + Processor(market).process("sell_all", steps="all_sell", + liquidity=liquidity, base_currency=base_currency) + +def process_sell_all__2_wait(market, liquidity="medium", base_currency="BTC"): + Processor(market).process("sell_all", steps="wait", + liquidity=liquidity, base_currency=base_currency) +def process_sell_all__3_all_buy(market, base_currency="BTC", liquidity="medium"): + Processor(market).process("sell_all", steps="all_buy", + liquidity=liquidity, base_currency=base_currency)