+class Processor:
+ scenarios = {
+ "sell_needed": [
+ {
+ "name": "wait",
+ "number": 0,
+ "before": False,
+ "after": True,
+ "wait_for_recent": {},
+ },
+ {
+ "name": "sell",
+ "number": 1,
+ "before": False,
+ "after": True,
+ "fetch_balances": ["begin", "end"],
+ "prepare_trades": {},
+ "prepare_orders": { "only": "dispose", "compute_value": "average" },
+ "run_orders": {},
+ "follow_orders": {},
+ },
+ {
+ "name": "buy",
+ "number": 2,
+ "before": False,
+ "after": True,
+ "fetch_balances": ["begin", "end"],
+ "prepare_trades": { "only": "acquire" },
+ "prepare_orders": { "only": "acquire", "compute_value": "average" },
+ "move_balances": {},
+ "run_orders": {},
+ "follow_orders": {},
+ },
+ ],
+ "sell_all": [
+ {
+ "name": "all_sell",
+ "number": 1,
+ "before": True,
+ "after": False,
+ "fetch_balances": ["begin", "end"],
+ "prepare_trades": { "repartition": { "BTC": (1, "long") } },
+ "prepare_orders": { "compute_value": "average" },
+ "run_orders": {},
+ "follow_orders": {},
+ },
+ {
+ "name": "wait",
+ "number": 2,
+ "before": False,
+ "after": True,
+ "wait_for_recent": {},
+ },
+ {
+ "name": "all_buy",
+ "number": 3,
+ "before": False,
+ "after": True,
+ "fetch_balances": ["begin", "end"],
+ "prepare_trades": {},
+ "prepare_orders": { "compute_value": "average" },
+ "move_balances": {},
+ "run_orders": {},
+ "follow_orders": {},
+ },
+ ]
+ }
+
+ allowed_arguments = {
+ "wait_for_recent": ["delta"],
+ "prepare_trades": ["base_currency", "liquidity", "compute_value", "repartition", "only"],
+ "prepare_orders": ["only", "compute_value"],
+ "move_balances": [],
+ "run_orders": [],
+ "follow_orders": ["sleep"],
+ }
+
+ def __init__(self, market):
+ self.market = market
+
+ def select_steps(self, scenario, step):
+ if step == "all":
+ return scenario
+ elif step == "before" or step == "after":
+ return list(filter(lambda x: step in x and x[step], scenario))
+ elif type(step) == int:
+ return [scenario[step-1]]
+ elif type(step) == str:
+ return list(filter(lambda x: x["name"] == step, scenario))
+ else:
+ raise TypeError("Unknown step {}".format(step))
+
+ def process(self, scenario_name, steps="all", **kwargs):
+ scenario = self.scenarios[scenario_name]
+ selected_steps = []
+
+ if type(steps) == str or type(steps) == int:
+ selected_steps += self.select_steps(scenario, steps)
+ else:
+ for step in steps:
+ selected_steps += self.select_steps(scenario, step)
+ for step in selected_steps:
+ self.process_step(scenario_name, step, **kwargs)
+
+ def process_step(self, scenario_name, step, **kwargs):
+ process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
+ self.market.report.log_stage("{}_begin".format(process_name))
+ if "begin" in step.get("fetch_balances", []):
+ self.market.balances.fetch_balances(tag="{}_begin".format(process_name))
+
+ for action in ["wait_for_recent", "prepare_trades",
+ "prepare_orders", "move_balances", "run_orders",
+ "follow_orders"]:
+ if action in step:
+ self.run_action(action, step[action], **kwargs)
+
+ if "end" in step.get("fetch_balances", []):
+ self.market.balances.fetch_balances(tag="{}_end".format(process_name))
+ self.market.report.log_stage("{}_end".format(process_name))
+
+ def run_action(self, action, default_args, **kwargs):
+ args = {k: v for k, v in {**default_args, **kwargs}.items() if k in self.allowed_arguments[action] }
+
+ if action == "wait_for_recent":
+ portfolio.Portfolio.wait_for_recent(self.market, **args)
+ if action == "prepare_trades":
+ self.market.prepare_trades(**args)
+ if action == "prepare_orders":
+ self.market.trades.prepare_orders(**args)
+ if action == "move_balances":
+ self.market.move_balances(**args)
+ if action == "run_orders":
+ self.market.trades.run_orders(**args)
+ if action == "follow_orders":
+ self.market.follow_orders(**args)
+