aboutsummaryrefslogtreecommitdiff
path: root/market.py
diff options
context:
space:
mode:
Diffstat (limited to 'market.py')
-rw-r--r--market.py31
1 files changed, 16 insertions, 15 deletions
diff --git a/market.py b/market.py
index fc6f9f6..5876071 100644
--- a/market.py
+++ b/market.py
@@ -1,8 +1,7 @@
1from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce 1from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce
2import ccxt_wrapper as ccxt 2import ccxt_wrapper as ccxt
3import time 3import time
4import psycopg2 4import dbs
5import redis
6from store import * 5from store import *
7from cachetools.func import ttl_cache 6from cachetools.func import ttl_cache
8from datetime import datetime 7from datetime import datetime
@@ -27,7 +26,7 @@ class Market:
27 self.balances = BalanceStore(self) 26 self.balances = BalanceStore(self)
28 self.processor = Processor(self) 27 self.processor = Processor(self)
29 28
30 for key in ["user_id", "market_id", "pg_config", "redis_config"]: 29 for key in ["user_id", "market_id"]:
31 setattr(self, key, kwargs.get(key, None)) 30 setattr(self, key, kwargs.get(key, None))
32 31
33 self.report.log_market(self.args) 32 self.report.log_market(self.args)
@@ -45,9 +44,9 @@ class Market:
45 date = datetime.datetime.now() 44 date = datetime.datetime.now()
46 if self.args.report_path is not None: 45 if self.args.report_path is not None:
47 self.store_file_report(date) 46 self.store_file_report(date)
48 if self.pg_config is not None and self.args.report_db: 47 if dbs.psql_connected() and self.args.report_db:
49 self.store_database_report(date) 48 self.store_database_report(date)
50 if self.redis_config is not None and self.args.report_redis: 49 if dbs.redis_connected() and self.args.report_redis:
51 self.store_redis_report(date) 50 self.store_redis_report(date)
52 51
53 def store_file_report(self, date): 52 def store_file_report(self, date):
@@ -64,29 +63,26 @@ class Market:
64 try: 63 try:
65 report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' 64 report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
66 line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' 65 line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
67 connection = psycopg2.connect(**self.pg_config) 66 cursor = dbs.psql.cursor()
68 cursor = connection.cursor()
69 cursor.execute(report_query, (date, self.market_id, self.debug)) 67 cursor.execute(report_query, (date, self.market_id, self.debug))
70 report_id = cursor.fetchone()[0] 68 report_id = cursor.fetchone()[0]
71 for date, type_, payload in self.report.to_json_array(): 69 for date, type_, payload in self.report.to_json_array():
72 cursor.execute(line_query, (date, report_id, type_, payload)) 70 cursor.execute(line_query, (date, report_id, type_, payload))
73 71
74 connection.commit() 72 dbs.psql.commit()
75 cursor.close() 73 cursor.close()
76 connection.close()
77 except Exception as e: 74 except Exception as e:
78 print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) 75 print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
79 76
80 def store_redis_report(self, date): 77 def store_redis_report(self, date):
81 try: 78 try:
82 conn = redis.Redis(**self.redis_config)
83 for type_, log in self.report.to_json_redis(): 79 for type_, log in self.report.to_json_redis():
84 key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) 80 key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_)
85 conn.set(key, log, ex=31*24*60*60) 81 dbs.redis.set(key, log, ex=31*24*60*60)
86 key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) 82 key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_)
87 conn.set(key, log) 83 dbs.redis.set(key, log)
88 key = "/cryptoportfolio/{}/latest/date".format(self.market_id) 84 key = "/cryptoportfolio/{}/latest/date".format(self.market_id)
89 conn.set(key, date.isoformat()) 85 dbs.redis.set(key, date.isoformat())
90 except Exception as e: 86 except Exception as e:
91 print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) 87 print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e))
92 88
@@ -259,6 +255,7 @@ class Processor:
259 "name": "print_balances", 255 "name": "print_balances",
260 "number": 1, 256 "number": 1,
261 "fetch_balances": ["begin"], 257 "fetch_balances": ["begin"],
258 "fetch_balances_args": { "add_portfolio": True },
262 "print_tickers": { "base_currency": "BTC" }, 259 "print_tickers": { "base_currency": "BTC" },
263 } 260 }
264 ], 261 ],
@@ -390,15 +387,19 @@ class Processor:
390 def process_step(self, scenario_name, step, kwargs): 387 def process_step(self, scenario_name, step, kwargs):
391 process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) 388 process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
392 self.market.report.log_stage("{}_begin".format(process_name)) 389 self.market.report.log_stage("{}_begin".format(process_name))
390
391 fetch_args = step.get("fetch_balances_args", {})
393 if "begin" in step.get("fetch_balances", []): 392 if "begin" in step.get("fetch_balances", []):
394 self.market.balances.fetch_balances(tag="{}_begin".format(process_name), log_tickers=True) 393 self.market.balances.fetch_balances(tag="{}_begin".format(process_name),
394 log_tickers=True, **fetch_args)
395 395
396 for action in self.ordered_actions: 396 for action in self.ordered_actions:
397 if action in step: 397 if action in step:
398 self.run_action(action, step[action], kwargs) 398 self.run_action(action, step[action], kwargs)
399 399
400 if "end" in step.get("fetch_balances", []): 400 if "end" in step.get("fetch_balances", []):
401 self.market.balances.fetch_balances(tag="{}_end".format(process_name), log_tickers=True) 401 self.market.balances.fetch_balances(tag="{}_end".format(process_name),
402 log_tickers=True, **fetch_args)
402 self.market.report.log_stage("{}_end".format(process_name)) 403 self.market.report.log_stage("{}_end".format(process_name))
403 404
404 def method_arguments(self, action): 405 def method_arguments(self, action):