4 import simplejson
as json
5 from decimal
import Decimal
as D
, ROUND_DOWN
8 from json
import JSONDecodeError
9 from simplejson
.errors
import JSONDecodeError
as SimpleJSONDecodeError
12 __all__
= ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"]
15 def __init__(self
, market
, verbose_print
=True, no_http_dup
=False):
17 self
.verbose_print
= verbose_print
21 self
.redis_status
= []
23 self
.no_http_dup
= no_http_dup
26 def merge(self
, other_report
):
27 self
.logs
+= other_report
.logs
28 self
.logs
.sort(key
=lambda x
: x
["date"])
30 self
.print_logs
+= other_report
.print_logs
31 self
.print_logs
.sort(key
=lambda x
: x
[0])
33 def print_log(self
, message
):
34 now
= datetime
.datetime
.now()
35 message
= "{:%Y-%m-%d %H:%M:%S}: {}".format(now
, str(message
))
36 self
.print_logs
.append([now
, message
])
37 if self
.verbose_print
:
40 def add_log(self
, hash_
):
41 hash_
["date"] = datetime
.datetime
.now()
42 if self
.market
is not None:
43 hash_
["user_id"] = self
.market
.user_id
44 hash_
["market_id"] = self
.market
.market_id
46 hash_
["user_id"] = None
47 hash_
["market_id"] = None
48 self
.logs
.append(hash_
)
51 def add_redis_status(self
, hash_
):
52 self
.redis_status
.append(hash_
)
56 def default_json_serial(obj
):
57 if isinstance(obj
, (datetime
.datetime
, datetime
.date
)):
58 return obj
.isoformat()
62 return json
.dumps(self
.logs
, default
=self
.default_json_serial
, indent
=" ")
64 def to_json_array(self
):
65 for log
in (x
.copy() for x
in self
.logs
):
69 json
.dumps(log
, default
=self
.default_json_serial
, indent
=" ")
72 def to_json_redis(self
):
73 for log
in (x
.copy() for x
in self
.redis_status
):
76 json
.dumps(log
, default
=self
.default_json_serial
)
79 def set_verbose(self
, verbose_print
):
80 self
.verbose_print
= verbose_print
82 def log_stage(self
, stage
, **kwargs
):
85 return inspect
.getsource(element
).strip()
86 elif hasattr(element
, "as_json"):
87 return element
.as_json()
91 args
= { k: as_json(v) for k, v in kwargs.items() }
92 args_str
= ["{}={}".format(k
, v
) for k
, v
in args
.items()]
93 self
.print_log("-" * (len(stage
) + 8))
94 self
.print_log("[Stage] {} {}".format(stage
, ", ".join(args_str
)))
102 def log_balances(self
, tag
=None, checkpoint
=None, tickers
=None,
103 ticker_currency
=None, compute_value
=None, type=None):
104 self
.print_log("[Balance]")
105 for currency
, balance
in self
.market
.balances
.all
.items():
106 self
.print_log("\t{}".format(balance
))
111 "checkpoint": checkpoint
,
112 "balances": self
.market
.balances
.as_json()
115 if tickers
is not None:
116 log
["tickers"] = self
._ticker
_hash
(tickers
, ticker_currency
,
119 self
.add_log(log
.copy())
120 self
.add_redis_status(log
)
122 def log_tickers(self
, amounts
, other_currency
,
123 compute_value
, type):
124 log
= self
._ticker
_hash
(amounts
, other_currency
, compute_value
,
126 log
["type"] = "tickers"
130 def _ticker_hash(self
, amounts
, other_currency
, compute_value
, type):
133 if callable(compute_value
):
134 compute_value
= inspect
.getsource(compute_value
).strip()
136 for currency
, amount
in amounts
.items():
137 values
[currency
] = amount
.as_json()["value"]
138 rates
[currency
] = amount
.rate
140 "compute_value": compute_value
,
141 "balance_type": type,
142 "currency": other_currency
,
145 "total": sum(amounts
.values()).as_json()["value"]
148 def log_dispatch(self
, amount
, amounts
, liquidity
, repartition
):
151 "liquidity": liquidity
,
152 "repartition_ratio": repartition
,
153 "total_amount": amount
.as_json(),
154 "repartition": { k: v.as_json()["value"] for k, v in amounts.items() }
157 def log_trades(self
, matching_and_trades
, only
):
159 for matching
, trade
in matching_and_trades
:
160 trade_json
= trade
.as_json()
161 trade_json
["skipped"] = not matching
162 trades
.append(trade_json
)
167 "debug": self
.market
.debug
,
171 def log_orders(self
, orders
, tick
=None, only
=None, compute_value
=None):
172 if callable(compute_value
):
173 compute_value
= inspect
.getsource(compute_value
).strip()
174 self
.print_log("[Orders]")
175 self
.market
.trades
.print_all_with_order(ind
="\t")
179 "compute_value": compute_value
,
181 "orders": [order
.as_json() for order
in orders
if order
is not None]
184 def log_order(self
, order
, tick
, finished
=False, update
=None,
185 new_order
=None, compute_value
=None):
186 if callable(compute_value
):
187 compute_value
= inspect
.getsource(compute_value
).strip()
189 self
.print_log("[Order] Finished {}".format(order
))
190 elif update
== "waiting":
191 self
.print_log("[Order] {}, tick {}, waiting".format(order
, tick
))
192 elif update
== "adjusting":
193 self
.print_log("[Order] {}, tick {}, cancelling and adjusting to {}".format(order
, tick
, new_order
))
194 elif update
== "market_fallback":
195 self
.print_log("[Order] {}, tick {}, fallbacking to market value".format(order
, tick
))
196 elif update
== "market_adjust":
197 self
.print_log("[Order] {}, tick {}, market value, cancelling and adjusting to {}".format(order
, tick
, new_order
))
203 "order": order
.as_json(),
204 "compute_value": compute_value
,
205 "new_order": new_order
.as_json() if new_order
is not None else None
208 def log_move_balances(self
, needed
, moving
):
210 "type": "move_balances",
211 "debug": self
.market
.debug
,
212 "needed": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in needed.items() }
,
213 "moving": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in moving.items() }
,
216 def log_http_request(self
, method
, url
, body
, headers
, response
):
217 if isinstance(response
, Exception):
219 "type": "http_request",
226 "error": response
.__class
__.__name
__,
227 "error_message": str(response
),
229 self
.last_http
= None
230 elif self
.no_http_dup
and \
231 self
.last_http
is not None and \
232 self
.last_http
["url"] == url
and \
233 self
.last_http
["method"] == method
and \
234 self
.last_http
["response"] == response
.text
:
236 "type": "http_request",
241 "status": response
.status_code
,
242 "duration": response
.elapsed
.total_seconds(),
244 "response_same_as": self
.last_http
["date"]
247 self
.last_http
= self
.add_log({
248 "type": "http_request",
253 "status": response
.status_code
,
254 "duration": response
.elapsed
.total_seconds(),
255 "response": response
.text
,
256 "response_same_as": None,
259 def log_error(self
, action
, message
=None, exception
=None):
260 self
.print_log("[Error] {}".format(action
))
261 if exception
is not None:
262 self
.print_log(str("\t{}: {}".format(exception
.__class
__.__name
__, exception
)))
263 if message
is not None:
264 self
.print_log("\t{}".format(message
))
269 "exception_class": exception
.__class
__.__name
__ if exception
is not None else None,
270 "exception_message": str(exception
) if exception
is not None else None,
274 def log_debug_action(self
, action
):
275 self
.print_log("[Debug] {}".format(action
))
278 "type": "debug_action",
282 def log_market(self
, args
):
285 "commit": "$Format:%H$",
290 def __init__(self
, market
):
294 def currencies(self
):
295 return self
.all
.keys()
297 def in_currency(self
, other_currency
, compute_value
="average", type="total"):
299 for currency
, balance
in self
.all
.items():
300 other_currency_amount
= getattr(balance
, type)\
301 .in_currency(other_currency
, self
.market
, compute_value
=compute_value
)
302 amounts
[currency
] = other_currency_amount
303 self
.market
.report
.log_tickers(amounts
, other_currency
,
307 def fetch_balances(self
, tag
=None, add_portfolio
=False,
308 checkpoint
=None, log_tickers
=False, add_usdt
=False,
309 ticker_currency
="BTC", ticker_compute_value
="average", ticker_type
="total"):
310 all_balances
= self
.market
.ccxt
.fetch_all_balances()
311 for currency
, balance
in all_balances
.items():
312 if balance
["exchange_total"] != 0 or balance
["margin_total"] != 0 or \
313 currency
in self
.all
:
314 self
.all
[currency
] = portfolio
.Balance(currency
, balance
)
316 for currency
in Portfolio
.repartition(from_cache
=True):
317 self
.all
.setdefault(currency
, portfolio
.Balance(currency
, {}))
319 self
.all
.setdefault("USDT", portfolio
.Balance("USDT", {}))
321 tickers
= self
.in_currency(ticker_currency
, compute_value
=ticker_compute_value
, type=ticker_type
)
322 self
.market
.report
.log_balances(tag
=tag
, checkpoint
=checkpoint
,
323 tickers
=tickers
, ticker_currency
=ticker_currency
,
324 compute_value
=ticker_compute_value
, type=ticker_type
)
326 self
.market
.report
.log_balances(tag
=tag
, checkpoint
=checkpoint
)
328 def dispatch_assets(self
, amount
, liquidity
="medium", repartition
=None):
329 if repartition
is None:
330 repartition
= Portfolio
.repartition(liquidity
=liquidity
)
331 sum_ratio
= sum([v
[0] for k
, v
in repartition
.items()])
333 for currency
, (ptt
, trade_type
) in repartition
.items():
334 amounts
[currency
] = ptt
* amount
/ sum_ratio
335 if trade_type
== "short":
336 amounts
[currency
] = - amounts
[currency
]
337 self
.all
.setdefault(currency
, portfolio
.Balance(currency
, {}))
338 self
.market
.report
.log_dispatch(amount
, amounts
, liquidity
, repartition
)
342 return { k: v.as_json() for k, v in self.all.items() }
345 def __init__(self
, market
):
351 return list(filter(lambda t
: t
.pending
, self
.all
))
353 def compute_trades(self
, values_in_base
, new_repartition
, only
=None):
355 base_currency
= sum(values_in_base
.values()).currency
356 for currency
in self
.market
.balances
.currencies():
357 if currency
== base_currency
:
359 value_from
= values_in_base
.get(currency
, portfolio
.Amount(base_currency
, 0))
360 value_to
= new_repartition
.get(currency
, portfolio
.Amount(base_currency
, 0))
362 if value_from
.value
* value_to
.value
< 0:
363 computed_trades
.append(self
.trade_if_matching(
364 value_from
, portfolio
.Amount(base_currency
, 0),
365 currency
, only
=only
))
366 computed_trades
.append(self
.trade_if_matching(
367 portfolio
.Amount(base_currency
, 0), value_to
,
368 currency
, only
=only
))
370 computed_trades
.append(self
.trade_if_matching(
371 value_from
, value_to
,
372 currency
, only
=only
))
373 for matching
, trade
in computed_trades
:
375 self
.all
.append(trade
)
376 self
.market
.report
.log_trades(computed_trades
, only
)
378 def trade_if_matching(self
, value_from
, value_to
, currency
,
380 trade
= portfolio
.Trade(value_from
, value_to
, currency
,
382 matching
= only
is None or trade
.action
== only
383 return [matching
, trade
]
385 def prepare_orders(self
, only
=None, compute_value
="default"):
387 for trade
in self
.pending
:
388 if only
is None or trade
.action
== only
:
389 orders
.append(trade
.prepare_order(compute_value
=compute_value
))
390 self
.market
.report
.log_orders(orders
, only
, compute_value
)
392 def close_trades(self
):
393 for trade
in self
.all
:
396 def print_all_with_order(self
, ind
=""):
397 for trade
in self
.all
:
398 trade
.print_with_order(ind
=ind
)
400 def run_orders(self
):
401 orders
= self
.all_orders(state
="pending")
404 self
.market
.report
.log_stage("run_orders")
405 self
.market
.report
.log_orders(orders
)
407 def all_orders(self
, state
=None):
408 all_orders
= sum(map(lambda v
: v
.orders
, self
.all
), [])
412 return list(filter(lambda o
: o
.status
== state
, all_orders
))
414 def update_all_orders_status(self
):
415 for order
in self
.all_orders(state
="open"):
419 def __enter__(self
, *args
):
421 def __exit__(self
, *args
):
425 def __init__(self
, value
):
426 self
.lock
= NoopLock()
429 def start_lock(self
):
431 self
.lock
= threading
.Lock()
433 def set(self
, value
):
437 def get(self
, key
=None):
439 if key
is not None and isinstance(self
.val
, dict):
440 return self
.val
.get(key
)
444 def __getattr__(self
, key
):
446 return getattr(self
.val
, key
)
449 URL
= "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
450 data
= LockedVar(None)
451 liquidities
= LockedVar({})
452 last_date
= LockedVar(None)
453 report
= LockedVar(ReportStore(None, no_http_dup
=True))
456 worker_started
= False
461 def start_worker(cls
, poll
=30):
464 cls
.worker
= threading
.Thread(name
="portfolio", daemon
=True,
465 target
=cls
.wait_for_notification
, kwargs
={"poll": poll}
)
466 cls
.worker_notify
= threading
.Event()
467 cls
.callback
= threading
.Event()
469 cls
.last_date
.start_lock()
470 cls
.liquidities
.start_lock()
471 cls
.report
.start_lock()
473 cls
.worker_tag
= "[Worker] "
474 cls
.worker_started
= True
478 def is_worker_thread(cls
):
479 if cls
.worker
is None:
483 return cls
.worker
== threading
.current_thread()
486 def wait_for_notification(cls
, poll
=30):
487 if not cls
.is_worker_thread():
488 raise RuntimeError("This method needs to be ran with the worker")
489 while cls
.worker_started
:
490 cls
.worker_notify
.wait()
491 if cls
.worker_started
:
492 cls
.worker_notify
.clear()
493 cls
.report
.print_log("[Worker] Fetching cryptoportfolio")
494 cls
.get_cryptoportfolio(refetch
=True)
499 def stop_worker(cls
):
500 cls
.worker_started
= False
501 cls
.worker_notify
.set()
504 def notify_and_wait(cls
):
506 cls
.worker_notify
.set()
510 def wait_for_recent(cls
, delta
=4, poll
=30):
511 cls
.get_cryptoportfolio()
512 while cls
.last_date
.get() is None or datetime
.datetime
.now() - cls
.last_date
.get() > datetime
.timedelta(delta
):
513 if cls
.worker
is None:
515 cls
.report
.print_log("Attempt to fetch up-to-date cryptoportfolio")
516 cls
.get_cryptoportfolio(refetch
=True)
519 def repartition(cls
, liquidity
="medium", from_cache
=False):
521 cls
.retrieve_cryptoportfolio()
522 cls
.get_cryptoportfolio()
523 liquidities
= cls
.liquidities
.get(liquidity
)
524 return liquidities
[cls
.last_date
.get()]
527 def get_cryptoportfolio(cls
, refetch
=False):
528 if cls
.data
.get() is not None and not refetch
:
530 if cls
.worker
is not None and not cls
.is_worker_thread():
531 cls
.notify_and_wait()
534 r
= requests
.get(cls
.URL
)
535 cls
.report
.log_http_request(r
.request
.method
,
536 r
.request
.url
, r
.request
.body
, r
.request
.headers
, r
)
537 except Exception as e
:
538 cls
.report
.log_error("{}get_cryptoportfolio".format(cls
.worker_tag
), exception
=e
)
541 cls
.data
.set(r
.json(parse_int
=D
, parse_float
=D
))
542 cls
.parse_cryptoportfolio()
543 cls
.store_cryptoportfolio()
544 except (JSONDecodeError
, SimpleJSONDecodeError
):
546 cls
.last_date
.set(None)
547 cls
.liquidities
.set({})
550 def retrieve_cryptoportfolio(cls
):
551 if dbs
.redis_connected():
552 repartition
= dbs
.redis
.get("/cryptoportfolio/repartition/latest")
553 date
= dbs
.redis
.get("/cryptoportfolio/repartition/date")
554 if date
is not None and repartition
is not None:
555 date
= datetime
.datetime
.strptime(date
.decode(), "%Y-%m-%d")
556 repartition
= json
.loads(repartition
, parse_int
=D
, parse_float
=D
)
557 repartition
= { k: { date: v }
for k
, v
in repartition
.items() }
560 cls
.last_date
.set(date
)
561 cls
.liquidities
.set(repartition
)
564 def store_cryptoportfolio(cls
):
565 if dbs
.redis_connected():
567 for liquidity
, repartitions
in cls
.liquidities
.items():
568 hash_
[liquidity
] = repartitions
[cls
.last_date
.get()]
569 dump
= json
.dumps(hash_
)
570 key
= "/cryptoportfolio/repartition/latest"
571 dbs
.redis
.set(key
, dump
)
572 key
= "/cryptoportfolio/repartition/date"
573 dbs
.redis
.set(key
, cls
.last_date
.date().isoformat())
576 def parse_cryptoportfolio(cls
):
577 def filter_weights(weight_hash
):
578 if weight_hash
[1][0] == 0:
580 if weight_hash
[0] == "_row":
584 def clean_weights(i
):
585 def clean_weights_(h
):
586 if h
[0].endswith("s"):
587 return [h
[0][0:-1], (h
[1][i
], "short")]
589 return [h
[0], (h
[1][i
], "long")]
590 return clean_weights_
592 def parse_weights(portfolio_hash
):
593 if "weights" not in portfolio_hash
:
595 weights_hash
= portfolio_hash
["weights"]
597 for i
in range(len(weights_hash
["_row"])):
598 date
= datetime
.datetime
.strptime(weights_hash
["_row"][i
], "%Y-%m-%d")
599 weights
[date
] = dict(filter(
601 map(clean_weights(i
), weights_hash
.items())))
604 high_liquidity
= parse_weights(cls
.data
.get("portfolio_1"))
605 medium_liquidity
= parse_weights(cls
.data
.get("portfolio_2"))
607 cls
.liquidities
.set({
608 "medium": medium_liquidity
,
609 "high": high_liquidity
,
611 cls
.last_date
.set(max(
612 max(medium_liquidity
.keys(), default
=datetime
.datetime(1, 1, 1)),
613 max(high_liquidity
.keys(), default
=datetime
.datetime(1, 1, 1))