aboutsummaryrefslogtreecommitdiff
path: root/market.py
diff options
context:
space:
mode:
authorIsmaël Bouya <ismael.bouya@normalesup.org>2018-03-09 19:18:02 +0100
committerIsmaël Bouya <ismael.bouya@normalesup.org>2018-03-09 19:18:02 +0100
commit34eb08f759a440af0376727664d9422041dfbd18 (patch)
treed1a94be893451a4e182f3e75e9afb01749172bb4 /market.py
parentf9226903cb53a9b303a26de562e321159349f8df (diff)
parentdc1ca9a306f09886c6c57f8d426c59a9d084b2b3 (diff)
downloadTrader-34eb08f759a440af0376727664d9422041dfbd18.tar.gz
Trader-34eb08f759a440af0376727664d9422041dfbd18.tar.zst
Trader-34eb08f759a440af0376727664d9422041dfbd18.zip
Merge branch 'immae/parallelize' into dev
Fixes https://git.immae.eu/mantisbt/view.php?id=51
Diffstat (limited to 'market.py')
-rw-r--r--market.py239
1 files changed, 235 insertions, 4 deletions
diff --git a/market.py b/market.py
index 3381d1e..8672c59 100644
--- a/market.py
+++ b/market.py
@@ -3,6 +3,8 @@ import ccxt_wrapper as ccxt
3import time 3import time
4from store import * 4from store import *
5from cachetools.func import ttl_cache 5from cachetools.func import ttl_cache
6from datetime import datetime
7import portfolio
6 8
7class Market: 9class Market:
8 debug = False 10 debug = False
@@ -11,17 +13,21 @@ class Market:
11 trades = None 13 trades = None
12 balances = None 14 balances = None
13 15
14 def __init__(self, ccxt_instance, debug=False): 16 def __init__(self, ccxt_instance, debug=False, user_id=None, report_path=None):
15 self.debug = debug 17 self.debug = debug
16 self.ccxt = ccxt_instance 18 self.ccxt = ccxt_instance
17 self.ccxt._market = self 19 self.ccxt._market = self
18 self.report = ReportStore(self) 20 self.report = ReportStore(self)
19 self.trades = TradeStore(self) 21 self.trades = TradeStore(self)
20 self.balances = BalanceStore(self) 22 self.balances = BalanceStore(self)
23 self.processor = Processor(self)
24
25 self.user_id = user_id
26 self.report_path = report_path
21 27
22 @classmethod 28 @classmethod
23 def from_config(cls, config, debug=False): 29 def from_config(cls, config, debug=False, user_id=None, report_path=None):
24 config["apiKey"] = config.pop("key") 30 config["apiKey"] = config.pop("key", None)
25 31
26 ccxt_instance = ccxt.poloniexE(config) 32 ccxt_instance = ccxt.poloniexE(config)
27 33
@@ -37,7 +43,35 @@ class Market:
37 ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session, 43 ccxt_instance.session.request = request_wrap.__get__(ccxt_instance.session,
38 ccxt_instance.session.__class__) 44 ccxt_instance.session.__class__)
39 45
40 return cls(ccxt_instance, debug=debug) 46 return cls(ccxt_instance, debug=debug, user_id=user_id, report_path=report_path)
47
48 def store_report(self):
49 self.report.merge(Portfolio.report)
50 try:
51 if self.report_path is not None:
52 report_file = "{}/{}_{}.json".format(self.report_path, datetime.now().isoformat(), self.user_id)
53 with open(report_file, "w") as f:
54 f.write(self.report.to_json())
55 except Exception as e:
56 print("impossible to store report file: {}; {}".format(e.__class__.__name__, e))
57
58 def process(self, actions, before=False, after=False):
59 try:
60 if len(actions or []) == 0:
61 if before:
62 self.processor.process("sell_all", steps="before")
63 if after:
64 self.processor.process("sell_all", steps="after")
65 else:
66 for action in actions:
67 if hasattr(self, action):
68 getattr(self, action)()
69 else:
70 self.report.log_error("market_process", message="Unknown action {}".format(action))
71 except Exception as e:
72 self.report.log_error("market_process", exception=e)
73 finally:
74 self.store_report()
41 75
42 def move_balances(self): 76 def move_balances(self):
43 needed_in_margin = {} 77 needed_in_margin = {}
@@ -143,3 +177,200 @@ class Market:
143 liquidity=liquidity, repartition=repartition) 177 liquidity=liquidity, repartition=repartition)
144 self.trades.compute_trades(values_in_base, new_repartition, only=only) 178 self.trades.compute_trades(values_in_base, new_repartition, only=only)
145 179
180 # Helpers
181 def print_orders(self, base_currency="BTC"):
182 self.report.log_stage("print_orders")
183 self.balances.fetch_balances(tag="print_orders")
184 self.prepare_trades(base_currency=base_currency, compute_value="average")
185 self.trades.prepare_orders(compute_value="average")
186
187 def print_balances(self, base_currency="BTC"):
188 self.report.log_stage("print_balances")
189 self.balances.fetch_balances()
190 if base_currency is not None:
191 self.report.print_log("total:")
192 self.report.print_log(sum(self.balances.in_currency(base_currency).values()))
193
194class Processor:
195 scenarios = {
196 "wait_for_cryptoportfolio": [
197 {
198 "name": "wait",
199 "number": 1,
200 "before": False,
201 "after": True,
202 "wait_for_recent": {},
203 },
204 ],
205 "print_orders": [
206 {
207 "name": "wait",
208 "number": 1,
209 "before": False,
210 "after": True,
211 "wait_for_recent": {},
212 },
213 {
214 "name": "make_orders",
215 "number": 2,
216 "before": False,
217 "after": True,
218 "fetch_balances": ["begin"],
219 "prepare_trades": { "compute_value": "average" },
220 "prepare_orders": { "compute_value": "average" },
221 },
222 ],
223 "sell_needed": [
224 {
225 "name": "wait",
226 "number": 0,
227 "before": False,
228 "after": True,
229 "wait_for_recent": {},
230 },
231 {
232 "name": "sell",
233 "number": 1,
234 "before": False,
235 "after": True,
236 "fetch_balances": ["begin", "end"],
237 "prepare_trades": {},
238 "prepare_orders": { "only": "dispose", "compute_value": "average" },
239 "run_orders": {},
240 "follow_orders": {},
241 "close_trades": {},
242 },
243 {
244 "name": "buy",
245 "number": 2,
246 "before": False,
247 "after": True,
248 "fetch_balances": ["begin", "end"],
249 "prepare_trades": { "only": "acquire" },
250 "prepare_orders": { "only": "acquire", "compute_value": "average" },
251 "move_balances": {},
252 "run_orders": {},
253 "follow_orders": {},
254 "close_trades": {},
255 },
256 ],
257 "sell_all": [
258 {
259 "name": "all_sell",
260 "number": 1,
261 "before": True,
262 "after": False,
263 "fetch_balances": ["begin", "end"],
264 "prepare_trades": { "repartition": { "base_currency": (1, "long") } },
265 "prepare_orders": { "compute_value": "average" },
266 "run_orders": {},
267 "follow_orders": {},
268 "close_trades": {},
269 },
270 {
271 "name": "wait",
272 "number": 2,
273 "before": False,
274 "after": True,
275 "wait_for_recent": {},
276 },
277 {
278 "name": "all_buy",
279 "number": 3,
280 "before": False,
281 "after": True,
282 "fetch_balances": ["begin", "end"],
283 "prepare_trades": {},
284 "prepare_orders": { "compute_value": "average" },
285 "move_balances": {},
286 "run_orders": {},
287 "follow_orders": {},
288 "close_trades": {},
289 },
290 ]
291 }
292
293 ordered_actions = [
294 "wait_for_recent", "prepare_trades", "prepare_orders",
295 "move_balances", "run_orders", "follow_orders",
296 "close_trades"]
297
298 def __init__(self, market):
299 self.market = market
300
301 def select_steps(self, scenario, step):
302 if step == "all":
303 return scenario
304 elif step == "before" or step == "after":
305 return list(filter(lambda x: step in x and x[step], scenario))
306 elif type(step) == int:
307 return [scenario[step-1]]
308 elif type(step) == str:
309 return list(filter(lambda x: x["name"] == step, scenario))
310 else:
311 raise TypeError("Unknown step {}".format(step))
312
313 def process(self, scenario_name, steps="all", **kwargs):
314 scenario = self.scenarios[scenario_name]
315 selected_steps = []
316
317 if type(steps) == str or type(steps) == int:
318 selected_steps += self.select_steps(scenario, steps)
319 else:
320 for step in steps:
321 selected_steps += self.select_steps(scenario, step)
322 for step in selected_steps:
323 self.process_step(scenario_name, step, kwargs)
324
325 def process_step(self, scenario_name, step, kwargs):
326 process_name = "process_{}__{}_{}".format(scenario_name, step["number"], step["name"])
327 self.market.report.log_stage("{}_begin".format(process_name))
328 if "begin" in step.get("fetch_balances", []):
329 self.market.balances.fetch_balances(tag="{}_begin".format(process_name))
330
331 for action in self.ordered_actions:
332 if action in step:
333 self.run_action(action, step[action], kwargs)
334
335 if "end" in step.get("fetch_balances", []):
336 self.market.balances.fetch_balances(tag="{}_end".format(process_name))
337 self.market.report.log_stage("{}_end".format(process_name))
338
339 def method_arguments(self, action):
340 import inspect
341
342 if action == "wait_for_recent":
343 method = Portfolio.wait_for_recent
344 elif action == "prepare_trades":
345 method = self.market.prepare_trades
346 elif action == "prepare_orders":
347 method = self.market.trades.prepare_orders
348 elif action == "move_balances":
349 method = self.market.move_balances
350 elif action == "run_orders":
351 method = self.market.trades.run_orders
352 elif action == "follow_orders":
353 method = self.market.follow_orders
354 elif action == "close_trades":
355 method = self.market.trades.close_trades
356
357 signature = inspect.getfullargspec(method)
358 defaults = signature.defaults or []
359 kwargs = signature.args[-len(defaults):]
360
361 return [method, kwargs]
362
363 def parse_args(self, action, default_args, kwargs):
364 method, allowed_arguments = self.method_arguments(action)
365 args = {k: v for k, v in {**default_args, **kwargs}.items() if k in allowed_arguments }
366
367 if "repartition" in args and "base_currency" in args["repartition"]:
368 r = args["repartition"]
369 r[args.get("base_currency", "BTC")] = r.pop("base_currency")
370
371 return method, args
372
373 def run_action(self, action, default_args, kwargs):
374 method, args = self.parse_args(action, default_args, kwargs)
375
376 method(**args)