- 在“本地代码”选择输入需要订阅的行情,如rb1905.SHFE;
- 然后点击后边“K线记录”或者“Tick记录”中的“添加”选项,会把记录特定品种任务添加到data_recorder_setting.json上,并且显示到“K线记录列表”或者“Tick记录列表”中,如图。
- 通过queue.put()与queue.get()异步完成收录行情信息任务。
- 先创建tick_recordings字典;
- 调用接口的suscribe()函数订阅行情;3 )保存该tick_recordings字典到json文件上;
- 推送行情记录事件。
- def subscribe(self, contract: ContractData):
- """"""
- req = SubscribeRequest(
- symbol=contract.symbol,
- exchange=contract.exchange
- )
- self.main_engine.subscribe(req, contract.gateway_name)
- 该json文件用于存放行情记录的任务,当每次启动行情模块后,会调用load_setting()函数来得到tick_recordings和bar_recordings字典,进而开始记录的任务。
- 创建行情记录列表tick_symbols和bar_symbols,并且缓存在data字典里;
- 创建evnte对象,其类型是EVENT_RECORDER_UPDATE, 内容是data字典;
- 调用event_engine的put()函数推送event事件。
- def put_event(self):
- """"""
- tick_symbols = list(self.tick_recordings.keys())
- tick_symbols.sort()
-
- bar_symbols = list(self.bar_recordings.keys())
- bar_symbols.sort()
-
- data = {
- "tick": tick_symbols,
- "bar": bar_symbols
- }
-
- event = Event(
- data
- )
- self.event_engine.put(event)
- EVENT_CONTRACT事件,调用的是process_contract_event()函数: 从tick_recordings和bar_recordings字典获取需要订阅的合约品种;然后使用subscribe()函数进行订阅行情。
- EVENT_TICK事件,调用的是process_tick_event()函数:从tick_recordings和bar_recordings字典获取需要订阅的合约品种;然后使用record_tick()和record_bar()函数,把行情记录任务推送到queue队列中等待执行。
- def run(self):
- """"""
- while self.active:
- try:
- task = self.queue.get(timeout=1)
- task_type, data = task
-
- if task_type == "tick":
- database_manager.save_tick_data([data])
- elif task_type == "bar":
- database_manager.save_bar_data([data])
-
- except Empty: