]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/commitdiff
Merge branch 'store_reports' into dev
authorIsmaël Bouya <ismael.bouya@normalesup.org>
Sat, 24 Mar 2018 09:41:28 +0000 (10:41 +0100)
committerIsmaël Bouya <ismael.bouya@normalesup.org>
Sat, 24 Mar 2018 09:41:28 +0000 (10:41 +0100)
Add reports storing in database.

See https://git.immae.eu/mantisbt/view.php?id=56 for the expected schema

main.py
market.py
store.py
tasks/import_reports_to_database.py [new file with mode: 0644]
test.py

diff --git a/main.py b/main.py
index 856d449fc16cf94263c81ad45b11bcb406a12ce0..446219247cc2f8c9211032d1c03a9f1d96986a40 100644 (file)
--- a/main.py
+++ b/main.py
@@ -62,17 +62,20 @@ def make_order(market, value, currency, action="acquire",
 
 def get_user_market(config_path, user_id, debug=False):
     pg_config, report_path = parse_config(config_path)
-    market_config = list(fetch_markets(pg_config, str(user_id)))[0][0]
-    return market.Market.from_config(market_config, debug=debug)
+    market_id, market_config, user_id = list(fetch_markets(pg_config, str(user_id)))[0]
+    args = type('Args', (object,), { "debug": debug, "quiet": False })()
+    return market.Market.from_config(market_config, args,
+            pg_config=pg_config, market_id=market_id,
+            user_id=user_id, report_path=report_path)
 
 def fetch_markets(pg_config, user):
     connection = psycopg2.connect(**pg_config)
     cursor = connection.cursor()
 
     if user is None:
-        cursor.execute("SELECT config,user_id FROM market_configs")
+        cursor.execute("SELECT id,config,user_id FROM market_configs")
     else:
-        cursor.execute("SELECT config,user_id FROM market_configs WHERE user_id = %s", user)
+        cursor.execute("SELECT id,config,user_id FROM market_configs WHERE user_id = %s", user)
 
     for row in cursor:
         yield row
@@ -109,6 +112,9 @@ def parse_args(argv):
     parser.add_argument("--after",
             default=False, action='store_const', const=True,
             help="Run the steps after the cryptoportfolio update")
+    parser.add_argument("--quiet",
+            default=False, action='store_const', const=True,
+            help="Don't print messages")
     parser.add_argument("--debug",
             default=False, action='store_const', const=True,
             help="Run in debug mode")
@@ -128,10 +134,12 @@ def parse_args(argv):
 
     return args
 
-def process(market_config, user_id, report_path, args):
+def process(market_config, market_id, user_id, args, report_path, pg_config):
     try:
         market.Market\
-                .from_config(market_config, debug=args.debug, user_id=user_id, report_path=report_path)\
+                .from_config(market_config, args,
+                        pg_config=pg_config, market_id=market_id,
+                        user_id=user_id, report_path=report_path)\
                 .process(args.action, before=args.before, after=args.after)
     except Exception as e:
         print("{}: {}".format(e.__class__.__name__, e))
@@ -145,11 +153,13 @@ def main(argv):
         import threading
         market.Portfolio.start_worker()
 
-        for market_config, user_id in fetch_markets(pg_config, args.user):
-            threading.Thread(target=process, args=[market_config, user_id, report_path, args]).start()
+        def process_(*args):
+            threading.Thread(target=process, args=args).start()
     else:
-        for market_config, user_id in fetch_markets(pg_config, args.user):
-            process(market_config, user_id, report_path, args)
+        process_ = process
+
+    for market_id, market_config, user_id in fetch_markets(pg_config, args.user):
+        process_(market_config, market_id, user_id, args, report_path, pg_config)
 
 if __name__ == '__main__': # pragma: no cover
     main(sys.argv[1:])
index 2ddebfac3795ebc51167676255e16e4113a834a5..496ec45843319f2145f955e811ea49e1842e4761 100644 (file)
--- a/market.py
+++ b/market.py
@@ -1,6 +1,7 @@
 from ccxt import ExchangeError, NotSupported
 import ccxt_wrapper as ccxt
 import time
+import psycopg2
 from store import *
 from cachetools.func import ttl_cache
 from datetime import datetime
@@ -13,20 +14,21 @@ class Market:
     trades = None
     balances = None
 
-    def __init__(self, ccxt_instance, debug=False, user_id=None, report_path=None):
-        self.debug = debug
+    def __init__(self, ccxt_instance, args, **kwargs):
+        self.args = args
+        self.debug = args.debug
         self.ccxt = ccxt_instance
         self.ccxt._market = self
-        self.report = ReportStore(self)
+        self.report = ReportStore(self, verbose_print=(not args.quiet))
         self.trades = TradeStore(self)
         self.balances = BalanceStore(self)
         self.processor = Processor(self)
 
-        self.user_id = user_id
-        self.report_path = report_path
+        for key in ["user_id", "market_id", "report_path", "pg_config"]:
+            setattr(self, key, kwargs.get(key, None))
 
     @classmethod
-    def from_config(cls, config, debug=False, user_id=None, report_path=None):
+    def from_config(cls, config, args, **kwargs):
         config["apiKey"] = config.pop("key", None)
 
         ccxt_instance = ccxt.poloniexE(config)
@@ -43,20 +45,43 @@ class Market:
         ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session,
                 ccxt_instance.session.__class__)
 
-        return cls(ccxt_instance, debug=debug, user_id=user_id, report_path=report_path)
+        return cls(ccxt_instance, args, **kwargs)
 
     def store_report(self):
         self.report.merge(Portfolio.report)
+        date = datetime.now()
+        if self.report_path is not None:
+            self.store_file_report(date)
+        if self.pg_config is not None:
+            self.store_database_report(date)
+
+    def store_file_report(self, date):
         try:
-            if self.report_path is not None:
-                report_file = "{}/{}_{}".format(self.report_path, datetime.now().isoformat(), self.user_id)
-                with open(report_file + ".json", "w") as f:
-                    f.write(self.report.to_json())
-                with open(report_file + ".log", "w") as f:
-                    f.write("\n".join(map(lambda x: x[1], self.report.print_logs)))
+            report_file = "{}/{}_{}".format(self.report_path, date.isoformat(), self.user_id)
+            with open(report_file + ".json", "w") as f:
+                f.write(self.report.to_json())
+            with open(report_file + ".log", "w") as f:
+                f.write("\n".join(map(lambda x: x[1], self.report.print_logs)))
         except Exception as e:
             print("impossible to store report file: {}; {}".format(e.__class__.__name__, e))
 
+    def store_database_report(self, date):
+        try:
+            report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
+            line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
+            connection = psycopg2.connect(**self.pg_config)
+            cursor = connection.cursor()
+            cursor.execute(report_query, (date, self.market_id, self.debug))
+            report_id = cursor.fetchone()[0]
+            for date, type_, payload in self.report.to_json_array():
+                cursor.execute(line_query, (date, report_id, type_, payload))
+
+            connection.commit()
+            cursor.close()
+            connection.close()
+        except Exception as e:
+            print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
+
     def process(self, actions, before=False, after=False):
         try:
             if len(actions or []) == 0:
index d875a983a77b5d0fa8cbce0f60c923ac5de62832..b3ada4567e38a28613efd0159dafd33ece105a0d 100644 (file)
--- a/store.py
+++ b/store.py
@@ -36,12 +36,22 @@ class ReportStore:
         hash_["date"] = datetime.now()
         self.logs.append(hash_)
 
+    @staticmethod
+    def default_json_serial(obj):
+        if isinstance(obj, (datetime, date)):
+            return obj.isoformat()
+        return str(obj)
+
     def to_json(self):
-        def default_json_serial(obj):
-            if isinstance(obj, (datetime, date)):
-                return obj.isoformat()
-            return str(obj)
-        return json.dumps(self.logs, default=default_json_serial, indent="  ")
+        return json.dumps(self.logs, default=self.default_json_serial, indent="  ")
+
+    def to_json_array(self):
+        for log in (x.copy() for x in self.logs):
+            yield (
+                    log.pop("date"),
+                    log.pop("type"),
+                    json.dumps(log, default=self.default_json_serial, indent="  ")
+                    )
 
     def set_verbose(self, verbose_print):
         self.verbose_print = verbose_print
diff --git a/tasks/import_reports_to_database.py b/tasks/import_reports_to_database.py
new file mode 100644 (file)
index 0000000..152c762
--- /dev/null
@@ -0,0 +1,50 @@
+import sys
+import os
+import simplejson as json
+from datetime import datetime
+from decimal import Decimal as D
+import psycopg2
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+from main import parse_config
+
+config = sys.argv[1]
+reports = sys.argv[2:]
+
+pg_config, report_path = parse_config(config)
+
+connection = psycopg2.connect(**pg_config)
+cursor = connection.cursor()
+
+report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
+line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
+
+user_id_to_market_id = {
+        2: 1,
+        1: 3,
+        }
+
+for report in reports:
+    with open(report, "rb") as f:
+        json_content = json.load(f, parse_float=D)
+        basename = os.path.basename(report)
+        date, rest = basename.split("_", 1)
+        user_id, rest = rest.split(".", 1)
+
+        date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
+        market_id = user_id_to_market_id[int(user_id)]
+        debug = any("debug" in x and x["debug"] for x in json_content)
+        print(market_id, date, debug)
+        cursor.execute(report_query, (date, market_id, debug))
+        report_id = cursor.fetchone()[0]
+
+        for line in json_content:
+            date = datetime.strptime(line["date"], "%Y-%m-%dT%H:%M:%S.%f")
+            type_ = line["type"]
+            del(line["date"])
+            del(line["type"])
+
+            cursor.execute(line_query, (date, report_id, type_, json.dumps(line)))
+connection.commit()
+cursor.close()
+connection.close()
diff --git a/test.py b/test.py
index 13bd332405526ae0e1af0a9b568bf38446506410..637a3054c4d8449b3fb0963e4180e62b0b837567 100644 (file)
--- a/test.py
+++ b/test.py
@@ -23,6 +23,9 @@ for test_type in limits:
 class WebMockTestCase(unittest.TestCase):
     import time
 
+    def market_args(self, debug=False, quiet=False):
+        return type('Args', (object,), { "debug": debug, "quiet": quiet })()
+
     def setUp(self):
         super(WebMockTestCase, self).setUp()
         self.wm = requests_mock.Mocker()
@@ -1092,7 +1095,7 @@ class MarketTest(WebMockTestCase):
         self.ccxt = mock.Mock(spec=market.ccxt.poloniexE)
 
     def test_values(self):
-        m = market.Market(self.ccxt)
+        m = market.Market(self.ccxt, self.market_args())
 
         self.assertEqual(self.ccxt, m.ccxt)
         self.assertFalse(m.debug)
@@ -1104,19 +1107,27 @@ class MarketTest(WebMockTestCase):
         self.assertEqual(m, m.balances.market)
         self.assertEqual(m, m.ccxt._market)
 
-        m = market.Market(self.ccxt, debug=True)
+        m = market.Market(self.ccxt, self.market_args(debug=True))
         self.assertTrue(m.debug)
 
-        m = market.Market(self.ccxt, debug=False)
+        m = market.Market(self.ccxt, self.market_args(debug=False))
         self.assertFalse(m.debug)
 
