]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blob - store.py
Merge branch 'refactor_db' into dev
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / store.py
1 import time
2 import requests
3 import portfolio
4 import simplejson as json
5 from decimal import Decimal as D, ROUND_DOWN
6 import datetime
7 import inspect
8 from json import JSONDecodeError
9 from simplejson.errors import JSONDecodeError as SimpleJSONDecodeError
10 import dbs
11
12 __all__ = ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"]
13
14 class ReportStore:
15 def __init__(self, market, verbose_print=True, no_http_dup=False):
16 self.market = market
17 self.verbose_print = verbose_print
18
19 self.print_logs = []
20 self.logs = []
21 self.redis_status = []
22
23 self.no_http_dup = no_http_dup
24 self.last_http = None
25
26 def merge(self, other_report):
27 self.logs += other_report.logs
28 self.logs.sort(key=lambda x: x["date"])
29
30 self.print_logs += other_report.print_logs
31 self.print_logs.sort(key=lambda x: x[0])
32
33 def print_log(self, message):
34 now = datetime.datetime.now()
35 message = "{:%Y-%m-%d %H:%M:%S}: {}".format(now, str(message))
36 self.print_logs.append([now, message])
37 if self.verbose_print:
38 print(message)
39
40 def add_log(self, hash_):
41 hash_["date"] = datetime.datetime.now()
42 if self.market is not None:
43 hash_["user_id"] = self.market.user_id
44 hash_["market_id"] = self.market.market_id
45 else:
46 hash_["user_id"] = None
47 hash_["market_id"] = None
48 self.logs.append(hash_)
49 return hash_
50
51 def add_redis_status(self, hash_):
52 self.redis_status.append(hash_)
53 return hash_
54
55 @staticmethod
56 def default_json_serial(obj):
57 if isinstance(obj, (datetime.datetime, datetime.date)):
58 return obj.isoformat()
59 return str(obj)
60
61 def to_json(self):
62 return json.dumps(self.logs, default=self.default_json_serial, indent=" ")
63
64 def to_json_array(self):
65 for log in (x.copy() for x in self.logs):
66 yield (
67 log.pop("date"),
68 log.pop("type"),
69 json.dumps(log, default=self.default_json_serial, indent=" ")
70 )
71
72 def to_json_redis(self):
73 for log in (x.copy() for x in self.redis_status):
74 yield (
75 log.pop("type"),
76 json.dumps(log, default=self.default_json_serial)
77 )
78
79 def set_verbose(self, verbose_print):
80 self.verbose_print = verbose_print
81
82 def log_stage(self, stage, **kwargs):
83 def as_json(element):
84 if callable(element):
85 return inspect.getsource(element).strip()
86 elif hasattr(element, "as_json"):
87 return element.as_json()
88 else:
89 return element
90
91 args = { k: as_json(v) for k, v in kwargs.items() }
92 args_str = ["{}={}".format(k, v) for k, v in args.items()]
93 self.print_log("-" * (len(stage) + 8))
94 self.print_log("[Stage] {} {}".format(stage, ", ".join(args_str)))
95
96 self.add_log({
97 "type": "stage",
98 "stage": stage,
99 "args": args,
100 })
101
102 def log_balances(self, tag=None, tickers=None,
103 ticker_currency=None, compute_value=None, type=None):
104 self.print_log("[Balance]")
105 for currency, balance in self.market.balances.all.items():
106 self.print_log("\t{}".format(balance))
107
108 log = {
109 "type": "balance",
110 "tag": tag,
111 "balances": self.market.balances.as_json()
112 }
113
114 if tickers is not None:
115 log["tickers"] = self._ticker_hash(tickers, ticker_currency,
116 compute_value, type)
117
118 self.add_log(log.copy())
119 self.add_redis_status(log)
120
121 def log_tickers(self, amounts, other_currency,
122 compute_value, type):
123 log = self._ticker_hash(amounts, other_currency, compute_value,
124 type)
125 log["type"] = "tickers"
126
127 self.add_log(log)
128
129 def _ticker_hash(self, amounts, other_currency, compute_value, type):
130 values = {}
131 rates = {}
132 if callable(compute_value):
133 compute_value = inspect.getsource(compute_value).strip()
134
135 for currency, amount in amounts.items():
136 values[currency] = amount.as_json()["value"]
137 rates[currency] = amount.rate
138 return {
139 "compute_value": compute_value,
140 "balance_type": type,
141 "currency": other_currency,
142 "balances": values,
143 "rates": rates,
144 "total": sum(amounts.values()).as_json()["value"]
145 }
146
147 def log_dispatch(self, amount, amounts, liquidity, repartition):
148 self.add_log({
149 "type": "dispatch",
150 "liquidity": liquidity,
151 "repartition_ratio": repartition,
152 "total_amount": amount.as_json(),
153 "repartition": { k: v.as_json()["value"] for k, v in amounts.items() }
154 })
155
156 def log_trades(self, matching_and_trades, only):
157 trades = []
158 for matching, trade in matching_and_trades:
159 trade_json = trade.as_json()
160 trade_json["skipped"] = not matching
161 trades.append(trade_json)
162
163 self.add_log({
164 "type": "trades",
165 "only": only,
166 "debug": self.market.debug,
167 "trades": trades
168 })
169
170 def log_orders(self, orders, tick=None, only=None, compute_value=None):
171 if callable(compute_value):
172 compute_value = inspect.getsource(compute_value).strip()
173 self.print_log("[Orders]")
174 self.market.trades.print_all_with_order(ind="\t")
175 self.add_log({
176 "type": "orders",
177 "only": only,
178 "compute_value": compute_value,
179 "tick": tick,
180 "orders": [order.as_json() for order in orders if order is not None]
181 })
182
183 def log_order(self, order, tick, finished=False, update=None,
184 new_order=None, compute_value=None):
185 if callable(compute_value):
186 compute_value = inspect.getsource(compute_value).strip()
187 if finished:
188 self.print_log("[Order] Finished {}".format(order))
189 elif update == "waiting":
190 self.print_log("[Order] {}, tick {}, waiting".format(order, tick))
191 elif update == "adjusting":
192 self.print_log("[Order] {}, tick {}, cancelling and adjusting to {}".format(order, tick, new_order))
193 elif update == "market_fallback":
194 self.print_log("[Order] {}, tick {}, fallbacking to market value".format(order, tick))
195 elif update == "market_adjust":
196 self.print_log("[Order] {}, tick {}, market value, cancelling and adjusting to {}".format(order, tick, new_order))
197
198 self.add_log({
199 "type": "order",
200 "tick": tick,
201 "update": update,
202 "order": order.as_json(),
203 "compute_value": compute_value,
204 "new_order": new_order.as_json() if new_order is not None else None
205 })
206
207 def log_move_balances(self, needed, moving):
208 self.add_log({
209 "type": "move_balances",
210 "debug": self.market.debug,
211 "needed": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in needed.items() },
212 "moving": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in moving.items() },
213 })
214
215 def log_http_request(self, method, url, body, headers, response):
216 if isinstance(response, Exception):
217 self.add_log({
218 "type": "http_request",
219 "method": method,
220 "url": url,
221 "body": body,
222 "headers": headers,
223 "status": -1,
224 "response": None,
225 "error": response.__class__.__name__,
226 "error_message": str(response),
227 })
228 self.last_http = None
229 elif self.no_http_dup and \
230 self.last_http is not None and \
231 self.last_http["url"] == url and \
232 self.last_http["method"] == method and \
233 self.last_http["response"] == response.text:
234 self.add_log({
235 "type": "http_request",
236 "method": method,
237 "url": url,
238 "body": body,
239 "headers": headers,
240 "status": response.status_code,
241 "duration": response.elapsed.total_seconds(),
242 "response": None,
243 "response_same_as": self.last_http["date"]
244 })
245 else:
246 self.last_http = self.add_log({
247 "type": "http_request",
248 "method": method,
249 "url": url,
250 "body": body,
251 "headers": headers,
252 "status": response.status_code,
253 "duration": response.elapsed.total_seconds(),
254 "response": response.text,
255 "response_same_as": None,
256 })
257
258 def log_error(self, action, message=None, exception=None):
259 self.print_log("[Error] {}".format(action))
260 if exception is not None:
261 self.print_log(str("\t{}: {}".format(exception.__class__.__name__, exception)))
262 if message is not None:
263 self.print_log("\t{}".format(message))
264
265 self.add_log({
266 "type": "error",
267 "action": action,
268 "exception_class": exception.__class__.__name__ if exception is not None else None,
269 "exception_message": str(exception) if exception is not None else None,
270 "message": message,
271 })
272
273 def log_debug_action(self, action):
274 self.print_log("[Debug] {}".format(action))
275
276 self.add_log({
277 "type": "debug_action",
278 "action": action,
279 })
280
281 def log_market(self, args):
282 self.add_log({
283 "type": "market",
284 "commit": "$Format:%H$",
285 "args": vars(args),
286 })
287
288 class BalanceStore:
289 def __init__(self, market):
290 self.market = market
291 self.all = {}
292
293 def currencies(self):
294 return self.all.keys()
295
296 def in_currency(self, other_currency, compute_value="average", type="total"):
297 amounts = {}
298 for currency, balance in self.all.items():
299 other_currency_amount = getattr(balance, type)\
300 .in_currency(other_currency, self.market, compute_value=compute_value)
301 amounts[currency] = other_currency_amount
302 self.market.report.log_tickers(amounts, other_currency,
303 compute_value, type)
304 return amounts
305
306 def fetch_balances(self, tag=None, add_portfolio=False, log_tickers=False,
307 ticker_currency="BTC", ticker_compute_value="average", ticker_type="total"):
308 all_balances = self.market.ccxt.fetch_all_balances()
309 for currency, balance in all_balances.items():
310 if balance["exchange_total"] != 0 or balance["margin_total"] != 0 or \
311 currency in self.all:
312 self.all[currency] = portfolio.Balance(currency, balance)
313 if add_portfolio:
314 for currency in Portfolio.repartition(from_cache=True):
315 self.all.setdefault(currency, portfolio.Balance(currency, {}))
316 if log_tickers:
317 tickers = self.in_currency(ticker_currency, compute_value=ticker_compute_value, type=ticker_type)
318 self.market.report.log_balances(tag=tag,
319 tickers=tickers, ticker_currency=ticker_currency,
320 compute_value=ticker_compute_value, type=ticker_type)
321 else:
322 self.market.report.log_balances(tag=tag)
323
324 def dispatch_assets(self, amount, liquidity="medium", repartition=None):
325 if repartition is None:
326 repartition = Portfolio.repartition(liquidity=liquidity)
327 sum_ratio = sum([v[0] for k, v in repartition.items()])
328 amounts = {}
329 for currency, (ptt, trade_type) in repartition.items():
330 amounts[currency] = ptt * amount / sum_ratio
331 if trade_type == "short":
332 amounts[currency] = - amounts[currency]
333 self.all.setdefault(currency, portfolio.Balance(currency, {}))
334 self.market.report.log_dispatch(amount, amounts, liquidity, repartition)
335 return amounts
336
337 def as_json(self):
338 return { k: v.as_json() for k, v in self.all.items() }
339
340 class TradeStore:
341 def __init__(self, market):
342 self.market = market
343 self.all = []
344
345 @property
346 def pending(self):
347 return list(filter(lambda t: t.pending, self.all))
348
349 def compute_trades(self, values_in_base, new_repartition, only=None):
350 computed_trades = []
351 base_currency = sum(values_in_base.values()).currency
352 for currency in self.market.balances.currencies():
353 if currency == base_currency:
354 continue
355 value_from = values_in_base.get(currency, portfolio.Amount(base_currency, 0))
356 value_to = new_repartition.get(currency, portfolio.Amount(base_currency, 0))
357
358 if value_from.value * value_to.value < 0:
359 computed_trades.append(self.trade_if_matching(
360 value_from, portfolio.Amount(base_currency, 0),
361 currency, only=only))
362 computed_trades.append(self.trade_if_matching(
363 portfolio.Amount(base_currency, 0), value_to,
364 currency, only=only))
365 else:
366 computed_trades.append(self.trade_if_matching(
367 value_from, value_to,
368 currency, only=only))
369 for matching, trade in computed_trades:
370 if matching:
371 self.all.append(trade)
372 self.market.report.log_trades(computed_trades, only)
373
374 def trade_if_matching(self, value_from, value_to, currency,
375 only=None):
376 trade = portfolio.Trade(value_from, value_to, currency,
377 self.market)
378 matching = only is None or trade.action == only
379 return [matching, trade]
380
381 def prepare_orders(self, only=None, compute_value="default"):
382 orders = []
383 for trade in self.pending:
384 if only is None or trade.action == only:
385 orders.append(trade.prepare_order(compute_value=compute_value))
386 self.market.report.log_orders(orders, only, compute_value)
387
388 def close_trades(self):
389 for trade in self.all:
390 trade.close()
391
392 def print_all_with_order(self, ind=""):
393 for trade in self.all:
394 trade.print_with_order(ind=ind)
395
396 def run_orders(self):
397 orders = self.all_orders(state="pending")
398 for order in orders:
399 order.run()
400 self.market.report.log_stage("run_orders")
401 self.market.report.log_orders(orders)
402
403 def all_orders(self, state=None):
404 all_orders = sum(map(lambda v: v.orders, self.all), [])
405 if state is None:
406 return all_orders
407 else:
408 return list(filter(lambda o: o.status == state, all_orders))
409
410 def update_all_orders_status(self):
411 for order in self.all_orders(state="open"):
412 order.get_status()
413
414 class NoopLock:
415 def __enter__(self, *args):
416 pass
417 def __exit__(self, *args):
418 pass
419
420 class LockedVar:
421 def __init__(self, value):
422 self.lock = NoopLock()
423 self.val = value
424
425 def start_lock(self):
426 import threading
427 self.lock = threading.Lock()
428
429 def set(self, value):
430 with self.lock:
431 self.val = value
432
433 def get(self, key=None):
434 with self.lock:
435 if key is not None and isinstance(self.val, dict):
436 return self.val.get(key)
437 else:
438 return self.val
439
440 def __getattr__(self, key):
441 with self.lock:
442 return getattr(self.val, key)
443
444 class Portfolio:
445 URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
446 data = LockedVar(None)
447 liquidities = LockedVar({})
448 last_date = LockedVar(None)
449 report = LockedVar(ReportStore(None, no_http_dup=True))
450 worker = None
451 worker_tag = ""
452 worker_started = False
453 worker_notify = None
454 callback = None
455
456 @classmethod
457 def start_worker(cls, poll=30):
458 import threading
459
460 cls.worker = threading.Thread(name="portfolio", daemon=True,
461 target=cls.wait_for_notification, kwargs={"poll": poll})
462 cls.worker_notify = threading.Event()
463 cls.callback = threading.Event()
464
465 cls.last_date.start_lock()
466 cls.liquidities.start_lock()
467 cls.report.start_lock()
468
469 cls.worker_tag = "[Worker] "
470 cls.worker_started = True
471 cls.worker.start()
472
473 @classmethod
474 def is_worker_thread(cls):
475 if cls.worker is None:
476 return False
477 else:
478 import threading
479 return cls.worker == threading.current_thread()
480
481 @classmethod
482 def wait_for_notification(cls, poll=30):
483 if not cls.is_worker_thread():
484 raise RuntimeError("This method needs to be ran with the worker")
485 while cls.worker_started:
486 cls.worker_notify.wait()
487 if cls.worker_started:
488 cls.worker_notify.clear()
489 cls.report.print_log("[Worker] Fetching cryptoportfolio")
490 cls.get_cryptoportfolio(refetch=True)
491 cls.callback.set()
492 time.sleep(poll)
493
494 @classmethod
495 def stop_worker(cls):
496 cls.worker_started = False
497 cls.worker_notify.set()
498
499 @classmethod
500 def notify_and_wait(cls):
501 cls.callback.clear()
502 cls.worker_notify.set()
503 cls.callback.wait()
504
505 @classmethod
506 def wait_for_recent(cls, delta=4, poll=30):
507 cls.get_cryptoportfolio()
508 while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta):
509 if cls.worker is None:
510 time.sleep(poll)
511 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
512 cls.get_cryptoportfolio(refetch=True)
513
514 @classmethod
515 def repartition(cls, liquidity="medium", from_cache=False):
516 if from_cache:
517 cls.retrieve_cryptoportfolio()
518 cls.get_cryptoportfolio()
519 liquidities = cls.liquidities.get(liquidity)
520 return liquidities[cls.last_date.get()]
521
522 @classmethod
523 def get_cryptoportfolio(cls, refetch=False):
524 if cls.data.get() is not None and not refetch:
525 return
526 if cls.worker is not None and not cls.is_worker_thread():
527 cls.notify_and_wait()
528 return
529 try:
530 r = requests.get(cls.URL)
531 cls.report.log_http_request(r.request.method,
532 r.request.url, r.request.body, r.request.headers, r)
533 except Exception as e:
534 cls.report.log_error("{}get_cryptoportfolio".format(cls.worker_tag), exception=e)
535 return
536 try:
537 cls.data.set(r.json(parse_int=D, parse_float=D))
538 cls.parse_cryptoportfolio()
539 cls.store_cryptoportfolio()
540 except (JSONDecodeError, SimpleJSONDecodeError):
541 cls.data.set(None)
542 cls.last_date.set(None)
543 cls.liquidities.set({})
544
545 @classmethod
546 def retrieve_cryptoportfolio(cls):
547 if dbs.redis_connected():
548 repartition = dbs.redis.get("/cryptoportfolio/repartition/latest")
549 date = dbs.redis.get("/cryptoportfolio/repartition/date")
550 if date is not None and repartition is not None:
551 date = datetime.datetime.strptime(date.decode(), "%Y-%m-%d")
552 repartition = json.loads(repartition, parse_int=D, parse_float=D)
553 repartition = { k: { date: v } for k, v in repartition.items() }
554
555 cls.data.set("")
556 cls.last_date.set(date)
557 cls.liquidities.set(repartition)
558
559 @classmethod
560 def store_cryptoportfolio(cls):
561 if dbs.redis_connected():
562 hash_ = {}
563 for liquidity, repartitions in cls.liquidities.items():
564 hash_[liquidity] = repartitions[cls.last_date.get()]
565 dump = json.dumps(hash_)
566 key = "/cryptoportfolio/repartition/latest"
567 dbs.redis.set(key, dump)
568 key = "/cryptoportfolio/repartition/date"
569 dbs.redis.set(key, cls.last_date.date().isoformat())
570
571 @classmethod
572 def parse_cryptoportfolio(cls):
573 def filter_weights(weight_hash):
574 if weight_hash[1][0] == 0:
575 return False
576 if weight_hash[0] == "_row":
577 return False
578 return True
579
580 def clean_weights(i):
581 def clean_weights_(h):
582 if h[0].endswith("s"):
583 return [h[0][0:-1], (h[1][i], "short")]
584 else:
585 return [h[0], (h[1][i], "long")]
586 return clean_weights_
587
588 def parse_weights(portfolio_hash):
589 if "weights" not in portfolio_hash:
590 return {}
591 weights_hash = portfolio_hash["weights"]
592 weights = {}
593 for i in range(len(weights_hash["_row"])):
594 date = datetime.datetime.strptime(weights_hash["_row"][i], "%Y-%m-%d")
595 weights[date] = dict(filter(
596 filter_weights,
597 map(clean_weights(i), weights_hash.items())))
598 return weights
599
600 high_liquidity = parse_weights(cls.data.get("portfolio_1"))
601 medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
602
603 cls.liquidities.set({
604 "medium": medium_liquidity,
605 "high": high_liquidity,
606 })
607 cls.last_date.set(max(
608 max(medium_liquidity.keys(), default=datetime.datetime(1, 1, 1)),
609 max(high_liquidity.keys(), default=datetime.datetime(1, 1, 1))
610 ))
611