X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=store.py;h=45a52339dc688bb9311451c476ec149b23f09a5f;hb=a0dcf4e0978331709da164fb0e29ae008b90fc88;hp=f655be538b66f3d81d7fb121a0595ee4ff63ddc7;hpb=34eb08f759a440af0376727664d9422041dfbd18;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/store.py b/store.py index f655be5..45a5233 100644 --- a/store.py +++ b/store.py @@ -3,7 +3,7 @@ import requests import portfolio import simplejson as json from decimal import Decimal as D, ROUND_DOWN -from datetime import date, datetime, timedelta +import datetime import inspect from json import JSONDecodeError from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError @@ -11,31 +11,57 @@ from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError __all__ = ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"] class ReportStore: - def __init__(self, market, verbose_print=True): + def __init__(self, market, verbose_print=True, no_http_dup=False): self.market = market self.verbose_print = verbose_print + self.print_logs = [] self.logs = [] + self.no_http_dup = no_http_dup + self.last_http = None + def merge(self, other_report): self.logs += other_report.logs self.logs.sort(key=lambda x: x["date"]) + self.print_logs += other_report.print_logs + self.print_logs.sort(key=lambda x: x[0]) + def print_log(self, message): - message = str(message) + now = datetime.datetime.now() + message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message)) + self.print_logs.append([now, message]) if self.verbose_print: print(message) def add_log(self, hash_): - hash_["date"] = datetime.now() + hash_["date"] = datetime.datetime.now() + if self.market is not None: + hash_["user_id"] = self.market.user_id + hash_["market_id"] = self.market.market_id + else: + hash_["user_id"] = None + hash_["market_id"] = None self.logs.append(hash_) + return hash_ + + @staticmethod + def default_json_serial(obj): + if isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + return str(obj) def to_json(self): - def default_json_serial(obj): - if isinstance(obj, (datetime, date)): - return obj.isoformat() - return str(obj) - return json.dumps(self.logs, default=default_json_serial, indent=" ") + return json.dumps(self.logs, default=self.default_json_serial, indent=" ") + + def to_json_array(self): + for log in (x.copy() for x in self.logs): + yield ( + log.pop("date"), + log.pop("type"), + json.dumps(log, default=self.default_json_serial, indent=" ") + ) def set_verbose(self, verbose_print): self.verbose_print = verbose_print @@ -160,15 +186,45 @@ class ReportStore: }) def log_http_request(self, method, url, body, headers, response): - self.add_log({ - "type": "http_request", - "method": method, - "url": url, - "body": body, - "headers": headers, - "status": response.status_code, - "response": response.text - }) + if isinstance(response, Exception): + self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": -1, + "response": None, + "error": response.__class__.__name__, + "error_message": str(response), + }) + self.last_http = None + elif self.no_http_dup and \ + self.last_http is not None and \ + self.last_http["url"] == url and \ + self.last_http["method"] == method and \ + self.last_http["response"] == response.text: + self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": response.status_code, + "response": None, + "response_same_as": self.last_http["date"] + }) + else: + self.last_http = self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": response.status_code, + "response": response.text, + "response_same_as": None, + }) def log_error(self, action, message=None, exception=None): self.print_log("[Error] {}".format(action)) @@ -193,6 +249,13 @@ class ReportStore: "action": action, }) + def log_market(self, args): + self.add_log({ + "type": "market", + "commit": "$Format:%H$", + "args": vars(args), + }) + class BalanceStore: def __init__(self, market): self.market = market @@ -344,8 +407,9 @@ class Portfolio: data = LockedVar(None) liquidities = LockedVar({}) last_date = LockedVar(None) - report = LockedVar(ReportStore(None)) + report = LockedVar(ReportStore(None, no_http_dup=True)) worker = None + worker_tag = "" worker_started = False worker_notify = None callback = None @@ -363,6 +427,7 @@ class Portfolio: cls.liquidities.start_lock() cls.report.start_lock() + cls.worker_tag = "[Worker] " cls.worker_started = True cls.worker.start() @@ -380,11 +445,17 @@ class Portfolio: raise RuntimeError("This method needs to be ran with the worker") while cls.worker_started: cls.worker_notify.wait() - cls.worker_notify.clear() - cls.report.print_log("Fetching cryptoportfolio") - cls.get_cryptoportfolio(refetch=True) - cls.callback.set() - time.sleep(poll) + if cls.worker_started: + cls.worker_notify.clear() + cls.report.print_log("[Worker] Fetching cryptoportfolio") + cls.get_cryptoportfolio(refetch=True) + cls.callback.set() + time.sleep(poll) + + @classmethod + def stop_worker(cls): + cls.worker_started = False + cls.worker_notify.set() @classmethod def notify_and_wait(cls): @@ -395,7 +466,7 @@ class Portfolio: @classmethod def wait_for_recent(cls, delta=4, poll=30): cls.get_cryptoportfolio() - while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta): + while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta): if cls.worker is None: time.sleep(poll) cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") @@ -419,7 +490,7 @@ class Portfolio: cls.report.log_http_request(r.request.method, r.request.url, r.request.body, r.request.headers, r) except Exception as e: - cls.report.log_error("get_cryptoportfolio", exception=e) + cls.report.log_error("{}get_cryptoportfolio".format(cls.worker_tag), exception=e) return try: cls.data.set(r.json(parse_int=D, parse_float=D)) @@ -452,7 +523,7 @@ class Portfolio: weights_hash = portfolio_hash["weights"] weights = {} for i in range(len(weights_hash["_row"])): - date = datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d") + date = datetime.datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d") weights[date] = dict(filter( filter_weights, map(clean_weights(i), weights_hash.items()))) @@ -466,8 +537,7 @@ class Portfolio: "high": high_liquidity, }) cls.last_date.set(max( - max(medium_liquidity.keys(), default=datetime(1, 1, 1)), - max(high_liquidity.keys(), default=datetime(1, 1, 1)) + max(medium_liquidity.keys(), default=datetime.datetime(1, 1, 1)), + max(high_liquidity.keys(), default=datetime.datetime(1, 1, 1)) )) -