+        with mock.patch("market.ReportStore") as report_store:
+            with self.subTest(quiet=False):
+                m = market.Market(self.ccxt, self.market_args(quiet=False))
+                report_store.assert_called_with(m, verbose_print=True)
+            with self.subTest(quiet=True):
+                m = market.Market(self.ccxt, self.market_args(quiet=True))
+                report_store.assert_called_with(m, verbose_print=False)
+
     @mock.patch("market.ccxt")
     def test_from_config(self, ccxt):
         with mock.patch("market.ReportStore"):
             ccxt.poloniexE.return_value = self.ccxt
             self.ccxt.session.request.return_value = "response"
 
-            m = market.Market.from_config({"key": "key", "secred": "secret"})
+            m = market.Market.from_config({"key": "key", "secred": "secret"}, self.market_args())
 
             self.assertEqual(self.ccxt, m.ccxt)
 
@@ -1125,7 +1136,7 @@ class MarketTest(WebMockTestCase):
             m.report.log_http_request.assert_called_with('GET', 'URL', 'data',
                     'headers', 'response')
 
-        m = market.Market.from_config({"key": "key", "secred": "secret"}, debug=True)
+        m = market.Market.from_config({"key": "key", "secred": "secret"}, self.market_args(debug=True))
         self.assertEqual(True, m.debug)
 
     def test_get_tickers(self):
@@ -1134,7 +1145,7 @@ class MarketTest(WebMockTestCase):
                 market.NotSupported
                 ]
 
-        m = market.Market(self.ccxt)
+        m = market.Market(self.ccxt, self.market_args())
         self.assertEqual("tickers", m.get_tickers())
         self.assertEqual("tickers", m.get_tickers())
         self.ccxt.fetch_tickers.assert_called_once()
@@ -1147,7 +1158,7 @@ class MarketTest(WebMockTestCase):
                     "ETH/ETC": { "bid": 1, "ask": 3 },
                     "XVG/ETH": { "bid": 10, "ask": 40 },
                     }
-            m = market.Market(self.ccxt)
+            m = market.Market(self.ccxt, self.market_args())
 
             ticker = m.get_ticker("ETH", "ETC")
             self.assertEqual(1, ticker["bid"])
@@ -1175,7 +1186,7 @@ class MarketTest(WebMockTestCase):
                     market.ExchangeError("foo"),
                     ]
 
-            m = market.Market(self.ccxt)
+            m = market.Market(self.ccxt, self.market_args())
 
             ticker = m.get_ticker("ETH", "ETC")
             self.ccxt.fetch_ticker.assert_called_with("ETH/ETC")
@@ -1195,7 +1206,7 @@ class MarketTest(WebMockTestCase):
             self.assertIsNone(ticker)
 
     def test_fetch_fees(self):
-        m = market.Market(self.ccxt)
+        m = market.Market(self.ccxt, self.market_args())
         self.ccxt.fetch_fees.return_value = "Foo"
         self.assertEqual("Foo", m.fetch_fees())
         self.ccxt.fetch_fees.assert_called_once()
@@ -1222,7 +1233,7 @@ class MarketTest(WebMockTestCase):
         get_ticker.side_effect = _get_ticker
 
         with mock.patch("market.ReportStore"):
