]> git.immae.eu Git - perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git/blob - market.py
Move Portfolio to store and cleanup methods
[perso/Immae/Projets/Cryptomonnaies/Cryptoportfolio/Trader.git] / market.py
1 from ccxt import ExchangeError, NotSupported
2 import ccxt_wrapper as ccxt
3 import time
4 from store import *
5 from cachetools.func import ttl_cache
6 from datetime import datetime
7 import portfolio
8
9 class Market:
10 debug = False
11 ccxt = None
12 report = None
13 trades = None
14 balances = None
15
16 def __init__(self, ccxt_instance, debug=False, user_id=None, report_path=None):
17 self.debug = debug
18 self.ccxt = ccxt_instance
19 self.ccxt._market = self
20 self.report = ReportStore(self)
21 self.trades = TradeStore(self)
22 self.balances = BalanceStore(self)
23 self.processor = Processor(self)
24
25 self.user_id = user_id
26 self.report_path = report_path
27
28 @classmethod
29 def from_config(cls, config, debug=False, user_id=None, report_path=None):
30 config["apiKey"] = config.pop("key", None)
31
32 ccxt_instance = ccxt.poloniexE(config)
33
34 # For requests logging
35 ccxt_instance.session.origin_request = ccxt_instance.session.request
36 ccxt_instance.session._parent = ccxt_instance
37
38 def request_wrap(self, *args, **kwargs):
39 r = self.origin_request(*args, **kwargs)
40 self._parent._market.report.log_http_request(args[0],
41 args[1], kwargs["data"], kwargs["headers"], r)
42 return r
43 ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session,
44 ccxt_instance.session.__class__)
45
46 return cls(ccxt_instance, debug=debug, user_id=user_id, report_path=report_path)
47
48 def store_report(self):
49 try:
50 if self.report_path is not None:
51 report_file = "{}/{}_{}.json".format(self.report_path, datetime.now().isoformat(), self.user_id)
52 with open(report_file, "w") as f:
53 f.write(self.report.to_json())
54 except Exception as e:
55 print("impossible to store report file: {}; {}".format(e.__class__.__name__, e))
56
57 def process(self, actions, before=False, after=False):
58 try:
59 if len(actions or []) == 0:
60 if before:
61 self.processor.process("sell_all", steps="before")
62 if after:
63 self.processor.process("sell_all", steps="after")
64 else:
65 for action in actions:
66 if hasattr(self, action):
67 getattr(self, action)()
68 else:
69 self.report.log_error("market_process", message="Unknown action {}".format(action))
70 except Exception as e:
71 self.report.log_error("market_process", exception=e)
72 finally:
73 self.store_report()
74
75 def move_balances(self):
76 needed_in_margin = {}
77 moving_to_margin = {}
78
79 for currency, balance in self.balances.all.items():
80 needed_in_margin[currency] = balance.margin_in_position - balance.margin_pending_gain
81 for trade in self.trades.pending:
82 needed_in_margin.setdefault(trade.base_currency, 0)
83 if trade.trade_type == "short":
84 needed_in_margin[trade.base_currency] -= trade.delta
85 for currency, needed in needed_in_margin.items():
86 current_balance = self.balances.all[currency].margin_available
87 moving_to_margin[currency] = (needed - current_balance)
88 delta = moving_to_margin[currency].value
89 if self.debug and delta != 0:
90 self.report.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin[currency]))
91 continue
92 if delta > 0:
93 self.ccxt.transfer_balance(currency, delta, "exchange", "margin")
94 elif delta < 0:
95 self.ccxt.transfer_balance(currency, -delta, "margin", "exchange")
96 self.report.log_move_balances(needed_in_margin, moving_to_margin)
97
98 self.balances.fetch_balances()
99
100 @ttl_cache(ttl=3600)
101 def fetch_fees(self):
102 return self.ccxt.fetch_fees()
103
104 @ttl_cache(maxsize=20, ttl=5)
105 def get_tickers(self, refresh=False):
106 try:
107 return self.ccxt.fetch_tickers()
108 except NotSupported:
109 return None
110
111 @ttl_cache(maxsize=20, ttl=5)
112 def get_ticker(self, c1, c2, refresh=False):
113 def invert(ticker):
114 return {
115 "inverted": True,
116 "average": (1/ticker["bid"] + 1/ticker["ask"]) / 2,
117 "original": ticker,
118 }
119 def augment_ticker(ticker):
120 ticker.update({
121 "inverted": False,
122 "average": (ticker["bid"] + ticker["ask"] ) / 2,
123 })
124 return ticker
125
126 tickers = self.get_tickers()
127 if tickers is None:
128 try:
129 ticker = augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c1, c2)))
130 except ExchangeError:
131 try:
132 ticker = invert(augment_ticker(self.ccxt.fetch_ticker("{}/{}".format(c2, c1))))
133 except ExchangeError:
134 ticker = None
135 else:
136 if "{}/{}".format(c1, c2) in tickers:
137 ticker = augment_ticker(tickers["{}/{}".format(c1, c2)])
138 elif "{}/{}".format(c2, c1) in tickers:
139 ticker = invert(augment_ticker(tickers["{}/{}".format(c2, c1)]))
140 else:
141 ticker = None
142 return ticker
143
144 def follow_orders(self, sleep=None):
145 if sleep is None:
146 sleep = 7 if self.debug else 30
147 if self.debug:
148 self.report.log_debug_action("Set follow_orders tick to {}s".format(sleep))
149 tick = 0
150 self.report.log_stage("follow_orders_begin")
151 while len(self.trades.all_orders(state="open")) > 0:
152 time.sleep(sleep)
153 tick += 1
154 open_orders = self.trades.all_orders(state="open")
155 self.report.log_stage("follow_orders_tick_{}".format(tick))
156 self.report.log_orders(open_orders, tick=tick)
157 for order in open_orders:
158 if order.get_status() != "open":
159 self.report.log_order(order, tick, finished=True)
160 else:
161 order.trade.update_order(order, tick)
162 self.report.log_stage("follow_orders_end")
163
164 def prepare_trades(self, base_currency="BTC", liquidity="medium",
165 compute_value="average", repartition=None, only=None):
166
167 self.report.log_stage("prepare_trades",
168 base_currency=base_currency, liquidity=liquidity,
169 compute_value=compute_value, only=only,
170 repartition=repartition)
171
172 values_in_base = self.balances.in_currency(base_currency,
173 compute_value=compute_value)
174 total_base_value = sum(values_in_base.values())
175 new_repartition = self.balances.dispatch_assets(total_base_value,
176 liquidity=liquidity, repartition=repartition)
177 self.trades.compute_trades(values_in_base, new_repartition, only=only)
178
179 # Helpers
180 def print_orders(self, base_currency="BTC"):
181 self.report.log_stage("print_orders")
182 self.balances.fetch_balances(tag="print_orders")
183 self.prepare_trades(base_currency=base_currency, compute_value="average")
184 self.trades.prepare_orders(compute_value="average")
185
186 def print_balances(self, base_currency="BTC"):
187 self.report.log_stage("print_balances")
188 self.balances.fetch_balances()
189 if base_currency is not None:
190 self.report.print_log("total:")
191 self.report.print_log(sum(self.balances.in_currency(base_currency).values()))
192
193 class Processor:
194 scenarios = {
195 "sell_needed": [
196 {
197 "name": "wait",
198 "number": 0,
199 "before": False,
200 "after": True,
201 "wait_for_recent": {},
202 },
203 {
204 "name": "sell",
205 "number": 1,
206 "before": False,
207 "after": True,
208 "fetch_balances": ["begin", "end"],
209 "prepare_trades": {},
210 "prepare_orders": { "only": "dispose", "compute_value": "average" },
211 "run_orders": {},
212 "follow_orders": {},
213 "close_trades": {},
214 },
215 {
216 "name": "buy",
217 "number": 2,
218 "before": False,
219 "after": True,
220 "fetch_balances": ["begin", "end"],
221 "prepare_trades": { "only": "acquire" },
222 "prepare_orders": { "only": "acquire", "compute_value": "average" },
223 "move_balances": {},
224 "run_orders": {},
225 "follow_orders": {},
226 "close_trades": {},
227 },
228 ],
229 "sell_all": [
230 {
231 "name": "all_sell",
232 "number": 1,
233 "before": True,
234 "after": False,
235 "fetch_balances": ["begin", "end"],
236 "prepare_trades": { "repartition": { "base_currency": (1, "long") } },
237 "prepare_orders": { "compute_value": "average" },
238 "run_orders": {},
239 "follow_orders": {},
240 "close_trades": {},
241 },
242 {
243 "name": "wait",
244 "number": 2,
245 "before": False,
246 "after": True,
247 "wait_for_recent": {},
248 },
249 {
250 "name": "all_buy",
251 "number": 3,
252 "before": False,
253 "after": True,
254 "fetch_balances": ["begin", "end"],
255 "prepare_trades": {},
256 "prepare_orders": { "compute_value": "average" },
257 "move_balances": {},
258 "run_orders": {},
259 "follow_orders": {},
260 "close_trades": {},
261 },
262 ]
263 }
264
265 ordered_actions = [
266 "wait_for_recent", "prepare_trades", "prepare_orders",
267 "move_balances", "run_orders", "follow_orders",
268 "close_trades"]
269
270 def __init__(self, market):
271 self.market = market
272
273 def select_steps(self, scenario, step):
274 if step == "all":
275 return scenario
276 elif step == "before" or step == "after":
277 return list(filter(lambda x: step in x and x[step], scenario))
278 elif type(step) == int:
279 return [scenario[step-1]]
280 elif type(step) == str:
281 return list(filter(lambda x: x["name"] == step, scenario))
282 else:
283 raise TypeError("Unknown step {}".format(step))
284
285 def process(self, scenario_name, steps="all", **kwargs):
286 scenario = self.scenarios[scenario_name]
287 selected_steps = []
288
289 if type(steps) == str or type(steps) == int:
290 selected_steps += self.select_steps(scenario, steps)
291 else:
292 for step in steps:
293 selected_steps += self.select_steps(scenario, step)
294 for step in selected_steps:
295 self.process_step(scenario_name, step, kwargs)
296
297 def process_step(self, scenario_name, step, kwargs):
298 process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
299 self.market.report.log_stage("{}_begin".format(process_name))
300 if "begin" in step.get("fetch_balances", []):
301 self.market.balances.fetch_balances(tag="{}_begin".format(process_name))
302
303 for action in self.ordered_actions:
304 if action in step:
305 self.run_action(action, step[action], kwargs)
306
307 if "end" in step.get("fetch_balances", []):
308 self.market.balances.fetch_balances(tag="{}_end".format(process_name))
309 self.market.report.log_stage("{}_end".format(process_name))
310
311 def method_arguments(self, action):
312 import inspect
313
314 if action == "wait_for_recent":
315 method = Portfolio.wait_for_recent
316 elif action == "prepare_trades":
317 method = self.market.prepare_trades
318 elif action == "prepare_orders":
319 method = self.market.trades.prepare_orders
320 elif action == "move_balances":
321 method = self.market.move_balances
322 elif action == "run_orders":
323 method = self.market.trades.run_orders
324 elif action == "follow_orders":
325 method = self.market.follow_orders
326 elif action == "close_trades":
327 method = self.market.trades.close_trades
328
329 signature = inspect.getfullargspec(method)
330 defaults = signature.defaults or []
331 kwargs = signature.args[-len(defaults):]
332
333 return [method, kwargs]
334
335 def parse_args(self, action, default_args, kwargs):
336 method, allowed_arguments = self.method_arguments(action)
337 args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments }
338
339 if "repartition" in args and "base_currency" in args["repartition"]:
340 r = args["repartition"]
341 r[args.get("base_currency", "BTC")] = r.pop("base_currency")
342
343 return method, args
344
345 def run_action(self, action, default_args, kwargs):
346 method, args = self.parse_args(action, default_args, kwargs)
347
348 method(**args)