]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blame - store.py
Merge branch 'dev'
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / store.py
CommitLineData
ada1b5f1
IB
1import time
2import requests
6ca5a1ec 3import portfolio
3d0247f9
IB
4import simplejson as json
5from decimal import Decimal as D, ROUND_DOWN
e7d7c0e5 6import datetime
aca4d437 7import inspect
ada1b5f1
IB
8from json import JSONDecodeError
9from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError
40d0fa27 10import dbs
6ca5a1ec 11
ada1b5f1 12__all__ = ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"]
3d0247f9
IB
13
14class ReportStore:
e7d7c0e5 15 def __init__(self, market, verbose_print=True, no_http_dup=False):
f86ee140
IB
16 self.market = market
17 self.verbose_print = verbose_print
3d0247f9 18
718e3e91 19 self.print_logs = []
f86ee140 20 self.logs = []
1593c7a9 21 self.redis_status = []
f86ee140 22
e7d7c0e5
IB
23 self.no_http_dup = no_http_dup
24 self.last_http = None
25
9bde69bf
IB
26 def merge(self, other_report):
27 self.logs += other_report.logs
28 self.logs.sort(key=lambda x: x["date"])
29
718e3e91
IB
30 self.print_logs += other_report.print_logs
31 self.print_logs.sort(key=lambda x: x[0])
32
f86ee140 33 def print_log(self, message):
e7d7c0e5 34 now = datetime.datetime.now()
718e3e91
IB
35 message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message))
36 self.print_logs.append([now, message])
f86ee140 37 if self.verbose_print:
3d0247f9
IB
38 print(message)
39
f86ee140 40 def add_log(self, hash_):
e7d7c0e5
IB
41 hash_["date"] = datetime.datetime.now()
42 if self.market is not None:
43 hash_["user_id"] = self.market.user_id
44 hash_["market_id"] = self.market.market_id
45 else:
46 hash_["user_id"] = None
47 hash_["market_id"] = None
f86ee140 48 self.logs.append(hash_)
e7d7c0e5 49 return hash_
3d0247f9 50
1593c7a9
IB
51 def add_redis_status(self, hash_):
52 self.redis_status.append(hash_)
53 return hash_
54
b4e0ba0b
IB
55 @staticmethod
56 def default_json_serial(obj):
e7d7c0e5 57 if isinstance(obj, (datetime.datetime, datetime.date)):
b4e0ba0b
IB
58 return obj.isoformat()
59 return str(obj)
60
f86ee140 61 def to_json(self):
b4e0ba0b
IB
62 return json.dumps(self.logs, default=self.default_json_serial, indent=" ")
63
64 def to_json_array(self):
65 for log in (x.copy() for x in self.logs):
66 yield (
67 log.pop("date"),
68 log.pop("type"),
69 json.dumps(log, default=self.default_json_serial, indent=" ")
70 )
3d0247f9 71
1593c7a9
IB
72 def to_json_redis(self):
73 for log in (x.copy() for x in self.redis_status):
74 yield (
75 log.pop("type"),
76 json.dumps(log, default=self.default_json_serial)
77 )
78
f86ee140
IB
79 def set_verbose(self, verbose_print):
80 self.verbose_print = verbose_print
3d0247f9 81
7bd830a8
IB
82 def log_stage(self, stage, **kwargs):
83 def as_json(element):
84 if callable(element):
85 return inspect.getsource(element).strip()
86 elif hasattr(element, "as_json"):
87 return element.as_json()
88 else:
89 return element
90
91 args = { k: as_json(v) for k, v in kwargs.items() }
92 args_str = ["{}={}".format(k, v) for k, v in args.items()]
f86ee140 93 self.print_log("-" * (len(stage) + 8))
7bd830a8 94 self.print_log("[Stage] {} {}".format(stage, ", ".join(args_str)))
3d0247f9 95
f86ee140 96 self.add_log({
3d0247f9
IB
97 "type": "stage",
98 "stage": stage,
7bd830a8 99 "args": args,
3d0247f9
IB
100 })
101
bb127bc8 102 def log_balances(self, tag=None, checkpoint=None, tickers=None,
2b1ee8f4 103 ticker_currency=None, compute_value=None, type=None):
f86ee140
IB
104 self.print_log("[Balance]")
105 for currency, balance in self.market.balances.all.items():
106 self.print_log("\t{}".format(balance))
3d0247f9 107
1593c7a9
IB
108 log = {
109 "type": "balance",
110 "tag": tag,
bb127bc8 111 "checkpoint": checkpoint,
1593c7a9
IB
112 "balances": self.market.balances.as_json()
113 }
114
2b1ee8f4
IB
115 if tickers is not None:
116 log["tickers"] = self._ticker_hash(tickers, ticker_currency,
117 compute_value, type)
118
1593c7a9
IB
119 self.add_log(log.copy())
120 self.add_redis_status(log)
3d0247f9 121
f86ee140 122 def log_tickers(self, amounts, other_currency,
3d0247f9 123 compute_value, type):
2b1ee8f4
IB
124 log = self._ticker_hash(amounts, other_currency, compute_value,
125 type)
126 log["type"] = "tickers"
127
128 self.add_log(log)
129
130 def _ticker_hash(self, amounts, other_currency, compute_value, type):
3d0247f9
IB
131 values = {}
132 rates = {}
aca4d437
IB
133 if callable(compute_value):
134 compute_value = inspect.getsource(compute_value).strip()
135
3d0247f9
IB
136 for currency, amount in amounts.items():
137 values[currency] = amount.as_json()["value"]
138 rates[currency] = amount.rate
2b1ee8f4 139 return {
1593c7a9
IB
140 "compute_value": compute_value,
141 "balance_type": type,
142 "currency": other_currency,
143 "balances": values,
144 "rates": rates,
145 "total": sum(amounts.values()).as_json()["value"]
146 }
147
f86ee140
IB
148 def log_dispatch(self, amount, amounts, liquidity, repartition):
149 self.add_log({
3d0247f9
IB
150 "type": "dispatch",
151 "liquidity": liquidity,
152 "repartition_ratio": repartition,
153 "total_amount": amount.as_json(),
154 "repartition": { k: v.as_json()["value"] for k, v in amounts.items() }
155 })
156
f86ee140 157 def log_trades(self, matching_and_trades, only):
3d0247f9
IB
158 trades = []
159 for matching, trade in matching_and_trades:
160 trade_json = trade.as_json()
161 trade_json["skipped"] = not matching
162 trades.append(trade_json)
163
f86ee140 164 self.add_log({
3d0247f9
IB
165 "type": "trades",
166 "only": only,
f86ee140 167 "debug": self.market.debug,
3d0247f9
IB
168 "trades": trades
169 })
170
f86ee140 171 def log_orders(self, orders, tick=None, only=None, compute_value=None):
aca4d437
IB
172 if callable(compute_value):
173 compute_value = inspect.getsource(compute_value).strip()
f86ee140
IB
174 self.print_log("[Orders]")
175 self.market.trades.print_all_with_order(ind="\t")
176 self.add_log({
3d0247f9
IB
177 "type": "orders",
178 "only": only,
179 "compute_value": compute_value,
180 "tick": tick,
181 "orders": [order.as_json() for order in orders if order is not None]
182 })
183
f86ee140 184 def log_order(self, order, tick, finished=False, update=None,
3d0247f9 185 new_order=None, compute_value=None):
aca4d437
IB
186 if callable(compute_value):
187 compute_value = inspect.getsource(compute_value).strip()
3d0247f9 188 if finished:
f86ee140 189 self.print_log("[Order] Finished {}".format(order))
3d0247f9 190 elif update == "waiting":
f86ee140 191 self.print_log("[Order] {}, tick {}, waiting".format(order, tick))
3d0247f9 192 elif update == "adjusting":
f86ee140 193 self.print_log("[Order] {}, tick {}, cancelling and adjusting to {}".format(order, tick, new_order))
3d0247f9 194 elif update == "market_fallback":
f86ee140 195 self.print_log("[Order] {}, tick {}, fallbacking to market value".format(order, tick))
3d0247f9 196 elif update == "market_adjust":
f86ee140 197 self.print_log("[Order] {}, tick {}, market value, cancelling and adjusting to {}".format(order, tick, new_order))
3d0247f9 198
f86ee140 199 self.add_log({
3d0247f9
IB
200 "type": "order",
201 "tick": tick,
202 "update": update,
203 "order": order.as_json(),
204 "compute_value": compute_value,
205 "new_order": new_order.as_json() if new_order is not None else None
206 })
207
f86ee140
IB
208 def log_move_balances(self, needed, moving):
209 self.add_log({
3d0247f9 210 "type": "move_balances",
f86ee140 211 "debug": self.market.debug,
3d0247f9
IB
212 "needed": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in needed.items() },
213 "moving": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in moving.items() },
214 })
215
f86ee140 216 def log_http_request(self, method, url, body, headers, response):
d8e233ac
IB
217 if isinstance(response, Exception):
218 self.add_log({
219 "type": "http_request",
220 "method": method,
221 "url": url,
222 "body": body,
223 "headers": headers,
224 "status": -1,
225 "response": None,
226 "error": response.__class__.__name__,
227 "error_message": str(response),
228 })
e7d7c0e5
IB
229 self.last_http = None
230 elif self.no_http_dup and \
231 self.last_http is not None and \
232 self.last_http["url"] == url and \
233 self.last_http["method"] == method and \
234 self.last_http["response"] == response.text:
d8e233ac
IB
235 self.add_log({
236 "type": "http_request",
237 "method": method,
238 "url": url,
239 "body": body,
240 "headers": headers,
241 "status": response.status_code,
9fe90554 242 "duration": response.elapsed.total_seconds(),
e7d7c0e5
IB
243 "response": None,
244 "response_same_as": self.last_http["date"]
245 })
246 else:
247 self.last_http = self.add_log({
248 "type": "http_request",
249 "method": method,
250 "url": url,
251 "body": body,
252 "headers": headers,
253 "status": response.status_code,
9fe90554 254 "duration": response.elapsed.total_seconds(),
e7d7c0e5
IB
255 "response": response.text,
256 "response_same_as": None,
d8e233ac 257 })
3d0247f9 258
f86ee140
IB
259 def log_error(self, action, message=None, exception=None):
260 self.print_log("[Error] {}".format(action))
3d0247f9 261 if exception is not None:
f86ee140 262 self.print_log(str("\t{}: {}".format(exception.__class__.__name__, exception)))
3d0247f9 263 if message is not None:
f86ee140 264 self.print_log("\t{}".format(message))
3d0247f9 265
f86ee140 266 self.add_log({
3d0247f9
IB
267 "type": "error",
268 "action": action,
269 "exception_class": exception.__class__.__name__ if exception is not None else None,
270 "exception_message": str(exception) if exception is not None else None,
271 "message": message,
272 })
273
f86ee140
IB
274 def log_debug_action(self, action):
275 self.print_log("[Debug] {}".format(action))
3d0247f9 276
f86ee140 277 self.add_log({
3d0247f9
IB
278 "type": "debug_action",
279 "action": action,
280 })
6ca5a1ec 281
e7d7c0e5 282 def log_market(self, args):
90d7423e
IB
283 self.add_log({
284 "type": "market",
285 "commit": "$Format:%H$",
286 "args": vars(args),
90d7423e
IB
287 })
288
6ca5a1ec 289class BalanceStore:
f86ee140
IB
290 def __init__(self, market):
291 self.market = market
292 self.all = {}
6ca5a1ec 293
f86ee140
IB
294 def currencies(self):
295 return self.all.keys()
6ca5a1ec 296
f86ee140 297 def in_currency(self, other_currency, compute_value="average", type="total"):
6ca5a1ec 298 amounts = {}
f86ee140 299 for currency, balance in self.all.items():
6ca5a1ec 300 other_currency_amount = getattr(balance, type)\
f86ee140 301 .in_currency(other_currency, self.market, compute_value=compute_value)
6ca5a1ec 302 amounts[currency] = other_currency_amount
f86ee140 303 self.market.report.log_tickers(amounts, other_currency,
3d0247f9 304 compute_value, type)
6ca5a1ec
IB
305 return amounts
306
8e648fd5 307 def fetch_balances(self, tag=None, add_portfolio=False, liquidity="medium",
3a15ffc7 308 checkpoint=None, log_tickers=False, add_usdt=False,
2b1ee8f4 309 ticker_currency="BTC", ticker_compute_value="average", ticker_type="total"):
f86ee140 310 all_balances = self.market.ccxt.fetch_all_balances()
6ca5a1ec
IB
311 for currency, balance in all_balances.items():
312 if balance["exchange_total"] != 0 or balance["margin_total"] != 0 or \
f86ee140
IB
313 currency in self.all:
314 self.all[currency] = portfolio.Balance(currency, balance)
9b697863 315 if add_portfolio:
8e648fd5 316 for currency in Portfolio.repartition(from_cache=True, liquidity=liquidity):
9b697863 317 self.all.setdefault(currency, portfolio.Balance(currency, {}))
3a15ffc7
IB
318 if add_usdt:
319 self.all.setdefault("USDT", portfolio.Balance("USDT", {}))
2b1ee8f4
IB
320 if log_tickers:
321 tickers = self.in_currency(ticker_currency, compute_value=ticker_compute_value, type=ticker_type)
bb127bc8 322 self.market.report.log_balances(tag=tag, checkpoint=checkpoint,
2b1ee8f4
IB
323 tickers=tickers, ticker_currency=ticker_currency,
324 compute_value=ticker_compute_value, type=ticker_type)
325 else:
bb127bc8 326 self.market.report.log_balances(tag=tag, checkpoint=checkpoint)
6ca5a1ec 327
3d6f74ee
IB
328 def available_balances_for_repartition(self,
329 compute_value="average", base_currency="BTC",
330 liquidity="medium", repartition=None):
331 if repartition is None:
332 repartition = Portfolio.repartition(liquidity=liquidity)
333 base_currency_balance = self.all.get(base_currency)
334
335 if base_currency_balance is None:
336 total_base_value = portfolio.Amount(base_currency, 0)
337 else:
338 total_base_value = base_currency_balance.exchange_free + \
339 base_currency_balance.margin_available - \
340 base_currency_balance.margin_in_position
341
342 amount_in_position = {}
343
344 # Compute balances already in the target position
345 for currency, (ptt, trade_type) in repartition.items():
346 amount_in_position[currency] = portfolio.Amount(base_currency, 0)
347 balance = self.all.get(currency)
348 if currency != base_currency and balance is not None:
349 if trade_type == "short":
350 amount = balance.margin_borrowed
351 else:
352 amount = balance.exchange_free + balance.exchange_used
353 amount_in_position[currency] = amount.in_currency(base_currency,
354 self.market, compute_value=compute_value)
355 total_base_value += amount_in_position[currency]
356
357 # recursively delete more-than-filled positions from the wanted
358 # repartition
359 did_delete = True
360 while did_delete:
361 did_delete = False
362 sum_ratio = sum([v[0] for k, v in repartition.items()])
363 current_base_value = total_base_value
364 for currency, (ptt, trade_type) in repartition.copy().items():
365 if amount_in_position[currency] > current_base_value * ptt / sum_ratio:
366 did_delete = True
367 del(repartition[currency])
368 total_base_value -= amount_in_position[currency]
369 return repartition, total_base_value, amount_in_position
370
f86ee140 371 def dispatch_assets(self, amount, liquidity="medium", repartition=None):
6ca5a1ec 372 if repartition is None:
ada1b5f1 373 repartition = Portfolio.repartition(liquidity=liquidity)
6ca5a1ec
IB
374 sum_ratio = sum([v[0] for k, v in repartition.items()])
375 amounts = {}
376 for currency, (ptt, trade_type) in repartition.items():
377 amounts[currency] = ptt * amount / sum_ratio
378 if trade_type == "short":
379 amounts[currency] = - amounts[currency]
aca4d437 380 self.all.setdefault(currency, portfolio.Balance(currency, {}))
f86ee140 381 self.market.report.log_dispatch(amount, amounts, liquidity, repartition)
6ca5a1ec
IB
382 return amounts
383
f86ee140
IB
384 def as_json(self):
385 return { k: v.as_json() for k, v in self.all.items() }
3d0247f9 386
6ca5a1ec 387class TradeStore:
f86ee140
IB
388 def __init__(self, market):
389 self.market = market
390 self.all = []
6ca5a1ec 391
aca4d437
IB
392 @property
393 def pending(self):
17598517 394 return list(filter(lambda t: t.pending, self.all))
aca4d437 395
f86ee140 396 def compute_trades(self, values_in_base, new_repartition, only=None):
3d0247f9 397 computed_trades = []
6ca5a1ec 398 base_currency = sum(values_in_base.values()).currency
f86ee140 399 for currency in self.market.balances.currencies():
6ca5a1ec
IB
400 if currency == base_currency:
401 continue
402 value_from = values_in_base.get(currency, portfolio.Amount(base_currency, 0))
403 value_to = new_repartition.get(currency, portfolio.Amount(base_currency, 0))
1aa7d4fa 404
6ca5a1ec 405 if value_from.value * value_to.value < 0:
f86ee140 406 computed_trades.append(self.trade_if_matching(
1aa7d4fa 407 value_from, portfolio.Amount(base_currency, 0),
f86ee140
IB
408 currency, only=only))
409 computed_trades.append(self.trade_if_matching(
1aa7d4fa 410 portfolio.Amount(base_currency, 0), value_to,
f86ee140 411 currency, only=only))
6ca5a1ec 412 else:
f86ee140 413 computed_trades.append(self.trade_if_matching(
3d0247f9 414 value_from, value_to,
f86ee140 415 currency, only=only))
3d0247f9
IB
416 for matching, trade in computed_trades:
417 if matching:
f86ee140
IB
418 self.all.append(trade)
419 self.market.report.log_trades(computed_trades, only)
1aa7d4fa 420
f86ee140
IB
421 def trade_if_matching(self, value_from, value_to, currency,
422 only=None):
1aa7d4fa 423 trade = portfolio.Trade(value_from, value_to, currency,
f86ee140 424 self.market)
3d0247f9
IB
425 matching = only is None or trade.action == only
426 return [matching, trade]
6ca5a1ec 427
f86ee140 428 def prepare_orders(self, only=None, compute_value="default"):
3d0247f9 429 orders = []
aca4d437 430 for trade in self.pending:
6ca5a1ec 431 if only is None or trade.action == only:
3d0247f9 432 orders.append(trade.prepare_order(compute_value=compute_value))
f86ee140 433 self.market.report.log_orders(orders, only, compute_value)
6ca5a1ec 434
17598517
IB
435 def close_trades(self):
436 for trade in self.all:
437 trade.close()
438
f86ee140
IB
439 def print_all_with_order(self, ind=""):
440 for trade in self.all:
3d0247f9 441 trade.print_with_order(ind=ind)
6ca5a1ec 442
f86ee140
IB
443 def run_orders(self):
444 orders = self.all_orders(state="pending")
3d0247f9 445 for order in orders:
6ca5a1ec 446 order.run()
f86ee140
IB
447 self.market.report.log_stage("run_orders")
448 self.market.report.log_orders(orders)
6ca5a1ec 449
f86ee140
IB
450 def all_orders(self, state=None):
451 all_orders = sum(map(lambda v: v.orders, self.all), [])
6ca5a1ec
IB
452 if state is None:
453 return all_orders
454 else:
455 return list(filter(lambda o: o.status == state, all_orders))
456
f86ee140
IB
457 def update_all_orders_status(self):
458 for order in self.all_orders(state="open"):
6ca5a1ec
IB
459 order.get_status()
460
dc1ca9a3
IB
461class NoopLock:
462 def __enter__(self, *args):
463 pass
464 def __exit__(self, *args):
465 pass
466
467class LockedVar:
468 def __init__(self, value):
469 self.lock = NoopLock()
470 self.val = value
471
472 def start_lock(self):
473 import threading
474 self.lock = threading.Lock()
475
476 def set(self, value):
477 with self.lock:
478 self.val = value
479
480 def get(self, key=None):
481 with self.lock:
482 if key is not None and isinstance(self.val, dict):
483 return self.val.get(key)
484 else:
485 return self.val
486
487 def __getattr__(self, key):
488 with self.lock:
489 return getattr(self.val, key)
490
ada1b5f1
IB
491class Portfolio:
492 URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
dc1ca9a3
IB
493 data = LockedVar(None)
494 liquidities = LockedVar({})
495 last_date = LockedVar(None)
e7d7c0e5 496 report = LockedVar(ReportStore(None, no_http_dup=True))
dc1ca9a3 497 worker = None
a0dcf4e0 498 worker_tag = ""
dc1ca9a3
IB
499 worker_started = False
500 worker_notify = None
501 callback = None
8f98e46a 502 poll_started_at = None
dc1ca9a3
IB
503
504 @classmethod
8f98e46a
IB
505 def next_wait_time(cls):
506 now = datetime.datetime.now()
507 if cls.poll_started_at is None:
508 cls.poll_started_at = now
509 delta = now - cls.poll_started_at
510
511 if delta < datetime.timedelta(minutes=30):
512 return 30
513 elif delta < datetime.timedelta(hours=1):
514 return 60
515 elif delta < datetime.timedelta(hours=4):
516 return 5*60
517 elif delta < datetime.timedelta(days=1):
518 return 60*60
519 else:
6746607a 520 raise Exception("Too long waiting")
8f98e46a
IB
521
522 @classmethod
523 def start_worker(cls):
dc1ca9a3
IB
524 import threading
525
526 cls.worker = threading.Thread(name="portfolio", daemon=True,
8f98e46a 527 target=cls.wait_for_notification)
dc1ca9a3
IB
528 cls.worker_notify = threading.Event()
529 cls.callback = threading.Event()
530
531 cls.last_date.start_lock()
532 cls.liquidities.start_lock()
533 cls.report.start_lock()
534
a0dcf4e0 535 cls.worker_tag = "[Worker] "
dc1ca9a3
IB
536 cls.worker_started = True
537 cls.worker.start()
538
539 @classmethod
540 def is_worker_thread(cls):
541 if cls.worker is None:
542 return False
543 else:
544 import threading
545 return cls.worker == threading.current_thread()
546
547 @classmethod
8f98e46a 548 def wait_for_notification(cls):
dc1ca9a3
IB
549 if not cls.is_worker_thread():
550 raise RuntimeError("This method needs to be ran with the worker")
551 while cls.worker_started:
552 cls.worker_notify.wait()
e7d7c0e5
IB
553 if cls.worker_started:
554 cls.worker_notify.clear()
a0dcf4e0 555 cls.report.print_log("[Worker] Fetching cryptoportfolio")
e7d7c0e5
IB
556 cls.get_cryptoportfolio(refetch=True)
557 cls.callback.set()
6746607a
IB
558 try:
559 time.sleep(cls.next_wait_time())
560 except Exception:
561 cls.stop_worker()
e7d7c0e5
IB
562
563 @classmethod
564 def stop_worker(cls):
565 cls.worker_started = False
566 cls.worker_notify.set()
ada1b5f1
IB
567
568 @classmethod
dc1ca9a3
IB
569 def notify_and_wait(cls):
570 cls.callback.clear()
571 cls.worker_notify.set()
572 cls.callback.wait()
573
574 @classmethod
8f98e46a 575 def wait_for_recent(cls, delta=4):
ada1b5f1 576 cls.get_cryptoportfolio()
e7d7c0e5 577 while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta):
dc1ca9a3 578 if cls.worker is None:
8f98e46a 579 time.sleep(cls.next_wait_time())
dc1ca9a3 580 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
ada1b5f1
IB
581 cls.get_cryptoportfolio(refetch=True)
582
583 @classmethod
9b697863
IB
584 def repartition(cls, liquidity="medium", from_cache=False):
585 if from_cache:
586 cls.retrieve_cryptoportfolio()
ada1b5f1 587 cls.get_cryptoportfolio()
dc1ca9a3 588 liquidities = cls.liquidities.get(liquidity)
4b598ca6
IB
589 if liquidities is not None and cls.last_date.get() in liquidities:
590 return liquidities[cls.last_date.get()].copy()
ada1b5f1
IB
591
592 @classmethod
593 def get_cryptoportfolio(cls, refetch=False):
dc1ca9a3
IB
594 if cls.data.get() is not None and not refetch:
595 return
596 if cls.worker is not None and not cls.is_worker_thread():
6746607a
IB
597 if not cls.worker_started:
598 raise Exception("Portfolio worker is down and no usable data is present")
dc1ca9a3 599 cls.notify_and_wait()
ada1b5f1
IB
600 return
601 try:
602 r = requests.get(cls.URL)
603 cls.report.log_http_request(r.request.method,
604 r.request.url, r.request.body, r.request.headers, r)
605 except Exception as e:
a0dcf4e0 606 cls.report.log_error("{}get_cryptoportfolio".format(cls.worker_tag), exception=e)
ada1b5f1
IB
607 return
608 try:
dc1ca9a3 609 cls.data.set(r.json(parse_int=D, parse_float=D))
ada1b5f1 610 cls.parse_cryptoportfolio()
40d0fa27 611 cls.store_cryptoportfolio()
8f98e46a 612 except (AssertionError, JSONDecodeError, SimpleJSONDecodeError):
dc1ca9a3
IB
613 cls.data.set(None)
614 cls.last_date.set(None)
615 cls.liquidities.set({})
ada1b5f1 616
9b697863
IB
617 @classmethod
618 def retrieve_cryptoportfolio(cls):
619 if dbs.redis_connected():
620 repartition = dbs.redis.get("/cryptoportfolio/repartition/latest")
621 date = dbs.redis.get("/cryptoportfolio/repartition/date")
622 if date is not None and repartition is not None:
623 date = datetime.datetime.strptime(date.decode(), "%Y-%m-%d")
624 repartition = json.loads(repartition, parse_int=D, parse_float=D)
625 repartition = { k: { date: v } for k, v in repartition.items() }
626
627 cls.data.set("")
628 cls.last_date.set(date)
629 cls.liquidities.set(repartition)
630
40d0fa27
IB
631 @classmethod
632 def store_cryptoportfolio(cls):
633 if dbs.redis_connected():
634 hash_ = {}
635 for liquidity, repartitions in cls.liquidities.items():
636 hash_[liquidity] = repartitions[cls.last_date.get()]
637 dump = json.dumps(hash_)
638 key = "/cryptoportfolio/repartition/latest"
639 dbs.redis.set(key, dump)
640 key = "/cryptoportfolio/repartition/date"
641 dbs.redis.set(key, cls.last_date.date().isoformat())
642
ada1b5f1
IB
643 @classmethod
644 def parse_cryptoportfolio(cls):
645 def filter_weights(weight_hash):
646 if weight_hash[1][0] == 0:
647 return False
648 if weight_hash[0] == "_row":
649 return False
650 return True
651
652 def clean_weights(i):
653 def clean_weights_(h):
654 if h[0].endswith("s"):
655 return [h[0][0:-1], (h[1][i], "short")]
656 else:
657 return [h[0], (h[1][i], "long")]
658 return clean_weights_
659
660 def parse_weights(portfolio_hash):
dc1ca9a3
IB
661 if "weights" not in portfolio_hash:
662 return {}
ada1b5f1
IB
663 weights_hash = portfolio_hash["weights"]
664 weights = {}
665 for i in range(len(weights_hash["_row"])):
e7d7c0e5 666 date = datetime.datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d")
ada1b5f1
IB
667 weights[date] = dict(filter(
668 filter_weights,
669 map(clean_weights(i), weights_hash.items())))
670 return weights
671
dc1ca9a3
IB
672 high_liquidity = parse_weights(cls.data.get("portfolio_1"))
673 medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
ada1b5f1 674
8f98e46a
IB
675 assert len(high_liquidity) > 0
676 assert len(medium_liquidity) > 0
dc1ca9a3
IB
677 cls.liquidities.set({
678 "medium": medium_liquidity,
679 "high": high_liquidity,
680 })
681 cls.last_date.set(max(
e7d7c0e5
IB
682 max(medium_liquidity.keys(), default=datetime.datetime(1, 1, 1)),
683 max(high_liquidity.keys(), default=datetime.datetime(1, 1, 1))
dc1ca9a3 684 ))
ada1b5f1 685