diff options
author | Ismaël Bouya <ismael.bouya@normalesup.org> | 2018-03-09 15:37:10 +0100 |
---|---|---|
committer | Ismaël Bouya <ismael.bouya@normalesup.org> | 2018-03-09 19:12:57 +0100 |
commit | dc1ca9a306f09886c6c57f8d426c59a9d084b2b3 (patch) | |
tree | d1a94be893451a4e182f3e75e9afb01749172bb4 | |
parent | 81d1db5117b61c2bb9f114cfc59f015e97c6eb9b (diff) | |
download | Trader-dc1ca9a306f09886c6c57f8d426c59a9d084b2b3.tar.gz Trader-dc1ca9a306f09886c6c57f8d426c59a9d084b2b3.tar.zst Trader-dc1ca9a306f09886c6c57f8d426c59a9d084b2b3.zip |
Add parallelization
-rw-r--r-- | main.py | 13 | ||||
-rw-r--r-- | store.py | 128 | ||||
-rw-r--r-- | test.py | 418 |
3 files changed, 431 insertions, 128 deletions
@@ -117,6 +117,8 @@ def parse_args(argv): | |||
117 | parser.add_argument("--action", | 117 | parser.add_argument("--action", |
118 | action='append', | 118 | action='append', |
119 | help="Do a different action than trading (add several times to chain)") | 119 | help="Do a different action than trading (add several times to chain)") |
120 | parser.add_argument("--parallel", action='store_true', default=True, dest="parallel") | ||
121 | parser.add_argument("--no-parallel", action='store_false', dest="parallel") | ||
120 | 122 | ||
121 | args = parser.parse_args(argv) | 123 | args = parser.parse_args(argv) |
122 | 124 | ||
@@ -139,8 +141,15 @@ def main(argv): | |||
139 | 141 | ||
140 | pg_config, report_path = parse_config(args.config) | 142 | pg_config, report_path = parse_config(args.config) |
141 | 143 | ||
142 | for market_config, user_id in fetch_markets(pg_config, args.user): | 144 | if args.parallel: |
143 | process(market_config, user_id, report_path, args) | 145 | import threading |
146 | market.Portfolio.start_worker() | ||
147 | |||
148 | for market_config, user_id in fetch_markets(pg_config, args.user): | ||
149 | threading.Thread(target=process, args=[market_config, user_id, report_path, args]).start() | ||
150 | else: | ||
151 | for market_config, user_id in fetch_markets(pg_config, args.user): | ||
152 | process(market_config, user_id, report_path, args) | ||
144 | 153 | ||
145 | if __name__ == '__main__': # pragma: no cover | 154 | if __name__ == '__main__': # pragma: no cover |
146 | main(sys.argv[1:]) | 155 | main(sys.argv[1:]) |
@@ -309,30 +309,110 @@ class TradeStore: | |||
309 | for order in self.all_orders(state="open"): | 309 | for order in self.all_orders(state="open"): |
310 | order.get_status() | 310 | order.get_status() |
311 | 311 | ||
312 | class NoopLock: | ||
313 | def __enter__(self, *args): | ||
314 | pass | ||
315 | def __exit__(self, *args): | ||
316 | pass | ||
317 | |||
318 | class LockedVar: | ||
319 | def __init__(self, value): | ||
320 | self.lock = NoopLock() | ||
321 | self.val = value | ||
322 | |||
323 | def start_lock(self): | ||
324 | import threading | ||
325 | self.lock = threading.Lock() | ||
326 | |||
327 | def set(self, value): | ||
328 | with self.lock: | ||
329 | self.val = value | ||
330 | |||
331 | def get(self, key=None): | ||
332 | with self.lock: | ||
333 | if key is not None and isinstance(self.val, dict): | ||
334 | return self.val.get(key) | ||
335 | else: | ||
336 | return self.val | ||
337 | |||
338 | def __getattr__(self, key): | ||
339 | with self.lock: | ||
340 | return getattr(self.val, key) | ||
341 | |||
312 | class Portfolio: | 342 | class Portfolio: |
313 | URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json" | 343 | URL = "https://cryptoportfolio.io/wp-content/uploads/portfolio/json/cryptoportfolio.json" |
314 | liquidities = {} | 344 | data = LockedVar(None) |
315 | data = None | 345 | liquidities = LockedVar({}) |
316 | last_date = None | 346 | last_date = LockedVar(None) |
317 | report = ReportStore(None) | 347 | report = LockedVar(ReportStore(None)) |
348 | worker = None | ||
349 | worker_started = False | ||
350 | worker_notify = None | ||
351 | callback = None | ||
352 | |||
353 | @classmethod | ||
354 | def start_worker(cls, poll=30): | ||
355 | import threading | ||
356 | |||
357 | cls.worker = threading.Thread(name="portfolio", daemon=True, | ||
358 | target=cls.wait_for_notification, kwargs={"poll": poll}) | ||
359 | cls.worker_notify = threading.Event() | ||
360 | cls.callback = threading.Event() | ||
361 | |||
362 | cls.last_date.start_lock() | ||
363 | cls.liquidities.start_lock() | ||
364 | cls.report.start_lock() | ||
365 | |||
366 | cls.worker_started = True | ||
367 | cls.worker.start() | ||
368 | |||
369 | @classmethod | ||
370 | def is_worker_thread(cls): | ||
371 | if cls.worker is None: | ||
372 | return False | ||
373 | else: | ||
374 | import threading | ||
375 | return cls.worker == threading.current_thread() | ||
376 | |||
377 | @classmethod | ||
378 | def wait_for_notification(cls, poll=30): | ||
379 | if not cls.is_worker_thread(): | ||
380 | raise RuntimeError("This method needs to be ran with the worker") | ||
381 | while cls.worker_started: | ||
382 | cls.worker_notify.wait() | ||
383 | cls.worker_notify.clear() | ||
384 | cls.report.print_log("Fetching cryptoportfolio") | ||
385 | cls.get_cryptoportfolio(refetch=True) | ||
386 | cls.callback.set() | ||
387 | time.sleep(poll) | ||
318 | 388 | ||
319 | @classmethod | 389 | @classmethod |
320 | def wait_for_recent(cls, delta=4): | 390 | def notify_and_wait(cls): |
391 | cls.callback.clear() | ||
392 | cls.worker_notify.set() | ||
393 | cls.callback.wait() | ||
394 | |||
395 | @classmethod | ||
396 | def wait_for_recent(cls, delta=4, poll=30): | ||
321 | cls.get_cryptoportfolio() | 397 | cls.get_cryptoportfolio() |
322 | while cls.last_date is None or datetime.now() - cls.last_date > timedelta(delta): | 398 | while cls.last_date.get() is None or datetime.now() - cls.last_date.get() > timedelta(delta): |
323 | time.sleep(30) | 399 | if cls.worker is None: |
324 | cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") | 400 | time.sleep(poll) |
401 | cls.report.print_log("Attempt to fetch up-to-date cryptoportfolio") | ||
325 | cls.get_cryptoportfolio(refetch=True) | 402 | cls.get_cryptoportfolio(refetch=True) |
326 | 403 | ||
327 | @classmethod | 404 | @classmethod |
328 | def repartition(cls, liquidity="medium"): | 405 | def repartition(cls, liquidity="medium"): |
329 | cls.get_cryptoportfolio() | 406 | cls.get_cryptoportfolio() |
330 | liquidities = cls.liquidities[liquidity] | 407 | liquidities = cls.liquidities.get(liquidity) |
331 | return liquidities[cls.last_date] | 408 | return liquidities[cls.last_date.get()] |
332 | 409 | ||
333 | @classmethod | 410 | @classmethod |
334 | def get_cryptoportfolio(cls, refetch=False): | 411 | def get_cryptoportfolio(cls, refetch=False): |
335 | if cls.data is not None and not refetch: | 412 | if cls.data.get() is not None and not refetch: |
413 | return | ||
414 | if cls.worker is not None and not cls.is_worker_thread(): | ||
415 | cls.notify_and_wait() | ||
336 | return | 416 | return |
337 | try: | 417 | try: |
338 | r = requests.get(cls.URL) | 418 | r = requests.get(cls.URL) |
@@ -342,11 +422,12 @@ class Portfolio: | |||
342 | cls.report.log_error("get_cryptoportfolio", exception=e) | 422 | cls.report.log_error("get_cryptoportfolio", exception=e) |
343 | return | 423 | return |
344 | try: | 424 | try: |
345 | cls.data = r.json(parse_int=D, parse_float=D) | 425 | cls.data.set(r.json(parse_int=D, parse_float=D)) |
346 | cls.parse_cryptoportfolio() | 426 | cls.parse_cryptoportfolio() |
347 | except (JSONDecodeError, SimpleJSONDecodeError): | 427 | except (JSONDecodeError, SimpleJSONDecodeError): |
348 | cls.data = None | 428 | cls.data.set(None) |
349 | cls.liquidities = {} | 429 | cls.last_date.set(None) |
430 | cls.liquidities.set({}) | ||
350 | 431 | ||
351 | @classmethod | 432 | @classmethod |
352 | def parse_cryptoportfolio(cls): | 433 | def parse_cryptoportfolio(cls): |
@@ -366,6 +447,8 @@ class Portfolio: | |||
366 | return clean_weights_ | 447 | return clean_weights_ |
367 | 448 | ||
368 | def parse_weights(portfolio_hash): | 449 | def parse_weights(portfolio_hash): |
450 | if "weights" not in portfolio_hash: | ||
451 | return {} | ||
369 | weights_hash = portfolio_hash["weights"] | 452 | weights_hash = portfolio_hash["weights"] |
370 | weights = {} | 453 | weights = {} |
371 | for i in range(len(weights_hash["_row"])): | 454 | for i in range(len(weights_hash["_row"])): |
@@ -375,13 +458,16 @@ class Portfolio: | |||
375 | map(clean_weights(i), weights_hash.items()))) | 458 | map(clean_weights(i), weights_hash.items()))) |
376 | return weights | 459 | return weights |
377 | 460 | ||
378 | high_liquidity = parse_weights(cls.data["portfolio_1"]) | 461 | high_liquidity = parse_weights(cls.data.get("portfolio_1")) |
379 | medium_liquidity = parse_weights(cls.data["portfolio_2"]) | 462 | medium_liquidity = parse_weights(cls.data.get("portfolio_2")) |
380 | 463 | ||
381 | cls.liquidities = { | 464 | cls.liquidities.set({ |
382 | "medium": medium_liquidity, | 465 | "medium": medium_liquidity, |
383 | "high": high_liquidity, | 466 | "high": high_liquidity, |
384 | } | 467 | }) |
385 | cls.last_date = max(max(medium_liquidity.keys()), max(high_liquidity.keys())) | 468 | cls.last_date.set(max( |
469 | max(medium_liquidity.keys(), default=datetime(1, 1, 1)), | ||
470 | max(high_liquidity.keys(), default=datetime(1, 1, 1)) | ||
471 | )) | ||
386 | 472 | ||
387 | 473 | ||
@@ -7,6 +7,7 @@ from unittest import mock | |||
7 | import requests | 7 | import requests |
8 | import requests_mock | 8 | import requests_mock |
9 | from io import StringIO | 9 | from io import StringIO |
10 | import threading | ||
10 | import portfolio, market, main, store | 11 | import portfolio, market, main, store |
11 | 12 | ||
12 | limits = ["acceptance", "unit"] | 13 | limits = ["acceptance", "unit"] |
@@ -33,10 +34,14 @@ class WebMockTestCase(unittest.TestCase): | |||
33 | 34 | ||
34 | self.patchers = [ | 35 | self.patchers = [ |
35 | mock.patch.multiple(market.Portfolio, | 36 | mock.patch.multiple(market.Portfolio, |
36 | last_date=None, | 37 | data=store.LockedVar(None), |
37 | data=None, | 38 | liquidities=store.LockedVar({}), |
38 | liquidities={}, | 39 | last_date=store.LockedVar(None), |
39 | report=mock.Mock()), | 40 | report=mock.Mock(), |
41 | worker=None, | ||
42 | worker_notify=None, | ||
43 | worker_started=False, | ||
44 | callback=None), | ||
40 | mock.patch.multiple(portfolio.Computation, | 45 | mock.patch.multiple(portfolio.Computation, |
41 | computations=portfolio.Computation.computations), | 46 | computations=portfolio.Computation.computations), |
42 | ] | 47 | ] |
@@ -442,6 +447,99 @@ class poloniexETest(unittest.TestCase): | |||
442 | create_order.assert_called_once_with("symbol", "type", "side", "amount", price="price", params="params") | 447 | create_order.assert_called_once_with("symbol", "type", "side", "amount", price="price", params="params") |
443 | 448 | ||
444 | @unittest.skipUnless("unit" in limits, "Unit skipped") | 449 | @unittest.skipUnless("unit" in limits, "Unit skipped") |
450 | class NoopLockTest(unittest.TestCase): | ||
451 | def test_with(self): | ||
452 | noop_lock = store.NoopLock() | ||
453 | with noop_lock: | ||
454 | self.assertTrue(True) | ||
455 | |||
456 | @unittest.skipUnless("unit" in limits, "Unit skipped") | ||
457 | class LockedVar(unittest.TestCase): | ||
458 | |||
459 | def test_values(self): | ||
460 | locked_var = store.LockedVar("Foo") | ||
461 | self.assertIsInstance(locked_var.lock, store.NoopLock) | ||
462 | self.assertEqual("Foo", locked_var.val) | ||
463 | |||
464 | def test_get(self): | ||
465 | with self.subTest(desc="Normal case"): | ||
466 | locked_var = store.LockedVar("Foo") | ||
467 | self.assertEqual("Foo", locked_var.get()) | ||
468 | with self.subTest(desc="Dict"): | ||
469 | locked_var = store.LockedVar({"foo": "bar"}) | ||
470 | self.assertEqual({"foo": "bar"}, locked_var.get()) | ||
471 | self.assertEqual("bar", locked_var.get("foo")) | ||
472 | self.assertIsNone(locked_var.get("other")) | ||
473 | |||
474 | def test_set(self): | ||
475 | locked_var = store.LockedVar("Foo") | ||
476 | locked_var.set("Bar") | ||
477 | self.assertEqual("Bar", locked_var.get()) | ||
478 | |||
479 | def test__getattr(self): | ||
480 | dummy = type('Dummy', (object,), {})() | ||
481 | dummy.attribute = "Hey" | ||
482 | |||
483 | locked_var = store.LockedVar(dummy) | ||
484 | self.assertEqual("Hey", locked_var.attribute) | ||
485 | with self.assertRaises(AttributeError): | ||
486 | locked_var.other | ||
487 | |||
488 | def test_start_lock(self): | ||
489 | locked_var = store.LockedVar("Foo") | ||
490 | locked_var.start_lock() | ||
491 | self.assertEqual("lock", locked_var.lock.__class__.__name__) | ||
492 | |||
493 | thread1 = threading.Thread(target=locked_var.set, args=["Bar1"]) | ||
494 | thread2 = threading.Thread(target=locked_var.set, args=["Bar2"]) | ||
495 | thread3 = threading.Thread(target=locked_var.set, args=["Bar3"]) | ||
496 | |||
497 | with locked_var.lock: | ||
498 | thread1.start() | ||
499 | thread2.start() | ||
500 | thread3.start() | ||
501 | |||
502 | self.assertEqual("Foo", locked_var.val) | ||
503 | thread1.join() | ||
504 | thread2.join() | ||
505 | thread3.join() | ||
506 | self.assertEqual("Bar", locked_var.get()[0:3]) | ||
507 | |||
508 | def test_wait_for_notification(self): | ||
509 | with self.assertRaises(RuntimeError): | ||
510 | store.Portfolio.wait_for_notification() | ||
511 | |||
512 | with mock.patch.object(store.Portfolio, "get_cryptoportfolio") as get,\ | ||
513 | mock.patch.object(store.Portfolio, "report") as report,\ | ||
514 | mock.patch.object(store.time, "sleep") as sleep: | ||
515 | store.Portfolio.start_worker(poll=3) | ||
516 | |||
517 | store.Portfolio.worker_notify.set() | ||
518 | |||
519 | store.Portfolio.callback.wait() | ||
520 | |||
521 | report.print_log.assert_called_once_with("Fetching cryptoportfolio") | ||
522 | get.assert_called_once_with(refetch=True) | ||
523 | sleep.assert_called_once_with(3) | ||
524 | self.assertFalse(store.Portfolio.worker_notify.is_set()) | ||
525 | self.assertTrue(store.Portfolio.worker.is_alive()) | ||
526 | |||
527 | store.Portfolio.callback.clear() | ||
528 | store.Portfolio.worker_started = False | ||
529 | store.Portfolio.worker_notify.set() | ||
530 | store.Portfolio.callback.wait() | ||
531 | |||
532 | self.assertFalse(store.Portfolio.worker.is_alive()) | ||
533 | |||
534 | def test_notify_and_wait(self): | ||
535 | with mock.patch.object(store.Portfolio, "callback") as callback,\ | ||
536 | mock.patch.object(store.Portfolio, "worker_notify") as worker_notify: | ||
537 | store.Portfolio.notify_and_wait() | ||
538 | callback.clear.assert_called_once_with() | ||
539 | worker_notify.set.assert_called_once_with() | ||
540 | callback.wait.assert_called_once_with() | ||
541 | |||
542 | @unittest.skipUnless("unit" in limits, "Unit skipped") | ||
445 | class PortfolioTest(WebMockTestCase): | 543 | class PortfolioTest(WebMockTestCase): |
446 | def setUp(self): | 544 | def setUp(self): |
447 | super(PortfolioTest, self).setUp() | 545 | super(PortfolioTest, self).setUp() |
@@ -453,86 +551,131 @@ class PortfolioTest(WebMockTestCase): | |||
453 | 551 | ||
454 | @mock.patch.object(market.Portfolio, "parse_cryptoportfolio") | 552 | @mock.patch.object(market.Portfolio, "parse_cryptoportfolio") |
455 | def test_get_cryptoportfolio(self, parse_cryptoportfolio): | 553 | def test_get_cryptoportfolio(self, parse_cryptoportfolio): |
456 | self.wm.get(market.Portfolio.URL, [ | 554 | with self.subTest(parallel=False): |
457 | {"text":'{ "foo": "bar" }', "status_code": 200}, | 555 | self.wm.get(market.Portfolio.URL, [ |
458 | {"text": "System Error", "status_code": 500}, | 556 | {"text":'{ "foo": "bar" }', "status_code": 200}, |
459 | {"exc": requests.exceptions.ConnectTimeout}, | 557 | {"text": "System Error", "status_code": 500}, |
460 | ]) | 558 | {"exc": requests.exceptions.ConnectTimeout}, |
461 | market.Portfolio.get_cryptoportfolio() | 559 | ]) |
462 | self.assertIn("foo", market.Portfolio.data) | 560 | market.Portfolio.get_cryptoportfolio() |
463 | self.assertEqual("bar", market.Portfolio.data["foo"]) | 561 | self.assertIn("foo", market.Portfolio.data.get()) |
464 | self.assertTrue(self.wm.called) | 562 | self.assertEqual("bar", market.Portfolio.data.get()["foo"]) |
465 | self.assertEqual(1, self.wm.call_count) | 563 | self.assertTrue(self.wm.called) |
466 | market.Portfolio.report.log_error.assert_not_called() | 564 | self.assertEqual(1, self.wm.call_count) |
467 | market.Portfolio.report.log_http_request.assert_called_once() | 565 | market.Portfolio.report.log_error.assert_not_called() |
468 | parse_cryptoportfolio.assert_called_once_with() | 566 | market.Portfolio.report.log_http_request.assert_called_once() |
469 | market.Portfolio.report.log_http_request.reset_mock() | 567 | parse_cryptoportfolio.assert_called_once_with() |
470 | parse_cryptoportfolio.reset_mock() | 568 | market.Portfolio.report.log_http_request.reset_mock() |
471 | market.Portfolio.data = None | 569 | parse_cryptoportfolio.reset_mock() |
472 | 570 | market.Portfolio.data = store.LockedVar(None) | |
473 | market.Portfolio.get_cryptoportfolio() | 571 | |
474 | self.assertIsNone(market.Portfolio.data) | 572 | market.Portfolio.get_cryptoportfolio() |
475 | self.assertEqual(2, self.wm.call_count) | 573 | self.assertIsNone(market.Portfolio.data.get()) |
476 | parse_cryptoportfolio.assert_not_called() | 574 | self.assertEqual(2, self.wm.call_count) |
477 | market.Portfolio.report.log_error.assert_not_called() | 575 | parse_cryptoportfolio.assert_not_called() |
478 | market.Portfolio.report.log_http_request.assert_called_once() | 576 | market.Portfolio.report.log_error.assert_not_called() |
479 | market.Portfolio.report.log_http_request.reset_mock() | 577 | market.Portfolio.report.log_http_request.assert_called_once() |
480 | parse_cryptoportfolio.reset_mock() | 578 | market.Portfolio.report.log_http_request.reset_mock() |
481 | 579 | parse_cryptoportfolio.reset_mock() | |
482 | market.Portfolio.data = "Foo" | 580 | |
483 | market.Portfolio.get_cryptoportfolio() | 581 | market.Portfolio.data = store.LockedVar("Foo") |
484 | self.assertEqual(2, self.wm.call_count) | 582 | market.Portfolio.get_cryptoportfolio() |
485 | parse_cryptoportfolio.assert_not_called() | 583 | self.assertEqual(2, self.wm.call_count) |
486 | 584 | parse_cryptoportfolio.assert_not_called() | |
487 | market.Portfolio.get_cryptoportfolio(refetch=True) | 585 | |
488 | self.assertEqual("Foo", market.Portfolio.data) | 586 | market.Portfolio.get_cryptoportfolio(refetch=True) |
489 | self.assertEqual(3, self.wm.call_count) | 587 | self.assertEqual("Foo", market.Portfolio.data.get()) |
490 | market.Portfolio.report.log_error.assert_called_once_with("get_cryptoportfolio", | 588 | self.assertEqual(3, self.wm.call_count) |
491 | exception=mock.ANY) | 589 | market.Portfolio.report.log_error.assert_called_once_with("get_cryptoportfolio", |
492 | market.Portfolio.report.log_http_request.assert_not_called() | 590 | exception=mock.ANY) |
591 | market.Portfolio.report.log_http_request.assert_not_called() | ||
592 | with self.subTest(parallel=True): | ||
593 | with mock.patch.object(market.Portfolio, "is_worker_thread") as is_worker,\ | ||
594 | mock.patch.object(market.Portfolio, "notify_and_wait") as notify: | ||
595 | with self.subTest(worker=True): | ||
596 | market.Portfolio.data = store.LockedVar(None) | ||
597 | market.Portfolio.worker = mock.Mock() | ||
598 | is_worker.return_value = True | ||
599 | self.wm.get(market.Portfolio.URL, [ | ||
600 | {"text":'{ "foo": "bar" }', "status_code": 200}, | ||
601 | ]) | ||
602 | market.Portfolio.get_cryptoportfolio() | ||
603 | self.assertIn("foo", market.Portfolio.data.get()) | ||
604 | parse_cryptoportfolio.reset_mock() | ||
605 | with self.subTest(worker=False): | ||
606 | market.Portfolio.data = store.LockedVar(None) | ||
607 | market.Portfolio.worker = mock.Mock() | ||
608 | is_worker.return_value = False | ||
609 | market.Portfolio.get_cryptoportfolio() | ||
610 | notify.assert_called_once_with() | ||
611 | parse_cryptoportfolio.assert_not_called() | ||
493 | 612 | ||
494 | def test_parse_cryptoportfolio(self): | 613 | def test_parse_cryptoportfolio(self): |
495 | market.Portfolio.data = store.json.loads(self.json_response, parse_int=D, | 614 | with self.subTest(description="Normal case"): |
496 | parse_float=D) | 615 | market.Portfolio.data = store.LockedVar(store.json.loads( |
497 | market.Portfolio.parse_cryptoportfolio() | 616 | self.json_response, parse_int=D, parse_float=D)) |
498 | 617 | market.Portfolio.parse_cryptoportfolio() | |
499 | self.assertListEqual( | 618 | |
500 | ["medium", "high"], | 619 | self.assertListEqual( |
501 | list(market.Portfolio.liquidities.keys())) | 620 | ["medium", "high"], |
502 | 621 | list(market.Portfolio.liquidities.get().keys())) | |
503 | liquidities = market.Portfolio.liquidities | 622 | |
504 | self.assertEqual(10, len(liquidities["medium"].keys())) | 623 | liquidities = market.Portfolio.liquidities.get() |
505 | self.assertEqual(10, len(liquidities["high"].keys())) | 624 | self.assertEqual(10, len(liquidities["medium"].keys())) |
506 | 625 | self.assertEqual(10, len(liquidities["high"].keys())) | |
507 | expected = { | 626 | |
508 | 'BTC': (D("0.2857"), "long"), | 627 | expected = { |
509 | 'DGB': (D("0.1015"), "long"), | 628 | 'BTC': (D("0.2857"), "long"), |
510 | 'DOGE': (D("0.1805"), "long"), | 629 | 'DGB': (D("0.1015"), "long"), |
511 | 'SC': (D("0.0623"), "long"), | 630 | 'DOGE': (D("0.1805"), "long"), |
512 | 'ZEC': (D("0.3701"), "long"), | 631 | 'SC': (D("0.0623"), "long"), |
513 | } | 632 | 'ZEC': (D("0.3701"), "long"), |
514 | date = portfolio.datetime(2018, 1, 8) | 633 | } |
515 | self.assertDictEqual(expected, liquidities["high"][date]) | 634 | date = portfolio.datetime(2018, 1, 8) |
516 | 635 | self.assertDictEqual(expected, liquidities["high"][date]) | |
517 | expected = { | 636 | |
518 | 'BTC': (D("1.1102e-16"), "long"), | 637 | expected = { |
519 | 'ETC': (D("0.1"), "long"), | 638 | 'BTC': (D("1.1102e-16"), "long"), |
520 | 'FCT': (D("0.1"), "long"), | 639 | 'ETC': (D("0.1"), "long"), |
521 | 'GAS': (D("0.1"), "long"), | 640 | 'FCT': (D("0.1"), "long"), |
522 | 'NAV': (D("0.1"), "long"), | 641 | 'GAS': (D("0.1"), "long"), |
523 | 'OMG': (D("0.1"), "long"), | 642 | 'NAV': (D("0.1"), "long"), |
524 | 'OMNI': (D("0.1"), "long"), | 643 | 'OMG': (D("0.1"), "long"), |
525 | 'PPC': (D("0.1"), "long"), | 644 | 'OMNI': (D("0.1"), "long"), |
526 | 'RIC': (D("0.1"), "long"), | 645 | 'PPC': (D("0.1"), "long"), |
527 | 'VIA': (D("0.1"), "long"), | 646 | 'RIC': (D("0.1"), "long"), |
528 | 'XCP': (D("0.1"), "long"), | 647 | 'VIA': (D("0.1"), "long"), |
529 | } | 648 | 'XCP': (D("0.1"), "long"), |
530 | self.assertDictEqual(expected, liquidities["medium"][date]) | 649 | } |
531 | self.assertEqual(portfolio.datetime(2018, 1, 15), market.Portfolio.last_date) | 650 | self.assertDictEqual(expected, liquidities["medium"][date]) |
651 | self.assertEqual(portfolio.datetime(2018, 1, 15), market.Portfolio.last_date.get()) | ||
652 | |||
653 | with self.subTest(description="Missing weight"): | ||
654 | data = store.json.loads(self.json_response, parse_int=D, parse_float=D) | ||
655 | del(data["portfolio_2"]["weights"]) | ||
656 | market.Portfolio.data = store.LockedVar(data) | ||
657 | |||
658 | market.Portfolio.parse_cryptoportfolio() | ||
659 | self.assertListEqual( | ||
660 | ["medium", "high"], | ||
661 | list(market.Portfolio.liquidities.get().keys())) | ||
662 | self.assertEqual({}, market.Portfolio.liquidities.get("medium")) | ||
663 | |||
664 | with self.subTest(description="All missing weights"): | ||
665 | data = store.json.loads(self.json_response, parse_int=D, parse_float=D) | ||
666 | del(data["portfolio_1"]["weights"]) | ||
667 | del(data["portfolio_2"]["weights"]) | ||
668 | market.Portfolio.data = store.LockedVar(data) | ||
669 | |||
670 | market.Portfolio.parse_cryptoportfolio() | ||
671 | self.assertEqual({}, market.Portfolio.liquidities.get("medium")) | ||
672 | self.assertEqual({}, market.Portfolio.liquidities.get("high")) | ||
673 | self.assertEqual(datetime.datetime(1,1,1), market.Portfolio.last_date.get()) | ||
674 | |||
532 | 675 | ||
533 | @mock.patch.object(market.Portfolio, "get_cryptoportfolio") | 676 | @mock.patch.object(market.Portfolio, "get_cryptoportfolio") |
534 | def test_repartition(self, get_cryptoportfolio): | 677 | def test_repartition(self, get_cryptoportfolio): |
535 | market.Portfolio.liquidities = { | 678 | market.Portfolio.liquidities = store.LockedVar({ |
536 | "medium": { | 679 | "medium": { |
537 | "2018-03-01": "medium_2018-03-01", | 680 | "2018-03-01": "medium_2018-03-01", |
538 | "2018-03-08": "medium_2018-03-08", | 681 | "2018-03-08": "medium_2018-03-08", |
@@ -541,8 +684,8 @@ class PortfolioTest(WebMockTestCase): | |||
541 | "2018-03-01": "high_2018-03-01", | 684 | "2018-03-01": "high_2018-03-01", |
542 | "2018-03-08": "high_2018-03-08", | 685 | "2018-03-08": "high_2018-03-08", |
543 | } | 686 | } |
544 | } | 687 | }) |
545 | market.Portfolio.last_date = "2018-03-08" | 688 | market.Portfolio.last_date = store.LockedVar("2018-03-08") |
546 | 689 | ||
547 | self.assertEqual("medium_2018-03-08", market.Portfolio.repartition()) | 690 | self.assertEqual("medium_2018-03-08", market.Portfolio.repartition()) |
548 | get_cryptoportfolio.assert_called_once_with() | 691 | get_cryptoportfolio.assert_called_once_with() |
@@ -559,9 +702,9 @@ class PortfolioTest(WebMockTestCase): | |||
559 | else: | 702 | else: |
560 | self.assertFalse(refetch) | 703 | self.assertFalse(refetch) |
561 | self.call_count += 1 | 704 | self.call_count += 1 |
562 | market.Portfolio.last_date = store.datetime.now()\ | 705 | market.Portfolio.last_date = store.LockedVar(store.datetime.now()\ |
563 | - store.timedelta(10)\ | 706 | - store.timedelta(10)\ |
564 | + store.timedelta(self.call_count) | 707 | + store.timedelta(self.call_count)) |
565 | get_cryptoportfolio.side_effect = _get | 708 | get_cryptoportfolio.side_effect = _get |
566 | 709 | ||
567 | market.Portfolio.wait_for_recent() | 710 | market.Portfolio.wait_for_recent() |
@@ -572,7 +715,7 @@ class PortfolioTest(WebMockTestCase): | |||
572 | 715 | ||
573 | sleep.reset_mock() | 716 | sleep.reset_mock() |
574 | get_cryptoportfolio.reset_mock() | 717 | get_cryptoportfolio.reset_mock() |
575 | market.Portfolio.last_date = None | 718 | market.Portfolio.last_date = store.LockedVar(None) |
576 | self.call_count = 0 | 719 | self.call_count = 0 |
577 | market.Portfolio.wait_for_recent(delta=15) | 720 | market.Portfolio.wait_for_recent(delta=15) |
578 | sleep.assert_not_called() | 721 | sleep.assert_not_called() |
@@ -580,13 +723,45 @@ class PortfolioTest(WebMockTestCase): | |||
580 | 723 | ||
581 | sleep.reset_mock() | 724 | sleep.reset_mock() |
582 | get_cryptoportfolio.reset_mock() | 725 | get_cryptoportfolio.reset_mock() |
583 | market.Portfolio.last_date = None | 726 | market.Portfolio.last_date = store.LockedVar(None) |
584 | self.call_count = 0 | 727 | self.call_count = 0 |
585 | market.Portfolio.wait_for_recent(delta=1) | 728 | market.Portfolio.wait_for_recent(delta=1) |
586 | sleep.assert_called_with(30) | 729 | sleep.assert_called_with(30) |
587 | self.assertEqual(9, sleep.call_count) | 730 | self.assertEqual(9, sleep.call_count) |
588 | self.assertEqual(10, get_cryptoportfolio.call_count) | 731 | self.assertEqual(10, get_cryptoportfolio.call_count) |
589 | 732 | ||
733 | def test_is_worker_thread(self): | ||
734 | with self.subTest(worker=None): | ||
735 | self.assertFalse(store.Portfolio.is_worker_thread()) | ||
736 | |||
737 | with self.subTest(worker="not self"),\ | ||
738 | mock.patch("threading.current_thread") as current_thread: | ||
739 | current = mock.Mock() | ||
740 | current_thread.return_value = current | ||
741 | store.Portfolio.worker = mock.Mock() | ||
742 | self.assertFalse(store.Portfolio.is_worker_thread()) | ||
743 | |||
744 | with self.subTest(worker="self"),\ | ||
745 | mock.patch("threading.current_thread") as current_thread: | ||
746 | current = mock.Mock() | ||
747 | current_thread.return_value = current | ||
748 | store.Portfolio.worker = current | ||
749 | self.assertTrue(store.Portfolio.is_worker_thread()) | ||
750 | |||
751 | def test_start_worker(self): | ||
752 | with mock.patch.object(store.Portfolio, "wait_for_notification") as notification: | ||
753 | store.Portfolio.start_worker() | ||
754 | notification.assert_called_once_with(poll=30) | ||
755 | |||
756 | self.assertEqual("lock", store.Portfolio.last_date.lock.__class__.__name__) | ||
757 | self.assertEqual("lock", store.Portfolio.liquidities.lock.__class__.__name__) | ||
758 | store.Portfolio.report.start_lock.assert_called_once_with() | ||
759 | |||
760 | self.assertIsNotNone(store.Portfolio.worker) | ||
761 | self.assertIsNotNone(store.Portfolio.worker_notify) | ||
762 | self.assertIsNotNone(store.Portfolio.callback) | ||
763 | self.assertTrue(store.Portfolio.worker_started) | ||
764 | |||
590 | @unittest.skipUnless("unit" in limits, "Unit skipped") | 765 | @unittest.skipUnless("unit" in limits, "Unit skipped") |
591 | class AmountTest(WebMockTestCase): | 766 | class AmountTest(WebMockTestCase): |
592 | def test_values(self): | 767 | def test_values(self): |
@@ -3362,31 +3537,64 @@ class MainTest(WebMockTestCase): | |||
3362 | self.assertEqual("Exception: boo\n", stdout_mock.getvalue()) | 3537 | self.assertEqual("Exception: boo\n", stdout_mock.getvalue()) |
3363 | 3538 | ||
3364 | def test_main(self): | 3539 | def test_main(self): |
3365 | with mock.patch("main.parse_args") as parse_args,\ | 3540 | with self.subTest(parallel=False): |
3366 | mock.patch("main.parse_config") as parse_config,\ | 3541 | with mock.patch("main.parse_args") as parse_args,\ |
3367 | mock.patch("main.fetch_markets") as fetch_markets,\ | 3542 | mock.patch("main.parse_config") as parse_config,\ |
3368 | mock.patch("main.process") as process: | 3543 | mock.patch("main.fetch_markets") as fetch_markets,\ |
3544 | mock.patch("main.process") as process: | ||
3369 | 3545 | ||
3370 | args_mock = mock.Mock() | 3546 | args_mock = mock.Mock() |
3371 | args_mock.config = "config" | 3547 | args_mock.parallel = False |
3372 | args_mock.user = "user" | 3548 | args_mock.config = "config" |
3373 | parse_args.return_value = args_mock | 3549 | args_mock.user = "user" |
3550 | parse_args.return_value = args_mock | ||
3374 | 3551 | ||
3375 | parse_config.return_value = ["pg_config", "report_path"] | 3552 | parse_config.return_value = ["pg_config", "report_path"] |
3376 | 3553 | ||
3377 | fetch_markets.return_value = [["config1", 1], ["config2", 2]] | 3554 | fetch_markets.return_value = [["config1", 1], ["config2", 2]] |
3378 | 3555 | ||
3379 | main.main(["Foo", "Bar"]) | 3556 | main.main(["Foo", "Bar"]) |
3380 | 3557 | ||
3381 | parse_args.assert_called_with(["Foo", "Bar"]) | 3558 | parse_args.assert_called_with(["Foo", "Bar"]) |
3382 | parse_config.assert_called_with("config") | 3559 | parse_config.assert_called_with("config") |
3383 | fetch_markets.assert_called_with("pg_config", "user") | 3560 | fetch_markets.assert_called_with("pg_config", "user") |
3384 | 3561 | ||
3385 | self.assertEqual(2, process.call_count) | 3562 | self.assertEqual(2, process.call_count) |
3386 | process.assert_has_calls([ | 3563 | process.assert_has_calls([ |
3387 | mock.call("config1", 1, "report_path", args_mock), | 3564 | mock.call("config1", 1, "report_path", args_mock), |
3388 | mock.call("config2", 2, "report_path", args_mock), | 3565 | mock.call("config2", 2, "report_path", args_mock), |
3389 | ]) | 3566 | ]) |
3567 | with self.subTest(parallel=True): | ||
3568 | with mock.patch("main.parse_args") as parse_args,\ | ||
3569 | mock.patch("main.parse_config") as parse_config,\ | ||
3570 | mock.patch("main.fetch_markets") as fetch_markets,\ | ||
3571 | mock.patch("main.process") as process,\ | ||
3572 | mock.patch("store.Portfolio.start_worker") as start: | ||
3573 | |||
3574 | args_mock = mock.Mock() | ||
3575 | args_mock.parallel = True | ||
3576 | args_mock.config = "config" | ||
3577 | args_mock.user = "user" | ||
3578 | parse_args.return_value = args_mock | ||
3579 | |||
3580 | parse_config.return_value = ["pg_config", "report_path"] | ||
3581 | |||
3582 | fetch_markets.return_value = [["config1", 1], ["config2", 2]] | ||
3583 | |||
3584 | main.main(["Foo", "Bar"]) | ||
3585 | |||
3586 | parse_args.assert_called_with(["Foo", "Bar"]) | ||
3587 | parse_config.assert_called_with("config") | ||
3588 | fetch_markets.assert_called_with("pg_config", "user") | ||
3589 | |||
3590 | start.assert_called_once_with() | ||
3591 | self.assertEqual(2, process.call_count) | ||
3592 | process.assert_has_calls([ | ||
3593 | mock.call.__bool__(), | ||
3594 | mock.call("config1", 1, "report_path", args_mock), | ||
3595 | mock.call.__bool__(), | ||
3596 | mock.call("config2", 2, "report_path", args_mock), | ||
3597 | ]) | ||
3390 | 3598 | ||
3391 | @mock.patch.object(main.sys, "exit") | 3599 | @mock.patch.object(main.sys, "exit") |
3392 | @mock.patch("main.configparser") | 3600 | @mock.patch("main.configparser") |
@@ -3551,7 +3759,7 @@ class ProcessorTest(WebMockTestCase): | |||
3551 | 3759 | ||
3552 | method, arguments = processor.method_arguments("wait_for_recent") | 3760 | method, arguments = processor.method_arguments("wait_for_recent") |
3553 | self.assertEqual(market.Portfolio.wait_for_recent, method) | 3761 | self.assertEqual(market.Portfolio.wait_for_recent, method) |
3554 | self.assertEqual(["delta"], arguments) | 3762 | self.assertEqual(["delta", "poll"], arguments) |
3555 | 3763 | ||
3556 | method, arguments = processor.method_arguments("prepare_trades") | 3764 | method, arguments = processor.method_arguments("prepare_trades") |
3557 | self.assertEqual(m.prepare_trades, method) | 3765 | self.assertEqual(m.prepare_trades, method) |