import logging import asyncio from datetime import datetime, timedelta from app.models.database import get_db_context from app.core.scheduler import scheduler from sqlalchemy import text import json import os from app.core.wecombot import WecomBot from app.core.config import settings from app.models.statistics import DailyCommunityOrderStats, DailyOrderStats from sqlalchemy.exc import IntegrityError from app.models.community import CommunityDB from app.models.user import UserDB, UserRole from app.models.community_set import CommunitySet from app.models.community_set_mapping import CommunitySetMapping from app.core.account import AccountManager logger = logging.getLogger(__name__) async def daily_community_order_statistics(): """每日小区订单统计任务 每天早上9点执行,统计每个小区昨日的订单量(已完成)、总订单金额和总支付金额 """ logger.info(f"开始执行每日小区订单统计任务: {datetime.now()}") try: # 计算昨天的日期范围 yesterday = datetime.now() - timedelta(days=1) yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0) yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59) yesterday_date = yesterday.date() yesterday_str = yesterday.strftime("%Y-%m-%d") with get_db_context() as db: # 获取所有小区列表 all_communities = db.query(CommunityDB).all() community_dict = {community.id: community.name for community in all_communities} # 查询每个小区昨日的订单统计 result = db.execute( text(""" SELECT c.id AS community_id, c.name AS community_name, COUNT(o.orderid) AS order_count, COALESCE(SUM(o.original_amount + o.additional_fee_amount), 0) AS total_original_amount, COALESCE(SUM(o.final_amount), 0) AS total_final_amount FROM shipping_orders o JOIN communities c ON o.address_community_id = c.id WHERE o.create_time BETWEEN :start_time AND :end_time AND o.status = 'COMPLETED' GROUP BY c.id, c.name ORDER BY order_count DESC """), { "start_time": yesterday_start, "end_time": yesterday_end } ) # 转换结果为字典,以小区ID为键 community_stats_dict = {} for row in result: community_stats_dict[row.community_id] = { "community_id": row.community_id, "community_name": row.community_name, "order_count": row.order_count, "total_original_amount": float(row.total_original_amount), "total_final_amount": float(row.total_final_amount) } # 确保每个小区都有一条记录 community_stats = [] for community_id, community_name in community_dict.items(): if community_id in community_stats_dict: # 使用查询结果 community_stats.append(community_stats_dict[community_id]) else: # 创建零值记录 community_stats.append({ "community_id": community_id, "community_name": community_name, "order_count": 0, "total_original_amount": 0.0, "total_final_amount": 0.0 }) # 按订单数量降序排序 community_stats.sort(key=lambda x: x["order_count"], reverse=True) # 计算总计(只计算有订单的小区) communities_with_orders = [stat for stat in community_stats if stat["order_count"] > 0] total_order_count = sum(item["order_count"] for item in communities_with_orders) total_original_amount = sum(item["total_original_amount"] for item in communities_with_orders) total_final_amount = sum(item["total_final_amount"] for item in communities_with_orders) total_communities = len(communities_with_orders) # 保存到数据库 - 总体统计 try: # 检查是否已存在该日期的记录 existing_stats = db.query(DailyOrderStats).filter( DailyOrderStats.stats_date == yesterday_date ).first() if existing_stats: # 更新现有记录 existing_stats.total_order_count = total_order_count existing_stats.total_original_amount = total_original_amount existing_stats.total_final_amount = total_final_amount existing_stats.total_communities = total_communities existing_stats.update_time = datetime.now() else: # 创建新记录 daily_stats = DailyOrderStats( stats_date=yesterday_date, total_order_count=total_order_count, total_original_amount=total_original_amount, total_final_amount=total_final_amount, total_communities=total_communities ) db.add(daily_stats) # 保存到数据库 - 小区统计 # 先删除该日期的所有小区统计记录,然后重新插入 db.query(DailyCommunityOrderStats).filter( DailyCommunityOrderStats.stats_date == yesterday_date ).delete() # 批量插入小区统计记录 for stat in community_stats: community_stat = DailyCommunityOrderStats( stats_date=yesterday_date, community_id=stat["community_id"], community_name=stat["community_name"], order_count=stat["order_count"], total_original_amount=stat["total_original_amount"], total_final_amount=stat["total_final_amount"] ) db.add(community_stat) db.commit() logger.info(f"已将{yesterday_str}的订单统计数据保存到数据库,共{len(community_stats)}个小区") except IntegrityError as e: db.rollback() logger.error(f"保存订单统计数据到数据库失败: {str(e)}") except Exception as e: db.rollback() logger.error(f"保存订单统计数据到数据库失败: {str(e)}") # 生成报告消息 message = f"""### {yesterday_str} 小区订单统计报告 **环境:{settings.ENV_NAME}** ### 总计 > - 订单总数: {total_order_count} > - 订单总金额: ¥{total_original_amount:.2f} > - 支付总金额: ¥{total_final_amount:.2f} > - 有订单小区数: {total_communities} > - 总小区数: {len(community_stats)} ### 小区排名 (前5名) """ # 添加前5个小区的数据 for i, item in enumerate(communities_with_orders[:5], 1): message += f""" > {i}. **{item['community_name']}** > - 订单数: {item['order_count']} > - 订单金额: ¥{item['total_original_amount']:.2f} > - 支付金额: ¥{item['total_final_amount']:.2f} """ # 发送企业微信消息 if communities_with_orders and settings.URL_WECOMBOT_DAILY_REPORT: try: wecom_bot = WecomBot(settings.URL_WECOMBOT_DAILY_REPORT) await wecom_bot.send_markdown(message) logger.info("每日小区订单统计报告已发送到企业微信") except Exception as e: logger.error(f"发送企业微信消息失败: {str(e)}") logger.info("每日小区订单统计任务完成") except Exception as e: logger.error(f"每日小区订单统计任务失败: {str(e)}") def run_daily_community_order_statistics(): """ 包装异步函数的同步函数,用于APScheduler调度 """ try: # 获取或创建事件循环 try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 运行异步任务 if loop.is_running(): # 如果循环已经在运行,使用create_task future = asyncio.ensure_future(daily_community_order_statistics()) # 可以添加回调函数处理结果 future.add_done_callback(lambda f: logger.info("每日小区订单统计任务完成")) else: # 如果循环没有运行,直接运行到完成 loop.run_until_complete(daily_community_order_statistics()) except Exception as e: logger.error(f"运行每日小区订单统计任务失败: {str(e)}") async def daily_partner_settlement(): """每日合伙人结算任务""" logger.info(f"开始执行每日合伙人结算任务: {datetime.now()}") with get_db_context() as db: try: # 获取昨日订单统计数据 yesterday = (datetime.now() - timedelta(days=1)).date() print(yesterday) yesterday_stats = db.query(DailyCommunityOrderStats).filter( DailyCommunityOrderStats.stats_date == yesterday ).all() if not yesterday_stats: logger.info(f"昨日没有订单统计数据,跳过结算") return # 遍历每个小区的订单统计数据 total_partner_profit = 0 total_admin_profit = 0 total_delivery_profit = 0 total_platform_profit = 0 total_order_count = 0 total_original_amount = 0 total_final_amount = 0 for stat in yesterday_stats: print(f"日期:{stat.stats_date} 小区: {stat.community_name} 订单数: {stat.order_count} 订单金额: {stat.total_original_amount} 支付金额: {stat.total_final_amount}") total_order_count += stat.order_count total_original_amount += stat.total_original_amount total_final_amount += stat.total_final_amount community = db.query(CommunityDB).filter( CommunityDB.id == stat.community_id ).first() # 找到所有的运营商 community_sets = db.query(CommunitySet).filter( CommunitySet.community_set_mappings.any( CommunitySetMapping.community_id == stat.community_id ) ).all() # 提取运营商列表 partner_ids = [] for sets in community_sets: if sets.user_id and sets.user_id not in partner_ids: partner_ids.append(sets.user_id) # 获取分成比例 admin_profit_sharing = community.community_profit_sharing.admin_rate partner_profit_sharing = community.community_profit_sharing.partner_rate delivery_profit_sharing = community.community_profit_sharing.delivery_rate platform_profit_sharing = community.community_profit_sharing.platform_rate # 计算运营商分成金额 per_partner_profit = stat.total_final_amount * (float(partner_profit_sharing) / len(partner_ids)) / 100 # 计算每个运营商的分成金额 for partner_id in partner_ids: partner_profit = per_partner_profit print(f"运营商 {partner_id} 分成金额: {partner_profit}") total_partner_profit += partner_profit # 更新运营商账户余额 if partner_profit > 0: account_manager = AccountManager(db) account_manager.change_balance(partner_id, partner_profit, f"{stat.community_name} 订单收益") # 计算服务商分成 admin_profit = stat.total_final_amount * (float(admin_profit_sharing) / 100) print(f"服务商分成金额: {admin_profit}") total_admin_profit += admin_profit if admin_profit > 0 and community.admin_id and community.admin_id > 0: account_manager = AccountManager(db) account_manager.change_balance(community.admin_id, admin_profit, f"{stat.community_name} 订单收益") # 计算配送员分成 delivery_profit = stat.total_final_amount * (float(delivery_profit_sharing) / 100) print(f"配送员分成金额: {delivery_profit}") total_delivery_profit += delivery_profit # if delivery_profit > 0: # account_manager = AccountManager(db) # account_manager.change_balance(community.admin_id, delivery_profit, f"{stat.community_name} 订单收益") # 计算平台分成 platform_profit = stat.total_final_amount * (float(platform_profit_sharing) / 100) print(f"平台分成金额: {platform_profit}") total_platform_profit += platform_profit if platform_profit > 0: account_manager = AccountManager(db) account_manager.change_balance(settings.PLATFORM_USER_ID, platform_profit, f"{stat.community_name} 订单收益") # 生成分润报告 message = f"""### {yesterday.strftime("%Y-%m-%d")} 分润报告 **环境:{settings.ENV_NAME}** ### 订单汇总 > - 订单总数: {total_order_count} > - 订单总金额: {total_original_amount:.1f} > - 支付总金额: {total_final_amount:.1f} ### 分润汇总 > - 合伙人分成: {total_partner_profit:.1f} > - 服务商分成: {total_admin_profit:.1f} > - 配送员分成: {total_delivery_profit:.1f} > - 平台分成: {total_platform_profit:.1f} """ # 发送企业微信消息 try: wecom_bot = WecomBot(settings.URL_WECOMBOT_DAILY_REPORT) await wecom_bot.send_markdown(message) logger.info("每日合伙人结算报告已发送到企业微信") except Exception as e: logger.error(f"发送企业微信消息失败: {str(e)}") except Exception as e: db.rollback() logger.error(f"运行每日合伙人结算任务失败: {str(e)}") def run_daily_partner_settlement(): """ 包装异步函数的同步函数,用于APScheduler调度 """ try: # 获取或创建事件循环 try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 运行异步任务 if loop.is_running(): future = asyncio.ensure_future(daily_partner_settlement()) future.add_done_callback(lambda f: logger.info("每日合伙人结算任务完成")) else: loop.run_until_complete(daily_partner_settlement()) except Exception as e: logger.error(f"运行每日合伙人结算任务失败: {str(e)}") def register_daily_tasks(): """注册所有每日定时任务""" # 每天早上9点执行小区订单统计 scheduler.add_cron_job( run_daily_community_order_statistics, # 使用同步包装函数 hour=0, minute=10, job_id="daily_community_order_stats" ) scheduler.add_cron_job( run_daily_partner_settlement, hour=0, minute=30, job_id="daily_partner_settlement" ) logger.info("已注册所有每日定时任务")