• 在“本地代码”选择输入需要订阅的行情,如rb1905.SHFE;
    • 然后点击后边“K线记录”或者“Tick记录”中的“添加”选项,会把记录特定品种任务添加到data_recorder_setting.json上,并且显示到“K线记录列表”或者“Tick记录列表”中,如图。
    • 通过queue.put()与queue.get()异步完成收录行情信息任务。
    • 先创建tick_recordings字典;
    • 调用接口的suscribe()函数订阅行情;3 )保存该tick_recordings字典到json文件上;
    • 推送行情记录事件。
    1. def subscribe(self, contract: ContractData):
    2. """"""
    3. req = SubscribeRequest(
    4. symbol=contract.symbol,
    5. exchange=contract.exchange
    6. )
    7. self.main_engine.subscribe(req, contract.gateway_name)
    • 该json文件用于存放行情记录的任务,当每次启动行情模块后,会调用load_setting()函数来得到tick_recordings和bar_recordings字典,进而开始记录的任务。
    • 创建行情记录列表tick_symbols和bar_symbols,并且缓存在data字典里;
    • 创建evnte对象,其类型是EVENT_RECORDER_UPDATE, 内容是data字典;
    • 调用event_engine的put()函数推送event事件。
    1. def put_event(self):
    2. """"""
    3. tick_symbols = list(self.tick_recordings.keys())
    4. tick_symbols.sort()
    5.  
    6. bar_symbols = list(self.bar_recordings.keys())
    7. bar_symbols.sort()
    8.  
    9. data = {
    10. "tick": tick_symbols,
    11. "bar": bar_symbols
    12. }
    13.  
    14. event = Event(
    15. data
    16. )
    17. self.event_engine.put(event)
    • EVENT_CONTRACT事件,调用的是process_contract_event()函数: 从tick_recordings和bar_recordings字典获取需要订阅的合约品种;然后使用subscribe()函数进行订阅行情。
    • EVENT_TICK事件,调用的是process_tick_event()函数:从tick_recordings和bar_recordings字典获取需要订阅的合约品种;然后使用record_tick()和record_bar()函数,把行情记录任务推送到queue队列中等待执行。
    1. def run(self):
    2. """"""
    3. while self.active:
    4. try:
    5. task = self.queue.get(timeout=1)
    6. task_type, data = task
    7.  
    8. if task_type == "tick":
    9. database_manager.save_tick_data([data])
    10. elif task_type == "bar":
    11. database_manager.save_bar_data([data])
    12.  
    13. except Empty: