1 from ccxt
import AuthenticationError
, ExchangeError
, NotSupported
, RequestTimeout
, InvalidNonce
2 import ccxt_wrapper
as ccxt
6 from cachetools
.func
import ttl_cache
7 from datetime
import datetime
9 from retry
import retry
20 def __init__(self
, ccxt_instance
, args
, **kwargs
):
22 self
.debug
= args
.debug
23 self
.ccxt
= ccxt_instance
24 self
.ccxt
._market
= self
25 self
.report
= ReportStore(self
, verbose_print
=(not args
.quiet
))
26 self
.trades
= TradeStore(self
)
27 self
.balances
= BalanceStore(self
)
28 self
.processor
= Processor(self
)
30 self
.options
= kwargs
.get("options", {})
31 for key
in ["user_id", "market_id"]:
32 setattr(self
, key
, kwargs
.get(key
, None))
34 self
.report
.log_market(self
.args
)
37 def from_config(cls
, config
, args
, **kwargs
):
38 config
["apiKey"] = config
.pop("key", None)
40 ccxt_instance
= ccxt
.poloniexE(config
)
42 return cls(ccxt_instance
, args
, **kwargs
)
44 def store_report(self
):
45 self
.report
.merge(Portfolio
.report
)
46 date
= datetime
.datetime
.now()
47 if self
.args
.report_path
is not None:
48 self
.store_file_report(date
)
49 if dbs
.psql_connected() and self
.args
.report_db
:
50 self
.store_database_report(date
)
51 if dbs
.redis_connected() and self
.args
.report_redis
:
52 self
.store_redis_report(date
)
54 def store_file_report(self
, date
):
56 report_file
= "{}/{}_{}".format(self
.args
.report_path
, date
.isoformat(), self
.user_id
)
57 with open(report_file
+ ".json", "w") as f
:
58 f
.write(self
.report
.to_json())
59 with open(report_file
+ ".log", "w") as f
:
60 f
.write("\n".join(map(lambda x
: x
[1], self
.report
.print_logs
)))
61 except Exception as e
:
62 print("impossible to store report file: {}; {}".format(e
.__class
__.__name
__, e
))
64 def store_database_report(self
, date
):
66 report_query
= 'INSERT INTO reports("date", "market_config_id", "debug") VALUES (%s, %s, %s) RETURNING id;'
67 line_query
= 'INSERT INTO report_lines("date", "report_id", "type", "payload") VALUES (%s, %s, %s, %s);'
68 cursor
= dbs
.psql
.cursor()
69 cursor
.execute(report_query
, (date
, self
.market_id
, self
.debug
))
70 report_id
= cursor
.fetchone()[0]
71 for date
, type_
, payload
in self
.report
.to_json_array():
72 cursor
.execute(line_query
, (date
, report_id
, type_
, payload
))
76 except Exception as e
:
77 print("impossible to store report to database: {}; {}".format(e
.__class
__.__name
__, e
))
79 def store_redis_report(self
, date
):
81 for type_
, log
in self
.report
.to_json_redis():
82 key
= "/cryptoportfolio/{}/{}/{}".format(self
.market_id
, date
.isoformat(), type_
)
83 dbs
.redis
.set(key
, log
, ex
=31*24*60*60)
84 key
= "/cryptoportfolio/{}/latest/{}".format(self
.market_id
, type_
)
85 dbs
.redis
.set(key
, log
)
86 key
= "/cryptoportfolio/{}/latest/date".format(self
.market_id
)
87 dbs
.redis
.set(key
, date
.isoformat())
88 except Exception as e
:
89 print("impossible to store report to redis: {}; {}".format(e
.__class
__.__name
__, e
))
91 def process(self
, actions
, before
=False, after
=False):
93 self
.ccxt
.check_required_credentials()
94 for action
in actions
:
95 if bool(before
) is bool(after
):
96 self
.processor
.process(action
, steps
="all", options
=self
.options
)
98 self
.processor
.process(action
, steps
="before", options
=self
.options
)
100 self
.processor
.process(action
, steps
="after", options
=self
.options
)
101 except AuthenticationError
:
102 self
.report
.log_error("market_authentication", message
="Impossible to authenticate to market")
103 except Exception as e
:
105 self
.report
.log_error("market_process", exception
=e
, message
=traceback
.format_exc())
109 @retry((RequestTimeout
, InvalidNonce
), tries
=5)
110 def move_balances(self
):
111 needed_in_margin
= {}
112 moving_to_margin
= {}
114 for currency
, balance
in self
.balances
.all
.items():
115 needed_in_margin
[currency
] = balance
.margin_in_position
- balance
.margin_pending_gain
116 for trade
in self
.trades
.pending
:
117 needed_in_margin
.setdefault(trade
.base_currency
, 0)
118 if trade
.trade_type
== "short":
119 needed_in_margin
[trade
.base_currency
] -= trade
.delta
120 for currency
, needed
in needed_in_margin
.items():
121 current_balance
= self
.balances
.all
[currency
].margin_available
122 moving_to_margin
[currency
] = (needed
- current_balance
)
123 delta
= moving_to_margin
[currency
].value
124 action
= "Moving {} from exchange to margin".format(moving_to_margin
[currency
])
126 if self
.debug
and delta
!= 0:
127 self
.report
.log_debug_action(action
)
131 self
.ccxt
.transfer_balance(currency
, delta
, "exchange", "margin")
133 self
.ccxt
.transfer_balance(currency
, -delta
, "margin", "exchange")
134 except (RequestTimeout
, InvalidNonce
) as e
:
135 self
.report
.log_error(action
, message
="Retrying", exception
=e
)
136 self
.report
.log_move_balances(needed_in_margin
, moving_to_margin
)
137 self
.balances
.fetch_balances()
139 self
.report
.log_move_balances(needed_in_margin
, moving_to_margin
)
141 self
.balances
.fetch_balances()
144 def fetch_fees(self
):
145 return self
.ccxt
.fetch_fees()
147 @ttl_cache(maxsize
=20, ttl
=5)
148 def get_tickers(self
, refresh
=False):
150 return self
.ccxt
.fetch_tickers()
154 @ttl_cache(maxsize
=20, ttl
=5)
155 def get_ticker(self
, c1
, c2
, refresh
=False):
159 "average": (1/ticker
["bid"] + 1/ticker
["ask"]) / 2,
162 def augment_ticker(ticker
):
165 "average": (ticker
["bid"] + ticker
["ask"] ) / 2,
169 tickers
= self
.get_tickers()
172 ticker
= augment_ticker(self
.ccxt
.fetch_ticker("{}/{}".format(c1
, c2
)))
173 except ExchangeError
:
175 ticker
= invert(augment_ticker(self
.ccxt
.fetch_ticker("{}/{}".format(c2
, c1
))))
176 except ExchangeError
:
179 if "{}/{}".format(c1
, c2
) in tickers
:
180 ticker
= augment_ticker(tickers
["{}/{}".format(c1
, c2
)])
181 elif "{}/{}".format(c2
, c1
) in tickers
:
182 ticker
= invert(augment_ticker(tickers
["{}/{}".format(c2
, c1
)]))
187 def follow_orders(self
, sleep
=None):
189 sleep
= 7 if self
.debug
else 30
191 self
.report
.log_debug_action("Set follow_orders tick to {}s".format(sleep
))
193 self
.report
.log_stage("follow_orders_begin")
194 while len(self
.trades
.all_orders(state
="open")) > 0:
197 open_orders
= self
.trades
.all_orders(state
="open")
198 self
.report
.log_stage("follow_orders_tick_{}".format(tick
))
199 self
.report
.log_orders(open_orders
, tick
=tick
)
200 for order
in open_orders
:
201 status
= order
.get_status()
203 self
.report
.log_order(order
, tick
, finished
=True)
205 order
.trade
.update_order(order
, tick
)
206 if status
== "error_disappeared":
207 self
.report
.log_error("follow_orders",
208 message
="{} disappeared, recreating it".format(order
))
209 new_order
= order
.trade
.prepare_order(
210 compute_value
=order
.trade
.tick_actions_recreate(tick
))
211 if new_order
is not None:
213 self
.report
.log_order(order
, tick
, new_order
=new_order
)
215 self
.report
.log_stage("follow_orders_end")
217 def prepare_trades(self
, base_currency
="BTC", liquidity
="medium",
218 compute_value
="average", repartition
=None, only
=None,
219 available_balance_only
=False):
221 self
.report
.log_stage("prepare_trades",
222 base_currency
=base_currency
, liquidity
=liquidity
,
223 compute_value
=compute_value
, only
=only
,
224 repartition
=repartition
, available_balance_only
=available_balance_only
)
226 if available_balance_only
:
227 repartition
, total_base_value
, values_in_base
= self
.balances
.available_balances_for_repartition(
228 base_currency
=base_currency
, liquidity
=liquidity
,
229 repartition
=repartition
, compute_value
=compute_value
)
231 values_in_base
= self
.balances
.in_currency(base_currency
,
232 compute_value
=compute_value
)
233 total_base_value
= sum(values_in_base
.values())
234 new_repartition
= self
.balances
.dispatch_assets(total_base_value
,
235 liquidity
=liquidity
, repartition
=repartition
)
236 if available_balance_only
:
237 for currency
, amount
in values_in_base
.items():
238 if currency
!= base_currency
and currency
not in new_repartition
:
239 new_repartition
[currency
] = amount
241 self
.trades
.compute_trades(values_in_base
, new_repartition
, only
=only
)
243 def print_tickers(self
, base_currency
="BTC"):
244 if base_currency
is not None:
245 self
.report
.print_log("total:")
246 self
.report
.print_log(sum(self
.balances
.in_currency(base_currency
).values()))
250 "wait_for_cryptoportfolio": [
256 "wait_for_recent": {},
261 "name": "print_balances",
263 "fetch_balances_begin": {
266 "add_portfolio": True
268 "print_tickers": { "base_currency": "BTC" }
,
277 "wait_for_recent": {},
280 "name": "make_orders",
284 "fetch_balances_begin": {},
285 "prepare_trades": { "compute_value": "average" }
,
286 "prepare_orders": { "compute_value": "average" }
,
291 "name": "print_balances",
295 "fetch_balances_begin": {
299 "add_portfolio": True
307 "wait_for_recent": {},
314 "fetch_balances_begin": {},
315 "fetch_balances_end": {},
316 "prepare_trades": {},
317 "prepare_orders": { "only": "dispose", "compute_value": "average" }
,
327 "fetch_balances_begin": {},
328 "fetch_balances_end": {
329 "checkpoint": "begin",
333 "prepare_trades": { "only": "acquire", "available_balance_only": True }
,
334 "prepare_orders": { "only": "acquire", "compute_value": "average" }
,
347 "fetch_balances_begin": {
351 "add_portfolio": True
353 "fetch_balances_end": {},
354 "prepare_trades": { "repartition": { "base_currency": (1, "long") }
},
355 "prepare_orders": { "compute_value": "average" }
,
365 "wait_for_recent": {},
372 "fetch_balances_begin": {},
373 "fetch_balances_end": {
374 "checkpoint": "begin",
378 "prepare_trades": { "available_balance_only": True }
,
379 "prepare_orders": { "compute_value": "average" }
,
389 "wait_for_recent", "prepare_trades", "prepare_orders",
390 "move_balances", "run_orders", "follow_orders",
391 "close_trades", "print_tickers"]
393 def __init__(self
, market
):
396 def select_steps(self
, scenario
, step
):
399 elif step
== "before" or step
== "after":
400 return list(filter(lambda x
: x
.get(step
, False), scenario
))
401 elif type(step
) == int:
402 return [scenario
[step
-1]]
403 elif type(step
) == str:
404 return list(filter(lambda x
: x
["name"] == step
, scenario
))
406 raise TypeError("Unknown step {}".format(step
))
408 def can_process(self
, scenario_name
):
409 return scenario_name
in self
.scenarios
411 def process(self
, scenario_name
, steps
="all", options
={}):
412 if not self
.can_process(scenario_name
):
413 raise TypeError("Unknown scenario {}".format(scenario_name
))
414 scenario
= self
.scenarios
[scenario_name
]
417 if type(steps
) == str or type(steps
) == int:
418 selected_steps
+= self
.select_steps(scenario
, steps
)
421 selected_steps
+= self
.select_steps(scenario
, step
)
422 for step
in selected_steps
:
423 self
.process_step(scenario_name
, step
, options
)
425 def process_step(self
, scenario_name
, step
, options
):
426 process_name
= "process_{}__{}_{}".format(scenario_name
, step
["number"], step
["name"])
427 self
.market
.report
.log_stage("{}_begin".format(process_name
))
429 if "fetch_balances_begin" in step
:
430 self
.run_action("fetch_balances", step
["fetch_balances_begin"],
431 dict(options
, tag
="{}_begin".format(process_name
)))
433 for action
in self
.ordered_actions
:
435 self
.run_action(action
, step
[action
], options
)
437 if "fetch_balances_end" in step
:
438 self
.run_action("fetch_balances", step
["fetch_balances_end"],
439 dict(options
, tag
="{}_end".format(process_name
)))
441 self
.market
.report
.log_stage("{}_end".format(process_name
))
443 def method_arguments(self
, action
):
446 if action
== "wait_for_recent":
447 method
= Portfolio
.wait_for_recent
448 elif action
== "prepare_trades":
449 method
= self
.market
.prepare_trades
450 elif action
== "prepare_orders":
451 method
= self
.market
.trades
.prepare_orders
452 elif action
== "move_balances":
453 method
= self
.market
.move_balances
454 elif action
== "run_orders":
455 method
= self
.market
.trades
.run_orders
456 elif action
== "follow_orders":
457 method
= self
.market
.follow_orders
458 elif action
== "close_trades":
459 method
= self
.market
.trades
.close_trades
460 elif action
== "print_tickers":
461 method
= self
.market
.print_tickers
462 elif action
== "fetch_balances":
463 method
= self
.market
.balances
.fetch_balances
465 signature
= inspect
.getfullargspec(method
)
466 defaults
= signature
.defaults
or []
467 kwargs
= signature
.args
[-len(defaults
):]
469 return [method
, kwargs
]
471 def parse_args(self
, action
, default_args
, options
):
472 method
, allowed_arguments
= self
.method_arguments(action
)
473 args
= {k: v for k, v in {**default_args, **options}
.items() if k
in allowed_arguments
}
475 if "repartition" in args
and "base_currency" in args
["repartition"]:
476 r
= args
["repartition"]
477 r
[args
.get("base_currency", "BTC")] = r
.pop("base_currency")
481 def run_action(self
, action
, default_args
, options
):
482 method
, args
= self
.parse_args(action
, default_args
, options
)