1 from ccxt
import ExchangeError
, NotSupported
, RequestTimeout
, InvalidNonce
2 import ccxt_wrapper
as ccxt
6 from cachetools
.func
import ttl_cache
7 from datetime
import datetime
8 from retry
import retry
18 def __init__(self
, ccxt_instance
, args
, **kwargs
):
20 self
.debug
= args
.debug
21 self
.ccxt
= ccxt_instance
22 self
.ccxt
._market
= self
23 self
.report
= ReportStore(self
, verbose_print
=(not args
.quiet
))
24 self
.trades
= TradeStore(self
)
25 self
.balances
= BalanceStore(self
)
26 self
.processor
= Processor(self
)
28 for key
in ["user_id", "market_id", "pg_config"]:
29 setattr(self
, key
, kwargs
.get(key
, None))
31 self
.report
.log_market(self
.args
, self
.user_id
, self
.market_id
)
34 def from_config(cls
, config
, args
, **kwargs
):
35 config
["apiKey"] = config
.pop("key", None)
37 ccxt_instance
= ccxt
.poloniexE(config
)
39 return cls(ccxt_instance
, args
, **kwargs
)
41 def store_report(self
):
42 self
.report
.merge(Portfolio
.report
)
44 if self
.args
.report_path
is not None:
45 self
.store_file_report(date
)
46 if self
.pg_config
is not None and self
.args
.report_db
:
47 self
.store_database_report(date
)
49 def store_file_report(self
, date
):
51 report_file
= "{}/{}_{}".format(self
.args
.report_path
, date
.isoformat(), self
.user_id
)
52 with open(report_file
+ ".json", "w") as f
:
53 f
.write(self
.report
.to_json())
54 with open(report_file
+ ".log", "w") as f
:
55 f
.write("\n".join(map(lambda x
: x
[1], self
.report
.print_logs
)))
56 except Exception as e
:
57 print("impossible to store report file: {}; {}".format(e
.__class
__.__name
__, e
))
59 def store_database_report(self
, date
):
61 report_query
= 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
62 line_query
= 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
63 connection
= psycopg2
.connect(**self
.pg_config
)
64 cursor
= connection
.cursor()
65 cursor
.execute(report_query
, (date
, self
.market_id
, self
.debug
))
66 report_id
= cursor
.fetchone()[0]
67 for date
, type_
, payload
in self
.report
.to_json_array():
68 cursor
.execute(line_query
, (date
, report_id
, type_
, payload
))
73 except Exception as e
:
74 print("impossible to store report to database: {}; {}".format(e
.__class
__.__name
__, e
))
76 def process(self
, actions
, before
=False, after
=False):
78 if len(actions
or []) == 0:
80 self
.processor
.process("sell_all", steps
="before")
82 self
.processor
.process("sell_all", steps
="after")
84 for action
in actions
:
85 if hasattr(self
, action
):
86 getattr(self
, action
)()
88 self
.report
.log_error("market_process", message
="Unknown action {}".format(action
))
89 except Exception as e
:
90 self
.report
.log_error("market_process", exception
=e
)
94 @retry((RequestTimeout
, InvalidNonce
), tries
=5)
95 def move_balances(self
):
99 for currency
, balance
in self
.balances
.all
.items():
100 needed_in_margin
[currency
] = balance
.margin_in_position
- balance
.margin_pending_gain
101 for trade
in self
.trades
.pending
:
102 needed_in_margin
.setdefault(trade
.base_currency
, 0)
103 if trade
.trade_type
== "short":
104 needed_in_margin
[trade
.base_currency
] -= trade
.delta
105 for currency
, needed
in needed_in_margin
.items():
106 current_balance
= self
.balances
.all
[currency
].margin_available
107 moving_to_margin
[currency
] = (needed
- current_balance
)
108 delta
= moving_to_margin
[currency
].value
109 action
= "Moving {} from exchange to margin".format(moving_to_margin
[currency
])
111 if self
.debug
and delta
!= 0:
112 self
.report
.log_debug_action(action
)
116 self
.ccxt
.transfer_balance(currency
, delta
, "exchange", "margin")
118 self
.ccxt
.transfer_balance(currency
, -delta
, "margin", "exchange")
119 except (RequestTimeout
, InvalidNonce
) as e
:
120 self
.report
.log_error(action
, message
="Retrying", exception
=e
)
121 self
.report
.log_move_balances(needed_in_margin
, moving_to_margin
)
122 self
.balances
.fetch_balances()
124 self
.report
.log_move_balances(needed_in_margin
, moving_to_margin
)
126 self
.balances
.fetch_balances()
129 def fetch_fees(self
):
130 return self
.ccxt
.fetch_fees()
132 @ttl_cache(maxsize
=20, ttl
=5)
133 def get_tickers(self
, refresh
=False):
135 return self
.ccxt
.fetch_tickers()
139 @ttl_cache(maxsize
=20, ttl
=5)
140 def get_ticker(self
, c1
, c2
, refresh
=False):
144 "average": (1/ticker
["bid"] + 1/ticker
["ask"]) / 2,
147 def augment_ticker(ticker
):
150 "average": (ticker
["bid"] + ticker
["ask"] ) / 2,
154 tickers
= self
.get_tickers()
157 ticker
= augment_ticker(self
.ccxt
.fetch_ticker("{}/{}".format(c1
, c2
)))
158 except ExchangeError
:
160 ticker
= invert(augment_ticker(self
.ccxt
.fetch_ticker("{}/{}".format(c2
, c1
))))
161 except ExchangeError
:
164 if "{}/{}".format(c1
, c2
) in tickers
:
165 ticker
= augment_ticker(tickers
["{}/{}".format(c1
, c2
)])
166 elif "{}/{}".format(c2
, c1
) in tickers
:
167 ticker
= invert(augment_ticker(tickers
["{}/{}".format(c2
, c1
)]))
172 def follow_orders(self
, sleep
=None):
174 sleep
= 7 if self
.debug
else 30
176 self
.report
.log_debug_action("Set follow_orders tick to {}s".format(sleep
))
178 self
.report
.log_stage("follow_orders_begin")
179 while len(self
.trades
.all_orders(state
="open")) > 0:
182 open_orders
= self
.trades
.all_orders(state
="open")
183 self
.report
.log_stage("follow_orders_tick_{}".format(tick
))
184 self
.report
.log_orders(open_orders
, tick
=tick
)
185 for order
in open_orders
:
186 status
= order
.get_status()
188 self
.report
.log_order(order
, tick
, finished
=True)
190 order
.trade
.update_order(order
, tick
)
191 if status
== "error_disappeared":
192 self
.report
.log_error("follow_orders",
193 message
="{} disappeared, recreating it".format(order
))
194 order
.trade
.prepare_order(
195 compute_value
=order
.trade
.tick_actions_recreate(tick
))
197 self
.report
.log_stage("follow_orders_end")
199 def prepare_trades(self
, base_currency
="BTC", liquidity
="medium",
200 compute_value
="average", repartition
=None, only
=None):
202 self
.report
.log_stage("prepare_trades",
203 base_currency
=base_currency
, liquidity
=liquidity
,
204 compute_value
=compute_value
, only
=only
,
205 repartition
=repartition
)
207 values_in_base
= self
.balances
.in_currency(base_currency
,
208 compute_value
=compute_value
)
209 total_base_value
= sum(values_in_base
.values())
210 new_repartition
= self
.balances
.dispatch_assets(total_base_value
,
211 liquidity
=liquidity
, repartition
=repartition
)
212 self
.trades
.compute_trades(values_in_base
, new_repartition
, only
=only
)
215 def print_orders(self
, base_currency
="BTC"):
216 self
.report
.log_stage("print_orders")
217 self
.balances
.fetch_balances(tag
="print_orders")
218 self
.prepare_trades(base_currency
=base_currency
, compute_value
="average")
219 self
.trades
.prepare_orders(compute_value
="average")
221 def print_balances(self
, base_currency
="BTC"):
222 self
.report
.log_stage("print_balances")
223 self
.balances
.fetch_balances()
224 if base_currency
is not None:
225 self
.report
.print_log("total:")
226 self
.report
.print_log(sum(self
.balances
.in_currency(base_currency
).values()))
230 "wait_for_cryptoportfolio": [
236 "wait_for_recent": {},
245 "wait_for_recent": {},
248 "name": "make_orders",
252 "fetch_balances": ["begin"],
253 "prepare_trades": { "compute_value": "average" }
,
254 "prepare_orders": { "compute_value": "average" }
,
263 "wait_for_recent": {},
270 "fetch_balances": ["begin", "end"],
271 "prepare_trades": {},
272 "prepare_orders": { "only": "dispose", "compute_value": "average" }
,
282 "fetch_balances": ["begin", "end"],
283 "prepare_trades": { "only": "acquire" }
,
284 "prepare_orders": { "only": "acquire", "compute_value": "average" }
,
297 "fetch_balances": ["begin", "end"],
298 "prepare_trades": { "repartition": { "base_currency": (1, "long") }
},
299 "prepare_orders": { "compute_value": "average" }
,
309 "wait_for_recent": {},
316 "fetch_balances": ["begin", "end"],
317 "prepare_trades": {},
318 "prepare_orders": { "compute_value": "average" }
,
328 "wait_for_recent", "prepare_trades", "prepare_orders",
329 "move_balances", "run_orders", "follow_orders",
332 def __init__(self
, market
):
335 def select_steps(self
, scenario
, step
):
338 elif step
== "before" or step
== "after":
339 return list(filter(lambda x
: step
in x
and x
[step
], scenario
))
340 elif type(step
) == int:
341 return [scenario
[step
-1]]
342 elif type(step
) == str:
343 return list(filter(lambda x
: x
["name"] == step
, scenario
))
345 raise TypeError("Unknown step {}".format(step
))
347 def process(self
, scenario_name
, steps
="all", **kwargs
):
348 scenario
= self
.scenarios
[scenario_name
]
351 if type(steps
) == str or type(steps
) == int:
352 selected_steps
+= self
.select_steps(scenario
, steps
)
355 selected_steps
+= self
.select_steps(scenario
, step
)
356 for step
in selected_steps
:
357 self
.process_step(scenario_name
, step
, kwargs
)
359 def process_step(self
, scenario_name
, step
, kwargs
):
360 process_name
= "process_{}__{}_{}".format(scenario_name
, step
["number"], step
["name"])
361 self
.market
.report
.log_stage("{}_begin".format(process_name
))
362 if "begin" in step
.get("fetch_balances", []):
363 self
.market
.balances
.fetch_balances(tag
="{}_begin".format(process_name
))
365 for action
in self
.ordered_actions
:
367 self
.run_action(action
, step
[action
], kwargs
)
369 if "end" in step
.get("fetch_balances", []):
370 self
.market
.balances
.fetch_balances(tag
="{}_end".format(process_name
))
371 self
.market
.report
.log_stage("{}_end".format(process_name
))
373 def method_arguments(self
, action
):
376 if action
== "wait_for_recent":
377 method
= Portfolio
.wait_for_recent
378 elif action
== "prepare_trades":
379 method
= self
.market
.prepare_trades
380 elif action
== "prepare_orders":
381 method
= self
.market
.trades
.prepare_orders
382 elif action
== "move_balances":
383 method
= self
.market
.move_balances
384 elif action
== "run_orders":
385 method
= self
.market
.trades
.run_orders
386 elif action
== "follow_orders":
387 method
= self
.market
.follow_orders
388 elif action
== "close_trades":
389 method
= self
.market
.trades
.close_trades
391 signature
= inspect
.getfullargspec(method
)
392 defaults
= signature
.defaults
or []
393 kwargs
= signature
.args
[-len(defaults
):]
395 return [method
, kwargs
]
397 def parse_args(self
, action
, default_args
, kwargs
):
398 method
, allowed_arguments
= self
.method_arguments(action
)
399 args
= {k: v for k, v in {**default_args, **kwargs}
.items() if k
in allowed_arguments
}
401 if "repartition" in args
and "base_currency" in args
["repartition"]:
402 r
= args
["repartition"]
403 r
[args
.get("base_currency", "BTC")] = r
.pop("base_currency")
407 def run_action(self
, action
, default_args
, kwargs
):
408 method
, args
= self
.parse_args(action
, default_args
, kwargs
)