打开交易图表,堆上十个技术指标,然后对着屏幕发呆不知道下一步怎么操作——这场景对交易员来说太熟悉了。如果把历史数据丢给计算机,告诉它“去试错”。赚了有奖励,亏了有惩罚。让它在不断的尝试和失败中学习,最终迭代出一个不说完美、但至少能逻辑自洽的交易策略。
这就是 TensorTrade 的核心逻辑。
TensorTrade 是一个专注于利用 强化学习 (Reinforcement Learning, RL) 构建和训练交易算法的开源 Python 框架。

这里用 yfinance 抓取数据,配合 pandas_ta 计算技术指标。对数收益率 (Log Returns)、RSI 和 MACD 是几个比较基础的特征输入。
pip install yfinance pandas_taimport yfinance as yf import pandas_ta as ta import pandas as pd # Pick your ticker TICKER = "TTRD" # TODO: change this to something real, e.g. "AAPL", "BTC-USD" TRAIN_START_DATE = "2021-02-09" TRAIN_END_DATE = "2021-09-30" EVAL_START_DATE = "2021-10-01" EVAL_END_DATE = "2021-11-12" def build_dataset(ticker, start, end, filename): # 1. Download hourly OHLCV data df = yf.Ticker(ticker).history( start=start, end=end, interval="60m" ) # 2. Clean up df = df.drop(["Dividends", "Stock Splits"], axis=1) df["Volume"] = df["Volume"].astype(int) # 3. Add some basic features df.ta.log_return(append=True, length=16) df.ta.rsi(append=True, length=14) df.ta.macd(append=True, fast=12, slow=26) # 4. Move Datetime from index to column df = df.reset_index() # 5. Save df.to_csv(filename, index=False) print(f"Saved {filename} with {len(df)} rows") build_dataset(TICKER, TRAIN_START_DATE, TRAIN_END_DATE, "training.csv") build_dataset(TICKER, EVAL_START_DATE, EVAL_END_DATE, "evaluation.csv")
脚本跑完,目录下会生成 training.csv 和 evaluation.csv。包含了 OHLCV 基础数据和几个预处理好的指标。这些就是训练 RL 模型的数据。
构建 TensorTrade 交互环境强化学习没法直接使用CSV 文件。所以需要一个标准的交互 环境 (Environment):能够输出当前状态 (State),接收智能体的动作 (Action),并反馈奖励 (Reward)。
TensorTrade 把这个过程模块化了:
Instrument:定义交易标的(如 USD, TTRD)。
Wallet:管理资产余额。
Portfolio:钱包组合。
Stream / DataFeed:处理特征数据流。
reward_scheme / action_scheme:定义怎么操作,以及操作的好坏怎么评分。
pip install tensortrade
下面是一个环境工厂函数 (Environment Factory) 的实现,设计得比较轻量,这样可以方便后续接入 Ray:
import os import pandas as pd from tensortrade.feed.core import DataFeed, Stream from tensortrade.oms.instruments import Instrument from tensortrade.oms.exchanges import Exchange, ExchangeOptions from tensortrade.oms.services.execution.simulated import execute_order from tensortrade.oms.wallets import Wallet, Portfolio import tensortrade.env.default as default def create_env(config): """ Build a TensorTrade environment from a CSV. config needs: - csv_filename - window_size - reward_window_size - max_allowed_loss """ # 1. Read the dataset dataset = ( pd.read_csv(config["csv_filename"], parse_dates=["Datetime"]) .fillna(method="backfill") .fillna(method="ffill") ) # 2. Price stream (we'll trade on Close) commission = 0.0035 # 0.35%, tweak this to your broker price = Stream.source( list(dataset["Close"]), dtype="float" ).rename("USD-TTRD") options = ExchangeOptions(commission=commission) exchange = Exchange("TTSE", service=execute_order, options=options)(price) # 3. Instruments and wallets USD = Instrument("USD", 2, "US Dollar") TTRD = Instrument("TTRD", 2, "TensorTrade Corp") # just a label cash_wallet = Wallet(exchange, 1000 * USD) # start with $1000 asset_wallet = Wallet(exchange, 0 * TTRD) # start with zero TTRD portfolio = Portfolio(USD, [cash_wallet, asset_wallet]) # 4. Renderer feed (optional, useful for plotting later) renderer_feed = DataFeed([ Stream.source(list(dataset["Datetime"])).rename("date"), Stream.source(list(dataset["Open"]), dtype="float").rename("open"), Stream.source(list(dataset["High"]), dtype="float").rename("high"), Stream.source(list(dataset["Low"]), dtype="float").rename("low"), Stream.source(list(dataset["Close"]), dtype="float").rename("close"), Stream.source(list(dataset["Volume"]), dtype="float").rename("volume"), ]) renderer_feed.compile() # 5. Feature feed for the RL agent features = [] # Skip Datetime (first column) and stream everything else for col in dataset.columns[1:]: s = Stream.source(list(dataset[col]), dtype="float").rename(col) features.append(s) feed = DataFeed(features) feed.compile() # 6. Reward and action scheme reward_scheme = default.rewards.SimpleProfit( window_size=config["reward_window_size"] ) action_scheme = default.actions.BSH( cash=cash_wallet, asset=asset_wallet ) # 7. Put everything together in an environment env = default.create( portfolio=portfolio, action_scheme=action_scheme, reward_scheme=reward_scheme, feed=feed, renderer=[], renderer_feed=renderer_feed, window_size=config["window_size"], max_allowed_loss=config["max_allowed_loss"] ) return env
这样“游戏”规则就已经定好了:观察最近 N 根 K 线和指标(State),决定买卖持(Action),目标是让一段时间内的利润最大化(Reward)。
基于 Ray RLlib 与 PPO 算法的模型训练底层环境搭好,接下来让 Ray RLlib 介入处理 RL 的核心逻辑。
选用 PPO (Proximal Policy Optimization) 算法,这在连续控制和离散动作空间都有不错的表现。为了找到更优解,顺手做一个简单的超参数网格搜索:网络架构、学习率、Minibatch 大小,都跑一遍试试。
pip install "ray[rllib]"
训练脚本如下:
import os import ray from ray import tune from ray.tune.registry import register_env from your_module import create_env # wherever you defined create_env # Some hyperparameter grids to try FC_SIZE = tune.grid_search([ [256, 256], [1024], [128, 64, 32], ]) LEARNING_RATE = tune.grid_search([ 0.001, 0.0005, 0.00001, ]) MINIBATCH_SIZE = tune.grid_search([ 5, 10, 20, ]) cwd = os.getcwd() # Register our custom environment with RLlib register_env("MyTrainingEnv", lambda cfg: create_env(cfg)) env_config_training = { "window_size": 14, "reward_window_size": 7, "max_allowed_loss": 0.10, # cut episodes early if loss > 10% "csv_filename": os.path.join(cwd, "training.csv"), } env_config_evaluation = { "max_allowed_loss": 1.00, "csv_filename": os.path.join(cwd, "evaluation.csv"), } ray.init(ignore_reinit_error=True) analysis = tune.run( run_or_experiment="PPO", name="MyExperiment1", metric="episode_reward_mean", mode="max", stop={ "training_iteration": 5, # small for demo, increase in real runs }, config={ "env": "MyTrainingEnv", "env_config": env_config_training, "log_level": "WARNING", "framework": "torch", # or "tf" "ignore_worker_failures": True, "num_workers": 1, "num_envs_per_worker": 1, "num_gpus": 0, "clip_rewards": True, "lr": LEARNING_RATE, "gamma": 0.50, # discount factor "observation_filter": "MeanStdFilter", "model": { "fcnet_hiddens": FC_SIZE, }, "sgd_minibatch_size": MINIBATCH_SIZE, "evaluation_interval": 1, "evaluation_config": { "env_config": env_config_evaluation, "explore": False, # no exploration during evaluation }, }, num_samples=1, keep_checkpoints_num=10, checkpoint_freq=1, )
这段代码本质上是在运行一场“交易机器人锦标赛”。Ray 会根据定义的参数组合并行训练多个 PPO 智能体,追踪它们的平均回合奖励,并保存下表现最好的 Checkpoint 供后续调用。
自定义奖励机制 (PBR)默认的 SimpleProfit 奖励逻辑很简单,但实战中往往过于粗糙。我们有时需要根据具体的交易逻辑来重塑奖励函数。比如说基于持仓的奖励方案 PBR (Position-Based Reward):
维护当前持仓状态(多头或空头)。
监控价格变动。
奖励计算 = 价格变动 × 持仓方向。
价格涨了你做多,给正反馈;价格跌了你做空,也给正反馈。反之则是惩罚。
from tensortrade.env.default.rewards import RewardScheme from tensortrade.feed.core import DataFeed, Stream class PBR(RewardScheme): """ Position-Based Reward (PBR) Rewards the agent based on price changes and its current position. """ registered_name = "pbr" def __init__(self, price: Stream): super().__init__() self.position = -1 # start flat/short # Price differences r = Stream.sensor(price, lambda p: p.value, dtype="float").diff() # Position stream position = Stream.sensor(self, lambda rs: rs.position, dtype="float") # Reward = price_change * position reward = (r * position).fillna(0).rename("reward") self.feed = DataFeed([reward]) self.feed.compile() def on_action(self, action: int): # Simple mapping: action 0 = long, everything else = short self.position = 1 if action == 0 else -1 def get_reward(self, portfolio): return self.feed.next()["reward"] def reset(self): self.position = -1 self.feed.reset()
接入也很简单,在 create_env 函数里替换掉原来的 reward_scheme 即可:
reward_scheme = PBR(price)
这样改的好处是反馈更密集。智能体不需要等到最后平仓才知道赚没赚,每一个 step 都能收到关于“是否站对了队”的信号。

这套流程跑通只是个开始,想要真正可用,还有很多工作要做 比如:
数据置换:代码里的 TTRD 只是个占位符,换成真实的标的(股票、Crypto、指数)。
特征工程:RSI 和 MACD 只是抛砖引玉,试试 ATR、布林带,或者引入更长时间周期的特征。
参数调优:gamma(折扣因子)、window_size(观测窗口)对策略风格影响巨大,值得花时间去扫参。
基准测试:这一步最关键。把你训练出来的 RL 策略和 Buy & Hold(买入持有)比一比,甚至和随机策略比一比。如果跑不过随机策略,那就得从头检查了。
最后别忘了,我们只是研究,所以不要直接实盘。模型在训练集上大杀四方是常态,能通过样本外测试和模拟盘 (Paper Trading) 的考验才是真本事。
https://avoid.overfit.cn/post/8c9e08414e514c73ab3aefd694294f79
作者CodeBun