-            m = market.Market(self.ccxt)
+            m = market.Market(self.ccxt, self.market_args())
             self.ccxt.fetch_all_balances.return_value = {
                     "USDT": {
                         "exchange_free": D("10000.0"),
@@ -1262,7 +1273,7 @@ class MarketTest(WebMockTestCase):
                 (False, 12), (True, 12)]:
             with self.subTest(sleep=sleep, debug=debug), \
                     mock.patch("market.ReportStore"):
-                m = market.Market(self.ccxt, debug=debug)
+                m = market.Market(self.ccxt, self.market_args(debug=debug))
 
                 order_mock1 = mock.Mock()
                 order_mock2 = mock.Mock()
@@ -1339,7 +1350,7 @@ class MarketTest(WebMockTestCase):
         for debug in [True, False]:
             with self.subTest(debug=debug),\
                     mock.patch("market.ReportStore"):
-                m = market.Market(self.ccxt, debug=debug)
+                m = market.Market(self.ccxt, self.market_args(debug=debug))
 
                 value_from = portfolio.Amount("BTC", "1.0")
                 value_from.linked_to = portfolio.Amount("ETH", "10.0")
@@ -1375,54 +1386,135 @@ class MarketTest(WebMockTestCase):
                     self.ccxt.transfer_balance.assert_any_call("USDT", 100, "exchange", "margin")
                     self.ccxt.transfer_balance.assert_any_call("ETC", 5, "margin", "exchange")
 
-    def test_store_report(self):
-
-        file_open = mock.mock_open()
-        m = market.Market(self.ccxt, user_id=1)
-        with self.subTest(file=None),\
-                mock.patch.object(m, "report") as report,\
-                mock.patch("market.open", file_open):
-            m.store_report()
-            report.merge.assert_called_with(store.Portfolio.report)
-            file_open.assert_not_called()
-
-        report.reset_mock()
+    def test_store_file_report(self):
         file_open = mock.mock_open()
-        m = market.Market(self.ccxt, report_path="present", user_id=1)
+        m = market.Market(self.ccxt, self.market_args(), report_path="present", user_id=1)
         with self.subTest(file="present"),\
                 mock.patch("market.open", file_open),\
                 mock.patch.object(m, "report") as report,\
                 mock.patch.object(market, "datetime") as time_mock:
 
-            time_mock.now.return_value = datetime.datetime(2018, 2, 25)
             report.print_logs = [[time_mock.now(), "Foo"], [time_mock.now(), "Bar"]]
             report.to_json.return_value = "json_content"
 
-            m.store_report()
+            m.store_file_report(datetime.datetime(2018, 2, 25))
 
             file_open.assert_any_call("present/2018-02-25T00:00:00_1.json", "w")
             file_open.assert_any_call("present/2018-02-25T00:00:00_1.log", "w")
             file_open().write.assert_any_call("json_content")
             file_open().write.assert_any_call("Foo\nBar")
             m.report.to_json.assert_called_once_with()
-            report.merge.assert_called_with(store.Portfolio.report)
 
-        report.reset_mock()
-
-        m = market.Market(self.ccxt, report_path="error", user_id=1)
+        m = market.Market(self.ccxt, self.market_args(), report_path="error", user_id=1)
         with self.subTest(file="error"),\
                 mock.patch("market.open") as file_open,\
                 mock.patch.object(m, "report") as report,\
                 mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock:
             file_open.side_effect = FileNotFoundError
 
+            m.store_file_report(datetime.datetime(2018, 2, 25))
+
+            self.assertRegex(stdout_mock.getvalue(), "impossible to store report file: FileNotFoundError;")
+
+    @mock.patch.object(market, "psycopg2")
+    def test_store_database_report(self, psycopg2):
+        connect_mock = mock.Mock()
+        cursor_mock = mock.MagicMock()
+
+        connect_mock.cursor.return_value = cursor_mock
+        psycopg2.connect.return_value = connect_mock
+        m = market.Market(self.ccxt, self.market_args(),
+                pg_config={"config": "pg_config"}, user_id=1)
+        cursor_mock.fetchone.return_value = [42]
+
+        with self.subTest(error=False),\
+                mock.patch.object(m, "report") as report:
+            report.to_json_array.return_value = [
+                    ("date1", "type1", "payload1"),
+                    ("date2", "type2", "payload2"),
+                    ]
+            m.store_database_report(datetime.datetime(2018, 3, 24))
+            connect_mock.assert_has_calls([
+                mock.call.cursor(),
+                mock.call.cursor().execute('INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;', (datetime.datetime(2018, 3, 24), None, False)),
+                mock.call.cursor().fetchone(),
+                mock.call.cursor().execute('INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);', ('date1', 42, 'type1', 'payload1')),
+                mock.call.cursor().execute('INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);', ('date2', 42, 'type2', 'payload2')),
+                mock.call.commit(),
+                mock.call.cursor().close(),
+                mock.call.close()
+                ])
+
+        connect_mock.reset_mock()
+        with self.subTest(error=True),\
+                mock.patch('sys.stdout', new_callable=StringIO) as stdout_mock:
+            psycopg2.connect.side_effect = Exception("Bouh")
+            m.store_database_report(datetime.datetime(2018, 3, 24))
+            self.assertEqual(stdout_mock.getvalue(), "impossible to store report to database: Exception; Bouh\n")
+
+    def test_store_report(self):
+        m = market.Market(self.ccxt, self.market_args(), user_id=1)
+        with self.subTest(file=None, pg_config=None),\
+                mock.patch.object(m, "report") as report,\
+                mock.patch.object(m, "store_database_report") as db_report,\
+                mock.patch.object(m, "store_file_report") as file_report:
+            m.store_report()
+            report.merge.assert_called_with(store.Portfolio.report)
+
+            file_report.assert_not_called()
+            db_report.assert_not_called()
+
+        report.reset_mock()
+        m = market.Market(self.ccxt, self.market_args(), report_path="present", user_id=1)
+        with self.subTest(file="present", pg_config=None),\
+                mock.patch.object(m, "report") as report,\
+                mock.patch.object(m, "store_file_report") as file_report,\
+                mock.patch.object(m, "store_database_report") as db_report,\
+                mock.patch.object(market, "datetime") as time_mock:
+
+            time_mock.now.return_value = datetime.datetime(2018, 2, 25)
+
             m.store_report()
 
             report.merge.assert_called_with(store.Portfolio.report)
-            self.assertRegex(stdout_mock.getvalue(), "impossible to store report file: FileNotFoundError;")
+            file_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
+            db_report.assert_not_called()
+
+        report.reset_mock()
+        m = market.Market(self.ccxt, self.market_args(), pg_config="present", user_id=1)
+        with self.subTest(file=None, pg_config="present"),\
+                mock.patch.object(m, "report") as report,\
+                mock.patch.object(m, "store_file_report") as file_report,\
+                mock.patch.object(m, "store_database_report") as db_report,\
+                mock.patch.object(market, "datetime") as time_mock:
+
+            time_mock.now.return_value = datetime.datetime(2018, 2, 25)
+
+            m.store_report()
+
+            report.merge.assert_called_with(store.Portfolio.report)
+            file_report.assert_not_called()
+            db_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
+
+        report.reset_mock()
+        m = market.Market(self.ccxt, self.market_args(),
+                pg_config="pg_config", report_path="present", user_id=1)
+        with self.subTest(file="present", pg_config="present"),\
+                mock.patch.object(m, "report") as report,\
+                mock.patch.object(m, "store_file_report") as file_report,\
+                mock.patch.object(m, "store_database_report") as db_report,\
+                mock.patch.object(market, "datetime") as time_mock:
+
+            time_mock.now.return_value = datetime.datetime(2018, 2, 25)
+
+            m.store_report()
+
+            report.merge.assert_called_with(store.Portfolio.report)
+            file_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
+            db_report.assert_called_once_with(datetime.datetime(2018, 2, 25))
 
     def test_print_orders(self):
-        m = market.Market(self.ccxt)
+        m = market.Market(self.ccxt, self.market_args())
         with mock.patch.object(m.report, "log_stage") as log_stage,\
                 mock.patch.object(m.balances, "fetch_balances") as fetch_balances,\
                 mock.patch.object(m, "prepare_trades") as prepare_trades,\
@@ -1436,7 +1528,7 @@ class MarketTest(WebMockTestCase):
             prepare_orders.assert_called_with(compute_value="average")
 
     def test_print_balances(self):
-        m = market.Market(self.ccxt)
+        m = market.Market(self.ccxt, self.market_args())
 
         with mock.patch.object(m.balances, "in_currency") as in_currency,\
                 mock.patch.object(m.report, "log_stage") as log_stage,\
@@ -1461,7 +1553,7 @@ class MarketTest(WebMockTestCase):
     @mock.patch("market.ReportStore.log_error")
     @mock.patch("market.Market.store_report")
     def test_process(self, store_report, log_error, process):
-        m = market.Market(self.ccxt)
+        m = market.Market(self.ccxt, self.market_args())
         with self.subTest(before=False, after=False):
             m.process(None)
 
@@ -3039,6 +3131,14 @@ class ReportStoreTest(WebMockTestCase):
             report_store.print_log(portfolio.Amount("BTC", 1))
             self.assertEqual(stdout_mock.getvalue(), "")
 
+    def test_default_json_serial(self):
+        report_store = market.ReportStore(self.m)
+
+        self.assertEqual("2018-02-24T00:00:00",
+                report_store.default_json_serial(portfolio.datetime(2018, 2, 24)))
+        self.assertEqual("1.00000000 BTC",
+                report_store.default_json_serial(portfolio.Amount("BTC", 1)))
+
     def test_to_json(self):
         report_store = market.ReportStore(self.m)
         report_store.logs.append({"foo": "bar"})
@@ -3048,6 +3148,20 @@ class ReportStoreTest(WebMockTestCase):
         report_store.logs.append({"amount": portfolio.Amount("BTC", 1)})
         self.assertEqual('[\n  {\n    "foo": "bar"\n  },\n  {\n    "date": "2018-02-24T00:00:00"\n  },\n  {\n    "amount": "1.00000000 BTC"\n  }\n]', report_store.to_json())
 
+    def test_to_json_array(self):
+        report_store = market.ReportStore(self.m)
+        report_store.logs.append({
+            "date": "date1", "type": "type1", "foo": "bar", "bla": "bla"
+            })
+        report_store.logs.append({
+            "date": "date2", "type": "type2", "foo": "bar", "bla": "bla"
+            })
+        logs = list(report_store.to_json_array())
+
+        self.assertEqual(2, len(logs))
+        self.assertEqual(("date1", "type1", '{\n  "foo": "bar",\n  "bla": "bla"\n}'), logs[0])
+        self.assertEqual(("date2", "type2", '{\n  "foo": "bar",\n  "bla": "bla"\n}'), logs[1])
+
     @mock.patch.object(market.ReportStore, "print_log")
     @mock.patch.object(market.ReportStore, "add_log")
     def test_log_stage(self, add_log, print_log):
@@ -3541,7 +3655,7 @@ class MainTest(WebMockTestCase):
                 mock.patch("main.parse_config") as main_parse_config:
             with self.subTest(debug=False):
                 main_parse_config.return_value = ["pg_config", "report_path"]
-                main_fetch_markets.return_value = [({"key": "market_config"},)]
+                main_fetch_markets.return_value = [(1, {"key": "market_config"}, 3)]
                 m = main.get_user_market("config_path.ini", 1)
 
                 self.assertIsInstance(m, market.Market)
@@ -3549,7 +3663,7 @@ class MainTest(WebMockTestCase):
 
             with self.subTest(debug=True):
                 main_parse_config.return_value = ["pg_config", "report_path"]
-                main_fetch_markets.return_value = [({"key": "market_config"},)]
+                main_fetch_markets.return_value = [(1, {"key": "market_config"}, 3)]
                 m = main.get_user_market("config_path.ini", 1, debug=True)
 
                 self.assertIsInstance(m, market.Market)
@@ -3568,16 +3682,16 @@ class MainTest(WebMockTestCase):
             args_mock.after = "after"
             self.assertEqual("", stdout_mock.getvalue())
 
-            main.process("config", 1, "report_path", args_mock)
+            main.process("config", 3, 1, args_mock, "report_path", "pg_config")
 
             market_mock.from_config.assert_has_calls([
-                mock.call("config", debug="debug", user_id=1, report_path="report_path"),
+                mock.call("config", args_mock, pg_config="pg_config", market_id=3, user_id=1, report_path="report_path"),
                 mock.call().process("action", before="before", after="after"),
                 ])
 
             with self.subTest(exception=True):
                 market_mock.from_config.side_effect = Exception("boo")
-                main.process("config", 1, "report_path", args_mock)
+                main.process(3, "config", 1, "report_path", args_mock, "pg_config")
                 self.assertEqual("Exception: boo\n", stdout_mock.getvalue())
 
     def test_main(self):
@@ -3595,7 +3709,7 @@ class MainTest(WebMockTestCase):
 
                 parse_config.return_value = ["pg_config", "report_path"]
 
-                fetch_markets.return_value = [["config1", 1], ["config2", 2]]
+                fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]]
 
                 main.main(["Foo", "Bar"])
 
