1 from ccxt
import ExchangeError
, NotSupported
2 import ccxt_wrapper
as ccxt
5 from cachetools
.func
import ttl_cache
6 from datetime
import datetime
16 def __init__(self
, ccxt_instance
, debug
=False, user_id
=None, report_path
=None):
18 self
.ccxt
= ccxt_instance
19 self
.ccxt
._market
= self
20 self
.report
= ReportStore(self
)
21 self
.trades
= TradeStore(self
)
22 self
.balances
= BalanceStore(self
)
23 self
.processor
= Processor(self
)
25 self
.user_id
= user_id
26 self
.report_path
= report_path
29 def from_config(cls
, config
, debug
=False, user_id
=None, report_path
=None):
30 config
["apiKey"] = config
.pop("key", None)
32 ccxt_instance
= ccxt
.poloniexE(config
)
34 # For requests logging
35 ccxt_instance
.session
.origin_request
= ccxt_instance
.session
.request
36 ccxt_instance
.session
._parent
= ccxt_instance
38 def request_wrap(self
, *args
, **kwargs
):
39 r
= self
.origin_request(*args
, **kwargs
)
40 self
._parent
._market
.report
.log_http_request(args
[0],
41 args
[1], kwargs
["data"], kwargs
["headers"], r
)
43 ccxt_instance
.session
.request
= request_wrap
.__get
__(ccxt_instance
.session
,
44 ccxt_instance
.session
.__class
__)
46 return cls(ccxt_instance
, debug
=debug
, user_id
=user_id
, report_path
=report_path
)
48 def store_report(self
):
49 self
.report
.merge(Portfolio
.report
)
51 if self
.report_path
is not None:
52 report_file
= "{}/{}_{}.json".format(self
.report_path
, datetime
.now().isoformat(), self
.user_id
)
53 with open(report_file
, "w") as f
:
54 f
.write(self
.report
.to_json())
55 except Exception as e
:
56 print("impossible to store report file: {}; {}".format(e
.__class
__.__name
__, e
))
58 def process(self
, actions
, before
=False, after
=False):
60 if len(actions
or []) == 0:
62 self
.processor
.process("sell_all", steps
="before")
64 self
.processor
.process("sell_all", steps
="after")
66 for action
in actions
:
67 if hasattr(self
, action
):
68 getattr(self
, action
)()
70 self
.report
.log_error("market_process", message
="Unknown action {}".format(action
))
71 except Exception as e
:
72 self
.report
.log_error("market_process", exception
=e
)
76 def move_balances(self
):
80 for currency
, balance
in self
.balances
.all
.items():
81 needed_in_margin
[currency
] = balance
.margin_in_position
- balance
.margin_pending_gain
82 for trade
in self
.trades
.pending
:
83 needed_in_margin
.setdefault(trade
.base_currency
, 0)
84 if trade
.trade_type
== "short":
85 needed_in_margin
[trade
.base_currency
] -= trade
.delta
86 for currency
, needed
in needed_in_margin
.items():
87 current_balance
= self
.balances
.all
[currency
].margin_available
88 moving_to_margin
[currency
] = (needed
- current_balance
)
89 delta
= moving_to_margin
[currency
].value
90 if self
.debug
and delta
!= 0:
91 self
.report
.log_debug_action("Moving {} from exchange to margin".format(moving_to_margin
[currency
]))
94 self
.ccxt
.transfer_balance(currency
, delta
, "exchange", "margin")
96 self
.ccxt
.transfer_balance(currency
, -delta
, "margin", "exchange")
97 self
.report
.log_move_balances(needed_in_margin
, moving_to_margin
)
99 self
.balances
.fetch_balances()
102 def fetch_fees(self
):
103 return self
.ccxt
.fetch_fees()
105 @ttl_cache(maxsize
=20, ttl
=5)
106 def get_tickers(self
, refresh
=False):
108 return self
.ccxt
.fetch_tickers()
112 @ttl_cache(maxsize
=20, ttl
=5)
113 def get_ticker(self
, c1
, c2
, refresh
=False):
117 "average": (1/ticker
["bid"] + 1/ticker
["ask"]) / 2,
120 def augment_ticker(ticker
):
123 "average": (ticker
["bid"] + ticker
["ask"] ) / 2,
127 tickers
= self
.get_tickers()
130 ticker
= augment_ticker(self
.ccxt
.fetch_ticker("{}/{}".format(c1
, c2
)))
131 except ExchangeError
:
133 ticker
= invert(augment_ticker(self
.ccxt
.fetch_ticker("{}/{}".format(c2
, c1
))))
134 except ExchangeError
:
137 if "{}/{}".format(c1
, c2
) in tickers
:
138 ticker
= augment_ticker(tickers
["{}/{}".format(c1
, c2
)])
139 elif "{}/{}".format(c2
, c1
) in tickers
:
140 ticker
= invert(augment_ticker(tickers
["{}/{}".format(c2
, c1
)]))
145 def follow_orders(self
, sleep
=None):
147 sleep
= 7 if self
.debug
else 30
149 self
.report
.log_debug_action("Set follow_orders tick to {}s".format(sleep
))
151 self
.report
.log_stage("follow_orders_begin")
152 while len(self
.trades
.all_orders(state
="open")) > 0:
155 open_orders
= self
.trades
.all_orders(state
="open")
156 self
.report
.log_stage("follow_orders_tick_{}".format(tick
))
157 self
.report
.log_orders(open_orders
, tick
=tick
)
158 for order
in open_orders
:
159 if order
.get_status() != "open":
160 self
.report
.log_order(order
, tick
, finished
=True)
162 order
.trade
.update_order(order
, tick
)
163 self
.report
.log_stage("follow_orders_end")
165 def prepare_trades(self
, base_currency
="BTC", liquidity
="medium",
166 compute_value
="average", repartition
=None, only
=None):
168 self
.report
.log_stage("prepare_trades",
169 base_currency
=base_currency
, liquidity
=liquidity
,
170 compute_value
=compute_value
, only
=only
,
171 repartition
=repartition
)
173 values_in_base
= self
.balances
.in_currency(base_currency
,
174 compute_value
=compute_value
)
175 total_base_value
= sum(values_in_base
.values())
176 new_repartition
= self
.balances
.dispatch_assets(total_base_value
,
177 liquidity
=liquidity
, repartition
=repartition
)
178 self
.trades
.compute_trades(values_in_base
, new_repartition
, only
=only
)
181 def print_orders(self
, base_currency
="BTC"):
182 self
.report
.log_stage("print_orders")
183 self
.balances
.fetch_balances(tag
="print_orders")
184 self
.prepare_trades(base_currency
=base_currency
, compute_value
="average")
185 self
.trades
.prepare_orders(compute_value
="average")
187 def print_balances(self
, base_currency
="BTC"):
188 self
.report
.log_stage("print_balances")
189 self
.balances
.fetch_balances()
190 if base_currency
is not None:
191 self
.report
.print_log("total:")
192 self
.report
.print_log(sum(self
.balances
.in_currency(base_currency
).values()))
196 "wait_for_cryptoportfolio": [
202 "wait_for_recent": {},
211 "wait_for_recent": {},
214 "name": "make_orders",
218 "fetch_balances": ["begin"],
219 "prepare_trades": { "compute_value": "average" }
,
220 "prepare_orders": { "compute_value": "average" }
,
229 "wait_for_recent": {},
236 "fetch_balances": ["begin", "end"],
237 "prepare_trades": {},
238 "prepare_orders": { "only": "dispose", "compute_value": "average" }
,
248 "fetch_balances": ["begin", "end"],
249 "prepare_trades": { "only": "acquire" }
,
250 "prepare_orders": { "only": "acquire", "compute_value": "average" }
,
263 "fetch_balances": ["begin", "end"],
264 "prepare_trades": { "repartition": { "base_currency": (1, "long") }
},
265 "prepare_orders": { "compute_value": "average" }
,
275 "wait_for_recent": {},
282 "fetch_balances": ["begin", "end"],
283 "prepare_trades": {},
284 "prepare_orders": { "compute_value": "average" }
,
294 "wait_for_recent", "prepare_trades", "prepare_orders",
295 "move_balances", "run_orders", "follow_orders",
298 def __init__(self
, market
):
301 def select_steps(self
, scenario
, step
):
304 elif step
== "before" or step
== "after":
305 return list(filter(lambda x
: step
in x
and x
[step
], scenario
))
306 elif type(step
) == int:
307 return [scenario
[step
-1]]
308 elif type(step
) == str:
309 return list(filter(lambda x
: x
["name"] == step
, scenario
))
311 raise TypeError("Unknown step {}".format(step
))
313 def process(self
, scenario_name
, steps
="all", **kwargs
):
314 scenario
= self
.scenarios
[scenario_name
]
317 if type(steps
) == str or type(steps
) == int:
318 selected_steps
+= self
.select_steps(scenario
, steps
)
321 selected_steps
+= self
.select_steps(scenario
, step
)
322 for step
in selected_steps
:
323 self
.process_step(scenario_name
, step
, kwargs
)
325 def process_step(self
, scenario_name
, step
, kwargs
):
326 process_name
= "process_{}__{}_{}".format(scenario_name
, step
["number"], step
["name"])
327 self
.market
.report
.log_stage("{}_begin".format(process_name
))
328 if "begin" in step
.get("fetch_balances", []):
329 self
.market
.balances
.fetch_balances(tag
="{}_begin".format(process_name
))
331 for action
in self
.ordered_actions
:
333 self
.run_action(action
, step
[action
], kwargs
)
335 if "end" in step
.get("fetch_balances", []):
336 self
.market
.balances
.fetch_balances(tag
="{}_end".format(process_name
))
337 self
.market
.report
.log_stage("{}_end".format(process_name
))
339 def method_arguments(self
, action
):
342 if action
== "wait_for_recent":
343 method
= Portfolio
.wait_for_recent
344 elif action
== "prepare_trades":
345 method
= self
.market
.prepare_trades
346 elif action
== "prepare_orders":
347 method
= self
.market
.trades
.prepare_orders
348 elif action
== "move_balances":
349 method
= self
.market
.move_balances
350 elif action
== "run_orders":
351 method
= self
.market
.trades
.run_orders
352 elif action
== "follow_orders":
353 method
= self
.market
.follow_orders
354 elif action
== "close_trades":
355 method
= self
.market
.trades
.close_trades
357 signature
= inspect
.getfullargspec(method
)
358 defaults
= signature
.defaults
or []
359 kwargs
= signature
.args
[-len(defaults
):]
361 return [method
, kwargs
]
363 def parse_args(self
, action
, default_args
, kwargs
):
364 method
, allowed_arguments
= self
.method_arguments(action
)
365 args
= {k: v for k, v in {**default_args, **kwargs}
.items() if k
in allowed_arguments
}
367 if "repartition" in args
and "base_currency" in args
["repartition"]:
368 r
= args
["repartition"]
369 r
[args
.get("base_currency", "BTC")] = r
.pop("base_currency")
373 def run_action(self
, action
, default_args
, kwargs
):
374 method
, args
= self
.parse_args(action
, default_args
, kwargs
)