X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;ds=sidebyside;f=market.py;h=fc6f9f667fb320e469f7fa103d398d2c40d184b2;hb=2b1ee8f4d54fa1672510141a71a5817120ac031c;hp=4593eb1e3ecbe2f2d29c5de091a3cbe003d3957c;hpb=ceb7fc4c9e76857fefbe1dfe3f4dd3830d065a6f;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/market.py b/market.py index 4593eb1..fc6f9f6 100644 --- a/market.py +++ b/market.py @@ -2,6 +2,7 @@ from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time import psycopg2 +import redis from store import * from cachetools.func import ttl_cache from datetime import datetime @@ -26,7 +27,7 @@ class Market: self.balances = BalanceStore(self) self.processor = Processor(self) - for key in ["user_id", "market_id", "pg_config"]: + for key in ["user_id", "market_id", "pg_config", "redis_config"]: setattr(self, key, kwargs.get(key, None)) self.report.log_market(self.args) @@ -46,6 +47,8 @@ class Market: self.store_file_report(date) if self.pg_config is not None and self.args.report_db: self.store_database_report(date) + if self.redis_config is not None and self.args.report_redis: + self.store_redis_report(date) def store_file_report(self, date): try: @@ -74,6 +77,19 @@ class Market: except Exception as e: print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def store_redis_report(self, date): + try: + conn = redis.Redis(**self.redis_config) + for type_, log in self.report.to_json_redis(): + key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) + conn.set(key, log, ex=31*24*60*60) + key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) + conn.set(key, log) + key = "/cryptoportfolio/{}/latest/date".format(self.market_id) + conn.set(key, date.isoformat()) + except Exception as e: + print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): try: for action in actions: @@ -194,18 +210,32 @@ class Market: self.report.log_stage("follow_orders_end") def prepare_trades(self, base_currency="BTC", liquidity="medium", - compute_value="average", repartition=None, only=None): + compute_value="average", repartition=None, only=None, + available_balance_only=False): self.report.log_stage("prepare_trades", base_currency=base_currency, liquidity=liquidity, compute_value=compute_value, only=only, - repartition=repartition) + repartition=repartition, available_balance_only=available_balance_only) values_in_base = self.balances.in_currency(base_currency, compute_value=compute_value) - total_base_value = sum(values_in_base.values()) + if available_balance_only: + balance = self.balances.all.get(base_currency) + if balance is None: + total_base_value = portfolio.Amount(base_currency, 0) + else: + total_base_value = balance.exchange_free + balance.margin_available + else: + total_base_value = sum(values_in_base.values()) new_repartition = self.balances.dispatch_assets(total_base_value, liquidity=liquidity, repartition=repartition) + if available_balance_only: + for currency, amount in values_in_base.items(): + if currency != base_currency: + new_repartition.setdefault(currency, portfolio.Amount(base_currency, 0)) + new_repartition[currency] += amount + self.trades.compute_trades(values_in_base, new_repartition, only=only) def print_tickers(self, base_currency="BTC"): @@ -276,7 +306,7 @@ class Processor: "before": False, "after": True, "fetch_balances": ["begin", "end"], - "prepare_trades": { "only": "acquire" }, + "prepare_trades": { "only": "acquire", "available_balance_only": True }, "prepare_orders": { "only": "acquire", "compute_value": "average" }, "move_balances": {}, "run_orders": {}, @@ -310,7 +340,7 @@ class Processor: "before": False, "after": True, "fetch_balances": ["begin", "end"], - "prepare_trades": {}, + "prepare_trades": { "available_balance_only": True }, "prepare_orders": { "compute_value": "average" }, "move_balances": {}, "run_orders": {}, @@ -361,14 +391,14 @@ class Processor: process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) self.market.report.log_stage("{}_begin".format(process_name)) if "begin" in step.get("fetch_balances", []): - self.market.balances.fetch_balances(tag="{}_begin".format(process_name)) + self.market.balances.fetch_balances(tag="{}_begin".format(process_name), log_tickers=True) for action in self.ordered_actions: if action in step: self.run_action(action, step[action], kwargs) if "end" in step.get("fetch_balances", []): - self.market.balances.fetch_balances(tag="{}_end".format(process_name)) + self.market.balances.fetch_balances(tag="{}_end".format(process_name), log_tickers=True) self.market.report.log_stage("{}_end".format(process_name)) def method_arguments(self, action):