4 import simplejson
as json
5 from decimal
import Decimal
as D
, ROUND_DOWN
8 from json
import JSONDecodeError
9 from simplejson
.errors
import JSONDecodeError
as SimpleJSONDecodeError
12 __all__
= ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"]
15 def __init__(self
, market
, verbose_print
=True, no_http_dup
=False):
17 self
.verbose_print
= verbose_print
21 self
.redis_status
= []
23 self
.no_http_dup
= no_http_dup
26 def merge(self
, other_report
):
27 self
.logs
+= other_report
.logs
28 self
.logs
.sort(key
=lambda x
: x
["date"])
30 self
.print_logs
+= other_report
.print_logs
31 self
.print_logs
.sort(key
=lambda x
: x
[0])
33 def print_log(self
, message
):
34 now
= datetime
.datetime
.now()
35 message
= "{:%Y-%m-%d %H:%M:%S}: {}".format(now
, str(message
))
36 self
.print_logs
.append([now
, message
])
37 if self
.verbose_print
:
40 def add_log(self
, hash_
):
41 hash_
["date"] = datetime
.datetime
.now()
42 if self
.market
is not None:
43 hash_
["user_id"] = self
.market
.user_id
44 hash_
["market_id"] = self
.market
.market_id
46 hash_
["user_id"] = None
47 hash_
["market_id"] = None
48 self
.logs
.append(hash_
)
51 def add_redis_status(self
, hash_
):
52 self
.redis_status
.append(hash_
)
56 def default_json_serial(obj
):
57 if isinstance(obj
, (datetime
.datetime
, datetime
.date
)):
58 return obj
.isoformat()
62 return json
.dumps(self
.logs
, default
=self
.default_json_serial
, indent
=" ")
64 def to_json_array(self
):
65 for log
in (x
.copy() for x
in self
.logs
):
69 json
.dumps(log
, default
=self
.default_json_serial
, indent
=" ")
72 def to_json_redis(self
):
73 for log
in (x
.copy() for x
in self
.redis_status
):
76 json
.dumps(log
, default
=self
.default_json_serial
)
79 def set_verbose(self
, verbose_print
):
80 self
.verbose_print
= verbose_print
82 def log_stage(self
, stage
, **kwargs
):
85 return inspect
.getsource(element
).strip()
86 elif hasattr(element
, "as_json"):
87 return element
.as_json()
91 args
= { k: as_json(v) for k, v in kwargs.items() }
92 args_str
= ["{}={}".format(k
, v
) for k
, v
in args
.items()]
93 self
.print_log("-" * (len(stage
) + 8))
94 self
.print_log("[Stage] {} {}".format(stage
, ", ".join(args_str
)))
102 def log_balances(self
, tag
=None, checkpoint
=None, tickers
=None,
103 ticker_currency
=None, compute_value
=None, type=None):
104 self
.print_log("[Balance]")
105 for currency
, balance
in self
.market
.balances
.all
.items():
106 self
.print_log("\t{}".format(balance
))
111 "checkpoint": checkpoint
,
112 "balances": self
.market
.balances
.as_json()
115 if tickers
is not None:
116 log
["tickers"] = self
._ticker
_hash
(tickers
, ticker_currency
,
119 self
.add_log(log
.copy())
120 self
.add_redis_status(log
)
122 def log_tickers(self
, amounts
, other_currency
,
123 compute_value
, type):
124 log
= self
._ticker
_hash
(amounts
, other_currency
, compute_value
,
126 log
["type"] = "tickers"
130 def _ticker_hash(self
, amounts
, other_currency
, compute_value
, type):
133 if callable(compute_value
):
134 compute_value
= inspect
.getsource(compute_value
).strip()
136 for currency
, amount
in amounts
.items():
137 values
[currency
] = amount
.as_json()["value"]
138 rates
[currency
] = amount
.rate
140 "compute_value": compute_value
,
141 "balance_type": type,
142 "currency": other_currency
,
145 "total": sum(amounts
.values()).as_json()["value"]
148 def log_dispatch(self
, amount
, amounts
, liquidity
, repartition
):
151 "liquidity": liquidity
,
152 "repartition_ratio": repartition
,
153 "total_amount": amount
.as_json(),
154 "repartition": { k: v.as_json()["value"] for k, v in amounts.items() }
157 def log_trades(self
, matching_and_trades
, only
):
159 for matching
, trade
in matching_and_trades
:
160 trade_json
= trade
.as_json()
161 trade_json
["skipped"] = not matching
162 trades
.append(trade_json
)
167 "debug": self
.market
.debug
,
171 def log_orders(self
, orders
, tick
=None, only
=None, compute_value
=None):
172 if callable(compute_value
):
173 compute_value
= inspect
.getsource(compute_value
).strip()
174 self
.print_log("[Orders]")
175 self
.market
.trades
.print_all_with_order(ind
="\t")
179 "compute_value": compute_value
,
181 "orders": [order
.as_json() for order
in orders
if order
is not None]
184 def log_order(self
, order
, tick
, finished
=False, update
=None,
185 new_order
=None, compute_value
=None):
186 if callable(compute_value
):
187 compute_value
= inspect
.getsource(compute_value
).strip()
189 self
.print_log("[Order] Finished {}".format(order
))
190 elif update
== "waiting":
191 self
.print_log("[Order] {}, tick {}, waiting".format(order
, tick
))
192 elif update
== "adjusting":
193 self
.print_log("[Order] {}, tick {}, cancelling and adjusting to {}".format(order
, tick
, new_order
))
194 elif update
== "market_fallback":
195 self
.print_log("[Order] {}, tick {}, fallbacking to market value".format(order
, tick
))
196 elif update
== "market_adjust":
197 self
.print_log("[Order] {}, tick {}, market value, cancelling and adjusting to {}".format(order
, tick
, new_order
))
203 "order": order
.as_json(),
204 "compute_value": compute_value
,
205 "new_order": new_order
.as_json() if new_order
is not None else None
208 def log_move_balances(self
, needed
, moving
):
210 "type": "move_balances",
211 "debug": self
.market
.debug
,
212 "needed": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in needed.items() }
,
213 "moving": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in moving.items() }
,
216 def log_http_request(self
, method
, url
, body
, headers
, response
):
217 if isinstance(response
, Exception):
219 "type": "http_request",
226 "error": response
.__class
__.__name
__,
227 "error_message": str(response
),
229 self
.last_http
= None
230 elif self
.no_http_dup
and \
231 self
.last_http
is not None and \
232 self
.last_http
["url"] == url
and \
233 self
.last_http
["method"] == method
and \
234 self
.last_http
["response"] == response
.text
:
236 "type": "http_request",
241 "status": response
.status_code
,
242 "duration": response
.elapsed
.total_seconds(),
244 "response_same_as": self
.last_http
["date"]
247 self
.last_http
= self
.add_log({
248 "type": "http_request",
253 "status": response
.status_code
,
254 "duration": response
.elapsed
.total_seconds(),
255 "response": response
.text
,
256 "response_same_as": None,
259 def log_error(self
, action
, message
=None, exception
=None):
260 self
.print_log("[Error] {}".format(action
))
261 if exception
is not None:
262 self
.print_log(str("\t{}: {}".format(exception
.__class
__.__name
__, exception
)))
263 if message
is not None:
264 self
.print_log("\t{}".format(message
))
269 "exception_class": exception
.__class
__.__name
__ if exception
is not None else None,
270 "exception_message": str(exception
) if exception
is not None else None,
274 def log_debug_action(self
, action
):
275 self
.print_log("[Debug] {}".format(action
))
278 "type": "debug_action",
282 def log_market(self
, args
):
285 "commit": "$Format:%H$",
290 def __init__(self
, market
):
294 def currencies(self
):
295 return self
.all
.keys()
297 def in_currency(self
, other_currency
, compute_value
="average", type="total"):
299 for currency
, balance
in self
.all
.items():
300 other_currency_amount
= getattr(balance
, type)\
301 .in_currency(other_currency
, self
.market
, compute_value
=compute_value
)
302 amounts
[currency
] = other_currency_amount
303 self
.market
.report
.log_tickers(amounts
, other_currency
,
307 def fetch_balances(self
, tag
=None, add_portfolio
=False, liquidity
="medium",
308 checkpoint
=None, log_tickers
=False, add_usdt
=False,
309 ticker_currency
="BTC", ticker_compute_value
="average", ticker_type
="total"):
310 all_balances
= self
.market
.ccxt
.fetch_all_balances()
311 for currency
, balance
in all_balances
.items():
312 if balance
["exchange_total"] != 0 or balance
["margin_total"] != 0 or \
313 currency
in self
.all
:
314 self
.all
[currency
] = portfolio
.Balance(currency
, balance
)
316 for currency
in Portfolio
.repartition(from_cache
=True, liquidity
=liquidity
):
317 self
.all
.setdefault(currency
, portfolio
.Balance(currency
, {}))
319 self
.all
.setdefault("USDT", portfolio
.Balance("USDT", {}))
321 tickers
= self
.in_currency(ticker_currency
, compute_value
=ticker_compute_value
, type=ticker_type
)
322 self
.market
.report
.log_balances(tag
=tag
, checkpoint
=checkpoint
,
323 tickers
=tickers
, ticker_currency
=ticker_currency
,
324 compute_value
=ticker_compute_value
, type=ticker_type
)
326 self
.market
.report
.log_balances(tag
=tag
, checkpoint
=checkpoint
)
328 def available_balances_for_repartition(self
,
329 compute_value
="average", base_currency
="BTC",
330 liquidity
="medium", repartition
=None):
331 if repartition
is None:
332 repartition
= Portfolio
.repartition(liquidity
=liquidity
)
333 base_currency_balance
= self
.all
.get(base_currency
)
335 if base_currency_balance
is None:
336 total_base_value
= portfolio
.Amount(base_currency
, 0)
338 total_base_value
= base_currency_balance
.exchange_free
+ \
339 base_currency_balance
.margin_available
- \
340 base_currency_balance
.margin_in_position
342 amount_in_position
= {}
344 # Compute balances already in the target position
345 for currency
, (ptt
, trade_type
) in repartition
.items():
346 amount_in_position
[currency
] = portfolio
.Amount(base_currency
, 0)
347 balance
= self
.all
.get(currency
)
348 if currency
!= base_currency
and balance
is not None:
349 if trade_type
== "short":
350 amount
= balance
.margin_borrowed
352 amount
= balance
.exchange_free
+ balance
.exchange_used
353 amount_in_position
[currency
] = amount
.in_currency(base_currency
,
354 self
.market
, compute_value
=compute_value
)
355 total_base_value
+= amount_in_position
[currency
]
357 # recursively delete more-than-filled positions from the wanted
362 sum_ratio
= sum([v
[0] for k
, v
in repartition
.items()])
363 current_base_value
= total_base_value
364 for currency
, (ptt
, trade_type
) in repartition
.copy().items():
365 if amount_in_position
[currency
] > current_base_value
* ptt
/ sum_ratio
:
367 del(repartition
[currency
])
368 total_base_value
-= amount_in_position
[currency
]
369 return repartition
, total_base_value
, amount_in_position
371 def dispatch_assets(self
, amount
, liquidity
="medium", repartition
=None):
372 if repartition
is None:
373 repartition
= Portfolio
.repartition(liquidity
=liquidity
)
374 sum_ratio
= sum([v
[0] for k
, v
in repartition
.items()])
376 for currency
, (ptt
, trade_type
) in repartition
.items():
377 amounts
[currency
] = ptt
* amount
/ sum_ratio
378 if trade_type
== "short":
379 amounts
[currency
] = - amounts
[currency
]
380 self
.all
.setdefault(currency
, portfolio
.Balance(currency
, {}))
381 self
.market
.report
.log_dispatch(amount
, amounts
, liquidity
, repartition
)
385 return { k: v.as_json() for k, v in self.all.items() }
388 def __init__(self
, market
):
394 return list(filter(lambda t
: t
.pending
, self
.all
))
396 def compute_trades(self
, values_in_base
, new_repartition
, only
=None):
398 base_currency
= sum(values_in_base
.values()).currency
399 for currency
in self
.market
.balances
.currencies():
400 if currency
== base_currency
:
402 value_from
= values_in_base
.get(currency
, portfolio
.Amount(base_currency
, 0))
403 value_to
= new_repartition
.get(currency
, portfolio
.Amount(base_currency
, 0))
405 if value_from
.value
* value_to
.value
< 0:
406 computed_trades
.append(self
.trade_if_matching(
407 value_from
, portfolio
.Amount(base_currency
, 0),
408 currency
, only
=only
))
409 computed_trades
.append(self
.trade_if_matching(
410 portfolio
.Amount(base_currency
, 0), value_to
,
411 currency
, only
=only
))
413 computed_trades
.append(self
.trade_if_matching(
414 value_from
, value_to
,
415 currency
, only
=only
))
416 for matching
, trade
in computed_trades
:
418 self
.all
.append(trade
)
419 self
.market
.report
.log_trades(computed_trades
, only
)
421 def trade_if_matching(self
, value_from
, value_to
, currency
,
423 trade
= portfolio
.Trade(value_from
, value_to
, currency
,
425 matching
= only
is None or trade
.action
== only
426 return [matching
, trade
]
428 def prepare_orders(self
, only
=None, compute_value
="default"):
430 for trade
in self
.pending
:
431 if only
is None or trade
.action
== only
:
432 orders
.append(trade
.prepare_order(compute_value
=compute_value
))
433 self
.market
.report
.log_orders(orders
, only
, compute_value
)
435 def close_trades(self
):
436 for trade
in self
.all
:
439 def print_all_with_order(self
, ind
=""):
440 for trade
in self
.all
:
441 trade
.print_with_order(ind
=ind
)
443 def run_orders(self
):
444 orders
= self
.all_orders(state
="pending")
447 self
.market
.report
.log_stage("run_orders")
448 self
.market
.report
.log_orders(orders
)
450 def all_orders(self
, state
=None):
451 all_orders
= sum(map(lambda v
: v
.orders
, self
.all
), [])
455 return list(filter(lambda o
: o
.status
== state
, all_orders
))
457 def update_all_orders_status(self
):
458 for order
in self
.all_orders(state
="open"):
462 def __enter__(self
, *args
):
464 def __exit__(self
, *args
):
468 def __init__(self
, value
):
469 self
.lock
= NoopLock()
472 def start_lock(self
):
474 self
.lock
= threading
.Lock()
476 def set(self
, value
):
480 def get(self
, key
=None):
482 if key
is not None and isinstance(self
.val
, dict):
483 return self
.val
.get(key
)
487 def __getattr__(self
, key
):
489 return getattr(self
.val
, key
)
492 URL
= "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
493 data
= LockedVar(None)
494 liquidities
= LockedVar({})
495 last_date
= LockedVar(None)
496 report
= LockedVar(ReportStore(None, no_http_dup
=True))
499 worker_started
= False
502 poll_started_at
= None
505 def next_wait_time(cls
):
506 now
= datetime
.datetime
.now()
507 if cls
.poll_started_at
is None:
508 cls
.poll_started_at
= now
509 delta
= now
- cls
.poll_started_at
511 if delta
< datetime
.timedelta(minutes
=30):
513 elif delta
< datetime
.timedelta(hours
=1):
515 elif delta
< datetime
.timedelta(hours
=4):
517 elif delta
< datetime
.timedelta(days
=1):
520 raise "Too long waiting"
523 def start_worker(cls
):
526 cls
.worker
= threading
.Thread(name
="portfolio", daemon
=True,
527 target
=cls
.wait_for_notification
)
528 cls
.worker_notify
= threading
.Event()
529 cls
.callback
= threading
.Event()
531 cls
.last_date
.start_lock()
532 cls
.liquidities
.start_lock()
533 cls
.report
.start_lock()
535 cls
.worker_tag
= "[Worker] "
536 cls
.worker_started
= True
540 def is_worker_thread(cls
):
541 if cls
.worker
is None:
545 return cls
.worker
== threading
.current_thread()
548 def wait_for_notification(cls
):
549 if not cls
.is_worker_thread():
550 raise RuntimeError("This method needs to be ran with the worker")
551 while cls
.worker_started
:
552 cls
.worker_notify
.wait()
553 if cls
.worker_started
:
554 cls
.worker_notify
.clear()
555 cls
.report
.print_log("[Worker] Fetching cryptoportfolio")
556 cls
.get_cryptoportfolio(refetch
=True)
558 time
.sleep(cls
.next_wait_time())
561 def stop_worker(cls
):
562 cls
.worker_started
= False
563 cls
.worker_notify
.set()
566 def notify_and_wait(cls
):
568 cls
.worker_notify
.set()
572 def wait_for_recent(cls
, delta
=4):
573 cls
.get_cryptoportfolio()
574 while cls
.last_date
.get() is None or datetime
.datetime
.now() - cls
.last_date
.get() > datetime
.timedelta(delta
):
575 if cls
.worker
is None:
576 time
.sleep(cls
.next_wait_time())
577 cls
.report
.print_log("Attempt to fetch up-to-date cryptoportfolio")
578 cls
.get_cryptoportfolio(refetch
=True)
581 def repartition(cls
, liquidity
="medium", from_cache
=False):
583 cls
.retrieve_cryptoportfolio()
584 cls
.get_cryptoportfolio()
585 liquidities
= cls
.liquidities
.get(liquidity
)
586 if liquidities
is not None and cls
.last_date
.get() in liquidities
:
587 return liquidities
[cls
.last_date
.get()].copy()
590 def get_cryptoportfolio(cls
, refetch
=False):
591 if cls
.data
.get() is not None and not refetch
:
593 if cls
.worker
is not None and not cls
.is_worker_thread():
594 cls
.notify_and_wait()
597 r
= requests
.get(cls
.URL
)
598 cls
.report
.log_http_request(r
.request
.method
,
599 r
.request
.url
, r
.request
.body
, r
.request
.headers
, r
)
600 except Exception as e
:
601 cls
.report
.log_error("{}get_cryptoportfolio".format(cls
.worker_tag
), exception
=e
)
604 cls
.data
.set(r
.json(parse_int
=D
, parse_float
=D
))
605 cls
.parse_cryptoportfolio()
606 cls
.store_cryptoportfolio()
607 except (AssertionError, JSONDecodeError
, SimpleJSONDecodeError
):
609 cls
.last_date
.set(None)
610 cls
.liquidities
.set({})
613 def retrieve_cryptoportfolio(cls
):
614 if dbs
.redis_connected():
615 repartition
= dbs
.redis
.get("/cryptoportfolio/repartition/latest")
616 date
= dbs
.redis
.get("/cryptoportfolio/repartition/date")
617 if date
is not None and repartition
is not None:
618 date
= datetime
.datetime
.strptime(date
.decode(), "%Y-%m-%d")
619 repartition
= json
.loads(repartition
, parse_int
=D
, parse_float
=D
)
620 repartition
= { k: { date: v }
for k
, v
in repartition
.items() }
623 cls
.last_date
.set(date
)
624 cls
.liquidities
.set(repartition
)
627 def store_cryptoportfolio(cls
):
628 if dbs
.redis_connected():
630 for liquidity
, repartitions
in cls
.liquidities
.items():
631 hash_
[liquidity
] = repartitions
[cls
.last_date
.get()]
632 dump
= json
.dumps(hash_
)
633 key
= "/cryptoportfolio/repartition/latest"
634 dbs
.redis
.set(key
, dump
)
635 key
= "/cryptoportfolio/repartition/date"
636 dbs
.redis
.set(key
, cls
.last_date
.date().isoformat())
639 def parse_cryptoportfolio(cls
):
640 def filter_weights(weight_hash
):
641 if weight_hash
[1][0] == 0:
643 if weight_hash
[0] == "_row":
647 def clean_weights(i
):
648 def clean_weights_(h
):
649 if h
[0].endswith("s"):
650 return [h
[0][0:-1], (h
[1][i
], "short")]
652 return [h
[0], (h
[1][i
], "long")]
653 return clean_weights_
655 def parse_weights(portfolio_hash
):
656 if "weights" not in portfolio_hash
:
658 weights_hash
= portfolio_hash
["weights"]
660 for i
in range(len(weights_hash
["_row"])):
661 date
= datetime
.datetime
.strptime(weights_hash
["_row"][i
], "%Y-%m-%d")
662 weights
[date
] = dict(filter(
664 map(clean_weights(i
), weights_hash
.items())))
667 high_liquidity
= parse_weights(cls
.data
.get("portfolio_1"))
668 medium_liquidity
= parse_weights(cls
.data
.get("portfolio_2"))
670 assert len(high_liquidity
) > 0
671 assert len(medium_liquidity
) > 0
672 cls
.liquidities
.set({
673 "medium": medium_liquidity
,
674 "high": high_liquidity
,
676 cls
.last_date
.set(max(
677 max(medium_liquidity
.keys(), default
=datetime
.datetime(1, 1, 1)),
678 max(high_liquidity
.keys(), default
=datetime
.datetime(1, 1, 1))