self.market.report.log_stage("{}_begin".format(process_name))
if "fetch_balances_begin" in step:
- self.market.balances.fetch_balances(tag="{}_begin".format(process_name),
- **step["fetch_balances_begin"])
+ self.run_action("fetch_balances", step["fetch_balances_begin"],
+ dict(options, tag="{}_begin".format(process_name)))
for action in self.ordered_actions:
if action in step:
self.run_action(action, step[action], options)
if "fetch_balances_end" in step:
- self.market.balances.fetch_balances(tag="{}_end".format(process_name),
- **step["fetch_balances_end"])
+ self.run_action("fetch_balances", step["fetch_balances_end"],
+ dict(options, tag="{}_end".format(process_name)))
self.market.report.log_stage("{}_end".format(process_name))
method = self.market.trades.close_trades
elif action == "print_tickers":
method = self.market.print_tickers
+ elif action == "fetch_balances":
+ method = self.market.balances.fetch_balances
signature = inspect.getfullargspec(method)
defaults = signature.defaults or []
ticker=asset_ticker,
rate=rate)
else:
- raise Exception("This asset is not available in the chosen market")
+ return Amount(other_currency, 0, linked_to=self, ticker=None, rate=0)
def as_json(self):
return {
compute_value, type)
return amounts
- def fetch_balances(self, tag=None, add_portfolio=False,
+ def fetch_balances(self, tag=None, add_portfolio=False, liquidity="medium",
checkpoint=None, log_tickers=False, add_usdt=False,
ticker_currency="BTC", ticker_compute_value="average", ticker_type="total"):
all_balances = self.market.ccxt.fetch_all_balances()
currency in self.all:
self.all[currency] = portfolio.Balance(currency, balance)
if add_portfolio:
- for currency in Portfolio.repartition(from_cache=True):
+ for currency in Portfolio.repartition(from_cache=True, liquidity=liquidity):
self.all.setdefault(currency, portfolio.Balance(currency, {}))
if add_usdt:
self.all.setdefault("USDT", portfolio.Balance("USDT", {}))
elif delta < datetime.timedelta(days=1):
return 60*60
else:
- raise "Too long waiting"
+ raise Exception("Too long waiting")
@classmethod
def start_worker(cls):
cls.report.print_log("[Worker] Fetching cryptoportfolio")
cls.get_cryptoportfolio(refetch=True)
cls.callback.set()
- time.sleep(cls.next_wait_time())
+ try:
+ time.sleep(cls.next_wait_time())
+ except Exception:
+ cls.stop_worker()
@classmethod
def stop_worker(cls):
if cls.data.get() is not None and not refetch:
return
if cls.worker is not None and not cls.is_worker_thread():
+ if not cls.worker_started:
+ raise Exception("Portfolio worker is down and no usable data is present")
cls.notify_and_wait()
return
try:
method, arguments = processor.method_arguments("print_tickers")
self.assertEqual(m.print_tickers, method)
+ method, arguments = processor.method_arguments("fetch_balances")
+ self.assertEqual(m.balances.fetch_balances, method)
+
def test_process_step(self):
processor = market.Processor(self.m)
mock.call("process_foo__2_sell_begin"),
mock.call("process_foo__2_sell_end"),
])
- self.m.balances.fetch_balances.assert_has_calls([
- mock.call(tag="process_foo__2_sell_begin"),
- mock.call(tag="process_foo__2_sell_end"),
- ])
- self.assertEqual(5, run_action.call_count)
+ self.assertEqual(7, run_action.call_count)
run_action.assert_has_calls([
+ mock.call('fetch_balances', {}, {'foo': 'bar', 'tag': 'process_foo__2_sell_begin'}),
mock.call('prepare_trades', {}, {'foo': 'bar'}),
mock.call('prepare_orders', {'only': 'dispose', 'compute_value': 'average'}, {'foo': 'bar'}),
mock.call('run_orders', {}, {'foo': 'bar'}),
mock.call('follow_orders', {}, {'foo': 'bar'}),
mock.call('close_trades', {}, {'foo': 'bar'}),
+ mock.call('fetch_balances', {}, {'foo': 'bar', 'tag': 'process_foo__2_sell_end'}),
])
self.m.reset_mock()
mock.call("process_foo__0_print_balances_begin"),
mock.call("process_foo__0_print_balances_end"),
])
- self.m.balances.fetch_balances.assert_has_calls([
- mock.call(add_portfolio=True, checkpoint='end',
- log_tickers=True,
- add_usdt=True,
- tag='process_foo__0_print_balances_begin')
- ])
- self.assertEqual(0, run_action.call_count)
+ self.assertEqual(1, run_action.call_count)
+ run_action.assert_has_calls([
+ mock.call('fetch_balances',
+ {'checkpoint': 'end', 'log_tickers': True, 'add_usdt': True, 'add_portfolio': True},
+ {'foo': 'bar', 'tag': 'process_foo__0_print_balances_begin'}),
+ ])
self.m.reset_mock()
with mock.patch.object(processor, "run_action") as run_action:
step = processor.scenarios["sell_needed"][1]
processor.process_step("foo", step, {"foo":"bar"})
- self.m.balances.fetch_balances.assert_not_called()
+ self.assertEqual(1, run_action.call_count)
self.m.reset_mock()
with mock.patch.object(processor, "run_action") as run_action:
step = processor.scenarios["print_balances"][0]
processor.process_step("foo", step, {"foo":"bar"})
- self.m.balances.fetch_balances.assert_called_once_with(
- add_portfolio=True, add_usdt=True, log_tickers=True,
- tag='process_foo__1_print_balances_begin')
+ run_action.assert_has_calls([
+ mock.call('fetch_balances',
+ {'log_tickers': True, 'add_usdt': True, 'add_portfolio': True},
+ {'foo': 'bar', 'tag': 'process_foo__1_print_balances_begin'}),
+ ])
def test_parse_args(self):
processor = market.Processor(self.m)
with self.subTest(desc="no ticker for currency"):
self.m.get_ticker.return_value = None
- self.assertRaises(Exception, amount.in_currency, "ETH", self.m)
+ self.assertEqual(portfolio.Amount("ETH", 0), amount.in_currency("ETH", self.m))
with self.subTest(desc="nominal case"):
self.m.get_ticker.return_value = {
with self.subTest(worker=False):
market.Portfolio.data = store.LockedVar(None)
market.Portfolio.worker = mock.Mock()
+ market.Portfolio.worker_started = True
is_worker.return_value = False
market.Portfolio.get_cryptoportfolio()
notify.assert_called_once_with()
parse_cryptoportfolio.assert_not_called()
store_cryptoportfolio.assert_not_called()
+ with self.subTest(worker_started=False):
+ market.Portfolio.data = store.LockedVar(None)
+ market.Portfolio.worker = mock.Mock()
+ market.Portfolio.worker_started = False
+ is_worker.return_value = False
+ with self.assertRaises(Exception):
+ market.Portfolio.get_cryptoportfolio()
def test_parse_cryptoportfolio(self):
with self.subTest(description="Normal case"):
store.Portfolio.worker.join()
self.assertFalse(store.Portfolio.worker.is_alive())
+ with self.subTest("overdue"),\
+ mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
+ mock.patch.object(store.Portfolio, "report") as report,\
+ mock.patch.object(store.Portfolio, "next_wait_time") as wait,\
+ mock.patch.object(store.time, "sleep") as sleep:
+ wait.side_effect = Exception("Time over")
+ store.Portfolio.start_worker()
+
+ store.Portfolio.worker_notify.set()
+
+ store.Portfolio.callback.wait()
+
+ report.print_log.assert_called_once_with("[Worker] Fetching cryptoportfolio")
+ get.assert_called_once_with(refetch=True)
+ self.assertFalse(store.Portfolio.worker.is_alive())
+
def test_notify_and_wait(self):
with mock.patch.object(store.Portfolio, "callback") as callback,\
mock.patch.object(store.Portfolio, "worker_notify") as worker_notify: