众力资讯网

基于强化学习的量化交易框架 TensorTrade

打开交易图表,堆上十个技术指标,然后对着屏幕发呆不知道下一步怎么操作——这场景对交易员来说太熟悉了。如果把历史数据丢给计

打开交易图表,堆上十个技术指标,然后对着屏幕发呆不知道下一步怎么操作——这场景对交易员来说太熟悉了。如果把历史数据丢给计算机,告诉它“去试错”。赚了有奖励,亏了有惩罚。让它在不断的尝试和失败中学习,最终迭代出一个不说完美、但至少能逻辑自洽的交易策略。

这就是 TensorTrade 的核心逻辑。

TensorTrade 是一个专注于利用 强化学习 (Reinforcement Learning, RL) 构建和训练交易算法的开源 Python 框架。

数据获取与特征工程

这里用 yfinance 抓取数据,配合 pandas_ta 计算技术指标。对数收益率 (Log Returns)、RSI 和 MACD 是几个比较基础的特征输入。

pip install yfinance pandas_taimport yfinance as yf   import pandas_ta as ta   import pandas as pd    # Pick your ticker   TICKER = "TTRD"  # TODO: change this to something real, e.g. "AAPL", "BTC-USD"   TRAIN_START_DATE = "2021-02-09"   TRAIN_END_DATE   = "2021-09-30"   EVAL_START_DATE  = "2021-10-01"   EVAL_END_DATE    = "2021-11-12"    def build_dataset(ticker, start, end, filename):      # 1. Download hourly OHLCV data      df = yf.Ticker(ticker).history(          start=start,          end=end,          interval="60m"      )      # 2. Clean up      df = df.drop(["Dividends", "Stock Splits"], axis=1)      df["Volume"] = df["Volume"].astype(int)      # 3. Add some basic features      df.ta.log_return(append=True, length=16)      df.ta.rsi(append=True, length=14)      df.ta.macd(append=True, fast=12, slow=26)      # 4. Move Datetime from index to column      df = df.reset_index()      # 5. Save      df.to_csv(filename, index=False)      print(f"Saved {filename} with {len(df)} rows")    build_dataset(TICKER, TRAIN_START_DATE, TRAIN_END_DATE, "training.csv")   build_dataset(TICKER, EVAL_START_DATE,  EVAL_END_DATE,  "evaluation.csv")

脚本跑完,目录下会生成 training.csv 和 evaluation.csv。包含了 OHLCV 基础数据和几个预处理好的指标。这些就是训练 RL 模型的数据。

构建 TensorTrade 交互环境

强化学习没法直接使用CSV 文件。所以需要一个标准的交互 环境 (Environment):能够输出当前状态 (State),接收智能体的动作 (Action),并反馈奖励 (Reward)。

TensorTrade 把这个过程模块化了:

Instrument:定义交易标的(如 USD, TTRD)。

Wallet:管理资产余额。

Portfolio:钱包组合。

Stream / DataFeed:处理特征数据流。

reward_scheme / action_scheme:定义怎么操作,以及操作的好坏怎么评分。

pip install tensortrade

下面是一个环境工厂函数 (Environment Factory) 的实现,设计得比较轻量,这样可以方便后续接入 Ray:

import os  import pandas as pd    from tensortrade.feed.core import DataFeed, Stream  from tensortrade.oms.instruments import Instrument  from tensortrade.oms.exchanges import Exchange, ExchangeOptions  from tensortrade.oms.services.execution.simulated import execute_order  from tensortrade.oms.wallets import Wallet, Portfolio  import tensortrade.env.default as default    def create_env(config):      """      Build a TensorTrade environment from a CSV.      config needs:        - csv_filename        - window_size        - reward_window_size        - max_allowed_loss      """      # 1. Read the dataset      dataset = (          pd.read_csv(config["csv_filename"], parse_dates=["Datetime"])          .fillna(method="backfill")          .fillna(method="ffill")      )      # 2. Price stream (we'll trade on Close)      commission = 0.0035  # 0.35%, tweak this to your broker      price = Stream.source(          list(dataset["Close"]), dtype="float"      ).rename("USD-TTRD")      options = ExchangeOptions(commission=commission)      exchange = Exchange("TTSE", service=execute_order, options=options)(price)      # 3. Instruments and wallets      USD = Instrument("USD", 2, "US Dollar")      TTRD = Instrument("TTRD", 2, "TensorTrade Corp")  # just a label      cash_wallet = Wallet(exchange, 1000 * USD)  # start with $1000      asset_wallet = Wallet(exchange, 0 * TTRD)   # start with zero TTRD      portfolio = Portfolio(USD, [cash_wallet, asset_wallet])      # 4. Renderer feed (optional, useful for plotting later)      renderer_feed = DataFeed([          Stream.source(list(dataset["Datetime"])).rename("date"),          Stream.source(list(dataset["Open"]), dtype="float").rename("open"),          Stream.source(list(dataset["High"]), dtype="float").rename("high"),          Stream.source(list(dataset["Low"]), dtype="float").rename("low"),          Stream.source(list(dataset["Close"]), dtype="float").rename("close"),          Stream.source(list(dataset["Volume"]), dtype="float").rename("volume"),      ])      renderer_feed.compile()      # 5. Feature feed for the RL agent      features = []      # Skip Datetime (first column) and stream everything else      for col in dataset.columns[1:]:          s = Stream.source(list(dataset[col]), dtype="float").rename(col)          features.append(s)      feed = DataFeed(features)      feed.compile()      # 6. Reward and action scheme      reward_scheme = default.rewards.SimpleProfit(          window_size=config["reward_window_size"]      )      action_scheme = default.actions.BSH(          cash=cash_wallet,          asset=asset_wallet      )      # 7. Put everything together in an environment      env = default.create(          portfolio=portfolio,          action_scheme=action_scheme,          reward_scheme=reward_scheme,          feed=feed,          renderer=[],          renderer_feed=renderer_feed,          window_size=config["window_size"],          max_allowed_loss=config["max_allowed_loss"]      )      return env

