]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blobdiff - market.py
Add latest date to redis
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / market.py
index e16641c476865bc3977ceaa8f30ccce5296925ab..ce418415ad30e30ab6c9e556ee1a3d2b1c4cd94f 100644 (file)
--- a/market.py
+++ b/market.py
@@ -2,9 +2,11 @@ from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce
 import ccxt_wrapper as ccxt
 import time
 import psycopg2
+import redis
 from store import *
 from cachetools.func import ttl_cache
 from datetime import datetime
+import datetime
 from retry import retry
 import portfolio
 
@@ -25,10 +27,10 @@ class Market:
         self.balances = BalanceStore(self)
         self.processor = Processor(self)
 
-        for key in ["user_id", "market_id", "pg_config"]:
+        for key in ["user_id", "market_id", "pg_config", "redis_config"]:
             setattr(self, key, kwargs.get(key, None))
 
-        self.report.log_market(self.args, self.user_id, self.market_id)
+        self.report.log_market(self.args)
 
     @classmethod
     def from_config(cls, config, args, **kwargs):
@@ -40,11 +42,13 @@ class Market:
 
     def store_report(self):
         self.report.merge(Portfolio.report)
-        date = datetime.now()
+        date = datetime.datetime.now()
         if self.args.report_path is not None:
             self.store_file_report(date)
         if self.pg_config is not None and self.args.report_db:
             self.store_database_report(date)
+        if self.redis_config is not None and self.args.report_redis:
+            self.store_redis_report(date)
 
     def store_file_report(self, date):
         try:
@@ -73,19 +77,28 @@ class Market:
         except Exception as e:
             print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
 
+    def store_redis_report(self, date):
+        try:
+            conn = redis.Redis(**self.redis_config)
+            for type_, log in self.report.to_json_redis():
+                key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_)
+                conn.set(key, log, ex=31*24*60*60)
+                key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_)
+                conn.set(key, log)
+            key = "/cryptoportfolio/{}/latest/date".format(self.market_id)
+            conn.set(key, date.isoformat())
+        except Exception as e:
+            print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e))
+
     def process(self, actions, before=False, after=False):
         try:
-            if len(actions or []) == 0:
-                if before:
-                    self.processor.process("sell_all", steps="before")
-                if after:
-                    self.processor.process("sell_all", steps="after")
-            else:
-                for action in actions:
-                    if hasattr(self, action):
-                        getattr(self, action)()
-                    else:
-                        self.report.log_error("market_process", message="Unknown action {}".format(action))
+            for action in actions:
+                if bool(before) is bool(after):
+                    self.processor.process(action, steps="all")
+                elif before:
+                    self.processor.process(action, steps="before")
+                elif after:
+                    self.processor.process(action, steps="after")
         except Exception as e:
             self.report.log_error("market_process", exception=e)
         finally:
@@ -211,16 +224,7 @@ class Market:
                 liquidity=liquidity, repartition=repartition)
         self.trades.compute_trades(values_in_base, new_repartition, only=only)
 
-    # Helpers
-    def print_orders(self, base_currency="BTC"):
-        self.report.log_stage("print_orders")
-        self.balances.fetch_balances(tag="print_orders")
-        self.prepare_trades(base_currency=base_currency, compute_value="average")
-        self.trades.prepare_orders(compute_value="average")
-
-    def print_balances(self, base_currency="BTC"):
-        self.report.log_stage("print_balances")
-        self.balances.fetch_balances()
+    def print_tickers(self, base_currency="BTC"):
         if base_currency is not None:
             self.report.print_log("total:")
             self.report.print_log(sum(self.balances.in_currency(base_currency).values()))
@@ -236,12 +240,20 @@ class Processor:
                     "wait_for_recent": {},
                     },
                 ],
+            "print_balances": [
+                {
+                    "name": "print_balances",
+                    "number": 1,
+                    "fetch_balances": ["begin"],
+                    "print_tickers": { "base_currency": "BTC" },
+                    }
+                ],
             "print_orders": [
                 {
                     "name": "wait",
                     "number": 1,
-                    "before": False,
-                    "after": True,
+                    "before": True,
+                    "after": False,
                     "wait_for_recent": {},
                     },
                 {
@@ -327,7 +339,7 @@ class Processor:
     ordered_actions = [
             "wait_for_recent", "prepare_trades", "prepare_orders",
             "move_balances", "run_orders", "follow_orders",
-            "close_trades"]
+            "close_trades", "print_tickers"]
 
     def __init__(self, market):
         self.market = market
@@ -336,7 +348,7 @@ class Processor:
         if step == "all":
             return scenario
         elif step == "before" or step == "after":
-            return list(filter(lambda x: step in x and x[step], scenario))
+            return list(filter(lambda x: x.get(step, False), scenario))
         elif type(step) == int:
             return [scenario[step-1]]
         elif type(step) == str:
@@ -344,7 +356,12 @@ class Processor:
         else:
             raise TypeError("Unknown step {}".format(step))
 
+    def can_process(self, scenario_name):
+        return scenario_name in self.scenarios
+
     def process(self, scenario_name, steps="all", **kwargs):
+        if not self.can_process(scenario_name):
+            raise TypeError("Unknown scenario {}".format(scenario_name))
         scenario = self.scenarios[scenario_name]
         selected_steps = []
 
@@ -387,6 +404,8 @@ class Processor:
             method = self.market.follow_orders
         elif action == "close_trades":
             method = self.market.trades.close_trades
+        elif action == "print_tickers":
+            method = self.market.print_tickers
 
         signature = inspect.getfullargspec(method)
         defaults = signature.defaults or []