deliveryman-api/app/tasks/daily_tasks.py
2025-03-11 08:59:27 +08:00

223 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import asyncio
from datetime import datetime, timedelta
from app.models.database import get_db_context
from app.core.scheduler import scheduler
from sqlalchemy import text
import json
import os
from app.core.wecombot import WecomBot
from app.core.config import settings
from app.models.statistics import DailyCommunityOrderStats, DailyOrderStats
from sqlalchemy.exc import IntegrityError
from app.models.community import CommunityDB
logger = logging.getLogger(__name__)
async def daily_community_order_statistics():
"""每日小区订单统计任务
每天早上9点执行统计每个小区昨日的订单量已完成、总订单金额和总支付金额
"""
logger.info(f"开始执行每日小区订单统计任务: {datetime.now()}")
try:
# 计算昨天的日期范围
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)
yesterday_date = yesterday.date()
yesterday_str = yesterday.strftime("%Y-%m-%d")
with get_db_context() as db:
# 获取所有小区列表
all_communities = db.query(CommunityDB).all()
community_dict = {community.id: community.name for community in all_communities}
# 查询每个小区昨日的订单统计
result = db.execute(
text("""
SELECT
c.id AS community_id,
c.name AS community_name,
COUNT(o.orderid) AS order_count,
COALESCE(SUM(o.original_amount + o.additional_fee_amount), 0) AS total_original_amount,
COALESCE(SUM(o.final_amount), 0) AS total_final_amount
FROM
shipping_orders o
JOIN
communities c ON o.address_community_id = c.id
WHERE
o.create_time BETWEEN :start_time AND :end_time
AND o.status = 'COMPLETED'
GROUP BY
c.id, c.name
ORDER BY
order_count DESC
"""),
{
"start_time": yesterday_start,
"end_time": yesterday_end
}
)
# 转换结果为字典以小区ID为键
community_stats_dict = {}
for row in result:
community_stats_dict[row.community_id] = {
"community_id": row.community_id,
"community_name": row.community_name,
"order_count": row.order_count,
"total_original_amount": float(row.total_original_amount),
"total_final_amount": float(row.total_final_amount)
}
# 确保每个小区都有一条记录
community_stats = []
for community_id, community_name in community_dict.items():
if community_id in community_stats_dict:
# 使用查询结果
community_stats.append(community_stats_dict[community_id])
else:
# 创建零值记录
community_stats.append({
"community_id": community_id,
"community_name": community_name,
"order_count": 0,
"total_original_amount": 0.0,
"total_final_amount": 0.0
})
# 按订单数量降序排序
community_stats.sort(key=lambda x: x["order_count"], reverse=True)
# 计算总计(只计算有订单的小区)
communities_with_orders = [stat for stat in community_stats if stat["order_count"] > 0]
total_order_count = sum(item["order_count"] for item in communities_with_orders)
total_original_amount = sum(item["total_original_amount"] for item in communities_with_orders)
total_final_amount = sum(item["total_final_amount"] for item in communities_with_orders)
total_communities = len(communities_with_orders)
# 保存到数据库 - 总体统计
try:
# 检查是否已存在该日期的记录
existing_stats = db.query(DailyOrderStats).filter(
DailyOrderStats.stats_date == yesterday_date
).first()
if existing_stats:
# 更新现有记录
existing_stats.total_order_count = total_order_count
existing_stats.total_original_amount = total_original_amount
existing_stats.total_final_amount = total_final_amount
existing_stats.total_communities = total_communities
existing_stats.update_time = datetime.now()
else:
# 创建新记录
daily_stats = DailyOrderStats(
stats_date=yesterday_date,
total_order_count=total_order_count,
total_original_amount=total_original_amount,
total_final_amount=total_final_amount,
total_communities=total_communities
)
db.add(daily_stats)
# 保存到数据库 - 小区统计
# 先删除该日期的所有小区统计记录,然后重新插入
db.query(DailyCommunityOrderStats).filter(
DailyCommunityOrderStats.stats_date == yesterday_date
).delete()
# 批量插入小区统计记录
for stat in community_stats:
community_stat = DailyCommunityOrderStats(
stats_date=yesterday_date,
community_id=stat["community_id"],
community_name=stat["community_name"],
order_count=stat["order_count"],
total_original_amount=stat["total_original_amount"],
total_final_amount=stat["total_final_amount"]
)
db.add(community_stat)
db.commit()
logger.info(f"已将{yesterday_str}的订单统计数据保存到数据库,共{len(community_stats)}个小区")
except IntegrityError as e:
db.rollback()
logger.error(f"保存订单统计数据到数据库失败: {str(e)}")
except Exception as e:
db.rollback()
logger.error(f"保存订单统计数据到数据库失败: {str(e)}")
# 生成报告消息
message = f"""## {yesterday_str} 小区订单统计报告
### 总计
- 订单总数: {total_order_count}
- 订单总金额: ¥{total_original_amount:.2f}
- 支付总金额: ¥{total_final_amount:.2f}
- 有订单小区数: {total_communities}
- 总小区数: {len(community_stats)}
### 小区排名 (前5名)
"""
# 添加前5个小区的数据
for i, item in enumerate(communities_with_orders[:5], 1):
message += f"""
{i}. **{item['community_name']}**
- 订单数: {item['order_count']}
- 订单金额: ¥{item['total_original_amount']:.2f}
- 支付金额: ¥{item['total_final_amount']:.2f}
"""
# 发送企业微信消息
if communities_with_orders and settings.URL_WECOMBOT_DAILY_REPORT:
try:
wecom_bot = WecomBot(settings.URL_WECOMBOT_DAILY_REPORT)
await wecom_bot.send_markdown(message)
logger.info("每日小区订单统计报告已发送到企业微信")
except Exception as e:
logger.error(f"发送企业微信消息失败: {str(e)}")
logger.info("每日小区订单统计任务完成")
except Exception as e:
logger.error(f"每日小区订单统计任务失败: {str(e)}")
def run_daily_community_order_statistics():
"""
包装异步函数的同步函数用于APScheduler调度
"""
try:
# 获取或创建事件循环
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 运行异步任务
if loop.is_running():
# 如果循环已经在运行使用create_task
future = asyncio.ensure_future(daily_community_order_statistics())
# 可以添加回调函数处理结果
future.add_done_callback(lambda f: logger.info("每日小区订单统计任务完成"))
else:
# 如果循环没有运行,直接运行到完成
loop.run_until_complete(daily_community_order_statistics())
except Exception as e:
logger.error(f"运行每日小区订单统计任务失败: {str(e)}")
def register_daily_tasks():
"""注册所有每日定时任务"""
# 每天早上9点执行小区订单统计
scheduler.add_cron_job(
run_daily_community_order_statistics, # 使用同步包装函数
hour=3,
minute=0,
job_id="daily_community_order_stats"
)
logger.info("已注册所有每日定时任务")