aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIsmaël Bouya <ismael.bouya@normalesup.org>2018-03-09 15:37:10 +0100
committerIsmaël Bouya <ismael.bouya@normalesup.org>2018-03-09 19:12:57 +0100
commitdc1ca9a306f09886c6c57f8d426c59a9d084b2b3 (patch)
treed1a94be893451a4e182f3e75e9afb01749172bb4
parent81d1db5117b61c2bb9f114cfc59f015e97c6eb9b (diff)
downloadTrader-dc1ca9a306f09886c6c57f8d426c59a9d084b2b3.tar.gz
Trader-dc1ca9a306f09886c6c57f8d426c59a9d084b2b3.tar.zst
Trader-dc1ca9a306f09886c6c57f8d426c59a9d084b2b3.zip
Add parallelization
-rw-r--r--main.py13
-rw-r--r--store.py128
-rw-r--r--test.py418
3 files changed, 431 insertions, 128 deletions
diff --git a/main.py b/main.py
index 37f485d..856d449 100644
--- a/main.py
+++ b/main.py
@@ -117,6 +117,8 @@ def parse_args(argv):
117 parser.add_argument("--action", 117 parser.add_argument("--action",
118 action='append', 118 action='append',
119 help="Do a different action than trading (add several times to chain)") 119 help="Do a different action than trading (add several times to chain)")
120 parser.add_argument("--parallel", action='store_true', default=True, dest="parallel")
121 parser.add_argument("--no-parallel", action='store_false', dest="parallel")
120 122
121 args = parser.parse_args(argv) 123 args = parser.parse_args(argv)
122 124
@@ -139,8 +141,15 @@ def main(argv):
139 141
140 pg_config, report_path = parse_config(args.config) 142 pg_config, report_path = parse_config(args.config)
141 143
142 for market_config, user_id in fetch_markets(pg_config, args.user): 144 if args.parallel:
143 process(market_config, user_id, report_path, args) 145 import threading
146 market.Portfolio.start_worker()
147
148 for market_config, user_id in fetch_markets(pg_config, args.user):
149 threading.Thread(target=process, args=[market_config, user_id, report_path, args]).start()
150 else:
151 for market_config, user_id in fetch_markets(pg_config, args.user):
152 process(market_config, user_id, report_path, args)
144 153
145if __name__ == '__main__': # pragma: no cover 154if __name__ == '__main__': # pragma: no cover
146 main(sys.argv[1:]) 155 main(sys.argv[1:])
diff --git a/store.py b/store.py
index c8cdc42..f655be5 100644
--- a/store.py
+++ b/store.py
@@ -309,30 +309,110 @@ class TradeStore:
309 for order in self.all_orders(state="open"): 309 for order in self.all_orders(state="open"):
310 order.get_status() 310 order.get_status()
311 311
312class NoopLock:
313 def __enter__(self, *args):
314 pass
315 def __exit__(self, *args):
316 pass
317
318class LockedVar:
319 def __init__(self, value):
320 self.lock = NoopLock()
321 self.val = value
322
323 def start_lock(self):
324 import threading
325 self.lock = threading.Lock()
326
327 def set(self, value):
328 with self.lock:
329 self.val = value
330
331 def get(self, key=None):
332 with self.lock:
333 if key is not None and isinstance(self.val, dict):
334 return self.val.get(key)
335 else:
336 return self.val
337
338 def __getattr__(self, key):
339 with self.lock:
340 return getattr(self.val, key)
341
312class Portfolio: 342class Portfolio:
313 URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json" 343 URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json"
314 liquidities = {} 344 data = LockedVar(None)
315 data = None 345 liquidities = LockedVar({})
316 last_date = None 346 last_date = LockedVar(None)
317 report = ReportStore(None) 347 report = LockedVar(ReportStore(None))
348 worker = None
349 worker_started = False
350 worker_notify = None
351 callback = None
352
353 @classmethod
354 def start_worker(cls, poll=30):
355 import threading
356
357 cls.worker = threading.Thread(name="portfolio", daemon=True,
358 target=cls.wait_for_notification, kwargs={"poll": poll})
359 cls.worker_notify = threading.Event()
360 cls.callback = threading.Event()
361
362 cls.last_date.start_lock()
363 cls.liquidities.start_lock()
364 cls.report.start_lock()
365
366 cls.worker_started = True
367 cls.worker.start()
368
369 @classmethod
370 def is_worker_thread(cls):
371 if cls.worker is None:
372 return False
373 else:
374 import threading
375 return cls.worker == threading.current_thread()
376
377 @classmethod
378 def wait_for_notification(cls, poll=30):
379 if not cls.is_worker_thread():
380 raise RuntimeError("This method needs to be ran with the worker")
381 while cls.worker_started:
382 cls.worker_notify.wait()
383 cls.worker_notify.clear()
384 cls.report.print_log("Fetching cryptoportfolio")
385 cls.get_cryptoportfolio(refetch=True)
386 cls.callback.set()
387 time.sleep(poll)
318 388
319 @classmethod 389 @classmethod
320 def wait_for_recent(cls, delta=4): 390 def notify_and_wait(cls):
391 cls.callback.clear()
392 cls.worker_notify.set()
393 cls.callback.wait()
394
395 @classmethod
396 def wait_for_recent(cls, delta=4, poll=30):
321 cls.get_cryptoportfolio() 397 cls.get_cryptoportfolio()
322 while cls.last_date is None or datetime.now() - cls.last_date > timedelta(delta): 398 while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta):
323 time.sleep(30) 399 if cls.worker is None:
324 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") 400 time.sleep(poll)
401 cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio")
325 cls.get_cryptoportfolio(refetch=True) 402 cls.get_cryptoportfolio(refetch=True)
326 403
327 @classmethod 404 @classmethod
328 def repartition(cls, liquidity="medium"): 405 def repartition(cls, liquidity="medium"):
329 cls.get_cryptoportfolio() 406 cls.get_cryptoportfolio()
330 liquidities = cls.liquidities[liquidity] 407 liquidities = cls.liquidities.get(liquidity)
331 return liquidities[cls.last_date] 408 return liquidities[cls.last_date.get()]
332 409
333 @classmethod 410 @classmethod
334 def get_cryptoportfolio(cls, refetch=False): 411 def get_cryptoportfolio(cls, refetch=False):
335 if cls.data is not None and not refetch: 412 if cls.data.get() is not None and not refetch:
413 return
414 if cls.worker is not None and not cls.is_worker_thread():
415 cls.notify_and_wait()
336 return 416 return
337 try: 417 try:
338 r = requests.get(cls.URL) 418 r = requests.get(cls.URL)
@@ -342,11 +422,12 @@ class Portfolio:
342 cls.report.log_error("get_cryptoportfolio", exception=e) 422 cls.report.log_error("get_cryptoportfolio", exception=e)
343 return 423 return
344 try: 424 try:
345 cls.data = r.json(parse_int=D, parse_float=D) 425 cls.data.set(r.json(parse_int=D, parse_float=D))
346 cls.parse_cryptoportfolio() 426 cls.parse_cryptoportfolio()
347 except (JSONDecodeError, SimpleJSONDecodeError): 427 except (JSONDecodeError, SimpleJSONDecodeError):
348 cls.data = None 428 cls.data.set(None)
349 cls.liquidities = {} 429 cls.last_date.set(None)
430 cls.liquidities.set({})
350 431
351 @classmethod 432 @classmethod
352 def parse_cryptoportfolio(cls): 433 def parse_cryptoportfolio(cls):
@@ -366,6 +447,8 @@ class Portfolio:
366 return clean_weights_ 447 return clean_weights_
367 448
368 def parse_weights(portfolio_hash): 449 def parse_weights(portfolio_hash):
450 if "weights" not in portfolio_hash:
451 return {}
369 weights_hash = portfolio_hash["weights"] 452 weights_hash = portfolio_hash["weights"]
370 weights = {} 453 weights = {}
371 for i in range(len(weights_hash["_row"])): 454 for i in range(len(weights_hash["_row"])):
@@ -375,13 +458,16 @@ class Portfolio:
375 map(clean_weights(i), weights_hash.items()))) 458 map(clean_weights(i), weights_hash.items())))
376 return weights 459 return weights
377 460
378 high_liquidity = parse_weights(cls.data["portfolio_1"]) 461 high_liquidity = parse_weights(cls.data.get("portfolio_1"))
379 medium_liquidity = parse_weights(cls.data["portfolio_2"]) 462 medium_liquidity = parse_weights(cls.data.get("portfolio_2"))
380 463
381 cls.liquidities = { 464 cls.liquidities.set({
382 "medium": medium_liquidity, 465 "medium": medium_liquidity,
383 "high": high_liquidity, 466 "high": high_liquidity,
384 } 467 })
385 cls.last_date = max(max(medium_liquidity.keys()), max(high_liquidity.keys())) 468 cls.last_date.set(max(
469 max(medium_liquidity.keys(), default=datetime(1, 1, 1)),
470 max(high_liquidity.keys(), default=datetime(1, 1, 1))
471 ))
386 472
387 473
diff --git a/test.py b/test.py
index c0e6a8a..f61e739 100644
--- a/test.py
+++ b/test.py
@@ -7,6 +7,7 @@ from unittest import mock
7import requests 7import requests
8import requests_mock 8import requests_mock
9from io import StringIO 9from io import StringIO
10import threading
10import portfolio, market, main, store 11import portfolio, market, main, store
11 12
12limits = ["acceptance", "unit"] 13limits = ["acceptance", "unit"]
@@ -33,10 +34,14 @@ class WebMockTestCase(unittest.TestCase):
33 34
34 self.patchers = [ 35 self.patchers = [
35 mock.patch.multiple(market.Portfolio, 36 mock.patch.multiple(market.Portfolio,
36 last_date=None, 37 data=store.LockedVar(None),
37 data=None, 38 liquidities=store.LockedVar({}),
38 liquidities={}, 39 last_date=store.LockedVar(None),
39 report=mock.Mock()), 40 report=mock.Mock(),
41 worker=None,
42 worker_notify=None,
43 worker_started=False,
44 callback=None),
40 mock.patch.multiple(portfolio.Computation, 45 mock.patch.multiple(portfolio.Computation,
41 computations=portfolio.Computation.computations), 46 computations=portfolio.Computation.computations),
42 ] 47 ]
@@ -442,6 +447,99 @@ class poloniexETest(unittest.TestCase):
442 create_order.assert_called_once_with("symbol", "type", "side", "amount", price="price", params="params") 447 create_order.assert_called_once_with("symbol", "type", "side", "amount", price="price", params="params")
443 448
444@unittest.skipUnless("unit" in limits, "Unit skipped") 449@unittest.skipUnless("unit" in limits, "Unit skipped")
450class NoopLockTest(unittest.TestCase):
451 def test_with(self):
452 noop_lock = store.NoopLock()
453 with noop_lock:
454 self.assertTrue(True)
455
456@unittest.skipUnless("unit" in limits, "Unit skipped")
457class LockedVar(unittest.TestCase):
458
459 def test_values(self):
460 locked_var = store.LockedVar("Foo")
461 self.assertIsInstance(locked_var.lock, store.NoopLock)
462 self.assertEqual("Foo", locked_var.val)
463
464 def test_get(self):
465 with self.subTest(desc="Normal case"):
466 locked_var = store.LockedVar("Foo")
467 self.assertEqual("Foo", locked_var.get())
468 with self.subTest(desc="Dict"):
469 locked_var = store.LockedVar({"foo": "bar"})
470 self.assertEqual({"foo": "bar"}, locked_var.get())
471 self.assertEqual("bar", locked_var.get("foo"))
472 self.assertIsNone(locked_var.get("other"))
473
474 def test_set(self):
475 locked_var = store.LockedVar("Foo")
476 locked_var.set("Bar")
477 self.assertEqual("Bar", locked_var.get())
478
479 def test__getattr(self):
480 dummy = type('Dummy', (object,), {})()
481 dummy.attribute = "Hey"
482
483 locked_var = store.LockedVar(dummy)
484 self.assertEqual("Hey", locked_var.attribute)
485 with self.assertRaises(AttributeError):
486 locked_var.other
487
488 def test_start_lock(self):
489 locked_var = store.LockedVar("Foo")
490 locked_var.start_lock()
491 self.assertEqual("lock", locked_var.lock.__class__.__name__)
492
493 thread1 = threading.Thread(target=locked_var.set, args=["Bar1"])
494 thread2 = threading.Thread(target=locked_var.set, args=["Bar2"])
495 thread3 = threading.Thread(target=locked_var.set, args=["Bar3"])
496
497 with locked_var.lock:
498 thread1.start()
499 thread2.start()
500 thread3.start()
501
502 self.assertEqual("Foo", locked_var.val)
503 thread1.join()
504 thread2.join()
505 thread3.join()
506 self.assertEqual("Bar", locked_var.get()[0:3])
507
508 def test_wait_for_notification(self):
509 with self.assertRaises(RuntimeError):
510 store.Portfolio.wait_for_notification()
511
512 with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\
513 mock.patch.object(store.Portfolio, "report") as report,\
514 mock.patch.object(store.time, "sleep") as sleep:
515 store.Portfolio.start_worker(poll=3)
516
517 store.Portfolio.worker_notify.set()
518
519 store.Portfolio.callback.wait()
520
521 report.print_log.assert_called_once_with("Fetching cryptoportfolio")
522 get.assert_called_once_with(refetch=True)
523 sleep.assert_called_once_with(3)
524 self.assertFalse(store.Portfolio.worker_notify.is_set())
525 self.assertTrue(store.Portfolio.worker.is_alive())
526
527 store.Portfolio.callback.clear()
528 store.Portfolio.worker_started = False
529 store.Portfolio.worker_notify.set()
530 store.Portfolio.callback.wait()
531
532 self.assertFalse(store.Portfolio.worker.is_alive())
533
534 def test_notify_and_wait(self):
535 with mock.patch.object(store.Portfolio, "callback") as callback,\
536 mock.patch.object(store.Portfolio, "worker_notify") as worker_notify:
537 store.Portfolio.notify_and_wait()
538 callback.clear.assert_called_once_with()
539 worker_notify.set.assert_called_once_with()
540 callback.wait.assert_called_once_with()
541
542@unittest.skipUnless("unit" in limits, "Unit skipped")
445class PortfolioTest(WebMockTestCase): 543class PortfolioTest(WebMockTestCase):
446 def setUp(self): 544 def setUp(self):
447 super(PortfolioTest, self).setUp() 545 super(PortfolioTest, self).setUp()
@@ -453,86 +551,131 @@ class PortfolioTest(WebMockTestCase):
453 551
454 @mock.patch.object(market.Portfolio, "parse_cryptoportfolio") 552 @mock.patch.object(market.Portfolio, "parse_cryptoportfolio")
455 def test_get_cryptoportfolio(self, parse_cryptoportfolio): 553 def test_get_cryptoportfolio(self, parse_cryptoportfolio):
456 self.wm.get(market.Portfolio.URL, [ 554 with self.subTest(parallel=False):
457 {"text":'{ "foo": "bar" }', "status_code": 200}, 555 self.wm.get(market.Portfolio.URL, [
458 {"text": "System Error", "status_code": 500}, 556 {"text":'{ "foo": "bar" }', "status_code": 200},
459 {"exc": requests.exceptions.ConnectTimeout}, 557 {"text": "System Error", "status_code": 500},
460 ]) 558 {"exc": requests.exceptions.ConnectTimeout},
461 market.Portfolio.get_cryptoportfolio() 559 ])
462 self.assertIn("foo", market.Portfolio.data) 560 market.Portfolio.get_cryptoportfolio()
463 self.assertEqual("bar", market.Portfolio.data["foo"]) 561 self.assertIn("foo", market.Portfolio.data.get())
464 self.assertTrue(self.wm.called) 562 self.assertEqual("bar", market.Portfolio.data.get()["foo"])
465 self.assertEqual(1, self.wm.call_count) 563 self.assertTrue(self.wm.called)
466 market.Portfolio.report.log_error.assert_not_called() 564 self.assertEqual(1, self.wm.call_count)
467 market.Portfolio.report.log_http_request.assert_called_once() 565 market.Portfolio.report.log_error.assert_not_called()
468 parse_cryptoportfolio.assert_called_once_with() 566 market.Portfolio.report.log_http_request.assert_called_once()
469 market.Portfolio.report.log_http_request.reset_mock() 567 parse_cryptoportfolio.assert_called_once_with()
470 parse_cryptoportfolio.reset_mock() 568 market.Portfolio.report.log_http_request.reset_mock()
471 market.Portfolio.data = None 569 parse_cryptoportfolio.reset_mock()
472 570 market.Portfolio.data = store.LockedVar(None)
473 market.Portfolio.get_cryptoportfolio() 571
474 self.assertIsNone(market.Portfolio.data) 572 market.Portfolio.get_cryptoportfolio()
475 self.assertEqual(2, self.wm.call_count) 573 self.assertIsNone(market.Portfolio.data.get())
476 parse_cryptoportfolio.assert_not_called() 574 self.assertEqual(2, self.wm.call_count)
477 market.Portfolio.report.log_error.assert_not_called() 575 parse_cryptoportfolio.assert_not_called()
478 market.Portfolio.report.log_http_request.assert_called_once() 576 market.Portfolio.report.log_error.assert_not_called()
479 market.Portfolio.report.log_http_request.reset_mock() 577 market.Portfolio.report.log_http_request.assert_called_once()
480 parse_cryptoportfolio.reset_mock() 578 market.Portfolio.report.log_http_request.reset_mock()
481 579 parse_cryptoportfolio.reset_mock()
482 market.Portfolio.data = "Foo" 580
483 market.Portfolio.get_cryptoportfolio() 581 market.Portfolio.data = store.LockedVar("Foo")
484 self.assertEqual(2, self.wm.call_count) 582 market.Portfolio.get_cryptoportfolio()
485 parse_cryptoportfolio.assert_not_called() 583 self.assertEqual(2, self.wm.call_count)
486 584 parse_cryptoportfolio.assert_not_called()
487 market.Portfolio.get_cryptoportfolio(refetch=True) 585
488 self.assertEqual("Foo", market.Portfolio.data) 586 market.Portfolio.get_cryptoportfolio(refetch=True)
489 self.assertEqual(3, self.wm.call_count) 587 self.assertEqual("Foo", market.Portfolio.data.get())
490 market.Portfolio.report.log_error.assert_called_once_with("get_cryptoportfolio", 588 self.assertEqual(3, self.wm.call_count)
491 exception=mock.ANY) 589 market.Portfolio.report.log_error.assert_called_once_with("get_cryptoportfolio",
492 market.Portfolio.report.log_http_request.assert_not_called() 590 exception=mock.ANY)
591 market.Portfolio.report.log_http_request.assert_not_called()
592 with self.subTest(parallel=True):
593 with mock.patch.object(market.Portfolio, "is_worker_thread") as is_worker,\
594 mock.patch.object(market.Portfolio, "notify_and_wait") as notify:
595 with self.subTest(worker=True):
596 market.Portfolio.data = store.LockedVar(None)
597 market.Portfolio.worker = mock.Mock()
598 is_worker.return_value = True
599 self.wm.get(market.Portfolio.URL, [
600 {"text":'{ "foo": "bar" }', "status_code": 200},
601 ])
602 market.Portfolio.get_cryptoportfolio()
603 self.assertIn("foo", market.Portfolio.data.get())
604 parse_cryptoportfolio.reset_mock()
605 with self.subTest(worker=False):
606 market.Portfolio.data = store.LockedVar(None)
607 market.Portfolio.worker = mock.Mock()
608 is_worker.return_value = False
609 market.Portfolio.get_cryptoportfolio()
610 notify.assert_called_once_with()
611 parse_cryptoportfolio.assert_not_called()
493 612
494 def test_parse_cryptoportfolio(self): 613 def test_parse_cryptoportfolio(self):
495 market.Portfolio.data = store.json.loads(self.json_response, parse_int=D, 614 with self.subTest(description="Normal case"):
496 parse_float=D) 615 market.Portfolio.data = store.LockedVar(store.json.loads(
497 market.Portfolio.parse_cryptoportfolio() 616 self.json_response, parse_int=D, parse_float=D))
498 617 market.Portfolio.parse_cryptoportfolio()
499 self.assertListEqual( 618
500 ["medium", "high"], 619 self.assertListEqual(
501 list(market.Portfolio.liquidities.keys())) 620 ["medium", "high"],
502 621 list(market.Portfolio.liquidities.get().keys()))
503 liquidities = market.Portfolio.liquidities 622
504 self.assertEqual(10, len(liquidities["medium"].keys())) 623 liquidities = market.Portfolio.liquidities.get()
505 self.assertEqual(10, len(liquidities["high"].keys())) 624 self.assertEqual(10, len(liquidities["medium"].keys()))
506 625 self.assertEqual(10, len(liquidities["high"].keys()))
507 expected = { 626
508 'BTC': (D("0.2857"), "long"), 627 expected = {
509 'DGB': (D("0.1015"), "long"), 628 'BTC': (D("0.2857"), "long"),
510 'DOGE': (D("0.1805"), "long"), 629 'DGB': (D("0.1015"), "long"),
511 'SC': (D("0.0623"), "long"), 630 'DOGE': (D("0.1805"), "long"),
512 'ZEC': (D("0.3701"), "long"), 631 'SC': (D("0.0623"), "long"),
513 } 632 'ZEC': (D("0.3701"), "long"),
514 date = portfolio.datetime(2018, 1, 8) 633 }
515 self.assertDictEqual(expected, liquidities["high"][date]) 634 date = portfolio.datetime(2018, 1, 8)
516 635 self.assertDictEqual(expected, liquidities["high"][date])
517 expected = { 636
518 'BTC': (D("1.1102e-16"), "long"), 637 expected = {
519 'ETC': (D("0.1"), "long"), 638 'BTC': (D("1.1102e-16"), "long"),
520 'FCT': (D("0.1"), "long"), 639 'ETC': (D("0.1"), "long"),
521 'GAS': (D("0.1"), "long"), 640 'FCT': (D("0.1"), "long"),
522 'NAV': (D("0.1"), "long"), 641 'GAS': (D("0.1"), "long"),
523 'OMG': (D("0.1"), "long"), 642 'NAV': (D("0.1"), "long"),
524 'OMNI': (D("0.1"), "long"), 643 'OMG': (D("0.1"), "long"),
525 'PPC': (D("0.1"), "long"), 644 'OMNI': (D("0.1"), "long"),
526 'RIC': (D("0.1"), "long"), 645 'PPC': (D("0.1"), "long"),
527 'VIA': (D("0.1"), "long"), 646 'RIC': (D("0.1"), "long"),
528 'XCP': (D("0.1"), "long"), 647 'VIA': (D("0.1"), "long"),
529 } 648 'XCP': (D("0.1"), "long"),
530 self.assertDictEqual(expected, liquidities["medium"][date]) 649 }
531 self.assertEqual(portfolio.datetime(2018, 1, 15), market.Portfolio.last_date) 650 self.assertDictEqual(expected, liquidities["medium"][date])
651 self.assertEqual(portfolio.datetime(2018, 1, 15), market.Portfolio.last_date.get())
652
653 with self.subTest(description="Missing weight"):
654 data = store.json.loads(self.json_response, parse_int=D, parse_float=D)
655 del(data["portfolio_2"]["weights"])
656 market.Portfolio.data = store.LockedVar(data)
657
658 market.Portfolio.parse_cryptoportfolio()
659 self.assertListEqual(
660 ["medium", "high"],
661 list(market.Portfolio.liquidities.get().keys()))
662 self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
663
664 with self.subTest(description="All missing weights"):
665 data = store.json.loads(self.json_response, parse_int=D, parse_float=D)
666 del(data["portfolio_1"]["weights"])
667 del(data["portfolio_2"]["weights"])
668 market.Portfolio.data = store.LockedVar(data)
669
670 market.Portfolio.parse_cryptoportfolio()
671 self.assertEqual({}, market.Portfolio.liquidities.get("medium"))
672 self.assertEqual({}, market.Portfolio.liquidities.get("high"))
673 self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get())
674
532 675
533 @mock.patch.object(market.Portfolio, "get_cryptoportfolio") 676 @mock.patch.object(market.Portfolio, "get_cryptoportfolio")
534 def test_repartition(self, get_cryptoportfolio): 677 def test_repartition(self, get_cryptoportfolio):
535 market.Portfolio.liquidities = { 678 market.Portfolio.liquidities = store.LockedVar({
536 "medium": { 679 "medium": {
537 "2018-03-01": "medium_2018-03-01", 680 "2018-03-01": "medium_2018-03-01",
538 "2018-03-08": "medium_2018-03-08", 681 "2018-03-08": "medium_2018-03-08",
@@ -541,8 +684,8 @@ class PortfolioTest(WebMockTestCase):
541 "2018-03-01": "high_2018-03-01", 684 "2018-03-01": "high_2018-03-01",
542 "2018-03-08": "high_2018-03-08", 685 "2018-03-08": "high_2018-03-08",
543 } 686 }
544 } 687 })
545 market.Portfolio.last_date = "2018-03-08" 688 market.Portfolio.last_date = store.LockedVar("2018-03-08")
546 689
547 self.assertEqual("medium_2018-03-08", market.Portfolio.repartition()) 690 self.assertEqual("medium_2018-03-08", market.Portfolio.repartition())
548 get_cryptoportfolio.assert_called_once_with() 691 get_cryptoportfolio.assert_called_once_with()
@@ -559,9 +702,9 @@ class PortfolioTest(WebMockTestCase):
559 else: 702 else:
560 self.assertFalse(refetch) 703 self.assertFalse(refetch)
561 self.call_count += 1 704 self.call_count += 1
562 market.Portfolio.last_date = store.datetime.now()\ 705 market.Portfolio.last_date = store.LockedVar(store.datetime.now()\
563 - store.timedelta(10)\ 706 - store.timedelta(10)\
564 + store.timedelta(self.call_count) 707 + store.timedelta(self.call_count))
565 get_cryptoportfolio.side_effect = _get 708 get_cryptoportfolio.side_effect = _get
566 709
567 market.Portfolio.wait_for_recent() 710 market.Portfolio.wait_for_recent()
@@ -572,7 +715,7 @@ class PortfolioTest(WebMockTestCase):
572 715
573 sleep.reset_mock() 716 sleep.reset_mock()
574 get_cryptoportfolio.reset_mock() 717 get_cryptoportfolio.reset_mock()
575 market.Portfolio.last_date = None 718 market.Portfolio.last_date = store.LockedVar(None)
576 self.call_count = 0 719 self.call_count = 0
577 market.Portfolio.wait_for_recent(delta=15) 720 market.Portfolio.wait_for_recent(delta=15)
578 sleep.assert_not_called() 721 sleep.assert_not_called()
@@ -580,13 +723,45 @@ class PortfolioTest(WebMockTestCase):
580 723
581 sleep.reset_mock() 724 sleep.reset_mock()
582 get_cryptoportfolio.reset_mock() 725 get_cryptoportfolio.reset_mock()
583 market.Portfolio.last_date = None 726 market.Portfolio.last_date = store.LockedVar(None)
584 self.call_count = 0 727 self.call_count = 0
585 market.Portfolio.wait_for_recent(delta=1) 728 market.Portfolio.wait_for_recent(delta=1)
586 sleep.assert_called_with(30) 729 sleep.assert_called_with(30)
587 self.assertEqual(9, sleep.call_count) 730 self.assertEqual(9, sleep.call_count)
588 self.assertEqual(10, get_cryptoportfolio.call_count) 731 self.assertEqual(10, get_cryptoportfolio.call_count)
589 732
733 def test_is_worker_thread(self):
734 with self.subTest(worker=None):
735 self.assertFalse(store.Portfolio.is_worker_thread())
736
737 with self.subTest(worker="not self"),\
738 mock.patch("threading.current_thread") as current_thread:
739 current = mock.Mock()
740 current_thread.return_value = current
741 store.Portfolio.worker = mock.Mock()
742 self.assertFalse(store.Portfolio.is_worker_thread())
743
744 with self.subTest(worker="self"),\
745 mock.patch("threading.current_thread") as current_thread:
746 current = mock.Mock()
747 current_thread.return_value = current
748 store.Portfolio.worker = current
749 self.assertTrue(store.Portfolio.is_worker_thread())
750
751 def test_start_worker(self):
752 with mock.patch.object(store.Portfolio, "wait_for_notification") as notification:
753 store.Portfolio.start_worker()
754 notification.assert_called_once_with(poll=30)
755
756 self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__)
757 self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__)
758 store.Portfolio.report.start_lock.assert_called_once_with()
759
760 self.assertIsNotNone(store.Portfolio.worker)
761 self.assertIsNotNone(store.Portfolio.worker_notify)
762 self.assertIsNotNone(store.Portfolio.callback)
763 self.assertTrue(store.Portfolio.worker_started)
764
590@unittest.skipUnless("unit" in limits, "Unit skipped") 765@unittest.skipUnless("unit" in limits, "Unit skipped")
591class AmountTest(WebMockTestCase): 766class AmountTest(WebMockTestCase):
592 def test_values(self): 767 def test_values(self):
@@ -3362,31 +3537,64 @@ class MainTest(WebMockTestCase):
3362 self.assertEqual("Exception: boo\n", stdout_mock.getvalue()) 3537 self.assertEqual("Exception: boo\n", stdout_mock.getvalue())
3363 3538
3364 def test_main(self): 3539 def test_main(self):
3365 with mock.patch("main.parse_args") as parse_args,\ 3540 with self.subTest(parallel=False):
3366 mock.patch("main.parse_config") as parse_config,\ 3541 with mock.patch("main.parse_args") as parse_args,\
3367 mock.patch("main.fetch_markets") as fetch_markets,\ 3542 mock.patch("main.parse_config") as parse_config,\
3368 mock.patch("main.process") as process: 3543 mock.patch("main.fetch_markets") as fetch_markets,\
3544 mock.patch("main.process") as process:
3369 3545
3370 args_mock = mock.Mock() 3546 args_mock = mock.Mock()
3371 args_mock.config = "config" 3547 args_mock.parallel = False
3372 args_mock.user = "user" 3548 args_mock.config = "config"
3373 parse_args.return_value = args_mock 3549 args_mock.user = "user"
3550 parse_args.return_value = args_mock
3374 3551
3375 parse_config.return_value = ["pg_config", "report_path"] 3552 parse_config.return_value = ["pg_config", "report_path"]
3376 3553
3377 fetch_markets.return_value = [["config1", 1], ["config2", 2]] 3554 fetch_markets.return_value = [["config1", 1], ["config2", 2]]
3378 3555
3379 main.main(["Foo", "Bar"]) 3556 main.main(["Foo", "Bar"])
3380 3557
3381 parse_args.assert_called_with(["Foo", "Bar"]) 3558 parse_args.assert_called_with(["Foo", "Bar"])
3382 parse_config.assert_called_with("config") 3559 parse_config.assert_called_with("config")
3383 fetch_markets.assert_called_with("pg_config", "user") 3560 fetch_markets.assert_called_with("pg_config", "user")
3384 3561
3385 self.assertEqual(2, process.call_count) 3562 self.assertEqual(2, process.call_count)
3386 process.assert_has_calls([ 3563 process.assert_has_calls([
3387 mock.call("config1", 1, "report_path", args_mock), 3564 mock.call("config1", 1, "report_path", args_mock),
3388 mock.call("config2", 2, "report_path", args_mock), 3565 mock.call("config2", 2, "report_path", args_mock),
3389 ]) 3566 ])
3567 with self.subTest(parallel=True):
3568 with mock.patch("main.parse_args") as parse_args,\
3569 mock.patch("main.parse_config") as parse_config,\
3570 mock.patch("main.fetch_markets") as fetch_markets,\
3571 mock.patch("main.process") as process,\
3572 mock.patch("store.Portfolio.start_worker") as start:
3573
3574 args_mock = mock.Mock()
3575 args_mock.parallel = True
3576 args_mock.config = "config"
3577 args_mock.user = "user"
3578 parse_args.return_value = args_mock
3579
3580 parse_config.return_value = ["pg_config", "report_path"]
3581
3582 fetch_markets.return_value = [["config1", 1], ["config2", 2]]
3583
3584 main.main(["Foo", "Bar"])
3585
3586 parse_args.assert_called_with(["Foo", "Bar"])
3587 parse_config.assert_called_with("config")
3588 fetch_markets.assert_called_with("pg_config", "user")
3589
3590 start.assert_called_once_with()
3591 self.assertEqual(2, process.call_count)
3592 process.assert_has_calls([
3593 mock.call.__bool__(),
3594 mock.call("config1", 1, "report_path", args_mock),
3595 mock.call.__bool__(),
3596 mock.call("config2", 2, "report_path", args_mock),
3597 ])
3390 3598
3391 @mock.patch.object(main.sys, "exit") 3599 @mock.patch.object(main.sys, "exit")
3392 @mock.patch("main.configparser") 3600 @mock.patch("main.configparser")
@@ -3551,7 +3759,7 @@ class ProcessorTest(WebMockTestCase):
3551 3759
3552 method, arguments = processor.method_arguments("wait_for_recent") 3760 method, arguments = processor.method_arguments("wait_for_recent")
3553 self.assertEqual(market.Portfolio.wait_for_recent, method) 3761 self.assertEqual(market.Portfolio.wait_for_recent, method)
3554 self.assertEqual(["delta"], arguments) 3762 self.assertEqual(["delta", "poll"], arguments)
3555 3763
3556 method, arguments = processor.method_arguments("prepare_trades") 3764 method, arguments = processor.method_arguments("prepare_trades")
3557 self.assertEqual(m.prepare_trades, method) 3765 self.assertEqual(m.prepare_trades, method)