deliveryman-api/app/tasks/daily_tasks.py
2025-04-06 18:30:18 +08:00

460 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import asyncio
from datetime import datetime, timedelta
from app.models.database import get_db_context
from app.core.scheduler import scheduler
from sqlalchemy import text
import json
import os
from app.core.wecombot import WecomBot
from app.core.config import settings
from app.models.statistics import DailyCommunityOrderStats, DailyOrderStats
from sqlalchemy.exc import IntegrityError
from app.models.community import CommunityDB
from app.models.user import UserDB, UserRole
from app.models.community_set import CommunitySet
from app.models.community_set_mapping import CommunitySetMapping
from app.core.account import AccountManager
from app.models.settlement import SettlementHistoryDB
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
async def daily_community_order_statistics():
"""每日小区订单统计任务
每天早上9点执行统计每个小区昨日的订单量已完成、总订单金额和总支付金额
"""
logger.info(f"开始执行每日小区订单统计任务: {datetime.now()}")
try:
# 计算昨天的日期范围
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)
yesterday_date = yesterday.date()
yesterday_str = yesterday.strftime("%Y-%m-%d")
with get_db_context() as db:
# 获取所有小区列表
all_communities = db.query(CommunityDB).all()
community_dict = {community.id: community.name for community in all_communities}
# 查询每个小区昨日的订单统计
result = db.execute(
text("""
SELECT
c.id AS community_id,
c.name AS community_name,
COUNT(o.orderid) AS order_count,
COALESCE(SUM(o.original_amount + o.additional_fee_amount), 0) AS total_original_amount,
COALESCE(SUM(o.final_amount), 0) AS total_final_amount
FROM
shipping_orders o
JOIN
communities c ON o.address_community_id = c.id
WHERE
o.completed_time BETWEEN :start_time AND :end_time
AND o.status = 'COMPLETED'
GROUP BY
c.id, c.name
ORDER BY
order_count DESC
"""),
{
"start_time": yesterday_start,
"end_time": yesterday_end
}
)
# 转换结果为字典以小区ID为键
community_stats_dict = {}
for row in result:
community_stats_dict[row.community_id] = {
"community_id": row.community_id,
"community_name": row.community_name,
"order_count": row.order_count,
"total_original_amount": float(row.total_original_amount),
"total_final_amount": float(row.total_final_amount)
}
# 确保每个小区都有一条记录
community_stats = []
for community_id, community_name in community_dict.items():
if community_id in community_stats_dict:
# 使用查询结果
community_stats.append(community_stats_dict[community_id])
else:
# 创建零值记录
community_stats.append({
"community_id": community_id,
"community_name": community_name,
"order_count": 0,
"total_original_amount": 0.0,
"total_final_amount": 0.0
})
# 按订单数量降序排序
community_stats.sort(key=lambda x: x["order_count"], reverse=True)
# 计算总计(只计算有订单的小区)
communities_with_orders = [stat for stat in community_stats if stat["order_count"] > 0]
total_order_count = sum(item["order_count"] for item in communities_with_orders)
total_original_amount = sum(item["total_original_amount"] for item in communities_with_orders)
total_final_amount = sum(item["total_final_amount"] for item in communities_with_orders)
total_communities = len(communities_with_orders)
# 保存到数据库 - 总体统计
try:
# 检查是否已存在该日期的记录
existing_stats = db.query(DailyOrderStats).filter(
DailyOrderStats.stats_date == yesterday_date
).first()
if existing_stats:
# 更新现有记录
existing_stats.total_order_count = total_order_count
existing_stats.total_original_amount = total_original_amount
existing_stats.total_final_amount = total_final_amount
existing_stats.total_communities = total_communities
existing_stats.update_time = datetime.now()
else:
# 创建新记录
daily_stats = DailyOrderStats(
stats_date=yesterday_date,
total_order_count=total_order_count,
total_original_amount=total_original_amount,
total_final_amount=total_final_amount,
total_communities=total_communities
)
db.add(daily_stats)
# 保存到数据库 - 小区统计
# 先删除该日期的所有小区统计记录,然后重新插入
db.query(DailyCommunityOrderStats).filter(
DailyCommunityOrderStats.stats_date == yesterday_date
).delete()
# 批量插入小区统计记录
for stat in community_stats:
community_stat = DailyCommunityOrderStats(
stats_date=yesterday_date,
community_id=stat["community_id"],
community_name=stat["community_name"],
order_count=stat["order_count"],
total_original_amount=stat["total_original_amount"],
total_final_amount=stat["total_final_amount"]
)
db.add(community_stat)
db.commit()
logger.info(f"已将{yesterday_str}的订单统计数据保存到数据库,共{len(community_stats)}个小区")
except IntegrityError as e:
db.rollback()
logger.error(f"保存订单统计数据到数据库失败: {str(e)}")
except Exception as e:
db.rollback()
logger.error(f"保存订单统计数据到数据库失败: {str(e)}")
# 生成报告消息
message = f"""### {yesterday_str} 小区订单统计报告
**环境:{settings.ENV_NAME}**
### 总计
> - 订单总数: {total_order_count}
> - 订单总金额: ¥{total_original_amount:.2f}
> - 支付总金额: ¥{total_final_amount:.2f}
> - 有订单小区数: {total_communities}
> - 总小区数: {len(community_stats)}
### 小区排名 (前5名)
"""
# 添加前5个小区的数据
for i, item in enumerate(communities_with_orders[:5], 1):
message += f"""
> {i}. **{item['community_name']}**
> - 订单数: {item['order_count']}
> - 订单金额: ¥{item['total_original_amount']:.2f}
> - 支付金额: ¥{item['total_final_amount']:.2f}
"""
# 发送企业微信消息
if communities_with_orders and settings.URL_WECOMBOT_DAILY_REPORT:
try:
wecom_bot = WecomBot(settings.URL_WECOMBOT_DAILY_REPORT)
await wecom_bot.send_markdown(message)
logger.info("每日小区订单统计报告已发送到企业微信")
except Exception as e:
logger.error(f"发送企业微信消息失败: {str(e)}")
logger.info("每日小区订单统计任务完成")
except Exception as e:
logger.error(f"每日小区订单统计任务失败: {str(e)}")
def run_daily_community_order_statistics():
"""
包装异步函数的同步函数用于APScheduler调度
"""
try:
# 获取或创建事件循环
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 运行异步任务
if loop.is_running():
# 如果循环已经在运行使用create_task
future = asyncio.ensure_future(daily_community_order_statistics())
# 可以添加回调函数处理结果
future.add_done_callback(lambda f: logger.info("每日小区订单统计任务完成"))
else:
# 如果循环没有运行,直接运行到完成
loop.run_until_complete(daily_community_order_statistics())
except Exception as e:
logger.error(f"运行每日小区订单统计任务失败: {str(e)}")
async def daily_partner_settlement():
"""每日合伙人结算任务"""
logger.info(f"开始执行每日合伙人结算任务: {datetime.now()}")
with get_db_context() as db:
try:
# 获取昨日订单统计数据
yesterday = (datetime.now() - timedelta(days=1)).date()
print(yesterday)
yesterday_stats = db.query(DailyCommunityOrderStats).filter(
DailyCommunityOrderStats.stats_date == yesterday
).all()
if not yesterday_stats:
logger.info(f"昨日没有订单统计数据,跳过结算")
return
# 遍历每个小区的订单统计数据
total_partner_profit = 0
total_admin_profit = 0
total_delivery_profit = 0
total_platform_profit = 0
total_order_count = 0
total_original_amount = 0
total_final_amount = 0
# 初始化沉淀金额总计
total_profit_sediment = 0
settlement_history = []
for stat in yesterday_stats:
community = db.query(CommunityDB).filter(
CommunityDB.id == stat.community_id,
CommunityDB.community_profit_sharing != None
).first()
if not community:
logger.info(f"小区 {stat.community_name} 没有分成比例,跳过结算")
continue
# 检查是否已存在该日期的结算记录
existing_settlement = db.query(SettlementHistoryDB).filter(
SettlementHistoryDB.settle_date == yesterday,
SettlementHistoryDB.community_id == stat.community_id,
).first()
if existing_settlement:
logger.info(f"小区 {stat.community_name} 已存在结算记录,跳过结算")
continue
print(f"日期:{stat.stats_date} 小区: {stat.community_name} 订单数: {stat.order_count} 订单金额: {stat.total_original_amount} 支付金额: {stat.total_final_amount}")
total_order_count += stat.order_count
total_original_amount += stat.total_original_amount
total_final_amount += stat.total_final_amount
# 找到所有的运营商
community_sets = db.query(CommunitySet).filter(
CommunitySet.community_set_mappings.any(
CommunitySetMapping.community_id == stat.community_id
)
).all()
# 提取运营商列表
partner_ids = []
for sets in community_sets:
if sets.user_id and sets.user_id not in partner_ids:
partner_ids.append(sets.user_id)
# 获取分成比例
admin_profit_sharing = community.community_profit_sharing.admin_rate
partner_profit_sharing = community.community_profit_sharing.partner_rate
delivery_profit_sharing = community.community_profit_sharing.delivery_rate
platform_profit_sharing = community.community_profit_sharing.platform_rate
# 计算运营商分成金额
partner_profit = round(stat.total_original_amount * (float(partner_profit_sharing)) / 100, 2)
admin_profit = round(stat.total_original_amount * (float(admin_profit_sharing) / 100), 2)
delivery_profit = round(stat.total_original_amount * (float(delivery_profit_sharing) / 100), 2)
platform_profit = stat.total_original_amount * (float(platform_profit_sharing) / 100)
# 结算明细
settle_details = {
"total_order_amount": stat.total_original_amount,
"total_pay_amount": stat.total_final_amount,
"total_partner_profit": partner_profit,
"total_admin_profit": admin_profit,
"total_delivery_profit": delivery_profit,
"total_platform_profit": platform_profit
}
# 如果没有运营商、服务商,则需要沉淀到平台
community_profit_sediment = 0
# 计算每个运营商的分成金额
if len(partner_ids) > 0:
per_partner_profit = partner_profit / len(partner_ids)
for partner_id in partner_ids:
print(f"运营商({partner_profit_sharing/100}%) {partner_id} 分成金额: {per_partner_profit}")
total_partner_profit += per_partner_profit
settle_details[f"partner_profit_{partner_id}"] = {
"user_id": partner_id,
"profit": per_partner_profit
}
# 更新运营商账户余额
if per_partner_profit > 0:
account_manager = AccountManager(db)
account_manager.change_balance(partner_id, per_partner_profit, f"【运营商】 {stat.community_name} 订单收益")
else:
community_profit_sediment += partner_profit
# 计算服务商分成
print(f"服务商({admin_profit_sharing/100}%) {community.admin_id} 分成金额: {admin_profit}")
if community.admin_id and community.admin_id > 0:
total_admin_profit += admin_profit
settle_details["admin_profit"] = {
"user_id": community.admin_id,
"profit": admin_profit
}
if admin_profit > 0:
account_manager = AccountManager(db)
account_manager.change_balance(community.admin_id, admin_profit, f"【服务商】 {stat.community_name} 订单收益")
else:
community_profit_sediment += admin_profit
# 计算配送员分成
print(f"配送员({delivery_profit_sharing/100}%) 分成金额: {delivery_profit}")
total_delivery_profit += delivery_profit
settle_details["delivery_profit"] = {
"user_id": 0,
"profit": delivery_profit
}
# 计算平台分成
# 计算分成 + 沉淀金额
final_platform_profit = platform_profit + community_profit_sediment
print(f"平台({platform_profit_sharing/100}%) 分成: {platform_profit} + 沉淀金额: {community_profit_sediment}")
total_platform_profit += final_platform_profit
# 累加到总沉淀金额
total_profit_sediment += community_profit_sediment
settle_details["platform_profit"] = {
"user_id": settings.PLATFORM_USER_ID,
"profit": final_platform_profit
}
if final_platform_profit > 0:
account_manager = AccountManager(db)
account_manager.change_balance(settings.PLATFORM_USER_ID, final_platform_profit, f"【平台】 {stat.community_name} 订单收益")
# 保存结算明细
settlement = SettlementHistoryDB(
settle_date=yesterday,
community_id=stat.community_id,
community_name=stat.community_name,
settle_details=settle_details
)
settlement_history.append(settlement)
db.add_all(settlement_history)
db.commit()
# 生成分润报告
message = f"""### {yesterday.strftime("%Y-%m-%d")} 分润报告
**环境:{settings.ENV_NAME}**
### 订单汇总
> - 订单总数: {total_order_count}
> - 订单总金额: {total_original_amount:.2f}
> - 支付总金额: {total_final_amount:.2f}
### 分润汇总
> - 运营商分成: {total_partner_profit:.2f}
> - 服务商分成: {total_admin_profit:.2f}
> - 配送员分成: {total_delivery_profit:.2f}
> - 平台分成: {total_platform_profit:.2f}
> - 沉淀金额: {total_profit_sediment:.2f}
"""
# 发送企业微信消息
try:
wecom_bot = WecomBot(settings.URL_WECOMBOT_DAILY_REPORT)
await wecom_bot.send_markdown(message)
logger.info("每日合伙人结算报告已发送到企业微信")
except Exception as e:
logger.error(f"发送企业微信消息失败: {str(e)}")
raise e
except Exception as e:
db.rollback()
logger.error(f"运行每日合伙人结算任务失败: {str(e)}")
raise e
def run_daily_partner_settlement():
"""
包装异步函数的同步函数用于APScheduler调度
"""
try:
# 获取或创建事件循环
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 运行异步任务
if loop.is_running():
future = asyncio.ensure_future(daily_partner_settlement())
future.add_done_callback(lambda f: logger.info("每日合伙人结算任务完成"))
else:
loop.run_until_complete(daily_partner_settlement())
except Exception as e:
logger.error(f"运行每日合伙人结算任务失败: {str(e)}")
def register_daily_tasks():
"""注册所有每日定时任务"""
# 每天早上9点执行小区订单统计
scheduler.add_cron_job(
run_daily_community_order_statistics, # 使用同步包装函数
hour=0,
minute=10,
job_id="daily_community_order_stats"
)
scheduler.add_cron_job(
run_daily_partner_settlement,
hour=0,
minute=30,
job_id="daily_partner_settlement"
)
logger.info("已注册所有每日定时任务")