X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=store.py;h=072d3a2f02ccbc7dd17c1e92b51a786f57d7c6a5;hb=17fd3f752d5e37df906abddf1f13fd7ad1de6c00;hp=f655be538b66f3d81d7fb121a0595ee4ff63ddc7;hpb=dc1ca9a306f09886c6c57f8d426c59a9d084b2b3;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/store.py b/store.py index f655be5..072d3a2 100644 --- a/store.py +++ b/store.py @@ -3,7 +3,7 @@ import requests import portfolio import simplejson as json from decimal import Decimal as D, ROUND_DOWN -from datetime import date, datetime, timedelta +import datetime import inspect from json import JSONDecodeError from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError @@ -11,31 +11,69 @@ from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError __all__ = ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"] class ReportStore: - def __init__(self, market, verbose_print=True): + def __init__(self, market, verbose_print=True, no_http_dup=False): self.market = market self.verbose_print = verbose_print + self.print_logs = [] self.logs = [] + self.redis_status = [] + + self.no_http_dup = no_http_dup + self.last_http = None def merge(self, other_report): self.logs += other_report.logs self.logs.sort(key=lambda x: x["date"]) + self.print_logs += other_report.print_logs + self.print_logs.sort(key=lambda x: x[0]) + def print_log(self, message): - message = str(message) + now = datetime.datetime.now() + message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message)) + self.print_logs.append([now, message]) if self.verbose_print: print(message) def add_log(self, hash_): - hash_["date"] = datetime.now() + hash_["date"] = datetime.datetime.now() + if self.market is not None: + hash_["user_id"] = self.market.user_id + hash_["market_id"] = self.market.market_id + else: + hash_["user_id"] = None + hash_["market_id"] = None self.logs.append(hash_) + return hash_ + + def add_redis_status(self, hash_): + self.redis_status.append(hash_) + return hash_ + + @staticmethod + def default_json_serial(obj): + if isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + return str(obj) def to_json(self): - def default_json_serial(obj): - if isinstance(obj, (datetime, date)): - return obj.isoformat() - return str(obj) - return json.dumps(self.logs, default=default_json_serial, indent=" ") + return json.dumps(self.logs, default=self.default_json_serial, indent=" ") + + def to_json_array(self): + for log in (x.copy() for x in self.logs): + yield ( + log.pop("date"), + log.pop("type"), + json.dumps(log, default=self.default_json_serial, indent=" ") + ) + + def to_json_redis(self): + for log in (x.copy() for x in self.redis_status): + yield ( + log.pop("type"), + json.dumps(log, default=self.default_json_serial) + ) def set_verbose(self, verbose_print): self.verbose_print = verbose_print @@ -65,11 +103,14 @@ class ReportStore: for currency, balance in self.market.balances.all.items(): self.print_log("\t{}".format(balance)) - self.add_log({ - "type": "balance", - "tag": tag, - "balances": self.market.balances.as_json() - }) + log = { + "type": "balance", + "tag": tag, + "balances": self.market.balances.as_json() + } + + self.add_log(log.copy()) + self.add_redis_status(log) def log_tickers(self, amounts, other_currency, compute_value, type): @@ -81,15 +122,18 @@ class ReportStore: for currency, amount in amounts.items(): values[currency] = amount.as_json()["value"] rates[currency] = amount.rate - self.add_log({ - "type": "tickers", - "compute_value": compute_value, - "balance_type": type, - "currency": other_currency, - "balances": values, - "rates": rates, - "total": sum(amounts.values()).as_json()["value"] - }) + log = { + "type": "tickers", + "compute_value": compute_value, + "balance_type": type, + "currency": other_currency, + "balances": values, + "rates": rates, + "total": sum(amounts.values()).as_json()["value"] + } + + self.add_log(log.copy()) + self.add_redis_status(log) def log_dispatch(self, amount, amounts, liquidity, repartition): self.add_log({ @@ -160,15 +204,47 @@ class ReportStore: }) def log_http_request(self, method, url, body, headers, response): - self.add_log({ - "type": "http_request", - "method": method, - "url": url, - "body": body, - "headers": headers, - "status": response.status_code, - "response": response.text - }) + if isinstance(response, Exception): + self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": -1, + "response": None, + "error": response.__class__.__name__, + "error_message": str(response), + }) + self.last_http = None + elif self.no_http_dup and \ + self.last_http is not None and \ + self.last_http["url"] == url and \ + self.last_http["method"] == method and \ + self.last_http["response"] == response.text: + self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": response.status_code, + "duration": response.elapsed.total_seconds(), + "response": None, + "response_same_as": self.last_http["date"] + }) + else: + self.last_http = self.add_log({ + "type": "http_request", + "method": method, + "url": url, + "body": body, + "headers": headers, + "status": response.status_code, + "duration": response.elapsed.total_seconds(), + "response": response.text, + "response_same_as": None, + }) def log_error(self, action, message=None, exception=None): self.print_log("[Error] {}".format(action)) @@ -193,6 +269,13 @@ class ReportStore: "action": action, }) + def log_market(self, args): + self.add_log({ + "type": "market", + "commit": "$Format:%H$", + "args": vars(args), + }) + class BalanceStore: def __init__(self, market): self.market = market @@ -344,8 +427,9 @@ class Portfolio: data = LockedVar(None) liquidities = LockedVar({}) last_date = LockedVar(None) - report = LockedVar(ReportStore(None)) + report = LockedVar(ReportStore(None, no_http_dup=True)) worker = None + worker_tag = "" worker_started = False worker_notify = None callback = None @@ -363,6 +447,7 @@ class Portfolio: cls.liquidities.start_lock() cls.report.start_lock() + cls.worker_tag = "[Worker] " cls.worker_started = True cls.worker.start() @@ -380,11 +465,17 @@ class Portfolio: raise RuntimeError("This method needs to be ran with the worker") while cls.worker_started: cls.worker_notify.wait() - cls.worker_notify.clear() - cls.report.print_log("Fetching cryptoportfolio") - cls.get_cryptoportfolio(refetch=True) - cls.callback.set() - time.sleep(poll) + if cls.worker_started: + cls.worker_notify.clear() + cls.report.print_log("[Worker] Fetching cryptoportfolio") + cls.get_cryptoportfolio(refetch=True) + cls.callback.set() + time.sleep(poll) + + @classmethod + def stop_worker(cls): + cls.worker_started = False + cls.worker_notify.set() @classmethod def notify_and_wait(cls): @@ -395,7 +486,7 @@ class Portfolio: @classmethod def wait_for_recent(cls, delta=4, poll=30): cls.get_cryptoportfolio() - while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta): + while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta): if cls.worker is None: time.sleep(poll) cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") @@ -419,7 +510,7 @@ class Portfolio: cls.report.log_http_request(r.request.method, r.request.url, r.request.body, r.request.headers, r) except Exception as e: - cls.report.log_error("get_cryptoportfolio", exception=e) + cls.report.log_error("{}get_cryptoportfolio".format(cls.worker_tag), exception=e) return try: cls.data.set(r.json(parse_int=D, parse_float=D)) @@ -452,7 +543,7 @@ class Portfolio: weights_hash = portfolio_hash["weights"] weights = {} for i in range(len(weights_hash["_row"])): - date = datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d") + date = datetime.datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d") weights[date] = dict(filter( filter_weights, map(clean_weights(i), weights_hash.items()))) @@ -466,8 +557,7 @@ class Portfolio: "high": high_liquidity, }) cls.last_date.set(max( - max(medium_liquidity.keys(), default=datetime(1, 1, 1)), - max(high_liquidity.keys(), default=datetime(1, 1, 1)) + max(medium_liquidity.keys(), default=datetime.datetime(1, 1, 1)), + max(high_liquidity.keys(), default=datetime.datetime(1, 1, 1)) )) -