@@ -3605,8 +3719,8 @@ class MainTest(WebMockTestCase):
 
                 self.assertEqual(2, process.call_count)
                 process.assert_has_calls([
-                    mock.call("config1", 1, "report_path", args_mock),
-                    mock.call("config2", 2, "report_path", args_mock),
+                    mock.call("config1", 3, 1, args_mock, "report_path", "pg_config"),
+                    mock.call("config2", 1, 2, args_mock, "report_path", "pg_config"),
                     ])
         with self.subTest(parallel=True):
             with mock.patch("main.parse_args") as parse_args,\
@@ -3623,7 +3737,7 @@ class MainTest(WebMockTestCase):
 
                 parse_config.return_value = ["pg_config", "report_path"]
 
-                fetch_markets.return_value = [["config1", 1], ["config2", 2]]
+                fetch_markets.return_value = [[3, "config1", 1], [1, "config2", 2]]
 
                 main.main(["Foo", "Bar"])
 
@@ -3635,9 +3749,9 @@ class MainTest(WebMockTestCase):
                 self.assertEqual(2, process.call_count)
                 process.assert_has_calls([
                     mock.call.__bool__(),
-                    mock.call("config1", 1, "report_path", args_mock),
+                    mock.call("config1", 3, 1, args_mock, "report_path", "pg_config"),
                     mock.call.__bool__(),
-                    mock.call("config2", 2, "report_path", args_mock),
+                    mock.call("config2", 1, 2, args_mock, "report_path", "pg_config"),
                     ])
 
     @mock.patch.object(main.sys, "exit")
