X-Git-Url: https://git.immae.eu/?p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git;a=blobdiff_plain;f=market.py;h=6c14ae208af3add2eb1bf65ab6627696945e8b22;hp=3381d1e2643e03cb35019cbd14eb29f0f9108cbf;hb=1f117ac79e10c3c9728d3b267d134dec2a165603;hpb=f9226903cb53a9b303a26de562e321159349f8df diff --git a/market.py b/market.py index 3381d1e..6c14ae2 100644 --- a/market.py +++ b/market.py @@ -3,6 +3,8 @@ import ccxt_wrapper as ccxt import time from store import * from cachetools.func import ttl_cache +from datetime import datetime +import portfolio class Market: debug = False @@ -11,17 +13,21 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, debug=False): + def __init__(self, ccxt_instance, debug=False, user_id=None, report_path=None): self.debug = debug self.ccxt = ccxt_instance self.ccxt._market = self self.report = ReportStore(self) self.trades = TradeStore(self) self.balances = BalanceStore(self) + self.processor = Processor(self) + + self.user_id = user_id + self.report_path = report_path @classmethod - def from_config(cls, config, debug=False): - config["apiKey"] = config.pop("key") + def from_config(cls, config, debug=False, user_id=None, report_path=None): + config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) @@ -37,7 +43,34 @@ class Market: ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, ccxt_instance.session.__class__) - return cls(ccxt_instance, debug=debug) + return cls(ccxt_instance, debug=debug, user_id=user_id, report_path=report_path) + + def store_report(self): + try: + if self.report_path is not None: + report_file = "{}/{}_{}.json".format(self.report_path, datetime.now().isoformat(), self.user_id) + with open(report_file, "w") as f: + f.write(self.report.to_json()) + except Exception as e: + print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) + + def process(self, actions, before=False, after=False): + try: + if len(actions or []) == 0: + if before: + self.processor.process("sell_all", steps="before") + if after: + self.processor.process("sell_all", steps="after") + else: + for action in actions: + if hasattr(self, action): + getattr(self, action)() + else: + self.report.log_error("market_process", message="Unknown action {}".format(action)) + except Exception as e: + self.report.log_error("market_process", exception=e) + finally: + self.store_report() def move_balances(self): needed_in_margin = {} @@ -143,3 +176,177 @@ class Market: liquidity=liquidity, repartition=repartition) self.trades.compute_trades(values_in_base, new_repartition, only=only) + # Helpers + def print_orders(self, base_currency="BTC"): + self.report.log_stage("print_orders") + self.balances.fetch_balances(tag="print_orders") + self.prepare_trades(base_currency=base_currency, compute_value="average") + self.trades.prepare_orders(compute_value="average") + + def print_balances(self, base_currency="BTC"): + self.report.log_stage("print_balances") + self.balances.fetch_balances() + if base_currency is not None: + self.report.print_log("total:") + self.report.print_log(sum(self.balances.in_currency(base_currency).values())) + +class Processor: + scenarios = { + "sell_needed": [ + { + "name": "wait", + "number": 0, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "sell", + "number": 1, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": {}, + "prepare_orders": { "only": "dispose", "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + { + "name": "buy", + "number": 2, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": { "only": "acquire" }, + "prepare_orders": { "only": "acquire", "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + ], + "sell_all": [ + { + "name": "all_sell", + "number": 1, + "before": True, + "after": False, + "fetch_balances": ["begin", "end"], + "prepare_trades": { "repartition": { "base_currency": (1, "long") } }, + "prepare_orders": { "compute_value": "average" }, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + { + "name": "wait", + "number": 2, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "all_buy", + "number": 3, + "before": False, + "after": True, + "fetch_balances": ["begin", "end"], + "prepare_trades": {}, + "prepare_orders": { "compute_value": "average" }, + "move_balances": {}, + "run_orders": {}, + "follow_orders": {}, + "close_trades": {}, + }, + ] + } + + ordered_actions = [ + "wait_for_recent", "prepare_trades", "prepare_orders", + "move_balances", "run_orders", "follow_orders", + "close_trades"] + + def __init__(self, market): + self.market = market + + def select_steps(self, scenario, step): + if step == "all": + return scenario + elif step == "before" or step == "after": + return list(filter(lambda x: step in x and x[step], scenario)) + elif type(step) == int: + return [scenario[step-1]] + elif type(step) == str: + return list(filter(lambda x: x["name"] == step, scenario)) + else: + raise TypeError("Unknown step {}".format(step)) + + def process(self, scenario_name, steps="all", **kwargs): + scenario = self.scenarios[scenario_name] + selected_steps = [] + + if type(steps) == str or type(steps) == int: + selected_steps += self.select_steps(scenario, steps) + else: + for step in steps: + selected_steps += self.select_steps(scenario, step) + for step in selected_steps: + self.process_step(scenario_name, step, kwargs) + + def process_step(self, scenario_name, step, kwargs): + process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) + self.market.report.log_stage("{}_begin".format(process_name)) + if "begin" in step.get("fetch_balances", []): + self.market.balances.fetch_balances(tag="{}_begin".format(process_name)) + + for action in self.ordered_actions: + if action in step: + self.run_action(action, step[action], kwargs) + + if "end" in step.get("fetch_balances", []): + self.market.balances.fetch_balances(tag="{}_end".format(process_name)) + self.market.report.log_stage("{}_end".format(process_name)) + + def method_arguments(self, action): + import inspect + + if action == "wait_for_recent": + method = portfolio.Portfolio.wait_for_recent + elif action == "prepare_trades": + method = self.market.prepare_trades + elif action == "prepare_orders": + method = self.market.trades.prepare_orders + elif action == "move_balances": + method = self.market.move_balances + elif action == "run_orders": + method = self.market.trades.run_orders + elif action == "follow_orders": + method = self.market.follow_orders + elif action == "close_trades": + method = self.market.trades.close_trades + + signature = inspect.getfullargspec(method) + defaults = signature.defaults or [] + kwargs = signature.args[-len(defaults):] + + return [method, kwargs] + + def parse_args(self, action, default_args, kwargs): + method, allowed_arguments = self.method_arguments(action) + args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments } + + if "repartition" in args and "base_currency" in args["repartition"]: + r = args["repartition"] + r[args.get("base_currency", "BTC")] = r.pop("base_currency") + + return method, args + + def run_action(self, action, default_args, kwargs): + method, args = self.parse_args(action, default_args, kwargs) + + if action == "wait_for_recent": + method(self.market, **args) + else: + method(**args) +