""" Tushare高级数据服务 充分利用5000+积分,获取财务数据、资金流向、新闻公告等 """ import tushare as ts import pandas as pd from typing import Optional, List, Dict, Any from datetime import datetime, timedelta from app.config import get_settings from app.utils.logger import logger from app.utils.validators import normalize_stock_code class TushareAdvancedService: """Tushare高级数据服务类(需要5000+积分)""" def __init__(self): """初始化Tushare服务""" settings = get_settings() if not settings.tushare_token: logger.warning("Tushare token未配置") self.pro = None else: ts.set_token(settings.tushare_token) self.pro = ts.pro_api() logger.info("Tushare高级服务初始化成功") # ==================== 财务数据 ==================== def get_income_statement( self, stock_code: str, period: str = None, start_date: str = None, end_date: str = None ) -> Optional[Dict[str, Any]]: """ 获取利润表数据 Args: stock_code: 股票代码 period: 报告期(YYYYMMDD),如20231231 start_date: 开始日期 end_date: 结束日期 Returns: 利润表数据 """ if not self.pro: logger.error("Tushare服务未初始化") return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None # 默认获取最近4个季度的数据 if not period and not start_date: end_date = datetime.now().strftime('%Y%m%d') start_date = (datetime.now() - timedelta(days=400)).strftime('%Y%m%d') df = self.pro.income( ts_code=ts_code, period=period, start_date=start_date, end_date=end_date, fields='ts_code,ann_date,f_ann_date,end_date,report_type,comp_type,' 'total_revenue,revenue,operating_profit,total_profit,n_income,' 'n_income_attr_p,basic_eps,diluted_eps' ) if df.empty: logger.warning(f"未找到利润表数据: {ts_code}") return None # 转换为字典列表,按日期降序 df = df.sort_values('end_date', ascending=False) return { 'ts_code': ts_code, 'data': df.to_dict('records') } except Exception as e: logger.error(f"获取利润表失败: {e}") return None def get_balance_sheet( self, stock_code: str, period: str = None, start_date: str = None, end_date: str = None ) -> Optional[Dict[str, Any]]: """ 获取资产负债表数据 Args: stock_code: 股票代码 period: 报告期 start_date: 开始日期 end_date: 结束日期 Returns: 资产负债表数据 """ if not self.pro: return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None if not period and not start_date: end_date = datetime.now().strftime('%Y%m%d') start_date = (datetime.now() - timedelta(days=400)).strftime('%Y%m%d') df = self.pro.balancesheet( ts_code=ts_code, period=period, start_date=start_date, end_date=end_date, fields='ts_code,ann_date,f_ann_date,end_date,report_type,' 'total_assets,total_liab,total_hldr_eqy_exc_min_int,' 'total_cur_assets,total_cur_liab,money_cap' ) if df.empty: return None df = df.sort_values('end_date', ascending=False) return { 'ts_code': ts_code, 'data': df.to_dict('records') } except Exception as e: logger.error(f"获取资产负债表失败: {e}") return None def get_financial_indicators( self, stock_code: str, period: str = None ) -> Optional[Dict[str, Any]]: """ 获取财务指标数据(ROE、ROA、毛利率等) Args: stock_code: 股票代码 period: 报告期 Returns: 财务指标数据 """ if not self.pro: return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None # 获取最近一期的财务指标 df = self.pro.fina_indicator( ts_code=ts_code, period=period, fields='ts_code,end_date,eps,dt_eps,total_revenue_ps,revenue_ps,' 'capital_rese_ps,undist_profit_ps,extra_item,profit_dedt,' 'gross_margin,current_ratio,quick_ratio,roe,roe_waa,' 'roe_dt,roa,npta,roic,debt_to_assets,assets_to_eqt' ) if df.empty: return None # 取最新一期 latest = df.sort_values('end_date', ascending=False).iloc[0] return { 'ts_code': ts_code, 'end_date': latest['end_date'], 'indicators': latest.to_dict() } except Exception as e: logger.error(f"获取财务指标失败: {e}") return None # ==================== 估值数据 ==================== def get_daily_basic( self, stock_code: str, trade_date: str = None ) -> Optional[Dict[str, Any]]: """ 获取每日指标(PE、PB、PS、市值、换手率等) Args: stock_code: 股票代码 trade_date: 交易日期(YYYYMMDD) Returns: 每日指标数据 """ if not self.pro: return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None if not trade_date: trade_date = datetime.now().strftime('%Y%m%d') df = self.pro.daily_basic( ts_code=ts_code, trade_date=trade_date, fields='ts_code,trade_date,close,turnover_rate,turnover_rate_f,' 'volume_ratio,pe,pe_ttm,pb,ps,ps_ttm,' 'dv_ratio,dv_ttm,total_share,float_share,free_share,' 'total_mv,circ_mv' ) if df.empty: return None return { 'ts_code': ts_code, 'data': df.iloc[0].to_dict() } except Exception as e: logger.error(f"获取每日指标失败: {e}") return None # ==================== 资金流向 ==================== def get_money_flow( self, stock_code: str, start_date: str = None, end_date: str = None ) -> Optional[Dict[str, Any]]: """ 获取资金流向数据 Args: stock_code: 股票代码 start_date: 开始日期 end_date: 结束日期 Returns: 资金流向数据 """ if not self.pro: return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None if not start_date: start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') if not end_date: end_date = datetime.now().strftime('%Y%m%d') df = self.pro.moneyflow( ts_code=ts_code, start_date=start_date, end_date=end_date, fields='ts_code,trade_date,buy_sm_vol,buy_sm_amount,' 'sell_sm_vol,sell_sm_amount,buy_md_vol,buy_md_amount,' 'sell_md_vol,sell_md_amount,buy_lg_vol,buy_lg_amount,' 'sell_lg_vol,sell_lg_amount,buy_elg_vol,buy_elg_amount,' 'sell_elg_vol,sell_elg_amount,net_mf_vol,net_mf_amount' ) if df.empty: return None df = df.sort_values('trade_date', ascending=False) return { 'ts_code': ts_code, 'data': df.to_dict('records') } except Exception as e: logger.error(f"获取资金流向失败: {e}") return None # ==================== 新闻公告 ==================== def get_news( self, stock_code: str = None, start_date: str = None, end_date: str = None, src: str = None ) -> Optional[List[Dict[str, Any]]]: """ 获取新闻资讯 Args: stock_code: 股票代码(可选) start_date: 开始日期 end_date: 结束日期 src: 新闻来源 Returns: 新闻列表 """ if not self.pro: return None try: ts_code = None if stock_code: ts_code = normalize_stock_code(stock_code) if not start_date: start_date = (datetime.now() - timedelta(days=7)).strftime('%Y%m%d') if not end_date: end_date = datetime.now().strftime('%Y%m%d') # 使用news接口(需要5000积分) try: df = self.pro.query('news', src=src, start_date=start_date, end_date=end_date, fields='datetime,content,title,channels,score' ) except Exception as api_error: # 如果接口不可用(积分不足或接口名称问题),返回None logger.warning(f"新闻接口不可用(可能需要更高积分权限): {api_error}") return None if df is None or df.empty: return None # 如果指定了股票代码,过滤相关新闻 if ts_code: try: # 简单的关键词过滤 stock_info = self.pro.stock_basic(ts_code=ts_code, fields='name,symbol') if not stock_info.empty: name = stock_info.iloc[0]['name'] symbol = stock_info.iloc[0]['symbol'] df = df[ df['title'].str.contains(name, na=False) | df['content'].str.contains(name, na=False) | df['title'].str.contains(symbol, na=False) ] except Exception as filter_error: logger.warning(f"新闻过滤失败: {filter_error}") # 继续返回未过滤的新闻 df = df.sort_values('datetime', ascending=False) return df.head(10).to_dict('records') except Exception as e: logger.warning(f"获取新闻失败: {e}") return None # ==================== 市场特色数据 ==================== def get_margin_detail( self, stock_code: str, start_date: str = None, end_date: str = None ) -> Optional[Dict[str, Any]]: """ 获取融资融券详情 Args: stock_code: 股票代码 start_date: 开始日期 end_date: 结束日期 Returns: 融资融券数据 """ if not self.pro: return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None if not start_date: start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') if not end_date: end_date = datetime.now().strftime('%Y%m%d') df = self.pro.margin_detail( ts_code=ts_code, start_date=start_date, end_date=end_date, fields='ts_code,trade_date,rzye,rqye,rzmre,rqyl,' 'rzche,rqchl,rqmcl,rzrqye' ) if df.empty: return None df = df.sort_values('trade_date', ascending=False) return { 'ts_code': ts_code, 'data': df.to_dict('records') } except Exception as e: logger.error(f"获取融资融券失败: {e}") return None def get_block_trade( self, stock_code: str, start_date: str = None, end_date: str = None ) -> Optional[List[Dict[str, Any]]]: """ 获取大宗交易数据 Args: stock_code: 股票代码 start_date: 开始日期 end_date: 结束日期 Returns: 大宗交易列表 """ if not self.pro: return None try: ts_code = normalize_stock_code(stock_code) if not ts_code: return None if not start_date: start_date = (datetime.now() - timedelta(days=90)).strftime('%Y%m%d') if not end_date: end_date = datetime.now().strftime('%Y%m%d') df = self.pro.block_trade( ts_code=ts_code, start_date=start_date, end_date=end_date, fields='ts_code,trade_date,price,vol,amount,buyer,seller' ) if df.empty: return None df = df.sort_values('trade_date', ascending=False) return df.to_dict('records') except Exception as e: logger.error(f"获取大宗交易失败: {e}") return None def get_top_list( self, trade_date: str = None ) -> Optional[Dict[str, Any]]: """ 获取龙虎榜数据 Args: trade_date: 交易日期 Returns: 龙虎榜数据 """ if not self.pro: return None try: if not trade_date: trade_date = datetime.now().strftime('%Y%m%d') df = self.pro.top_list( trade_date=trade_date, fields='trade_date,ts_code,name,close,pct_change,turnover_rate,' 'amount,l_sell,l_buy,l_amount,net_amount,net_rate,' 'amount_rate,float_values,reason' ) if df.empty: return None return { 'trade_date': trade_date, 'data': df.to_dict('records') } except Exception as e: logger.error(f"获取龙虎榜失败: {e}") return None # ==================== 指数数据 ==================== def get_index_daily( self, ts_code: str, start_date: str = None, end_date: str = None ) -> Optional[List[Dict[str, Any]]]: """ 获取指数日线行情 Args: ts_code: 指数代码(如000001.SH=上证指数) start_date: 开始日期 end_date: 结束日期 Returns: 指数行情数据 """ if not self.pro: return None try: if not start_date: start_date = (datetime.now() - timedelta(days=180)).strftime('%Y%m%d') if not end_date: end_date = datetime.now().strftime('%Y%m%d') df = self.pro.index_daily( ts_code=ts_code, start_date=start_date, end_date=end_date, fields='ts_code,trade_date,close,open,high,low,pre_close,' 'change,pct_chg,vol,amount' ) if df.empty: return None df = df.sort_values('trade_date') return df.to_dict('records') except Exception as e: logger.error(f"获取指数数据失败: {e}") return None # 创建全局实例 tushare_advanced_service = TushareAdvancedService()