]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blobdiff - market.py
Merge branch 'dev'
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / market.py
index fc6f9f667fb320e469f7fa103d398d2c40d184b2..5876071875750df01a1d658698216b2c30670bb0 100644 (file)
--- a/market.py
+++ b/market.py
@@ -1,8 +1,7 @@
 from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce
 import ccxt_wrapper as ccxt
 import time
-import psycopg2
-import redis
+import dbs
 from store import *
 from cachetools.func import ttl_cache
 from datetime import datetime
@@ -27,7 +26,7 @@ class Market:
         self.balances = BalanceStore(self)
         self.processor = Processor(self)
 
-        for key in ["user_id", "market_id", "pg_config", "redis_config"]:
+        for key in ["user_id", "market_id"]:
             setattr(self, key, kwargs.get(key, None))
 
         self.report.log_market(self.args)
@@ -45,9 +44,9 @@ class Market:
         date = datetime.datetime.now()
         if self.args.report_path is not None:
             self.store_file_report(date)
-        if self.pg_config is not None and self.args.report_db:
+        if dbs.psql_connected() and self.args.report_db:
             self.store_database_report(date)
-        if self.redis_config is not None and self.args.report_redis:
+        if dbs.redis_connected() and self.args.report_redis:
             self.store_redis_report(date)
 
     def store_file_report(self, date):
@@ -64,29 +63,26 @@ class Market:
         try:
             report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
             line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
-            connection = psycopg2.connect(**self.pg_config)
-            cursor = connection.cursor()
+            cursor = dbs.psql.cursor()
             cursor.execute(report_query, (date, self.market_id, self.debug))
             report_id = cursor.fetchone()[0]
             for date, type_, payload in self.report.to_json_array():
                 cursor.execute(line_query, (date, report_id, type_, payload))
 
-            connection.commit()
+            dbs.psql.commit()
             cursor.close()
-            connection.close()
         except Exception as e:
             print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
 
     def store_redis_report(self, date):
         try:
-            conn = redis.Redis(**self.redis_config)
             for type_, log in self.report.to_json_redis():
                 key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_)
-                conn.set(key, log, ex=31*24*60*60)
+                dbs.redis.set(key, log, ex=31*24*60*60)
                 key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_)
-                conn.set(key, log)
+                dbs.redis.set(key, log)
             key = "/cryptoportfolio/{}/latest/date".format(self.market_id)
-            conn.set(key, date.isoformat())
+            dbs.redis.set(key, date.isoformat())
         except Exception as e:
             print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e))
 
@@ -259,6 +255,7 @@ class Processor:
                     "name": "print_balances",
                     "number": 1,
                     "fetch_balances": ["begin"],
+                    "fetch_balances_args": { "add_portfolio": True },
                     "print_tickers": { "base_currency": "BTC" },
                     }
                 ],
@@ -390,15 +387,19 @@ class Processor:
     def process_step(self, scenario_name, step, kwargs):
         process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
         self.market.report.log_stage("{}_begin".format(process_name))
+
+        fetch_args = step.get("fetch_balances_args", {})
         if "begin" in step.get("fetch_balances", []):
-            self.market.balances.fetch_balances(tag="{}_begin".format(process_name), log_tickers=True)
+            self.market.balances.fetch_balances(tag="{}_begin".format(process_name),
+                    log_tickers=True, **fetch_args)
 
         for action in self.ordered_actions:
             if action in step:
                 self.run_action(action, step[action], kwargs)
 
         if "end" in step.get("fetch_balances", []):
-            self.market.balances.fetch_balances(tag="{}_end".format(process_name), log_tickers=True)
+            self.market.balances.fetch_balances(tag="{}_end".format(process_name),
+                    log_tickers=True, **fetch_args)
         self.market.report.log_stage("{}_end".format(process_name))
 
     def method_arguments(self, action):