worker_started = False
worker_notify = None
callback = None
+ poll_started_at = None
@classmethod
- def start_worker(cls, poll=30):
+ def next_wait_time(cls):
+ now = datetime.datetime.now()
+ if cls.poll_started_at is None:
+ cls.poll_started_at = now
+ delta = now - cls.poll_started_at
+
+ if delta < datetime.timedelta(minutes=30):
+ return 30
+ elif delta < datetime.timedelta(hours=1):
+ return 60
+ elif delta < datetime.timedelta(hours=4):
+ return 5*60
+ elif delta < datetime.timedelta(days=1):
+ return 60*60
+ else:
+ raise "Too long waiting"
+
+ @classmethod
+ def start_worker(cls):
import threading
cls.worker = threading.Thread(name="portfolio", daemon=True,
- target=cls.wait_for_notification, kwargs={"poll": poll})
+ target=cls.wait_for_notification)
cls.worker_notify = threading.Event()
cls.callback = threading.Event()
return cls.worker == threading.current_thread()
@classmethod
- def wait_for_notification(cls, poll=30):
+ def wait_for_notification(cls):
if not cls.is_worker_thread():
raise RuntimeError("This method needs to be ran with the worker")
while cls.worker_started:
cls.report.print_log("[Worker] Fetching cryptoportfolio")
cls.get_cryptoportfolio(refetch=True)
cls.callback.set()
- time.sleep(poll)
+ time.sleep(cls.next_wait_time())
@classmethod
def stop_worker(cls):
cls.callback.wait()
@classmethod
- def wait_for_recent(cls, delta=4, poll=30):
+ def wait_for_recent(cls, delta=4):
cls.get_cryptoportfolio()
while cls.last_date.get() is None or datetime.datetime.now() - cls.last_date.get() > datetime.timedelta(delta):
if cls.worker is None:
- time.sleep(poll)
+ time.sleep(cls.next_wait_time())
cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
cls.get_cryptoportfolio(refetch=True)
cls.data.set(r.json(parse_int=D, parse_float=D))
cls.parse_cryptoportfolio()
cls.store_cryptoportfolio()
- except (JSONDecodeError, SimpleJSONDecodeError):
+ except (AssertionError, JSONDecodeError, SimpleJSONDecodeError):
cls.data.set(None)
cls.last_date.set(None)
cls.liquidities.set({})
high_liquidity = parse_weights(cls.data.get("portfolio_1"))
medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
+ assert len(high_liquidity) > 0
+ assert len(medium_liquidity) > 0
cls.liquidities.set({
"medium": medium_liquidity,
"high": high_liquidity,
del(data["portfolio_2"]["weights"])
market.Portfolio.data = store.LockedVar(data)
- market.Portfolio.parse_cryptoportfolio()
- self.assertListEqual(
- ["medium", "high"],
- list(market.Portfolio.liquidities.get().keys()))
- self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
-
- with self.subTest(description="All missing weights"):
- data = store.json.loads(self.json_response, parse_int=D, parse_float=D)
- del(data["portfolio_1"]["weights"])
- del(data["portfolio_2"]["weights"])
- market.Portfolio.data = store.LockedVar(data)
-
- market.Portfolio.parse_cryptoportfolio()
- self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
- self.assertEqual({}, market.Portfolio.liquidities.get("high"))
- self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get())
+ with self.assertRaises(AssertionError):
+ market.Portfolio.parse_cryptoportfolio()
@mock.patch.object(store.dbs, "redis_connected")
@mock.patch.object(store.dbs, "redis")
@mock.patch.object(market.time, "sleep")
@mock.patch.object(market.Portfolio, "get_cryptoportfolio")
- def test_wait_for_recent(self, get_cryptoportfolio, sleep):
+ @mock.patch.object(market.Portfolio, "next_wait_time")
+ def test_wait_for_recent(self, next_wait_time, get_cryptoportfolio, sleep):
self.call_count = 0
def _get(refetch=False):
if self.call_count != 0:
- store.datetime.timedelta(10)\
+ store.datetime.timedelta(self.call_count))
get_cryptoportfolio.side_effect = _get
+ next_wait_time.return_value = 30
market.Portfolio.wait_for_recent()
sleep.assert_called_with(30)
def test_start_worker(self):
with mock.patch.object(store.Portfolio, "wait_for_notification") as notification:
store.Portfolio.start_worker()
- notification.assert_called_once_with(poll=30)
+ notification.assert_called_once_with()
self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__)
self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__)
with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
mock.patch.object(store.Portfolio, "report") as report,\
mock.patch.object(store.time, "sleep") as sleep:
- store.Portfolio.start_worker(poll=3)
+ store.Portfolio.start_worker()
store.Portfolio.stop_worker()
store.Portfolio.worker.join()
get.assert_not_called()
with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
mock.patch.object(store.Portfolio, "report") as report,\
+ mock.patch.object(store.Portfolio, "next_wait_time") as wait,\
mock.patch.object(store.time, "sleep") as sleep:
- store.Portfolio.start_worker(poll=3)
+ wait.return_value = 3
+ store.Portfolio.start_worker()
store.Portfolio.worker_notify.set()
worker_notify.set.assert_called_once_with()
callback.wait.assert_called_once_with()
+ def test_next_wait_time(self):
+ with self.subTest("first start"):
+ self.assertEqual(30, store.Portfolio.next_wait_time())
+ self.assertIsNotNone(store.Portfolio.poll_started_at)
+ with self.subTest("25min"):
+ store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=25)
+ self.assertEqual(30, store.Portfolio.next_wait_time())
+ with self.subTest("35min"):
+ store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=35)
+ self.assertEqual(60, store.Portfolio.next_wait_time())
+ with self.subTest("1h15"):
+ store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(minutes=75)
+ self.assertEqual(300, store.Portfolio.next_wait_time())
+ with self.subTest("5hours"):
+ store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=5)
+ self.assertEqual(3600, store.Portfolio.next_wait_time())
+ with self.subTest("overdue"), self.assertRaises(Exception):
+ store.Portfolio.poll_started_at = datetime.datetime.now() - datetime.timedelta(hours=25)
+ store.Portfolio.next_wait_time()
+