X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=market.py;h=e16641c476865bc3977ceaa8f30ccce5296925ab;hb=882d55e99489d9131b5171f23e505b0dfd1c8738;hp=c9629c9762d9384e09385fc824b9f74551955147;hpb=9bde69bfc1dcd17a92ac2ee5abfcda5b30034d93;p=perso%2FImmae%2FProjets%2FCryptomonnaies%2FCryptoportfolio%2FTrader.git diff --git a/market.py b/market.py index c9629c9..e16641c 100644 --- a/market.py +++ b/market.py @@ -1,9 +1,11 @@ -from ccxt import ExchangeError, NotSupported +from ccxt import ExchangeError, NotSupported, RequestTimeout, InvalidNonce import ccxt_wrapper as ccxt import time +import psycopg2 from store import * from cachetools.func import ttl_cache from datetime import datetime +from retry import retry import portfolio class Market: @@ -13,48 +15,64 @@ class Market: trades = None balances = None - def __init__(self, ccxt_instance, debug=False, user_id=None, report_path=None): - self.debug = debug + def __init__(self, ccxt_instance, args, **kwargs): + self.args = args + self.debug = args.debug self.ccxt = ccxt_instance self.ccxt._market = self - self.report = ReportStore(self) + self.report = ReportStore(self, verbose_print=(not args.quiet)) self.trades = TradeStore(self) self.balances = BalanceStore(self) self.processor = Processor(self) - self.user_id = user_id - self.report_path = report_path + for key in ["user_id", "market_id", "pg_config"]: + setattr(self, key, kwargs.get(key, None)) + + self.report.log_market(self.args, self.user_id, self.market_id) @classmethod - def from_config(cls, config, debug=False, user_id=None, report_path=None): + def from_config(cls, config, args, **kwargs): config["apiKey"] = config.pop("key", None) ccxt_instance = ccxt.poloniexE(config) - # For requests logging - ccxt_instance.session.origin_request = ccxt_instance.session.request - ccxt_instance.session._parent = ccxt_instance - - def request_wrap(self, *args, **kwargs): - r = self.origin_request(*args, **kwargs) - self._parent._market.report.log_http_request(args[0], - args[1], kwargs["data"], kwargs["headers"], r) - return r - ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, - ccxt_instance.session.__class__) - - return cls(ccxt_instance, debug=debug, user_id=user_id, report_path=report_path) + return cls(ccxt_instance, args, **kwargs) def store_report(self): self.report.merge(Portfolio.report) + date = datetime.now() + if self.args.report_path is not None: + self.store_file_report(date) + if self.pg_config is not None and self.args.report_db: + self.store_database_report(date) + + def store_file_report(self, date): try: - if self.report_path is not None: - report_file = "{}/{}_{}.json".format(self.report_path, datetime.now().isoformat(), self.user_id) - with open(report_file, "w") as f: - f.write(self.report.to_json()) + report_file = "{}/{}_{}".format(self.args.report_path, date.isoformat(), self.user_id) + with open(report_file + ".json", "w") as f: + f.write(self.report.to_json()) + with open(report_file + ".log", "w") as f: + f.write("\n".join(map(lambda x: x[1], self.report.print_logs))) except Exception as e: print("impossible to store report file: {}; {}".format(e.__class__.__name__, e)) + def store_database_report(self, date): + try: + report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;' + line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);' + connection = psycopg2.connect(**self.pg_config) + cursor = connection.cursor() + cursor.execute(report_query, (date, self.market_id, self.debug)) + report_id = cursor.fetchone()[0] + for date, type_, payload in self.report.to_json_array(): + cursor.execute(line_query, (date, report_id, type_, payload)) + + connection.commit() + cursor.close() + connection.close() + except Exception as e: + print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e)) + def process(self, actions, before=False, after=False): try: if len(actions or []) == 0: @@ -73,6 +91,7 @@ class Market: finally: self.store_report() + @retry((RequestTimeout, InvalidNonce), tries=5) def move_balances(self): needed_in_margin = {} moving_to_margin = {} @@ -87,13 +106,21 @@ class Market: current_balance = self.balances.all[currency].margin_available moving_to_margin[currency] = (needed - current_balance) delta = moving_to_margin[currency].value + action = "Moving {} from exchange to margin".format(moving_to_margin[currency]) + if self.debug and delta != 0: - self.report.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency])) + self.report.log_debug_action(action) continue - if delta > 0: - self.ccxt.transfer_balance(currency, delta, "exchange", "margin") - elif delta < 0: - self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + try: + if delta > 0: + self.ccxt.transfer_balance(currency, delta, "exchange", "margin") + elif delta < 0: + self.ccxt.transfer_balance(currency, -delta, "margin", "exchange") + except (RequestTimeout, InvalidNonce) as e: + self.report.log_error(action, message="Retrying", exception=e) + self.report.log_move_balances(needed_in_margin, moving_to_margin) + self.balances.fetch_balances() + raise e self.report.log_move_balances(needed_in_margin, moving_to_margin) self.balances.fetch_balances() @@ -156,10 +183,17 @@ class Market: self.report.log_stage("follow_orders_tick_{}".format(tick)) self.report.log_orders(open_orders, tick=tick) for order in open_orders: - if order.get_status() != "open": + status = order.get_status() + if status != "open": self.report.log_order(order, tick, finished=True) else: order.trade.update_order(order, tick) + if status == "error_disappeared": + self.report.log_error("follow_orders", + message="{} disappeared, recreating it".format(order)) + order.trade.prepare_order( + compute_value=order.trade.tick_actions_recreate(tick)) + self.report.log_stage("follow_orders_end") def prepare_trades(self, base_currency="BTC", liquidity="medium", @@ -193,6 +227,33 @@ class Market: class Processor: scenarios = { + "wait_for_cryptoportfolio": [ + { + "name": "wait", + "number": 1, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + ], + "print_orders": [ + { + "name": "wait", + "number": 1, + "before": False, + "after": True, + "wait_for_recent": {}, + }, + { + "name": "make_orders", + "number": 2, + "before": False, + "after": True, + "fetch_balances": ["begin"], + "prepare_trades": { "compute_value": "average" }, + "prepare_orders": { "compute_value": "average" }, + }, + ], "sell_needed": [ { "name": "wait",