这样“游戏”规则就已经定好了:观察最近 N 根 K 线和指标(State),决定买卖持(Action),目标是让一段时间内的利润最大化(Reward)。

基于 Ray RLlib 与 PPO 算法的模型训练

底层环境搭好,接下来让 Ray RLlib 介入处理 RL 的核心逻辑。

选用 PPO (Proximal Policy Optimization) 算法,这在连续控制和离散动作空间都有不错的表现。为了找到更优解,顺手做一个简单的超参数网格搜索:网络架构、学习率、Minibatch 大小,都跑一遍试试。

pip install "ray[rllib]"

训练脚本如下:

import os  import ray  from ray import tune  from ray.tune.registry import register_env    from your_module import create_env  # wherever you defined create_env    # Some hyperparameter grids to try  FC_SIZE = tune.grid_search([      [256, 256],      [1024],      [128, 64, 32],  ])  LEARNING_RATE = tune.grid_search([      0.001,      0.0005,      0.00001,  ])  MINIBATCH_SIZE = tune.grid_search([      5,      10,      20,  ])  cwd = os.getcwd()  # Register our custom environment with RLlib  register_env("MyTrainingEnv", lambda cfg: create_env(cfg))  env_config_training = {      "window_size": 14,      "reward_window_size": 7,      "max_allowed_loss": 0.10,  # cut episodes early if loss > 10%      "csv_filename": os.path.join(cwd, "training.csv"),  }  env_config_evaluation = {      "max_allowed_loss": 1.00,      "csv_filename": os.path.join(cwd, "evaluation.csv"),  }  ray.init(ignore_reinit_error=True)  analysis = tune.run(      run_or_experiment="PPO",      name="MyExperiment1",      metric="episode_reward_mean",      mode="max",      stop={          "training_iteration": 5,  # small for demo, increase in real runs      },      config={          "env": "MyTrainingEnv",          "env_config": env_config_training,          "log_level": "WARNING",          "framework": "torch",     # or "tf"          "ignore_worker_failures": True,          "num_workers": 1,          "num_envs_per_worker": 1,          "num_gpus": 0,          "clip_rewards": True,          "lr": LEARNING_RATE,          "gamma": 0.50,            # discount factor          "observation_filter": "MeanStdFilter",          "model": {              "fcnet_hiddens": FC_SIZE,          },          "sgd_minibatch_size": MINIBATCH_SIZE,          "evaluation_interval": 1,          "evaluation_config": {              "env_config": env_config_evaluation,              "explore": False,     # no exploration during evaluation          },      },      num_samples=1,      keep_checkpoints_num=10,      checkpoint_freq=1,  )

这段代码本质上是在运行一场“交易机器人锦标赛”。Ray 会根据定义的参数组合并行训练多个 PPO 智能体,追踪它们的平均回合奖励,并保存下表现最好的 Checkpoint 供后续调用。

自定义奖励机制 (PBR)

默认的 SimpleProfit 奖励逻辑很简单,但实战中往往过于粗糙。我们有时需要根据具体的交易逻辑来重塑奖励函数。比如说基于持仓的奖励方案 PBR (Position-Based Reward):

维护当前持仓状态(多头或空头)。

监控价格变动。

奖励计算 = 价格变动 × 持仓方向。

价格涨了你做多,给正反馈;价格跌了你做空,也给正反馈。反之则是惩罚。

from tensortrade.env.default.rewards import RewardScheme   from tensortrade.feed.core import DataFeed, Stream   class PBR(RewardScheme):      """      Position-Based Reward (PBR)      Rewards the agent based on price changes and its current position.      """      registered_name = "pbr"      def __init__(self, price: Stream):          super().__init__()          self.position = -1  # start flat/short          # Price differences          r = Stream.sensor(price, lambda p: p.value, dtype="float").diff()          # Position stream          position = Stream.sensor(self, lambda rs: rs.position, dtype="float")          # Reward = price_change * position          reward = (r * position).fillna(0).rename("reward")          self.feed = DataFeed([reward])          self.feed.compile()      def on_action(self, action: int):          # Simple mapping: action 0 = long, everything else = short          self.position = 1 if action == 0 else -1      def get_reward(self, portfolio):          return self.feed.next()["reward"]      def reset(self):          self.position = -1          self.feed.reset()

接入也很简单,在 create_env 函数里替换掉原来的 reward_scheme 即可:

reward_scheme = PBR(price)

这样改的好处是反馈更密集。智能体不需要等到最后平仓才知道赚没赚,每一个 step 都能收到关于“是否站对了队”的信号。

后续优化方向与建议

这套流程跑通只是个开始,想要真正可用,还有很多工作要做 比如:

数据置换:代码里的 TTRD 只是个占位符,换成真实的标的(股票、Crypto、指数)。

特征工程:RSI 和 MACD 只是抛砖引玉,试试 ATR、布林带,或者引入更长时间周期的特征。

参数调优:gamma(折扣因子)、window_size(观测窗口)对策略风格影响巨大,值得花时间去扫参。

基准测试:这一步最关键。把你训练出来的 RL 策略和 Buy & Hold(买入持有)比一比,甚至和随机策略比一比。如果跑不过随机策略,那就得从头检查了。

最后别忘了,我们只是研究,所以不要直接实盘。模型在训练集上大杀四方是常态,能通过样本外测试和模拟盘 (Paper Trading) 的考验才是真本事。

https://avoid.overfit.cn/post/8c9e08414e514c73ab3aefd694294f79

作者CodeBun