X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=store.py;h=072d3a2f02ccbc7dd17c1e92b51a786f57d7c6a5;hb=17fd3f752d5e37df906abddf1f13fd7ad1de6c00;hp=2b5c18a1a1a65eb241af2677d322195a5202d18d;hpb=90d7423eec074a0ed0af680c223180f8d7e1d4e6;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/store.py b/store.py index 2b5c18a..072d3a2 100644 --- a/store.py +++ b/store.py @@ -3,7 +3,7 @@ import requests import portfolio import simplejson as json from decimal import Decimal as D, ROUND_DOWN -from datetime import date, datetime, timedelta +import datetime import inspect from json import JSONDecodeError from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError @@ -11,12 +11,16 @@ from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError __all__ = ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"] class ReportStore: - def __init__(self, market, verbose_print=True): + def __init__(self, market, verbose_print=True, no_http_dup=False): self.market = market self.verbose_print = verbose_print self.print_logs = [] self.logs = [] + self.redis_status = [] + + self.no_http_dup = no_http_dup + self.last_http = None def merge(self, other_report): self.logs += other_report.logs @@ -26,19 +30,30 @@ class ReportStore: self.print_logs.sort(key=lambda x: x[0]) def print_log(self, message): - now = datetime.now() + now = datetime.datetime.now() message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message)) self.print_logs.append([now, message]) if self.verbose_print: print(message) def add_log(self, hash_): - hash_["date"] = datetime.now() + hash_["date"] = datetime.datetime.now() + if self.market is not None: + hash_["user_id"] = self.market.user_id + hash_["market_id"] = self.market.market_id + else: + hash_["user_id"] = None + hash_["market_id"] = None self.logs.append(hash_) + return hash_ + + def add_redis_status(self, hash_): + self.redis_status.append(hash_) + return hash_ @staticmethod def default_json_serial(obj): - if isinstance(obj, (datetime, date)): + if isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() return str(obj) @@ -53,6 +68,13 @@ class ReportStore: json.dumps(log, default=self.default_json_serial, indent=" ") ) + def to_json_redis(self): + for log in (x.copy() for x in self.redis_status): + yield ( + log.pop("type"), + json.dumps(log, default=self.default_json_serial) + ) + def set_verbose(self, verbose_print): self.verbose_print = verbose_print @@ -81,11 +103,14 @@ class ReportStore: for currency, balance in self.market.balances.all.items(): self.print_log("\t{}".format(balance)) - self.add_log({ - "type": "balance", - "tag": tag, - "balances": self.market.balances.as_json() - }) + log = { + "type": "balance", + "tag": tag, + "balances": self.market.balances.as_json() + } + + self.add_log(log.copy()) + self.add_redis_status(log) def log_tickers(self, amounts, other_currency, compute_value, type): @@ -97,15 +122,18 @@ class ReportStore: for currency, amount in amounts.items(): values[currency] = amount.as_json()["value"] rates[currency] = amount.rate - self.add_log({ - "type": "tickers", - "compute_value": compute_value, - "balance_type": type, - "currency": other_currency, - "balances": values, - "rates": rates, - "total": sum(amounts.values()).as_json()["value"] - }) + log = { + "type": "tickers", + "compute_value": compute_value, + "balance_type": type, + "currency": other_currency, + "balances": values, + "rates": rates, + "total": sum(amounts.values()).as_json()["value"] + } + + self.add_log(log.copy()) + self.add_redis_status(log) def log_dispatch(self, amount, amounts, liquidity, repartition): self.add_log({ @@ -176,15 +204,47 @@ class ReportStore: }) def log_http_request(self, method, url, body, headers, response): - self.add_log({ - "type": "http_request", - "method": method, - "url": url, - "body": body, - "headers": headers, - "status": response.status_code, - "response": response.text - }) + if isinstance(response, Exception): + self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": -1, + "response": None, + "error": response.__class__.__name__, + "error_message": str(response), + }) + self.last_http = None + elif self.no_http_dup and \ + self.last_http is not None and \ + self.last_http["url"] == url and \ + self.last_http["method"] == method and \ + self.last_http["response"] == response.text: + self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": response.status_code, + "duration": response.elapsed.total_seconds(), + "response": None, + "response_same_as": self.last_http["date"] + }) + else: + self.last_http = self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": response.status_code, + "duration": response.elapsed.total_seconds(), + "response": response.text, + "response_same_as": None, + }) def log_error(self, action, message=None, exception=None): self.print_log("[Error] {}".format(action)) @@ -209,15 +269,11 @@ class ReportStore: "action": action, }) - def log_market(self, args, user_id, market_id, report_path, debug): + def log_market(self, args): self.add_log({ "type": "market", "commit": "$Format:%H$", "args": vars(args), - "user_id": user_id, - "market_id": market_id, - "report_path": report_path, - "debug": debug, }) class BalanceStore: @@ -371,8 +427,9 @@ class Portfolio: data = LockedVar(None) liquidities = LockedVar({}) last_date = LockedVar(None) - report = LockedVar(ReportStore(None)) + report = LockedVar(ReportStore(None, no_http_dup=True)) worker = None + worker_tag = "" worker_started = False worker_notify = None callback = None @@ -390,6 +447,7 @@ class Portfolio: cls.liquidities.start_lock() cls.report.start_lock() + cls.worker_tag = "[Worker] " cls.worker_started = True cls.worker.start() @@ -407,11 +465,17 @@ class Portfolio: raise RuntimeError("This method needs to be ran with the worker") while cls.worker_started: cls.worker_notify.wait() - cls.worker_notify.clear() - cls.report.print_log("Fetching cryptoportfolio") - cls.get_cryptoportfolio(refetch=True) - cls.callback.set() - time.sleep(poll) + if cls.worker_started: + cls.worker_notify.clear() + cls.report.print_log("[Worker] Fetching cryptoportfolio") + cls.get_cryptoportfolio(refetch=True) + cls.callback.set() + time.sleep(poll) + + @classmethod + def stop_worker(cls): + cls.worker_started = False + cls.worker_notify.set() @classmethod def notify_and_wait(cls): @@ -422,7 +486,7 @@ class Portfolio: @classmethod def wait_for_recent(cls, delta=4, poll=30): cls.get_cryptoportfolio() - while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta): + while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta): if cls.worker is None: time.sleep(poll) cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") @@ -446,7 +510,7 @@ class Portfolio: cls.report.log_http_request(r.request.method, r.request.url, r.request.body, r.request.headers, r) except Exception as e: - cls.report.log_error("get_cryptoportfolio", exception=e) + cls.report.log_error("{}get_cryptoportfolio".format(cls.worker_tag), exception=e) return try: cls.data.set(r.json(parse_int=D, parse_float=D)) @@ -479,7 +543,7 @@ class Portfolio: weights_hash = portfolio_hash["weights"] weights = {} for i in range(len(weights_hash["_row"])): - date = datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d") + date = datetime.datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d") weights[date] = dict(filter( filter_weights, map(clean_weights(i), weights_hash.items()))) @@ -493,8 +557,7 @@ class Portfolio: "high": high_liquidity, }) cls.last_date.set(max( - max(medium_liquidity.keys(), default=datetime(1, 1, 1)), - max(high_liquidity.keys(), default=datetime(1, 1, 1)) + max(medium_liquidity.keys(), default=datetime.datetime(1, 1, 1)), + max(high_liquidity.keys(), default=datetime.datetime(1, 1, 1)) )) -