216 lines
4.7 KiB
Python
216 lines
4.7 KiB
Python
"""
|
||
技术指标计算模块
|
||
提供常用技术指标的计算功能
|
||
"""
|
||
import pandas as pd
|
||
import numpy as np
|
||
from typing import Tuple
|
||
|
||
|
||
def calculate_ma(data: pd.Series, period: int = 5) -> pd.Series:
|
||
"""
|
||
计算移动平均线(MA)
|
||
|
||
Args:
|
||
data: 价格数据
|
||
period: 周期
|
||
|
||
Returns:
|
||
MA值
|
||
"""
|
||
return data.rolling(window=period).mean()
|
||
|
||
|
||
def calculate_ema(data: pd.Series, period: int = 12) -> pd.Series:
|
||
"""
|
||
计算指数移动平均线(EMA)
|
||
|
||
Args:
|
||
data: 价格数据
|
||
period: 周期
|
||
|
||
Returns:
|
||
EMA值
|
||
"""
|
||
return data.ewm(span=period, adjust=False).mean()
|
||
|
||
|
||
def calculate_macd(
|
||
data: pd.Series,
|
||
fast_period: int = 12,
|
||
slow_period: int = 26,
|
||
signal_period: int = 9
|
||
) -> Tuple[pd.Series, pd.Series, pd.Series]:
|
||
"""
|
||
计算MACD指标
|
||
|
||
Args:
|
||
data: 价格数据
|
||
fast_period: 快线周期
|
||
slow_period: 慢线周期
|
||
signal_period: 信号线周期
|
||
|
||
Returns:
|
||
(DIF, DEA, MACD柱)
|
||
"""
|
||
ema_fast = calculate_ema(data, fast_period)
|
||
ema_slow = calculate_ema(data, slow_period)
|
||
|
||
dif = ema_fast - ema_slow
|
||
dea = dif.ewm(span=signal_period, adjust=False).mean()
|
||
macd = (dif - dea) * 2
|
||
|
||
return dif, dea, macd
|
||
|
||
|
||
def calculate_rsi(data: pd.Series, period: int = 14) -> pd.Series:
|
||
"""
|
||
计算相对强弱指标(RSI)
|
||
|
||
Args:
|
||
data: 价格数据
|
||
period: 周期
|
||
|
||
Returns:
|
||
RSI值
|
||
"""
|
||
delta = data.diff()
|
||
|
||
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
||
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
||
|
||
rs = gain / loss
|
||
rsi = 100 - (100 / (1 + rs))
|
||
|
||
return rsi
|
||
|
||
|
||
def calculate_kdj(
|
||
high: pd.Series,
|
||
low: pd.Series,
|
||
close: pd.Series,
|
||
period: int = 9,
|
||
m1: int = 3,
|
||
m2: int = 3
|
||
) -> Tuple[pd.Series, pd.Series, pd.Series]:
|
||
"""
|
||
计算KDJ指标
|
||
|
||
Args:
|
||
high: 最高价
|
||
low: 最低价
|
||
close: 收盘价
|
||
period: 周期
|
||
m1: K值平滑参数
|
||
m2: D值平滑参数
|
||
|
||
Returns:
|
||
(K, D, J)
|
||
"""
|
||
low_min = low.rolling(window=period).min()
|
||
high_max = high.rolling(window=period).max()
|
||
|
||
rsv = (close - low_min) / (high_max - low_min) * 100
|
||
|
||
k = rsv.ewm(com=m1 - 1, adjust=False).mean()
|
||
d = k.ewm(com=m2 - 1, adjust=False).mean()
|
||
j = 3 * k - 2 * d
|
||
|
||
return k, d, j
|
||
|
||
|
||
def calculate_boll(
|
||
data: pd.Series,
|
||
period: int = 20,
|
||
std_dev: float = 2.0
|
||
) -> Tuple[pd.Series, pd.Series, pd.Series]:
|
||
"""
|
||
计算布林带(BOLL)
|
||
|
||
Args:
|
||
data: 价格数据
|
||
period: 周期
|
||
std_dev: 标准差倍数
|
||
|
||
Returns:
|
||
(上轨, 中轨, 下轨)
|
||
"""
|
||
middle = data.rolling(window=period).mean()
|
||
std = data.rolling(window=period).std()
|
||
|
||
upper = middle + (std * std_dev)
|
||
lower = middle - (std * std_dev)
|
||
|
||
return upper, middle, lower
|
||
|
||
|
||
def calculate_adx(
|
||
high: pd.Series,
|
||
low: pd.Series,
|
||
close: pd.Series,
|
||
period: int = 14
|
||
) -> Tuple[pd.Series, pd.Series, pd.Series]:
|
||
"""
|
||
计算 ADX (Average Directional Index) 趋势强度指标
|
||
|
||
ADX 衡量趋势强度(非方向):
|
||
- ADX < 20: 震荡 / 无趋势
|
||
- ADX 20-25: 过渡期
|
||
- ADX 25-50: 趋势中
|
||
- ADX > 50: 强趋势
|
||
|
||
Args:
|
||
high: 最高价
|
||
low: 最低价
|
||
close: 收盘价
|
||
period: 回溯周期 (默认 14)
|
||
|
||
Returns:
|
||
(adx, plus_di, minus_di) 元组
|
||
"""
|
||
# True Range
|
||
tr1 = high - low
|
||
tr2 = abs(high - close.shift(1))
|
||
tr3 = abs(low - close.shift(1))
|
||
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
|
||
|
||
# Directional Movement
|
||
up_move = high - high.shift(1)
|
||
down_move = low.shift(1) - low
|
||
|
||
plus_dm = pd.Series(
|
||
np.where((up_move > down_move) & (up_move > 0), up_move, 0.0),
|
||
index=high.index
|
||
)
|
||
minus_dm = pd.Series(
|
||
np.where((down_move > up_move) & (down_move > 0), down_move, 0.0),
|
||
index=high.index
|
||
)
|
||
|
||
# Wilder's smoothing (EWM alpha=1/period)
|
||
atr = tr.ewm(alpha=1/period, adjust=False).mean()
|
||
plus_di = 100 * (plus_dm.ewm(alpha=1/period, adjust=False).mean() / atr)
|
||
minus_di = 100 * (minus_dm.ewm(alpha=1/period, adjust=False).mean() / atr)
|
||
|
||
# DX and ADX
|
||
di_sum = plus_di + minus_di
|
||
di_sum = di_sum.replace(0, np.nan)
|
||
dx = 100 * abs(plus_di - minus_di) / di_sum
|
||
adx = dx.ewm(alpha=1/period, adjust=False).mean()
|
||
|
||
return adx, plus_di, minus_di
|
||
|
||
|
||
def calculate_volume_ma(volume: pd.Series, period: int = 5) -> pd.Series:
|
||
"""
|
||
计算成交量移动平均
|
||
|
||
Args:
|
||
volume: 成交量数据
|
||
period: 周期
|
||
|
||
Returns:
|
||
成交量MA
|
||
"""
|
||
return volume.rolling(window=period).mean()
|