]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blame - store.py
Refactor databases access
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / store.py
CommitLineData
ada1b5f1
IB
1import time
2import requests
6ca5a1ec 3import portfolio
3d0247f9
IB
4import simplejson as json
5from decimal import Decimal as D, ROUND_DOWN
e7d7c0e5 6import datetime
aca4d437 7import inspect
ada1b5f1
IB
8from json import JSONDecodeError
9from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError
6ca5a1ec 10
ada1b5f1 11__all__ = ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"]
3d0247f9
IB
12
13class ReportStore:
e7d7c0e5 14 def __init__(self, market, verbose_print=True, no_http_dup=False):
f86ee140
IB
15 self.market = market
16 self.verbose_print = verbose_print
3d0247f9 17
718e3e91 18 self.print_logs = []
f86ee140 19 self.logs = []
1593c7a9 20 self.redis_status = []
f86ee140 21
e7d7c0e5
IB
22 self.no_http_dup = no_http_dup
23 self.last_http = None
24
9bde69bf
IB
25 def merge(self, other_report):
26 self.logs += other_report.logs
27 self.logs.sort(key=lambda x: x["date"])
28
718e3e91
IB
29 self.print_logs += other_report.print_logs
30 self.print_logs.sort(key=lambda x: x[0])
31
f86ee140 32 def print_log(self, message):
e7d7c0e5 33 now = datetime.datetime.now()
718e3e91
IB
34 message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message))
35 self.print_logs.append([now, message])
f86ee140 36 if self.verbose_print:
3d0247f9
IB
37 print(message)
38
f86ee140 39 def add_log(self, hash_):
e7d7c0e5
IB
40 hash_["date"] = datetime.datetime.now()
41 if self.market is not None:
42 hash_["user_id"] = self.market.user_id
43 hash_["market_id"] = self.market.market_id
44 else:
45 hash_["user_id"] = None
46 hash_["market_id"] = None
f86ee140 47 self.logs.append(hash_)
e7d7c0e5 48 return hash_
3d0247f9 49
1593c7a9
IB
50 def add_redis_status(self, hash_):
51 self.redis_status.append(hash_)
52 return hash_
53
b4e0ba0b
IB
54 @staticmethod
55 def default_json_serial(obj):
e7d7c0e5 56 if isinstance(obj, (datetime.datetime, datetime.date)):
b4e0ba0b
IB
57 return obj.isoformat()
58 return str(obj)
59
f86ee140 60 def to_json(self):
b4e0ba0b
IB
61 return json.dumps(self.logs, default=self.default_json_serial, indent=" ")
62
63 def to_json_array(self):
64 for log in (x.copy() for x in self.logs):
65 yield (
66 log.pop("date"),
67 log.pop("type"),
68 json.dumps(log, default=self.default_json_serial, indent=" ")
69 )
3d0247f9 70
1593c7a9
IB
71 def to_json_redis(self):
72 for log in (x.copy() for x in self.redis_status):
73 yield (
74 log.pop("type"),
75 json.dumps(log, default=self.default_json_serial)
76 )
77
f86ee140
IB
78 def set_verbose(self, verbose_print):
79 self.verbose_print = verbose_print
3d0247f9 80
7bd830a8
IB
81 def log_stage(self, stage, **kwargs):
82 def as_json(element):
83 if callable(element):
84 return inspect.getsource(element).strip()
85 elif hasattr(element, "as_json"):
86 return element.as_json()
87 else:
88 return element
89
90 args = { k: as_json(v) for k, v in kwargs.items() }
91 args_str = ["{}={}".format(k, v) for k, v in args.items()]
f86ee140 92 self.print_log("-" * (len(stage) + 8))
7bd830a8 93 self.print_log("[Stage] {} {}".format(stage, ", ".join(args_str)))
3d0247f9 94
f86ee140 95 self.add_log({
3d0247f9
IB
96 "type": "stage",
97 "stage": stage,
7bd830a8 98 "args": args,
3d0247f9
IB
99 })
100
2b1ee8f4
IB
101 def log_balances(self, tag=None, tickers=None,
102 ticker_currency=None, compute_value=None, type=None):
f86ee140
IB
103 self.print_log("[Balance]")
104 for currency, balance in self.market.balances.all.items():
105 self.print_log("\t{}".format(balance))
3d0247f9 106
1593c7a9
IB
107 log = {
108 "type": "balance",
109 "tag": tag,
110 "balances": self.market.balances.as_json()
111 }
112
2b1ee8f4
IB
113 if tickers is not None:
114 log["tickers"] = self._ticker_hash(tickers, ticker_currency,
115 compute_value, type)
116
1593c7a9
IB
117 self.add_log(log.copy())
118 self.add_redis_status(log)
3d0247f9 119
f86ee140 120 def log_tickers(self, amounts, other_currency,
3d0247f9 121 compute_value, type):
2b1ee8f4
IB
122 log = self._ticker_hash(amounts, other_currency, compute_value,
123 type)
124 log["type"] = "tickers"
125
126 self.add_log(log)
127
128 def _ticker_hash(self, amounts, other_currency, compute_value, type):
3d0247f9
IB
129 values = {}
130 rates = {}
aca4d437
IB
131 if callable(compute_value):
132 compute_value = inspect.getsource(compute_value).strip()
133
3d0247f9
IB
134 for currency, amount in amounts.items():
135 values[currency] = amount.as_json()["value"]
136 rates[currency] = amount.rate
2b1ee8f4 137 return {
1593c7a9
IB
138 "compute_value": compute_value,
139 "balance_type": type,
140 "currency": other_currency,
141 "balances": values,
142 "rates": rates,
143 "total": sum(amounts.values()).as_json()["value"]
144 }
145
f86ee140
IB
146 def log_dispatch(self, amount, amounts, liquidity, repartition):
147 self.add_log({
3d0247f9
IB
148 "type": "dispatch",
149 "liquidity": liquidity,
150 "repartition_ratio": repartition,
151 "total_amount": amount.as_json(),
152 "repartition": { k: v.as_json()["value"] for k, v in amounts.items() }
153 })
154
f86ee140 155 def log_trades(self, matching_and_trades, only):
3d0247f9
IB
156 trades = []
157 for matching, trade in matching_and_trades:
158 trade_json = trade.as_json()
159 trade_json["skipped"] = not matching
160 trades.append(trade_json)
161
f86ee140 162 self.add_log({
3d0247f9
IB
163 "type": "trades",
164 "only": only,
f86ee140 165 "debug": self.market.debug,
3d0247f9
IB
166 "trades": trades
167 })
168
f86ee140 169 def log_orders(self, orders, tick=None, only=None, compute_value=None):
aca4d437
IB
170 if callable(compute_value):
171 compute_value = inspect.getsource(compute_value).strip()
f86ee140
IB
172 self.print_log("[Orders]")
173 self.market.trades.print_all_with_order(ind="\t")
174 self.add_log({
3d0247f9
IB
175 "type": "orders",
176 "only": only,
177 "compute_value": compute_value,
178 "tick": tick,
179 "orders": [order.as_json() for order in orders if order is not None]
180 })
181
f86ee140 182 def log_order(self, order, tick, finished=False, update=None,
3d0247f9 183 new_order=None, compute_value=None):
aca4d437
IB
184 if callable(compute_value):
185 compute_value = inspect.getsource(compute_value).strip()
3d0247f9 186 if finished:
f86ee140 187 self.print_log("[Order] Finished {}".format(order))
3d0247f9 188 elif update == "waiting":
f86ee140 189 self.print_log("[Order] {}, tick {}, waiting".format(order, tick))
3d0247f9 190 elif update == "adjusting":
f86ee140 191 self.print_log("[Order] {}, tick {}, cancelling and adjusting to {}".format(order, tick, new_order))
3d0247f9 192 elif update == "market_fallback":
f86ee140 193 self.print_log("[Order] {}, tick {}, fallbacking to market value".format(order, tick))
3d0247f9 194 elif update == "market_adjust":
f86ee140 195 self.print_log("[Order] {}, tick {}, market value, cancelling and adjusting to {}".format(order, tick, new_order))
3d0247f9 196
f86ee140 197 self.add_log({
3d0247f9
IB
198 "type": "order",
199 "tick": tick,
200 "update": update,
201 "order": order.as_json(),
202 "compute_value": compute_value,
203 "new_order": new_order.as_json() if new_order is not None else None
204 })
205
f86ee140
IB
206 def log_move_balances(self, needed, moving):
207 self.add_log({
3d0247f9 208 "type": "move_balances",
f86ee140 209 "debug": self.market.debug,
3d0247f9
IB
210 "needed": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in needed.items() },
211 "moving": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in moving.items() },
212 })
213
f86ee140 214 def log_http_request(self, method, url, body, headers, response):
d8e233ac
IB
215 if isinstance(response, Exception):
216 self.add_log({
217 "type": "http_request",
218 "method": method,
219 "url": url,
220 "body": body,
221 "headers": headers,
222 "status": -1,
223 "response": None,
224 "error": response.__class__.__name__,
225 "error_message": str(response),
226 })
e7d7c0e5
IB
227 self.last_http = None
228 elif self.no_http_dup and \
229 self.last_http is not None and \
230 self.last_http["url"] == url and \
231 self.last_http["method"] == method and \
232 self.last_http["response"] == response.text:
d8e233ac
IB
233 self.add_log({
234 "type": "http_request",
235 "method": method,
236 "url": url,
237 "body": body,
238 "headers": headers,
239 "status": response.status_code,
9fe90554 240 "duration": response.elapsed.total_seconds(),
e7d7c0e5
IB
241 "response": None,
242 "response_same_as": self.last_http["date"]
243 })
244 else:
245 self.last_http = self.add_log({
246 "type": "http_request",
247 "method": method,
248 "url": url,
249 "body": body,
250 "headers": headers,
251 "status": response.status_code,
9fe90554 252 "duration": response.elapsed.total_seconds(),
e7d7c0e5
IB
253 "response": response.text,
254 "response_same_as": None,
d8e233ac 255 })
3d0247f9 256
f86ee140
IB
257 def log_error(self, action, message=None, exception=None):
258 self.print_log("[Error] {}".format(action))
3d0247f9 259 if exception is not None:
f86ee140 260 self.print_log(str("\t{}: {}".format(exception.__class__.__name__, exception)))
3d0247f9 261 if message is not None:
f86ee140 262 self.print_log("\t{}".format(message))
3d0247f9 263
f86ee140 264 self.add_log({
3d0247f9
IB
265 "type": "error",
266 "action": action,
267 "exception_class": exception.__class__.__name__ if exception is not None else None,
268 "exception_message": str(exception) if exception is not None else None,
269 "message": message,
270 })
271
f86ee140
IB
272 def log_debug_action(self, action):
273 self.print_log("[Debug] {}".format(action))
3d0247f9 274
f86ee140 275 self.add_log({
3d0247f9
IB
276 "type": "debug_action",
277 "action": action,
278 })
6ca5a1ec 279
e7d7c0e5 280 def log_market(self, args):
90d7423e
IB
281 self.add_log({
282 "type": "market",
283 "commit": "$Format:%H$",
284 "args": vars(args),
90d7423e
IB
285 })
286
6ca5a1ec 287class BalanceStore:
f86ee140
IB
288 def __init__(self, market):
289 self.market = market
290 self.all = {}
6ca5a1ec 291
f86ee140
IB
292 def currencies(self):
293 return self.all.keys()
6ca5a1ec 294
f86ee140 295 def in_currency(self, other_currency, compute_value="average", type="total"):
6ca5a1ec 296 amounts = {}
f86ee140 297 for currency, balance in self.all.items():
6ca5a1ec 298 other_currency_amount = getattr(balance, type)\
f86ee140 299 .in_currency(other_currency, self.market, compute_value=compute_value)
6ca5a1ec 300 amounts[currency] = other_currency_amount
f86ee140 301 self.market.report.log_tickers(amounts, other_currency,
3d0247f9 302 compute_value, type)
6ca5a1ec
IB
303 return amounts
304
2b1ee8f4
IB
305 def fetch_balances(self, tag=None, log_tickers=False,
306 ticker_currency="BTC", ticker_compute_value="average", ticker_type="total"):
f86ee140 307 all_balances = self.market.ccxt.fetch_all_balances()
6ca5a1ec
IB
308 for currency, balance in all_balances.items():
309 if balance["exchange_total"] != 0 or balance["margin_total"] != 0 or \
f86ee140
IB
310 currency in self.all:
311 self.all[currency] = portfolio.Balance(currency, balance)
2b1ee8f4
IB
312 if log_tickers:
313 tickers = self.in_currency(ticker_currency, compute_value=ticker_compute_value, type=ticker_type)
314 self.market.report.log_balances(tag=tag,
315 tickers=tickers, ticker_currency=ticker_currency,
316 compute_value=ticker_compute_value, type=ticker_type)
317 else:
318 self.market.report.log_balances(tag=tag)
6ca5a1ec 319
f86ee140 320 def dispatch_assets(self, amount, liquidity="medium", repartition=None):
6ca5a1ec 321 if repartition is None:
ada1b5f1 322 repartition = Portfolio.repartition(liquidity=liquidity)
6ca5a1ec
IB
323 sum_ratio = sum([v[0] for k, v in repartition.items()])
324 amounts = {}
325 for currency, (ptt, trade_type) in repartition.items():
326 amounts[currency] = ptt * amount / sum_ratio
327 if trade_type == "short":
328 amounts[currency] = - amounts[currency]
aca4d437 329 self.all.setdefault(currency, portfolio.Balance(currency, {}))
f86ee140 330 self.market.report.log_dispatch(amount, amounts, liquidity, repartition)
6ca5a1ec
IB
331 return amounts
332
f86ee140
IB
333 def as_json(self):
334 return { k: v.as_json() for k, v in self.all.items() }
3d0247f9 335
6ca5a1ec 336class TradeStore:
f86ee140
IB
337 def __init__(self, market):
338 self.market = market
339 self.all = []
6ca5a1ec 340
aca4d437
IB
341 @property
342 def pending(self):
17598517 343 return list(filter(lambda t: t.pending, self.all))
aca4d437 344
f86ee140 345 def compute_trades(self, values_in_base, new_repartition, only=None):
3d0247f9 346 computed_trades = []
6ca5a1ec 347 base_currency = sum(values_in_base.values()).currency
f86ee140 348 for currency in self.market.balances.currencies():
6ca5a1ec
IB
349 if currency == base_currency:
350 continue
351 value_from = values_in_base.get(currency, portfolio.Amount(base_currency, 0))
352 value_to = new_repartition.get(currency, portfolio.Amount(base_currency, 0))
1aa7d4fa 353
6ca5a1ec 354 if value_from.value * value_to.value < 0:
f86ee140 355 computed_trades.append(self.trade_if_matching(
1aa7d4fa 356 value_from, portfolio.Amount(base_currency, 0),
f86ee140
IB
357 currency, only=only))
358 computed_trades.append(self.trade_if_matching(
1aa7d4fa 359 portfolio.Amount(base_currency, 0), value_to,
f86ee140 360 currency, only=only))
6ca5a1ec 361 else:
f86ee140 362 computed_trades.append(self.trade_if_matching(
3d0247f9 363 value_from, value_to,
f86ee140 364 currency, only=only))
3d0247f9
IB
365 for matching, trade in computed_trades:
366 if matching:
f86ee140
IB
367 self.all.append(trade)
368 self.market.report.log_trades(computed_trades, only)
1aa7d4fa 369
f86ee140
IB
370 def trade_if_matching(self, value_from, value_to, currency,
371 only=None):
1aa7d4fa 372 trade = portfolio.Trade(value_from, value_to, currency,
f86ee140 373 self.market)
3d0247f9
IB
374 matching = only is None or trade.action == only
375 return [matching, trade]
6ca5a1ec 376
f86ee140 377 def prepare_orders(self, only=None, compute_value="default"):
3d0247f9 378 orders = []
aca4d437 379 for trade in self.pending:
6ca5a1ec 380 if only is None or trade.action == only:
3d0247f9 381 orders.append(trade.prepare_order(compute_value=compute_value))
f86ee140 382 self.market.report.log_orders(orders, only, compute_value)
6ca5a1ec 383
17598517
IB
384 def close_trades(self):
385 for trade in self.all:
386 trade.close()
387
f86ee140
IB
388 def print_all_with_order(self, ind=""):
389 for trade in self.all:
3d0247f9 390 trade.print_with_order(ind=ind)
6ca5a1ec 391
f86ee140
IB
392 def run_orders(self):
393 orders = self.all_orders(state="pending")
3d0247f9 394 for order in orders:
6ca5a1ec 395 order.run()
f86ee140
IB
396 self.market.report.log_stage("run_orders")
397 self.market.report.log_orders(orders)
6ca5a1ec 398
f86ee140
IB
399 def all_orders(self, state=None):
400 all_orders = sum(map(lambda v: v.orders, self.all), [])
6ca5a1ec
IB
401 if state is None:
402 return all_orders
403 else:
404 return list(filter(lambda o: o.status == state, all_orders))
405
f86ee140
IB
406 def update_all_orders_status(self):
407 for order in self.all_orders(state="open"):
6ca5a1ec
IB
408 order.get_status()
409
dc1ca9a3
IB
410class NoopLock:
411 def __enter__(self, *args):
412 pass
413 def __exit__(self, *args):
414 pass
415
416class LockedVar:
417 def __init__(self, value):
418 self.lock = NoopLock()
419 self.val = value
420
421 def start_lock(self):
422 import threading
423 self.lock = threading.Lock()
424
425 def set(self, value):
426 with self.lock:
427 self.val = value
428
429 def get(self, key=None):
430 with self.lock:
431 if key is not None and isinstance(self.val, dict):
432 return self.val.get(key)
433 else:
434 return self.val
435
436 def __getattr__(self, key):
437 with self.lock:
438 return getattr(self.val, key)
439
ada1b5f1
IB
440class Portfolio:
441 URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
dc1ca9a3
IB
442 data = LockedVar(None)
443 liquidities = LockedVar({})
444 last_date = LockedVar(None)
e7d7c0e5 445 report = LockedVar(ReportStore(None, no_http_dup=True))
dc1ca9a3 446 worker = None
a0dcf4e0 447 worker_tag = ""
dc1ca9a3
IB
448 worker_started = False
449 worker_notify = None
450 callback = None
451
452 @classmethod
453 def start_worker(cls, poll=30):
454 import threading
455
456 cls.worker = threading.Thread(name="portfolio", daemon=True,
457 target=cls.wait_for_notification, kwargs={"poll": poll})
458 cls.worker_notify = threading.Event()
459 cls.callback = threading.Event()
460
461 cls.last_date.start_lock()
462 cls.liquidities.start_lock()
463 cls.report.start_lock()
464
a0dcf4e0 465 cls.worker_tag = "[Worker] "
dc1ca9a3
IB
466 cls.worker_started = True
467 cls.worker.start()
468
469 @classmethod
470 def is_worker_thread(cls):
471 if cls.worker is None:
472 return False
473 else:
474 import threading
475 return cls.worker == threading.current_thread()
476
477 @classmethod
478 def wait_for_notification(cls, poll=30):
479 if not cls.is_worker_thread():
480 raise RuntimeError("This method needs to be ran with the worker")
481 while cls.worker_started:
482 cls.worker_notify.wait()
e7d7c0e5
IB
483 if cls.worker_started:
484 cls.worker_notify.clear()
a0dcf4e0 485 cls.report.print_log("[Worker] Fetching cryptoportfolio")
e7d7c0e5
IB
486 cls.get_cryptoportfolio(refetch=True)
487 cls.callback.set()
488 time.sleep(poll)
489
490 @classmethod
491 def stop_worker(cls):
492 cls.worker_started = False
493 cls.worker_notify.set()
ada1b5f1
IB
494
495 @classmethod
dc1ca9a3
IB
496 def notify_and_wait(cls):
497 cls.callback.clear()
498 cls.worker_notify.set()
499 cls.callback.wait()
500
501 @classmethod
502 def wait_for_recent(cls, delta=4, poll=30):
ada1b5f1 503 cls.get_cryptoportfolio()
e7d7c0e5 504 while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta):
dc1ca9a3
IB
505 if cls.worker is None:
506 time.sleep(poll)
507 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
ada1b5f1
IB
508 cls.get_cryptoportfolio(refetch=True)
509
510 @classmethod
511 def repartition(cls, liquidity="medium"):
512 cls.get_cryptoportfolio()
dc1ca9a3
IB
513 liquidities = cls.liquidities.get(liquidity)
514 return liquidities[cls.last_date.get()]
ada1b5f1
IB
515
516 @classmethod
517 def get_cryptoportfolio(cls, refetch=False):
dc1ca9a3
IB
518 if cls.data.get() is not None and not refetch:
519 return
520 if cls.worker is not None and not cls.is_worker_thread():
521 cls.notify_and_wait()
ada1b5f1
IB
522 return
523 try:
524 r = requests.get(cls.URL)
525 cls.report.log_http_request(r.request.method,
526 r.request.url, r.request.body, r.request.headers, r)
527 except Exception as e:
a0dcf4e0 528 cls.report.log_error("{}get_cryptoportfolio".format(cls.worker_tag), exception=e)
ada1b5f1
IB
529 return
530 try:
dc1ca9a3 531 cls.data.set(r.json(parse_int=D, parse_float=D))
ada1b5f1
IB
532 cls.parse_cryptoportfolio()
533 except (JSONDecodeError, SimpleJSONDecodeError):
dc1ca9a3
IB
534 cls.data.set(None)
535 cls.last_date.set(None)
536 cls.liquidities.set({})
ada1b5f1
IB
537
538 @classmethod
539 def parse_cryptoportfolio(cls):
540 def filter_weights(weight_hash):
541 if weight_hash[1][0] == 0:
542 return False
543 if weight_hash[0] == "_row":
544 return False
545 return True
546
547 def clean_weights(i):
548 def clean_weights_(h):
549 if h[0].endswith("s"):
550 return [h[0][0:-1], (h[1][i], "short")]
551 else:
552 return [h[0], (h[1][i], "long")]
553 return clean_weights_
554
555 def parse_weights(portfolio_hash):
dc1ca9a3
IB
556 if "weights" not in portfolio_hash:
557 return {}
ada1b5f1
IB
558 weights_hash = portfolio_hash["weights"]
559 weights = {}
560 for i in range(len(weights_hash["_row"])):
e7d7c0e5 561 date = datetime.datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d")
ada1b5f1
IB
562 weights[date] = dict(filter(
563 filter_weights,
564 map(clean_weights(i), weights_hash.items())))
565 return weights
566
dc1ca9a3
IB
567 high_liquidity = parse_weights(cls.data.get("portfolio_1"))
568 medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
ada1b5f1 569
dc1ca9a3
IB
570 cls.liquidities.set({
571 "medium": medium_liquidity,
572 "high": high_liquidity,
573 })
574 cls.last_date.set(max(
e7d7c0e5
IB
575 max(medium_liquidity.keys(), default=datetime.datetime(1, 1, 1)),
576 max(high_liquidity.keys(), default=datetime.datetime(1, 1, 1))
dc1ca9a3 577 ))
ada1b5f1 578