4 import simplejson
as json
5 from decimal
import Decimal
as D
, ROUND_DOWN
8 from json
import JSONDecodeError
9 from simplejson
.errors
import JSONDecodeError
as SimpleJSONDecodeError
12 __all__
= ["Portfolio", "BalanceStore", "ReportStore", "TradeStore"]
15 def __init__(self
, market
, verbose_print
=True, no_http_dup
=False):
17 self
.verbose_print
= verbose_print
21 self
.redis_status
= []
23 self
.no_http_dup
= no_http_dup
26 def merge(self
, other_report
):
27 self
.logs
+= other_report
.logs
28 self
.logs
.sort(key
=lambda x
: x
["date"])
30 self
.print_logs
+= other_report
.print_logs
31 self
.print_logs
.sort(key
=lambda x
: x
[0])
33 def print_log(self
, message
):
34 now
= datetime
.datetime
.now()
35 message
= "{:%Y-%m-%d %H:%M:%S}: {}".format(now
, str(message
))
36 self
.print_logs
.append([now
, message
])
37 if self
.verbose_print
:
40 def add_log(self
, hash_
):
41 hash_
["date"] = datetime
.datetime
.now()
42 if self
.market
is not None:
43 hash_
["user_id"] = self
.market
.user_id
44 hash_
["market_id"] = self
.market
.market_id
46 hash_
["user_id"] = None
47 hash_
["market_id"] = None
48 self
.logs
.append(hash_
)
51 def add_redis_status(self
, hash_
):
52 self
.redis_status
.append(hash_
)
56 def default_json_serial(obj
):
57 if isinstance(obj
, (datetime
.datetime
, datetime
.date
)):
58 return obj
.isoformat()
62 return json
.dumps(self
.logs
, default
=self
.default_json_serial
, indent
=" ")
64 def to_json_array(self
):
65 for log
in (x
.copy() for x
in self
.logs
):
69 json
.dumps(log
, default
=self
.default_json_serial
, indent
=" ")
72 def to_json_redis(self
):
73 for log
in (x
.copy() for x
in self
.redis_status
):
76 json
.dumps(log
, default
=self
.default_json_serial
)
79 def set_verbose(self
, verbose_print
):
80 self
.verbose_print
= verbose_print
82 def log_stage(self
, stage
, **kwargs
):
85 return inspect
.getsource(element
).strip()
86 elif hasattr(element
, "as_json"):
87 return element
.as_json()
91 args
= { k: as_json(v) for k, v in kwargs.items() }
92 args_str
= ["{}={}".format(k
, v
) for k
, v
in args
.items()]
93 self
.print_log("-" * (len(stage
) + 8))
94 self
.print_log("[Stage] {} {}".format(stage
, ", ".join(args_str
)))
102 def log_balances(self
, tag
=None, tickers
=None,
103 ticker_currency
=None, compute_value
=None, type=None):
104 self
.print_log("[Balance]")
105 for currency
, balance
in self
.market
.balances
.all
.items():
106 self
.print_log("\t{}".format(balance
))
111 "balances": self
.market
.balances
.as_json()
114 if tickers
is not None:
115 log
["tickers"] = self
._ticker
_hash
(tickers
, ticker_currency
,
118 self
.add_log(log
.copy())
119 self
.add_redis_status(log
)
121 def log_tickers(self
, amounts
, other_currency
,
122 compute_value
, type):
123 log
= self
._ticker
_hash
(amounts
, other_currency
, compute_value
,
125 log
["type"] = "tickers"
129 def _ticker_hash(self
, amounts
, other_currency
, compute_value
, type):
132 if callable(compute_value
):
133 compute_value
= inspect
.getsource(compute_value
).strip()
135 for currency
, amount
in amounts
.items():
136 values
[currency
] = amount
.as_json()["value"]
137 rates
[currency
] = amount
.rate
139 "compute_value": compute_value
,
140 "balance_type": type,
141 "currency": other_currency
,
144 "total": sum(amounts
.values()).as_json()["value"]
147 def log_dispatch(self
, amount
, amounts
, liquidity
, repartition
):
150 "liquidity": liquidity
,
151 "repartition_ratio": repartition
,
152 "total_amount": amount
.as_json(),
153 "repartition": { k: v.as_json()["value"] for k, v in amounts.items() }
156 def log_trades(self
, matching_and_trades
, only
):
158 for matching
, trade
in matching_and_trades
:
159 trade_json
= trade
.as_json()
160 trade_json
["skipped"] = not matching
161 trades
.append(trade_json
)
166 "debug": self
.market
.debug
,
170 def log_orders(self
, orders
, tick
=None, only
=None, compute_value
=None):
171 if callable(compute_value
):
172 compute_value
= inspect
.getsource(compute_value
).strip()
173 self
.print_log("[Orders]")
174 self
.market
.trades
.print_all_with_order(ind
="\t")
178 "compute_value": compute_value
,
180 "orders": [order
.as_json() for order
in orders
if order
is not None]
183 def log_order(self
, order
, tick
, finished
=False, update
=None,
184 new_order
=None, compute_value
=None):
185 if callable(compute_value
):
186 compute_value
= inspect
.getsource(compute_value
).strip()
188 self
.print_log("[Order] Finished {}".format(order
))
189 elif update
== "waiting":
190 self
.print_log("[Order] {}, tick {}, waiting".format(order
, tick
))
191 elif update
== "adjusting":
192 self
.print_log("[Order] {}, tick {}, cancelling and adjusting to {}".format(order
, tick
, new_order
))
193 elif update
== "market_fallback":
194 self
.print_log("[Order] {}, tick {}, fallbacking to market value".format(order
, tick
))
195 elif update
== "market_adjust":
196 self
.print_log("[Order] {}, tick {}, market value, cancelling and adjusting to {}".format(order
, tick
, new_order
))
202 "order": order
.as_json(),
203 "compute_value": compute_value
,
204 "new_order": new_order
.as_json() if new_order
is not None else None
207 def log_move_balances(self
, needed
, moving
):
209 "type": "move_balances",
210 "debug": self
.market
.debug
,
211 "needed": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in needed.items() }
,
212 "moving": { k: v.as_json()["value"] if isinstance(v, portfolio.Amount) else v for k, v in moving.items() }
,
215 def log_http_request(self
, method
, url
, body
, headers
, response
):
216 if isinstance(response
, Exception):
218 "type": "http_request",
225 "error": response
.__class
__.__name
__,
226 "error_message": str(response
),
228 self
.last_http
= None
229 elif self
.no_http_dup
and \
230 self
.last_http
is not None and \
231 self
.last_http
["url"] == url
and \
232 self
.last_http
["method"] == method
and \
233 self
.last_http
["response"] == response
.text
:
235 "type": "http_request",
240 "status": response
.status_code
,
241 "duration": response
.elapsed
.total_seconds(),
243 "response_same_as": self
.last_http
["date"]
246 self
.last_http
= self
.add_log({
247 "type": "http_request",
252 "status": response
.status_code
,
253 "duration": response
.elapsed
.total_seconds(),
254 "response": response
.text
,
255 "response_same_as": None,
258 def log_error(self
, action
, message
=None, exception
=None):
259 self
.print_log("[Error] {}".format(action
))
260 if exception
is not None:
261 self
.print_log(str("\t{}: {}".format(exception
.__class
__.__name
__, exception
)))
262 if message
is not None:
263 self
.print_log("\t{}".format(message
))
268 "exception_class": exception
.__class
__.__name
__ if exception
is not None else None,
269 "exception_message": str(exception
) if exception
is not None else None,
273 def log_debug_action(self
, action
):
274 self
.print_log("[Debug] {}".format(action
))
277 "type": "debug_action",
281 def log_market(self
, args
):
284 "commit": "$Format:%H$",
289 def __init__(self
, market
):
293 def currencies(self
):
294 return self
.all
.keys()
296 def in_currency(self
, other_currency
, compute_value
="average", type="total"):
298 for currency
, balance
in self
.all
.items():
299 other_currency_amount
= getattr(balance
, type)\
300 .in_currency(other_currency
, self
.market
, compute_value
=compute_value
)
301 amounts
[currency
] = other_currency_amount
302 self
.market
.report
.log_tickers(amounts
, other_currency
,
306 def fetch_balances(self
, tag
=None, add_portfolio
=False, log_tickers
=False,
307 ticker_currency
="BTC", ticker_compute_value
="average", ticker_type
="total"):
308 all_balances
= self
.market
.ccxt
.fetch_all_balances()
309 for currency
, balance
in all_balances
.items():
310 if balance
["exchange_total"] != 0 or balance
["margin_total"] != 0 or \
311 currency
in self
.all
:
312 self
.all
[currency
] = portfolio
.Balance(currency
, balance
)
314 for currency
in Portfolio
.repartition(from_cache
=True):
315 self
.all
.setdefault(currency
, portfolio
.Balance(currency
, {}))
317 tickers
= self
.in_currency(ticker_currency
, compute_value
=ticker_compute_value
, type=ticker_type
)
318 self
.market
.report
.log_balances(tag
=tag
,
319 tickers
=tickers
, ticker_currency
=ticker_currency
,
320 compute_value
=ticker_compute_value
, type=ticker_type
)
322 self
.market
.report
.log_balances(tag
=tag
)
324 def dispatch_assets(self
, amount
, liquidity
="medium", repartition
=None):
325 if repartition
is None:
326 repartition
= Portfolio
.repartition(liquidity
=liquidity
)
327 sum_ratio
= sum([v
[0] for k
, v
in repartition
.items()])
329 for currency
, (ptt
, trade_type
) in repartition
.items():
330 amounts
[currency
] = ptt
* amount
/ sum_ratio
331 if trade_type
== "short":
332 amounts
[currency
] = - amounts
[currency
]
333 self
.all
.setdefault(currency
, portfolio
.Balance(currency
, {}))
334 self
.market
.report
.log_dispatch(amount
, amounts
, liquidity
, repartition
)
338 return { k: v.as_json() for k, v in self.all.items() }
341 def __init__(self
, market
):
347 return list(filter(lambda t
: t
.pending
, self
.all
))
349 def compute_trades(self
, values_in_base
, new_repartition
, only
=None):
351 base_currency
= sum(values_in_base
.values()).currency
352 for currency
in self
.market
.balances
.currencies():
353 if currency
== base_currency
:
355 value_from
= values_in_base
.get(currency
, portfolio
.Amount(base_currency
, 0))
356 value_to
= new_repartition
.get(currency
, portfolio
.Amount(base_currency
, 0))
358 if value_from
.value
* value_to
.value
< 0:
359 computed_trades
.append(self
.trade_if_matching(
360 value_from
, portfolio
.Amount(base_currency
, 0),
361 currency
, only
=only
))
362 computed_trades
.append(self
.trade_if_matching(
363 portfolio
.Amount(base_currency
, 0), value_to
,
364 currency
, only
=only
))
366 computed_trades
.append(self
.trade_if_matching(
367 value_from
, value_to
,
368 currency
, only
=only
))
369 for matching
, trade
in computed_trades
:
371 self
.all
.append(trade
)
372 self
.market
.report
.log_trades(computed_trades
, only
)
374 def trade_if_matching(self
, value_from
, value_to
, currency
,
376 trade
= portfolio
.Trade(value_from
, value_to
, currency
,
378 matching
= only
is None or trade
.action
== only
379 return [matching
, trade
]
381 def prepare_orders(self
, only
=None, compute_value
="default"):
383 for trade
in self
.pending
:
384 if only
is None or trade
.action
== only
:
385 orders
.append(trade
.prepare_order(compute_value
=compute_value
))
386 self
.market
.report
.log_orders(orders
, only
, compute_value
)
388 def close_trades(self
):
389 for trade
in self
.all
:
392 def print_all_with_order(self
, ind
=""):
393 for trade
in self
.all
:
394 trade
.print_with_order(ind
=ind
)
396 def run_orders(self
):
397 orders
= self
.all_orders(state
="pending")
400 self
.market
.report
.log_stage("run_orders")
401 self
.market
.report
.log_orders(orders
)
403 def all_orders(self
, state
=None):
404 all_orders
= sum(map(lambda v
: v
.orders
, self
.all
), [])
408 return list(filter(lambda o
: o
.status
== state
, all_orders
))
410 def update_all_orders_status(self
):
411 for order
in self
.all_orders(state
="open"):
415 def __enter__(self
, *args
):
417 def __exit__(self
, *args
):
421 def __init__(self
, value
):
422 self
.lock
= NoopLock()
425 def start_lock(self
):
427 self
.lock
= threading
.Lock()
429 def set(self
, value
):
433 def get(self
, key
=None):
435 if key
is not None and isinstance(self
.val
, dict):
436 return self
.val
.get(key
)
440 def __getattr__(self
, key
):
442 return getattr(self
.val
, key
)
445 URL
= "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
446 data
= LockedVar(None)
447 liquidities
= LockedVar({})
448 last_date
= LockedVar(None)
449 report
= LockedVar(ReportStore(None, no_http_dup
=True))
452 worker_started
= False
457 def start_worker(cls
, poll
=30):
460 cls
.worker
= threading
.Thread(name
="portfolio", daemon
=True,
461 target
=cls
.wait_for_notification
, kwargs
={"poll": poll}
)
462 cls
.worker_notify
= threading
.Event()
463 cls
.callback
= threading
.Event()
465 cls
.last_date
.start_lock()
466 cls
.liquidities
.start_lock()
467 cls
.report
.start_lock()
469 cls
.worker_tag
= "[Worker] "
470 cls
.worker_started
= True
474 def is_worker_thread(cls
):
475 if cls
.worker
is None:
479 return cls
.worker
== threading
.current_thread()
482 def wait_for_notification(cls
, poll
=30):
483 if not cls
.is_worker_thread():
484 raise RuntimeError("This method needs to be ran with the worker")
485 while cls
.worker_started
:
486 cls
.worker_notify
.wait()
487 if cls
.worker_started
:
488 cls
.worker_notify
.clear()
489 cls
.report
.print_log("[Worker] Fetching cryptoportfolio")
490 cls
.get_cryptoportfolio(refetch
=True)
495 def stop_worker(cls
):
496 cls
.worker_started
= False
497 cls
.worker_notify
.set()
500 def notify_and_wait(cls
):
502 cls
.worker_notify
.set()
506 def wait_for_recent(cls
, delta
=4, poll
=30):
507 cls
.get_cryptoportfolio()
508 while cls
.last_date
.get() is None or datetime
.datetime
.now() - cls
.last_date
.get() > datetime
.timedelta(delta
):
509 if cls
.worker
is None:
511 cls
.report
.print_log("Attempt to fetch up-to-date cryptoportfolio")
512 cls
.get_cryptoportfolio(refetch
=True)
515 def repartition(cls
, liquidity
="medium", from_cache
=False):
517 cls
.retrieve_cryptoportfolio()
518 cls
.get_cryptoportfolio()
519 liquidities
= cls
.liquidities
.get(liquidity
)
520 return liquidities
[cls
.last_date
.get()]
523 def get_cryptoportfolio(cls
, refetch
=False):
524 if cls
.data
.get() is not None and not refetch
:
526 if cls
.worker
is not None and not cls
.is_worker_thread():
527 cls
.notify_and_wait()
530 r
= requests
.get(cls
.URL
)
531 cls
.report
.log_http_request(r
.request
.method
,
532 r
.request
.url
, r
.request
.body
, r
.request
.headers
, r
)
533 except Exception as e
:
534 cls
.report
.log_error("{}get_cryptoportfolio".format(cls
.worker_tag
), exception
=e
)
537 cls
.data
.set(r
.json(parse_int
=D
, parse_float
=D
))
538 cls
.parse_cryptoportfolio()
539 cls
.store_cryptoportfolio()
540 except (JSONDecodeError
, SimpleJSONDecodeError
):
542 cls
.last_date
.set(None)
543 cls
.liquidities
.set({})
546 def retrieve_cryptoportfolio(cls
):
547 if dbs
.redis_connected():
548 repartition
= dbs
.redis
.get("/cryptoportfolio/repartition/latest")
549 date
= dbs
.redis
.get("/cryptoportfolio/repartition/date")
550 if date
is not None and repartition
is not None:
551 date
= datetime
.datetime
.strptime(date
.decode(), "%Y-%m-%d")
552 repartition
= json
.loads(repartition
, parse_int
=D
, parse_float
=D
)
553 repartition
= { k: { date: v }
for k
, v
in repartition
.items() }
556 cls
.last_date
.set(date
)
557 cls
.liquidities
.set(repartition
)
560 def store_cryptoportfolio(cls
):
561 if dbs
.redis_connected():
563 for liquidity
, repartitions
in cls
.liquidities
.items():
564 hash_
[liquidity
] = repartitions
[cls
.last_date
.get()]
565 dump
= json
.dumps(hash_
)
566 key
= "/cryptoportfolio/repartition/latest"
567 dbs
.redis
.set(key
, dump
)
568 key
= "/cryptoportfolio/repartition/date"
569 dbs
.redis
.set(key
, cls
.last_date
.date().isoformat())
572 def parse_cryptoportfolio(cls
):
573 def filter_weights(weight_hash
):
574 if weight_hash
[1][0] == 0:
576 if weight_hash
[0] == "_row":
580 def clean_weights(i
):
581 def clean_weights_(h
):
582 if h
[0].endswith("s"):
583 return [h
[0][0:-1], (h
[1][i
], "short")]
585 return [h
[0], (h
[1][i
], "long")]
586 return clean_weights_
588 def parse_weights(portfolio_hash
):
589 if "weights" not in portfolio_hash
:
591 weights_hash
= portfolio_hash
["weights"]
593 for i
in range(len(weights_hash
["_row"])):
594 date
= datetime
.datetime
.strptime(weights_hash
["_row"][i
], "%Y-%m-%d")
595 weights
[date
] = dict(filter(
597 map(clean_weights(i
), weights_hash
.items())))
600 high_liquidity
= parse_weights(cls
.data
.get("portfolio_1"))
601 medium_liquidity
= parse_weights(cls
.data
.get("portfolio_2"))
603 cls
.liquidities
.set({
604 "medium": medium_liquidity
,
605 "high": high_liquidity
,
607 cls
.last_date
.set(max(
608 max(medium_liquidity
.keys(), default
=datetime
.datetime(1, 1, 1)),
609 max(high_liquidity
.keys(), default
=datetime
.datetime(1, 1, 1))