@@ -3723,7 +3837,7 @@ class MainTest(WebMockTestCase):
             rows = list(main.fetch_markets({"foo": "bar"}, None))
 
             psycopg2.connect.assert_called_once_with(foo="bar")
-            cursor_mock.execute.assert_called_once_with("SELECT config,user_id FROM market_configs")
+            cursor_mock.execute.assert_called_once_with("SELECT id,config,user_id FROM market_configs")
 
             self.assertEqual(["row_1", "row_2"], rows)
 
@@ -3733,7 +3847,7 @@ class MainTest(WebMockTestCase):
             rows = list(main.fetch_markets({"foo": "bar"}, 1))
 
             psycopg2.connect.assert_called_once_with(foo="bar")
-            cursor_mock.execute.assert_called_once_with("SELECT config,user_id FROM market_configs WHERE user_id = %s", 1)
+            cursor_mock.execute.assert_called_once_with("SELECT id,config,user_id FROM market_configs WHERE user_id = %s", 1)
 
             self.assertEqual(["row_1", "row_2"], rows)
 
@@ -3797,7 +3911,7 @@ class ProcessorTest(WebMockTestCase):
 
     def test_method_arguments(self):
         ccxt = mock.Mock(spec=market.ccxt.poloniexE)
-        m = market.Market(ccxt)
+        m = market.Market(ccxt, self.market_args())
 
         processor = market.Processor(m)