Files
epic-go/internal/logic/cron/cron.go
hu xiaotong cecb19e497 feat(cron): 实现定时任务管理功能
- 新增 cron模块,支持定时任务管理- 实现了任务列表获取、任务添加、任务移除和任务状态获取等接口
- 添加了默认任务,包括数据同步、数据清理、健康检查和缓存刷新等
- 实现了优雅关闭功能,确保在服务停止时正确停止所有任务
- 添加了定时任务相关文档和使用指南
2025-06-23 15:19:38 +08:00

247 lines
5.7 KiB
Go

package cron
import (
"context"
"epic/internal/service"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/gtime"
"sync"
)
type Logic struct {
cron *gcron.Cron
jobs map[string]*gcron.Entry
jobsMux sync.RWMutex
sync *ThirdPartyDataSync
}
func New() *Logic {
return &Logic{
cron: gcron.New(),
jobs: make(map[string]*gcron.Entry),
sync: NewThirdPartyDataSync(),
}
}
func init() {
service.SetCron(New())
}
// StartAllJobs 启动所有定时任务
func (l *Logic) StartAllJobs(ctx context.Context) error {
g.Log().Info(ctx, "Starting all cron jobs...")
// 启动定时任务调度器
l.cron.Start()
// 注册默认的定时任务
if err := l.registerDefaultJobs(ctx); err != nil {
return err
}
g.Log().Info(ctx, "All cron jobs started successfully")
return nil
}
// StopAllJobs 停止所有定时任务
func (l *Logic) StopAllJobs(ctx context.Context) error {
g.Log().Info(ctx, "Stopping all cron jobs...")
l.jobsMux.Lock()
defer l.jobsMux.Unlock()
// 停止所有任务
for name, entry := range l.jobs {
l.cron.Remove(entry.Name)
delete(l.jobs, name)
g.Log().Infof(ctx, "Stopped job: %s", name)
}
// 停止调度器
l.cron.Stop()
g.Log().Info(ctx, "All cron jobs stopped successfully")
return nil
}
// AddJob 添加定时任务
func (l *Logic) AddJob(ctx context.Context, name, cron string, job func()) error {
l.jobsMux.Lock()
defer l.jobsMux.Unlock()
// 检查任务是否已存在
if _, exists := l.jobs[name]; exists {
return gerror.New("job already exists: " + name)
}
// 添加任务到调度器
entry, err := l.cron.Add(ctx, cron, func(ctx context.Context) {
startTime := gtime.Now()
g.Log().Infof(ctx, "Starting job: %s at %s", name, startTime.String())
job()
endTime := gtime.Now()
duration := endTime.Sub(startTime)
g.Log().Infof(ctx, "Completed job: %s, duration: %v", name, duration)
}, name)
if err != nil {
return err
}
// 保存任务引用
l.jobs[name] = entry
g.Log().Infof(ctx, "Added job: %s with cron: %s", name, cron)
return nil
}
// RemoveJob 移除定时任务
func (l *Logic) RemoveJob(ctx context.Context, name string) error {
l.jobsMux.Lock()
defer l.jobsMux.Unlock()
entry, exists := l.jobs[name]
if !exists {
return gerror.New("job not found: " + name)
}
// 从调度器中移除任务
l.cron.Remove(entry.Name)
delete(l.jobs, name)
g.Log().Infof(ctx, "Removed job: %s", name)
return nil
}
// GetJobStatus 获取任务状态
func (l *Logic) GetJobStatus(ctx context.Context, name string) (bool, error) {
l.jobsMux.RLock()
defer l.jobsMux.RUnlock()
_, exists := l.jobs[name]
return exists, nil
}
// registerDefaultJobs 注册默认的定时任务
func (l *Logic) registerDefaultJobs(ctx context.Context) error {
// 每小时执行一次数据同步任务
if err := l.AddJob(ctx, "data_sync_hourly", "0 0 * * * *", func() {
l.syncDataFromThirdParty(ctx)
}); err != nil {
return err
}
// 每天凌晨2点执行数据清理任务
if err := l.AddJob(ctx, "data_cleanup_daily", "0 0 2 * * *", func() {
l.cleanupOldData(ctx)
}); err != nil {
return err
}
// 每5分钟执行一次健康检查任务
if err := l.AddJob(ctx, "health_check", "0 0/5 * * * *", func() {
l.healthCheck(ctx)
}); err != nil {
return err
}
// 每30分钟执行一次缓存刷新任务
if err := l.AddJob(ctx, "cache_refresh", "0 0/5 * * * *", func() {
l.refreshCache(ctx)
}); err != nil {
return err
}
// 每天凌晨3点执行英雄数据同步
if err := l.AddJob(ctx, "hero_sync_daily", "0 0 3 * * *", func() {
l.syncHeroData(ctx)
}); err != nil {
return err
}
// 每天凌晨4点执行神器数据同步
if err := l.AddJob(ctx, "artifact_sync_daily", "0 0 4 * * *", func() {
l.syncArtifactData(ctx)
}); err != nil {
return err
}
return nil
}
// syncDataFromThirdParty 从第三方网站同步数据
func (l *Logic) syncDataFromThirdParty(ctx context.Context) {
g.Log().Info(ctx, "Starting data sync from third party...")
// 使用第三方数据同步器
if err := l.sync.SyncAllData(ctx); err != nil {
g.Log().Error(ctx, "Data sync failed:", err)
return
}
g.Log().Info(ctx, "Data sync completed")
}
// syncHeroData 同步英雄数据
func (l *Logic) syncHeroData(ctx context.Context) {
g.Log().Info(ctx, "Starting hero data sync...")
if err := l.sync.SyncHeroData(ctx); err != nil {
g.Log().Error(ctx, "Hero data sync failed:", err)
return
}
g.Log().Info(ctx, "Hero data sync completed")
}
// syncArtifactData 同步神器数据
func (l *Logic) syncArtifactData(ctx context.Context) {
g.Log().Info(ctx, "Starting artifact data sync...")
if err := l.sync.SyncArtifactData(ctx); err != nil {
g.Log().Error(ctx, "Artifact data sync failed:", err)
return
}
g.Log().Info(ctx, "Artifact data sync completed")
}
// cleanupOldData 清理旧数据
func (l *Logic) cleanupOldData(ctx context.Context) {
g.Log().Info(ctx, "Starting data cleanup...")
// TODO: 实现数据清理逻辑
// 1. 删除过期的缓存数据
// 2. 清理过期的日志记录
// 3. 归档历史数据
g.Log().Info(ctx, "Data cleanup completed")
}
// healthCheck 健康检查
func (l *Logic) healthCheck(ctx context.Context) {
g.Log().Debug(ctx, "Performing health check...")
// TODO: 实现健康检查逻辑
// 1. 检查数据库连接
// 2. 检查Redis连接
// 3. 检查第三方API可用性
// 4. 记录系统状态
g.Log().Debug(ctx, "Health check completed")
}
// refreshCache 刷新缓存
func (l *Logic) refreshCache(ctx context.Context) {
g.Log().Info(ctx, "Starting cache refresh...")
// TODO: 实现缓存刷新逻辑
// 1. 刷新英雄数据缓存
// 2. 刷新神器数据缓存
// 3. 刷新其他业务缓存
g.Log().Info(ctx, "Cache refresh completed")
}