--- /dev/null
+import ccxt
+import time
+# Put your poloniex api key in market.py
+from market import market
+
+# FIXME: Améliorer le bid/ask
+# FIXME: J'essayais d'utiliser plus de bitcoins que j'en avais à disposition
+# FIXME: better compute moves to avoid rounding errors
+
+def static_var(varname, value):
+ def decorate(func):
+ setattr(func, varname, value)
+ return func
+ return decorate
+
+class Portfolio:
+ URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
+ liquidities = {}
+ data = None
+
+ @classmethod
+ def repartition_pertenthousand(cls, liquidity="medium"):
+ cls.parse_cryptoportfolio()
+ liquidities = cls.liquidities[liquidity]
+ last_date = sorted(liquidities.keys())[-1]
+ return liquidities[last_date]
+
+ @classmethod
+ def get_cryptoportfolio(cls):
+ import json
+ import urllib3
+ urllib3.disable_warnings()
+ http = urllib3.PoolManager()
+
+ r = http.request("GET", cls.URL)
+ cls.data = json.loads(r.data)
+
+ @classmethod
+ def parse_cryptoportfolio(cls):
+ if cls.data is None:
+ cls.get_cryptoportfolio()
+
+ def filter_weights(weight_hash):
+ if weight_hash[1] == 0:
+ return False
+ if weight_hash[0] == "_row":
+ return False
+ return True
+
+ def clean_weights(i):
+ def clean_weights_(h):
+ if type(h[1][i]) == str:
+ return [h[0], h[1][i]]
+ else:
+ return [h[0], int(h[1][i] * 10000)]
+ return clean_weights_
+
+ def parse_weights(portfolio_hash):
+ # FIXME: we'll need shorts at some point
+ assert all(map(lambda x: x == "long", portfolio_hash["holding"]["direction"]))
+ weights_hash = portfolio_hash["weights"]
+ weights = {}
+ for i in range(len(weights_hash["_row"])):
+ weights[weights_hash["_row"][i]] = dict(filter(
+ filter_weights,
+ map(clean_weights(i), weights_hash.items())))
+ return weights
+
+ high_liquidity = parse_weights(cls.data["portfolio_1"])
+ medium_liquidity = parse_weights(cls.data["portfolio_2"])
+
+ cls.liquidities = {
+ "medium": medium_liquidity,
+ "high": high_liquidity,
+ }
+
+class Amount:
+ MAX_DIGITS = 18
+
+ def __init__(self, currency, value, int_val=None, linked_to=None, ticker=None):
+ self.currency = currency
+ if int_val is None:
+ self._value = int(value * 10**self.MAX_DIGITS)
+ else:
+ self._value = int_val
+ self.linked_to = linked_to
+ self.ticker = ticker
+
+ self.ticker_cache = {}
+ self.ticker_cache_timestamp = time.time()
+
+ @property
+ def value(self):
+ return self._value / 10 ** self.MAX_DIGITS
+
+ def in_currency(self, other_currency, market, action="average"):
+ if other_currency == self.currency:
+ return self
+ asset_ticker = self.get_ticker(other_currency, market)
+ if asset_ticker is not None:
+ return Amount(
+ other_currency,
+ 0,
+ int_val=int(self._value * asset_ticker[action]),
+ linked_to=self,
+ ticker=asset_ticker)
+ else:
+ raise Exception("This asset is not available in the chosen market")
+
+ def get_ticker(self, c2, market, refresh=False):
+ c1 = self.currency
+
+ def invert(ticker):
+ return {
+ "inverted": True,
+ "average": (float(1/ticker["bid"]) + float(1/ticker["ask"]) ) / 2,
+ "notInverted": ticker,
+ }
+ def augment_ticker(ticker):
+ ticker.update({
+ "inverted": False,
+ "average": (ticker["bid"] + ticker["ask"] ) / 2,
+ })
+
+ if time.time() - self.ticker_cache_timestamp > 5:
+ self.ticker_cache = {}
+ self.ticker_cache_timestamp = time.time()
+ elif not refresh:
+ if (c1, c2, market.__class__) in self.ticker_cache:
+ return self.ticker_cache[(c1, c2, market.__class__)]
+ if (c2, c1, market.__class__) in self.ticker_cache:
+ return invert(self.ticker_cache[(c2, c1, market.__class__)])
+
+ try:
+ self.ticker_cache[(c1, c2, market.__class__)] = market.fetch_ticker("{}/{}".format(c1, c2))
+ augment_ticker(self.ticker_cache[(c1, c2, market.__class__)])
+ except ccxt.ExchangeError:
+ try:
+ self.ticker_cache[(c2, c1, market.__class__)] = market.fetch_ticker("{}/{}".format(c2, c1))
+ augment_ticker(self.ticker_cache[(c2, c1, market.__class__)])
+ except ccxt.ExchangeError:
+ self.ticker_cache[(c1, c2, market.__class__)] = None
+ return self.get_ticker(c2, market)
+
+ def __abs__(self):
+ return Amount(self.currency, 0, int_val=abs(self._value))
+
+ def __add__(self, other):
+ if other.currency != self.currency and other._value * self._value != 0:
+ raise Exception("Summing amounts must be done with same currencies")
+ return Amount(self.currency, 0, int_val=self._value + other._value)
+
+ def __radd__(self, other):
+ if other == 0:
+ return self
+ else:
+ return self.__add__(other)
+
+ def __sub__(self, other):
+ if other.currency != self.currency and other._value * self._value != 0:
+ raise Exception("Summing amounts must be done with same currencies")
+ return Amount(self.currency, 0, int_val=self._value - other._value)
+
+ def __int__(self):
+ return self._value
+
+ def __mul__(self, value):
+ if type(value) != int and type(value) != float:
+ raise TypeError("Amount may only be multiplied by numbers")
+ return Amount(self.currency, 0, int_val=(self._value * value))
+
+ def __rmul__(self, value):
+ return self.__mul__(value)
+
+ def __floordiv__(self, value):
+ if type(value) != int:
+ raise TypeError("Amount may only be multiplied by integers")
+ return Amount(self.currency, 0, int_val=(self._value // value))
+
+ def __truediv__(self, value):
+ return self.__floordiv__(value)
+
+ def __lt__(self, other):
+ if self.currency != other.currency:
+ raise Exception("Comparing amounts must be done with same currencies")
+ return self._value < other._value
+
+ def __eq__(self, other):
+ if other == 0:
+ return self._value == 0
+ if self.currency != other.currency:
+ raise Exception("Comparing amounts must be done with same currencies")
+ return self._value == other._value
+
+ def __str__(self):
+ if self.linked_to is None:
+ return "{:.8f} {}".format(self.value, self.currency)
+ else:
+ return "{:.8f} {} [{}]".format(self.value, self.currency, self.linked_to)
+
+ def __repr__(self):
+ if self.linked_to is None:
+ return "Amount({:.8f} {})".format(self.value, self.currency)
+ else:
+ return "Amount({:.8f} {} -> {})".format(self.value, self.currency, repr(self.linked_to))
+
+class Balance:
+ known_balances = {}
+ trades = {}
+
+ def __init__(self, currency, total_value, free_value, used_value):
+ self.currency = currency
+ self.total = Amount(currency, total_value)
+ self.free = Amount(currency, free_value)
+ self.used = Amount(currency, used_value)
+
+ @classmethod
+ def in_currency(cls, other_currency, market, action="average", type="total"):
+ amounts = {}
+ for currency in cls.known_balances:
+ balance = cls.known_balances[currency]
+ other_currency_amount = getattr(balance, type)\
+ .in_currency(other_currency, market, action=action)
+ amounts[currency] = other_currency_amount
+ return amounts
+
+ @classmethod
+ def currencies(cls):
+ return cls.known_balances.keys()
+
+ @classmethod
+ def from_hash(cls, currency, hash_):
+ return cls(currency, hash_["total"], hash_["free"], hash_["used"])
+
+ @classmethod
+ def _fill_balances(cls, hash_):
+ for key in hash_:
+ if key in ["info", "free", "used", "total"]:
+ continue
+ if hash_[key]["total"] > 0:
+ cls.known_balances[key] = cls.from_hash(key, hash_[key])
+
+ @classmethod
+ def fetch_balances(cls, market):
+ cls._fill_balances(market.fetch_balance())
+ return cls.known_balances
+
+ @classmethod
+ def dispatch_assets(cls, amount):
+ repartition_pertenthousand = Portfolio.repartition_pertenthousand()
+ sum_pertenthousand = sum([v for k, v in repartition_pertenthousand.items()])
+ amounts = {}
+ for currency, ptt in repartition_pertenthousand.items():
+ amounts[currency] = ptt * amount / sum_pertenthousand
+ if currency not in cls.known_balances:
+ cls.known_balances[currency] = cls(currency, 0, 0, 0)
+ return amounts
+
+ @classmethod
+ def prepare_trades(cls, market, base_currency="BTC"):
+ cls.fetch_balances(market)
+ values_in_base = cls.in_currency(base_currency, market)
+ total_base_value = sum(values_in_base.values())
+ new_repartition = cls.dispatch_assets(total_base_value)
+ Trade.compute_trades(values_in_base, new_repartition, market=market)
+
+ def __int__(self):
+ return int(self.total)
+
+ def __repr__(self):
+ return "Balance({} [{}/{}/{}])".format(self.currency, str(self.free), str(self.used), str(self.total))
+
+class Trade:
+ trades = {}
+
+ def __init__(self, value_from, value_to, currency, market=None):
+ # We have value_from of currency, and want to finish with value_to of
+ # that currency. value_* may not be in currency's terms
+ self.currency = currency
+ self.value_from = value_from
+ self.value_to = value_to
+ self.orders = []
+ assert self.value_from.currency == self.value_to.currency
+ assert self.value_from.linked_to is not None and self.value_from.linked_to.currency == self.currency
+ self.base_currency = self.value_from.currency
+
+ if market is not None:
+ self.prepare_order(market)
+
+ @classmethod
+ def compute_trades(cls, values_in_base, new_repartition, market=None):
+ base_currency = sum(values_in_base.values()).currency
+ for currency in Balance.currencies():
+ if currency == base_currency:
+ continue
+ cls.trades[currency] = cls(
+ values_in_base.get(currency, Amount(base_currency, 0)),
+ new_repartition.get(currency, Amount(base_currency, 0)),
+ currency,
+ market=market
+ )
+ return cls.trades
+
+ @property
+ def action(self):
+ if self.value_from == self.value_to:
+ return None
+ if self.base_currency == self.currency:
+ return None
+
+ if self.value_from < self.value_to:
+ return "buy"
+ else:
+ return "sell"
+
+ def ticker_action(self, inverted):
+ if self.value_from < self.value_to:
+ return "ask" if not inverted else "bid"
+ else:
+ return "bid" if not inverted else "ask"
+
+ def prepare_order(self, market):
+ if self.action is None:
+ return
+ ticker = self.value_from.ticker
+ inverted = ticker["inverted"]
+
+ if not inverted:
+ value_from = self.value_from.linked_to
+ value_to = self.value_to.in_currency(self.currency, market)
+ delta = abs(value_to - value_from)
+ currency = self.base_currency
+ else:
+ ticker = ticker["notInverted"]
+ delta = abs(self.value_to - self.value_from)
+ currency = self.currency
+
+ rate = ticker[self.ticker_action(inverted)]
+
+ self.orders.append(Order(self.ticker_action(inverted), delta, rate, currency))
+
+ @classmethod
+ def all_orders(cls):
+ return sum(map(lambda v: v.orders, cls.trades.values()), [])
+
+ @classmethod
+ def follow_orders(cls, market):
+ orders = cls.all_orders()
+ finished_orders = []
+ while len(orders) != len(finished_orders):
+ time.sleep(30)
+ for order in orders:
+ if order in finished_orders:
+ continue
+ if order.get_status(market) != "open":
+ finished_orders.append(order)
+ print("finished {}".format(order))
+ print("All orders finished")
+
+ def __repr__(self):
+ return "Trade({} -> {} in {}, {})".format(
+ self.value_from,
+ self.value_to,
+ self.currency,
+ self.action)
+
+class Order:
+ DEBUG = True
+
+ def __init__(self, action, amount, rate, base_currency):
+ self.action = action
+ self.amount = amount
+ self.rate = rate
+ self.base_currency = base_currency
+ self.result = None
+ self.status = "not run"
+
+ def __repr__(self):
+ return "Order({} {} at {} {} [{}])".format(
+ self.action,
+ self.amount,
+ self.rate,
+ self.base_currency,
+ self.status
+ )
+
+ def run(self, market):
+ symbol = "{}/{}".format(self.amount.currency, self.base_currency)
+ amount = self.amount.value
+
+ if self.DEBUG:
+ print("market.create_order('{}', 'limit', '{}', {}, price={})".format(
+ symbol, self.action, amount, self.rate))
+ else:
+ try:
+ self.result = market.create_order(symbol, 'limit', self.action, amount, price=self.rate)
+ self.status = "open"
+ except Exception:
+ pass
+
+ def get_status(self, market):
+ # other states are "closed" and "canceled"
+ if self.status == "open":
+ result = market.fetch_order(self.result['id'])
+ self.status = result["status"]
+ return self.status
+
+@static_var("cache", {})
+def fetch_fees(market):
+ if market.__class__ not in fetch_fees.cache:
+ fetch_fees.cache[market.__class__] = market.fetch_fees()
+ return fetch_fees.cache[market.__class__]
+
+def print_orders(market, base_currency="BTC"):
+ Balance.prepare_trades(market, base_currency=base_currency)
+ for currency, trade in Trade.trades.items():
+ print(trade)
+ for order in trade.orders:
+ print("\t", order, sep="")
+
+def make_orders(market, base_currency="BTC"):
+ Balance.prepare_trades(market, base_currency=base_currency)
+ for currency, trade in Trade.trades.items():
+ print(trade)
+ for order in trade.orders:
+ print("\t", order, sep="")
+ order.run(market)
+
+if __name__ == '__main__':
+ print_orders(market)
+++ /dev/null
-import ccxt
-# Put your poloniex api key in market.py
-from market import market
-
-def static_var(varname, value):
- def decorate(func):
- setattr(func, varname, value)
- return func
- return decorate
-
-max_digits = 18
-
-repartition_pertenthousand = {
- "BTC": 2857,
- "ZEC": 3701,
- "DOGE": 1805,
- "DGB": 1015,
- "SC": 623,
- }
-
-
-def formatted_price(value):
- return round(value / 10**max_digits, 8)
-
-@static_var("cache", {})
-def get_ticker(c1, c2, market):
- def invert(ticker):
- return {
- "inverted": True,
- "bid": float(1/ticker["ask"]),
- "ask": float(1/ticker["bid"]),
- "bidA": float(1/ticker["askA"]),
- "askA": float(1/ticker["bidA"]),
- "bidE": float(1/ticker["askE"]),
- "askE": float(1/ticker["bidE"]),
- }
- def augment_ticker(ticker):
- bid_factor = 1.01
- ask_factor = 0.99
- fees = fetch_fees(market)
- # FIXME: need to do better than just a multiplier
- ticker.update({
- "inverted": False,
- # Adjusted
- "bidA": ticker["bid"] * bid_factor,
- "askA": ticker["ask"] * ask_factor,
- # Expected in the end
- "bidE": ticker["bid"] * bid_factor * (1 - fees["maker"]),
- "askE": ticker["ask"] * ask_factor * (1 - fees["maker"]),
- # fees
- "bidF": ticker["bid"] * bid_factor * fees["maker"],
- "askF": ticker["ask"] * ask_factor * fees["maker"],
- })
-
- if (c1, c2, market.__class__) in get_ticker.cache:
- return get_ticker.cache[(c1, c2, market.__class__)]
- if (c2, c1, market.__class__) in get_ticker.cache:
- return invert(get_ticker.cache[(c2, c1, market.__class__)])
-
- try:
- get_ticker.cache[(c1, c2, market.__class__)] = market.fetch_ticker("{}/{}".format(c1, c2))
- augment_ticker(get_ticker.cache[(c1, c2, market.__class__)])
- except ccxt.ExchangeError:
- try:
- get_ticker.cache[(c2, c1, market.__class__)] = market.fetch_ticker("{}/{}".format(c2, c1))
- augment_ticker(get_ticker.cache[(c2, c1, market.__class__)])
- except ccxt.ExchangeError:
- get_ticker.cache[(c1, c2, market.__class__)] = None
- return get_ticker(c1, c2, market)
-
-def fetch_balances(market):
- balances = {}
- fetched_balance = market.fetch_balance()
- for key, value in fetched_balance["total"].items():
- if value > 0:
- balances[key] = int(value * 10**max_digits)
- return balances
-
-@static_var("cache", {})
-def fetch_fees(market):
- if market.__class__ not in fetch_fees.cache:
- fetch_fees.cache[market.__class__] = market.fetch_fees()
- return fetch_fees.cache[market.__class__]
-
-def assets_value(assets, market, base_currency="BTC"):
- repartition_in_base_currency = {}
- for currency, asset_value in assets.items():
- if currency == base_currency:
- repartition_in_base_currency[currency] = [asset_value, 0]
- else:
- asset_ticker = get_ticker(currency, base_currency, market)
- if asset_ticker is None:
- raise Exception("This asset is not available in the chosen market")
- repartition_in_base_currency[currency] = [
- int(asset_ticker["bidE"] * asset_value),
- int(asset_ticker["bidF"] * asset_value)
- ]
-
- return repartition_in_base_currency
-
-def dispatch_assets(base_currency_value, repartition_pertenthousand, market, base_currency="BTC"):
- sum_pertenthousand = sum([v for k, v in repartition_pertenthousand.items()])
- repartition_in_base_currency = {}
- for currency, ptt in repartition_pertenthousand.items():
- repartition_in_base_currency[currency] = int(ptt * base_currency_value / sum_pertenthousand)
- return repartition_in_base_currency
-
-def compute_moves(current_assets, repartition_pertenthousand, market, no_fees=True, base_currency="BTC"):
- value_in_base = assets_value(current_assets, market, base_currency=base_currency)
- total_base_value = sum([ v[0] for k, v in value_in_base.items()])
-
- new_repartition = dispatch_assets(total_base_value, repartition_pertenthousand, market, base_currency=base_currency)
- mouvements = {}
-
- if no_fees:
- for key in set(value_in_base.keys()).union(set(new_repartition.keys())):
- mouvements[key] = value_in_base.get(key, [0, 0])[0] - new_repartition.get(key, 0)
- else:
- for key in set(value_in_base.keys()).union(set(new_repartition.keys())):
- value, fee = value_in_base.get(key, [0, 0])
- mouvements[key] = [value - new_repartition.get(key, 0), fee]
-
- return mouvements
-
-def compute_order(currency, value, market, base_currency="BTC"):
- if currency == base_currency or value == 0:
- return [None, 0, False]
-
- asset_ticker = get_ticker(currency, base_currency, market)
- if asset_ticker["inverted"]:
- asset_ticker = get_ticker(base_currency, currency, market)
- if value > 0:
- rate = asset_ticker["askA"]
- return ["buy", rate, True]
- else:
- rate = asset_ticker["bidA"]
- return ["sell", rate, True]
- else:
- if value > 0:
- rate = asset_ticker["bidA"]
- return ["sell", rate, False]
- else:
- rate = asset_ticker["askA"]
- return ["buy", rate, False]
-
-def make_order(currency, value, market, base_currency="BTC"):
- action, rate, inverted = compute_order(currency, value, market, base_currency=base_currency)
- amount = formatted_price(abs(value))
- if not inverted:
- symbol = "{}/{}".format(currency, base_currency)
- else:
- symbol = "{}/{}".format(base_currency, currency)
- return market.create_order(symbol, 'limit', action, amount, price=rate)
-
-def make_orders(current_assets, repartition_pertenthousand, market, base_currency="BTC"):
- mouvements = compute_moves(
- current_assets,
- repartition_pertenthousand,
- market,
- base_currency=base_currency)
-
- results = []
- for currency, value in sorted(mouvements.items(), key=lambda x: x[1]):
- # FIXME: wait for sales to finish
- results.append(make_order(currency, value, market, base_currency=base_currency))
- return results
-
-def print_assets(assets, indent="", market=None, base_currency="BTC"):
- if market is not None:
- format_string = "{}{} {} ({} {})"
- else:
- format_string = "{}{} {}"
- base_currency_price = 0
-
- for currency, value in assets.items():
- if market is not None:
- asset_ticker = get_ticker(currency, base_currency, market)
- base_currency_price = asset_ticker["bidE"] * value
- print(format_string.format(
- indent,
- formatted_price(value),
- currency,
- formatted_price(base_currency_price),
- base_currency))
-
-def print_orders(current_assets, repartition_pertenthousand, market, base_currency="BTC"):
- mouvements = compute_moves(
- current_assets,
- repartition_pertenthousand,
- market,
- no_fees=False,
- base_currency=base_currency)
-
- for currency, [value, fee] in mouvements.items():
- action, rate, inverted = compute_order(
- currency,
- value,
- market,
- base_currency=base_currency)
- if action is not None:
- currency_price = int(value / rate)
-
- if not inverted:
- c1, c2 = [base_currency, currency]
- v1, v2 = [value, currency_price]
- else:
- c1, c2 = [currency, base_currency]
- v1, v2 = [currency_price, value]
-
- print("need to {} {} {}'s worth of {}, i.e. {} {} ( + {} {} fee)".format(
- action,
- formatted_price(abs(v1)), c1,
- c2,
- formatted_price(abs(v2)), c2,
- formatted_price(fee), c2))
-
-current_assets = fetch_balances(market)
-print_orders(current_assets, repartition_pertenthousand, market)
--- /dev/null
+import portfolio
+import unittest
+from unittest import mock
+
+class AmountTest(unittest.TestCase):
+ def setUp(self):
+ super(AmountTest, self).setUp()
+
+ def test_values(self):
+ amount = portfolio.Amount("BTC", 0.65)
+ self.assertEqual(0.65, amount.value)
+ self.assertEqual("BTC", amount.currency)
+
+ amount = portfolio.Amount("BTC", 10, int_val=2000000000000000)
+ self.assertEqual(0.002, amount.value)
+
+ def test_in_currency(self):
+ amount = portfolio.Amount("ETC", 10)
+
+ self.assertEqual(amount, amount.in_currency("ETC", None))
+
+ ticker_mock = unittest.mock.Mock()
+ with mock.patch.object(portfolio.Amount, 'get_ticker', new=ticker_mock):
+ ticker_mock.return_value = None
+ portfolio.Amount.get_ticker = ticker_mock
+
+ self.assertRaises(Exception, amount.in_currency, "ETH", None)
+
+ with mock.patch.object(portfolio.Amount, 'get_ticker', new=ticker_mock):
+ ticker_mock.return_value = {
+ "average": 0.3,
+ "foo": "bar",
+ }
+ converted_amount = amount.in_currency("ETH", None)
+
+ self.assertEqual(3.0, converted_amount.value)
+ self.assertEqual("ETH", converted_amount.currency)
+ self.assertEqual(amount, converted_amount.linked_to)
+ self.assertEqual("bar", converted_amount.ticker["foo"])
+
+ @unittest.skip("TODO")
+ def test_get_ticker(self):
+ pass
+
+ def test__abs(self):
+ amount = portfolio.Amount("SC", -120)
+ self.assertEqual(120, abs(amount).value)
+ self.assertEqual("SC", abs(amount).currency)
+
+ amount = portfolio.Amount("SC", 10)
+ self.assertEqual(10, abs(amount).value)
+ self.assertEqual("SC", abs(amount).currency)
+
+ def test__add(self):
+ amount1 = portfolio.Amount("XVG", 12.9)
+ amount2 = portfolio.Amount("XVG", 13.1)
+
+ self.assertEqual(26, (amount1 + amount2).value)
+ self.assertEqual("XVG", (amount1 + amount2).currency)
+
+ amount3 = portfolio.Amount("ETH", 1.6)
+ with self.assertRaises(Exception):
+ amount1 + amount3
+
+ amount4 = portfolio.Amount("ETH", 0.0)
+ self.assertEqual(amount1, amount1 + amount4)
+
+ def test__radd(self):
+ amount = portfolio.Amount("XVG", 12.9)
+
+ self.assertEqual(amount, 0 + amount)
+ with self.assertRaises(Exception):
+ 4 + amount
+
+ def test__sub(self):
+ amount1 = portfolio.Amount("XVG", 13.3)
+ amount2 = portfolio.Amount("XVG", 13.1)
+
+ self.assertEqual(0.2, (amount1 - amount2).value)
+ self.assertEqual("XVG", (amount1 - amount2).currency)
+
+ amount3 = portfolio.Amount("ETH", 1.6)
+ with self.assertRaises(Exception):
+ amount1 - amount3
+
+ amount4 = portfolio.Amount("ETH", 0.0)
+ self.assertEqual(amount1, amount1 - amount4)
+
+ def test__int(self):
+ amount = portfolio.Amount("XMR", 0.1)
+ self.assertEqual(100000000000000000, int(amount))
+
+ def test__mul(self):
+ amount = portfolio.Amount("XEM", 11)
+
+ self.assertEqual(38.5, (amount * 3.5).value)
+ self.assertEqual(33, (amount * 3).value)
+
+ with self.assertRaises(Exception):
+ amount * amount
+
+ def test__rmul(self):
+ amount = portfolio.Amount("XEM", 11)
+
+ self.assertEqual(38.5, (3.5 * amount).value)
+ self.assertEqual(33, (3 * amount).value)
+
+ def test__floordiv(self):
+ amount = portfolio.Amount("XEM", 11)
+
+ self.assertEqual(5.5, (amount // 2).value)
+ with self.assertRaises(TypeError):
+ amount // 2.5
+ self.assertEqual(1571428571428571428, (amount // 7)._value)
+
+ def test__div(self):
+ amount = portfolio.Amount("XEM", 11)
+
+ with self.assertRaises(TypeError):
+ amount / 2.5
+ self.assertEqual(5.5, (amount / 2).value)
+ self.assertEqual(1571428571428571428, (amount / 7)._value)
+
+ def test__lt(self):
+ amount1 = portfolio.Amount("BTD", 11.3)
+ amount2 = portfolio.Amount("BTD", 13.1)
+
+ self.assertTrue(amount1 < amount2)
+ self.assertFalse(amount2 < amount1)
+ self.assertFalse(amount1 < amount1)
+
+ amount3 = portfolio.Amount("BTC", 1.6)
+ with self.assertRaises(Exception):
+ amount1 < amount3
+
+ def test__eq(self):
+ amount1 = portfolio.Amount("BTD", 11.3)
+ amount2 = portfolio.Amount("BTD", 13.1)
+ amount3 = portfolio.Amount("BTD", 11.3)
+
+ self.assertFalse(amount1 == amount2)
+ self.assertFalse(amount2 == amount1)
+ self.assertTrue(amount1 == amount3)
+ self.assertFalse(amount2 == 0)
+
+ amount4 = portfolio.Amount("BTC", 1.6)
+ with self.assertRaises(Exception):
+ amount1 == amount4
+
+ amount5 = portfolio.Amount("BTD", 0)
+ self.assertTrue(amount5 == 0)
+
+ def test__str(self):
+ amount1 = portfolio.Amount("BTX", 32)
+ self.assertEqual("32.00000000 BTX", str(amount1))
+
+ amount2 = portfolio.Amount("USDT", 12000)
+ amount1.linked_to = amount2
+ self.assertEqual("32.00000000 BTX [12000.00000000 USDT]", str(amount1))
+
+ def test__repr(self):
+ amount1 = portfolio.Amount("BTX", 32)
+ self.assertEqual("Amount(32.00000000 BTX)", repr(amount1))
+
+ amount2 = portfolio.Amount("USDT", 12000)
+ amount1.linked_to = amount2
+ self.assertEqual("Amount(32.00000000 BTX -> Amount(12000.00000000 USDT))", repr(amount1))
+
+ amount3 = portfolio.Amount("BTC", 0.1)
+ amount2.linked_to = amount3
+ self.assertEqual("Amount(32.00000000 BTX -> Amount(12000.00000000 USDT -> Amount(0.10000000 BTC)))", repr(amount1))
+
+if __name__ == '__main__':
+ unittest.main()