diff options
Diffstat (limited to 'market.py')
-rw-r--r-- | market.py | 31 |
1 files changed, 16 insertions, 15 deletions
@@ -1,8 +1,7 @@ | |||
1 | from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce | 1 | from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce |
2 | import ccxt_wrapper as ccxt | 2 | import ccxt_wrapper as ccxt |
3 | import time | 3 | import time |
4 | import psycopg2 | 4 | import dbs |
5 | import redis | ||
6 | from store import * | 5 | from store import * |
7 | from cachetools.func import ttl_cache | 6 | from cachetools.func import ttl_cache |
8 | from datetime import datetime | 7 | from datetime import datetime |
@@ -27,7 +26,7 @@ class Market: | |||
27 | self.balances = BalanceStore(self) | 26 | self.balances = BalanceStore(self) |
28 | self.processor = Processor(self) | 27 | self.processor = Processor(self) |
29 | 28 | ||
30 | for key in ["user_id", "market_id", "pg_config", "redis_config"]: | 29 | for key in ["user_id", "market_id"]: |
31 | setattr(self, key, kwargs.get(key, None)) | 30 | setattr(self, key, kwargs.get(key, None)) |
32 | 31 | ||
33 | self.report.log_market(self.args) | 32 | self.report.log_market(self.args) |
@@ -45,9 +44,9 @@ class Market: | |||
45 | date = datetime.datetime.now() | 44 | date = datetime.datetime.now() |
46 | if self.args.report_path is not None: | 45 | if self.args.report_path is not None: |
47 | self.store_file_report(date) | 46 | self.store_file_report(date) |
48 | if self.pg_config is not None and self.args.report_db: | 47 | if dbs.psql_connected() and self.args.report_db: |
49 | self.store_database_report(date) | 48 | self.store_database_report(date) |
50 | if self.redis_config is not None and self.args.report_redis: | 49 | if dbs.redis_connected() and self.args.report_redis: |
51 | self.store_redis_report(date) | 50 | self.store_redis_report(date) |
52 | 51 | ||
53 | def store_file_report(self, date): | 52 | def store_file_report(self, date): |
@@ -64,29 +63,26 @@ class Market: | |||
64 | try: | 63 | try: |
65 | report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' | 64 | report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' |
66 | line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' | 65 | line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' |
67 | connection = psycopg2.connect(**self.pg_config) | 66 | cursor = dbs.psql.cursor() |
68 | cursor = connection.cursor() | ||
69 | cursor.execute(report_query, (date, self.market_id, self.debug)) | 67 | cursor.execute(report_query, (date, self.market_id, self.debug)) |
70 | report_id = cursor.fetchone()[0] | 68 | report_id = cursor.fetchone()[0] |
71 | for date, type_, payload in self.report.to_json_array(): | 69 | for date, type_, payload in self.report.to_json_array(): |
72 | cursor.execute(line_query, (date, report_id, type_, payload)) | 70 | cursor.execute(line_query, (date, report_id, type_, payload)) |
73 | 71 | ||
74 | connection.commit() | 72 | dbs.psql.commit() |
75 | cursor.close() | 73 | cursor.close() |
76 | connection.close() | ||
77 | except Exception as e: | 74 | except Exception as e: |
78 | print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) | 75 | print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) |
79 | 76 | ||
80 | def store_redis_report(self, date): | 77 | def store_redis_report(self, date): |
81 | try: | 78 | try: |
82 | conn = redis.Redis(**self.redis_config) | ||
83 | for type_, log in self.report.to_json_redis(): | 79 | for type_, log in self.report.to_json_redis(): |
84 | key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) | 80 | key = "/cryptoportfolio/{}/{}/{}".format(self.market_id, date.isoformat(), type_) |
85 | conn.set(key, log, ex=31*24*60*60) | 81 | dbs.redis.set(key, log, ex=31*24*60*60) |
86 | key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) | 82 | key = "/cryptoportfolio/{}/latest/{}".format(self.market_id, type_) |
87 | conn.set(key, log) | 83 | dbs.redis.set(key, log) |
88 | key = "/cryptoportfolio/{}/latest/date".format(self.market_id) | 84 | key = "/cryptoportfolio/{}/latest/date".format(self.market_id) |
89 | conn.set(key, date.isoformat()) | 85 | dbs.redis.set(key, date.isoformat()) |
90 | except Exception as e: | 86 | except Exception as e: |
91 | print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) | 87 | print("impossible to store report to redis: {}; {}".format(e.__class__.__name__, e)) |
92 | 88 | ||
@@ -259,6 +255,7 @@ class Processor: | |||
259 | "name": "print_balances", | 255 | "name": "print_balances", |
260 | "number": 1, | 256 | "number": 1, |
261 | "fetch_balances": ["begin"], | 257 | "fetch_balances": ["begin"], |
258 | "fetch_balances_args": { "add_portfolio": True }, | ||
262 | "print_tickers": { "base_currency": "BTC" }, | 259 | "print_tickers": { "base_currency": "BTC" }, |
263 | } | 260 | } |
264 | ], | 261 | ], |
@@ -390,15 +387,19 @@ class Processor: | |||
390 | def process_step(self, scenario_name, step, kwargs): | 387 | def process_step(self, scenario_name, step, kwargs): |
391 | process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) | 388 | process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"]) |
392 | self.market.report.log_stage("{}_begin".format(process_name)) | 389 | self.market.report.log_stage("{}_begin".format(process_name)) |
390 | |||
391 | fetch_args = step.get("fetch_balances_args", {}) | ||
393 | if "begin" in step.get("fetch_balances", []): | 392 | if "begin" in step.get("fetch_balances", []): |
394 | self.market.balances.fetch_balances(tag="{}_begin".format(process_name), log_tickers=True) | 393 | self.market.balances.fetch_balances(tag="{}_begin".format(process_name), |
394 | log_tickers=True, **fetch_args) | ||
395 | 395 | ||
396 | for action in self.ordered_actions: | 396 | for action in self.ordered_actions: |
397 | if action in step: | 397 | if action in step: |
398 | self.run_action(action, step[action], kwargs) | 398 | self.run_action(action, step[action], kwargs) |
399 | 399 | ||
400 | if "end" in step.get("fetch_balances", []): | 400 | if "end" in step.get("fetch_balances", []): |
401 | self.market.balances.fetch_balances(tag="{}_end".format(process_name), log_tickers=True) | 401 | self.market.balances.fetch_balances(tag="{}_end".format(process_name), |
402 | log_tickers=True, **fetch_args) | ||
402 | self.market.report.log_stage("{}_end".format(process_name)) | 403 | self.market.report.log_stage("{}_end".format(process_name)) |
403 | 404 | ||
404 | def method_arguments(self, action): | 405 | def method_arguments(self, action): |