]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blob - market.py
Merge branch 'store_reports' into dev
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / market.py
1 from ccxt import ExchangeError, NotSupported
2 import ccxt_wrapper as ccxt
3 import time
4 import psycopg2
5 from store import *
6 from cachetools.func import ttl_cache
7 from datetime import datetime
8 import portfolio
9
10 class Market:
11 debug = False
12 ccxt = None
13 report = None
14 trades = None
15 balances = None
16
17 def __init__(self, ccxt_instance, args, **kwargs):
18 self.args = args
19 self.debug = args.debug
20 self.ccxt = ccxt_instance
21 self.ccxt._market = self
22 self.report = ReportStore(self, verbose_print=(not args.quiet))
23 self.trades = TradeStore(self)
24 self.balances = BalanceStore(self)
25 self.processor = Processor(self)
26
27 for key in ["user_id", "market_id", "report_path", "pg_config"]:
28 setattr(self, key, kwargs.get(key, None))
29
30 @classmethod
31 def from_config(cls, config, args, **kwargs):
32 config["apiKey"] = config.pop("key", None)
33
34 ccxt_instance = ccxt.poloniexE(config)
35
36 # For requests logging
37 ccxt_instance.session.origin_request = ccxt_instance.session.request
38 ccxt_instance.session._parent = ccxt_instance
39
40 def request_wrap(self, *args, **kwargs):
41 r = self.origin_request(*args, **kwargs)
42 self._parent._market.report.log_http_request(args[0],
43 args[1], kwargs["data"], kwargs["headers"], r)
44 return r
45 ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session,
46 ccxt_instance.session.__class__)
47
48 return cls(ccxt_instance, args, **kwargs)
49
50 def store_report(self):
51 self.report.merge(Portfolio.report)
52 date = datetime.now()
53 if self.report_path is not None:
54 self.store_file_report(date)
55 if self.pg_config is not None:
56 self.store_database_report(date)
57
58 def store_file_report(self, date):
59 try:
60 report_file = "{}/{}_{}".format(self.report_path, date.isoformat(), self.user_id)
61 with open(report_file + ".json", "w") as f:
62 f.write(self.report.to_json())
63 with open(report_file + ".log", "w") as f:
64 f.write("\n".join(map(lambda x: x[1], self.report.print_logs)))
65 except Exception as e:
66 print("impossible to store report file: {}; {}".format(e.__class__.__name__, e))
67
68 def store_database_report(self, date):
69 try:
70 report_query = 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
71 line_query = 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
72 connection = psycopg2.connect(**self.pg_config)
73 cursor = connection.cursor()
74 cursor.execute(report_query, (date, self.market_id, self.debug))
75 report_id = cursor.fetchone()[0]
76 for date, type_, payload in self.report.to_json_array():
77 cursor.execute(line_query, (date, report_id, type_, payload))
78
79 connection.commit()
80 cursor.close()
81 connection.close()
82 except Exception as e:
83 print("impossible to store report to database: {}; {}".format(e.__class__.__name__, e))
84
85 def process(self, actions, before=False, after=False):
86 try:
87 if len(actions or []) == 0:
88 if before:
89 self.processor.process("sell_all", steps="before")
90 if after:
91 self.processor.process("sell_all", steps="after")
92 else:
93 for action in actions:
94 if hasattr(self, action):
95 getattr(self, action)()
96 else:
97 self.report.log_error("market_process", message="Unknown action {}".format(action))
98 except Exception as e:
99 self.report.log_error("market_process", exception=e)
100 finally:
101 self.store_report()
102
103 def move_balances(self):
104 needed_in_margin = {}
105 moving_to_margin = {}
106
107 for currency, balance in self.balances.all.items():
108 needed_in_margin[currency] = balance.margin_in_position - balance.margin_pending_gain
109 for trade in self.trades.pending:
110 needed_in_margin.setdefault(trade.base_currency, 0)
111 if trade.trade_type == "short":
112 needed_in_margin[trade.base_currency] -= trade.delta
113 for currency, needed in needed_in_margin.items():
114 current_balance = self.balances.all[currency].margin_available
115 moving_to_margin[currency] = (needed - current_balance)
116 delta = moving_to_margin[currency].value
117 if self.debug and delta != 0:
118 self.report.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency]))
119 continue
120 if delta > 0:
121 self.ccxt.transfer_balance(currency, delta, "exchange", "margin")
122 elif delta < 0:
123 self.ccxt.transfer_balance(currency, -delta, "margin", "exchange")
124 self.report.log_move_balances(needed_in_margin, moving_to_margin)
125
126 self.balances.fetch_balances()
127
128 @ttl_cache(ttl=3600)
129 def fetch_fees(self):
130 return self.ccxt.fetch_fees()
131
132 @ttl_cache(maxsize=20, ttl=5)
133 def get_tickers(self, refresh=False):
134 try:
135 return self.ccxt.fetch_tickers()
136 except NotSupported:
137 return None
138
139 @ttl_cache(maxsize=20, ttl=5)
140 def get_ticker(self, c1, c2, refresh=False):
141 def invert(ticker):
142 return {
143 "inverted": True,
144 "average": (1/ticker["bid"] + 1/ticker["ask"]) / 2,
145 "original": ticker,
146 }
147 def augment_ticker(ticker):
148 ticker.update({
149 "inverted": False,
150 "average": (ticker["bid"] + ticker["ask"] ) / 2,
151 })
152 return ticker
153
154 tickers = self.get_tickers()
155 if tickers is None:
156 try:
157 ticker = augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c1, c2)))
158 except ExchangeError:
159 try:
160 ticker = invert(augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c2, c1))))
161 except ExchangeError:
162 ticker = None
163 else:
164 if "{}/{}".format(c1, c2) in tickers:
165 ticker = augment_ticker(tickers["{}/{}".format(c1, c2)])
166 elif "{}/{}".format(c2, c1) in tickers:
167 ticker = invert(augment_ticker(tickers["{}/{}".format(c2, c1)]))
168 else:
169 ticker = None
170 return ticker
171
172 def follow_orders(self, sleep=None):
173 if sleep is None:
174 sleep = 7 if self.debug else 30
175 if self.debug:
176 self.report.log_debug_action("Set follow_orders tick to {}s".format(sleep))
177 tick = 0
178 self.report.log_stage("follow_orders_begin")
179 while len(self.trades.all_orders(state="open")) > 0:
180 time.sleep(sleep)
181 tick += 1
182 open_orders = self.trades.all_orders(state="open")
183 self.report.log_stage("follow_orders_tick_{}".format(tick))
184 self.report.log_orders(open_orders, tick=tick)
185 for order in open_orders:
186 if order.get_status() != "open":
187 self.report.log_order(order, tick, finished=True)
188 else:
189 order.trade.update_order(order, tick)
190 self.report.log_stage("follow_orders_end")
191
192 def prepare_trades(self, base_currency="BTC", liquidity="medium",
193 compute_value="average", repartition=None, only=None):
194
195 self.report.log_stage("prepare_trades",
196 base_currency=base_currency, liquidity=liquidity,
197 compute_value=compute_value, only=only,
198 repartition=repartition)
199
200 values_in_base = self.balances.in_currency(base_currency,
201 compute_value=compute_value)
202 total_base_value = sum(values_in_base.values())
203 new_repartition = self.balances.dispatch_assets(total_base_value,
204 liquidity=liquidity, repartition=repartition)
205 self.trades.compute_trades(values_in_base, new_repartition, only=only)
206
207 # Helpers
208 def print_orders(self, base_currency="BTC"):
209 self.report.log_stage("print_orders")
210 self.balances.fetch_balances(tag="print_orders")
211 self.prepare_trades(base_currency=base_currency, compute_value="average")
212 self.trades.prepare_orders(compute_value="average")
213
214 def print_balances(self, base_currency="BTC"):
215 self.report.log_stage("print_balances")
216 self.balances.fetch_balances()
217 if base_currency is not None:
218 self.report.print_log("total:")
219 self.report.print_log(sum(self.balances.in_currency(base_currency).values()))
220
221 class Processor:
222 scenarios = {
223 "wait_for_cryptoportfolio": [
224 {
225 "name": "wait",
226 "number": 1,
227 "before": False,
228 "after": True,
229 "wait_for_recent": {},
230 },
231 ],
232 "print_orders": [
233 {
234 "name": "wait",
235 "number": 1,
236 "before": False,
237 "after": True,
238 "wait_for_recent": {},
239 },
240 {
241 "name": "make_orders",
242 "number": 2,
243 "before": False,
244 "after": True,
245 "fetch_balances": ["begin"],
246 "prepare_trades": { "compute_value": "average" },
247 "prepare_orders": { "compute_value": "average" },
248 },
249 ],
250 "sell_needed": [
251 {
252 "name": "wait",
253 "number": 0,
254 "before": False,
255 "after": True,
256 "wait_for_recent": {},
257 },
258 {
259 "name": "sell",
260 "number": 1,
261 "before": False,
262 "after": True,
263 "fetch_balances": ["begin", "end"],
264 "prepare_trades": {},
265 "prepare_orders": { "only": "dispose", "compute_value": "average" },
266 "run_orders": {},
267 "follow_orders": {},
268 "close_trades": {},
269 },
270 {
271 "name": "buy",
272 "number": 2,
273 "before": False,
274 "after": True,
275 "fetch_balances": ["begin", "end"],
276 "prepare_trades": { "only": "acquire" },
277 "prepare_orders": { "only": "acquire", "compute_value": "average" },
278 "move_balances": {},
279 "run_orders": {},
280 "follow_orders": {},
281 "close_trades": {},
282 },
283 ],
284 "sell_all": [
285 {
286 "name": "all_sell",
287 "number": 1,
288 "before": True,
289 "after": False,
290 "fetch_balances": ["begin", "end"],
291 "prepare_trades": { "repartition": { "base_currency": (1, "long") } },
292 "prepare_orders": { "compute_value": "average" },
293 "run_orders": {},
294 "follow_orders": {},
295 "close_trades": {},
296 },
297 {
298 "name": "wait",
299 "number": 2,
300 "before": False,
301 "after": True,
302 "wait_for_recent": {},
303 },
304 {
305 "name": "all_buy",
306 "number": 3,
307 "before": False,
308 "after": True,
309 "fetch_balances": ["begin", "end"],
310 "prepare_trades": {},
311 "prepare_orders": { "compute_value": "average" },
312 "move_balances": {},
313 "run_orders": {},
314 "follow_orders": {},
315 "close_trades": {},
316 },
317 ]
318 }
319
320 ordered_actions = [
321 "wait_for_recent", "prepare_trades", "prepare_orders",
322 "move_balances", "run_orders", "follow_orders",
323 "close_trades"]
324
325 def __init__(self, market):
326 self.market = market
327
328 def select_steps(self, scenario, step):
329 if step == "all":
330 return scenario
331 elif step == "before" or step == "after":
332 return list(filter(lambda x: step in x and x[step], scenario))
333 elif type(step) == int:
334 return [scenario[step-1]]
335 elif type(step) == str:
336 return list(filter(lambda x: x["name"] == step, scenario))
337 else:
338 raise TypeError("Unknown step {}".format(step))
339
340 def process(self, scenario_name, steps="all", **kwargs):
341 scenario = self.scenarios[scenario_name]
342 selected_steps = []
343
344 if type(steps) == str or type(steps) == int:
345 selected_steps += self.select_steps(scenario, steps)
346 else:
347 for step in steps:
348 selected_steps += self.select_steps(scenario, step)
349 for step in selected_steps:
350 self.process_step(scenario_name, step, kwargs)
351
352 def process_step(self, scenario_name, step, kwargs):
353 process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
354 self.market.report.log_stage("{}_begin".format(process_name))
355 if "begin" in step.get("fetch_balances", []):
356 self.market.balances.fetch_balances(tag="{}_begin".format(process_name))
357
358 for action in self.ordered_actions:
359 if action in step:
360 self.run_action(action, step[action], kwargs)
361
362 if "end" in step.get("fetch_balances", []):
363 self.market.balances.fetch_balances(tag="{}_end".format(process_name))
364 self.market.report.log_stage("{}_end".format(process_name))
365
366 def method_arguments(self, action):
367 import inspect
368
369 if action == "wait_for_recent":
370 method = Portfolio.wait_for_recent
371 elif action == "prepare_trades":
372 method = self.market.prepare_trades
373 elif action == "prepare_orders":
374 method = self.market.trades.prepare_orders
375 elif action == "move_balances":
376 method = self.market.move_balances
377 elif action == "run_orders":
378 method = self.market.trades.run_orders
379 elif action == "follow_orders":
380 method = self.market.follow_orders
381 elif action == "close_trades":
382 method = self.market.trades.close_trades
383
384 signature = inspect.getfullargspec(method)
385 defaults = signature.defaults or []
386 kwargs = signature.args[-len(defaults):]
387
388 return [method, kwargs]
389
390 def parse_args(self, action, default_args, kwargs):
391 method, allowed_arguments = self.method_arguments(action)
392 args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments }
393
394 if "repartition" in args and "base_currency" in args["repartition"]:
395 r = args["repartition"]
396 r[args.get("base_currency", "BTC")] = r.pop("base_currency")
397
398 return method, args
399
400 def run_action(self, action, default_args, kwargs):
401 method, args = self.parse_args(action, default_args, kwargs)
402
403 method(**args)