package cron import ( "context" "epic/internal/dao" "epic/internal/model/entity" "epic/internal/service" "epic/internal/util" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" "github.com/gogf/gf/v2/os/gtime" "strings" "sync" "time" ) type Logic struct { cron *gcron.Cron jobs map[string]*gcron.Entry jobsMux sync.RWMutex sync *ThirdPartyDataSync } func New() *Logic { return &Logic{ cron: gcron.New(), jobs: make(map[string]*gcron.Entry), sync: NewThirdPartyDataSync(), } } func init() { service.SetCron(New()) } // StartAllJobs 启动所有定时任务 func (l *Logic) StartAllJobs(ctx context.Context) error { g.Log().Info(ctx, "Starting all cron jobs...") // 启动定时任务调度器 l.cron.Start() // 注册默认的定时任务 if err := l.registerDefaultJobs(ctx); err != nil { return err } g.Log().Info(ctx, "All cron jobs started successfully") return nil } // StopAllJobs 停止所有定时任务 func (l *Logic) StopAllJobs(ctx context.Context) error { g.Log().Info(ctx, "Stopping all cron jobs...") l.jobsMux.Lock() defer l.jobsMux.Unlock() // 停止所有任务 for name, entry := range l.jobs { l.cron.Remove(entry.Name) delete(l.jobs, name) g.Log().Infof(ctx, "Stopped job: %s", name) } // 停止调度器 l.cron.Stop() g.Log().Info(ctx, "All cron jobs stopped successfully") return nil } // AddJob 添加定时任务 func (l *Logic) AddJob(ctx context.Context, name, cron string, job func()) error { l.jobsMux.Lock() defer l.jobsMux.Unlock() // 检查任务是否已存在 if _, exists := l.jobs[name]; exists { return gerror.New("job already exists: " + name) } // 添加任务到调度器 entry, err := l.cron.Add(ctx, cron, func(ctx context.Context) { startTime := gtime.Now() g.Log().Infof(ctx, "Starting job: %s at %s", name, startTime.String()) job() endTime := gtime.Now() duration := endTime.Sub(startTime) g.Log().Infof(ctx, "Completed job: %s, duration: %v", name, duration) }, name) if err != nil { return err } // 保存任务引用 l.jobs[name] = entry g.Log().Infof(ctx, "Added job: %s with cron: %s", name, cron) return nil } // RemoveJob 移除定时任务 func (l *Logic) RemoveJob(ctx context.Context, name string) error { l.jobsMux.Lock() defer l.jobsMux.Unlock() entry, exists := l.jobs[name] if !exists { return gerror.New("job not found: " + name) } // 从调度器中移除任务 l.cron.Remove(entry.Name) delete(l.jobs, name) g.Log().Infof(ctx, "Removed job: %s", name) return nil } // GetJobStatus 获取任务状态 func (l *Logic) GetJobStatus(ctx context.Context, name string) (bool, error) { l.jobsMux.RLock() defer l.jobsMux.RUnlock() _, exists := l.jobs[name] return exists, nil } // registerDefaultJobs 注册默认的定时任务 func (l *Logic) registerDefaultJobs(ctx context.Context) error { //// 每小时执行一次数据同步任务 //if err := l.AddJob(ctx, "data_sync_hourly", "0 0 * * * *", func() { // l.syncDataFromThirdParty(ctx) //}); err != nil { // return err //} // //// 每天凌晨2点执行数据清理任务 //if err := l.AddJob(ctx, "data_cleanup_daily", "0 0 2 * * *", func() { // l.cleanupOldData(ctx) //}); err != nil { // return err //} // 每5分钟执行一次健康检查任务 if err := l.AddJob(ctx, "health_check", "0 0/5 * * * *", func() { l.healthCheck(ctx) }); err != nil { return err } // 每30分钟执行一次缓存刷新任务 if err := l.AddJob(ctx, "cache_refresh", "0 0/5 * * * *", func() { l.refreshCache(ctx) }); err != nil { return err } // 每天凌晨3点执行英雄数据同步 if err := l.AddJob(ctx, "hero_sync_daily", "0 0 3 * * *", func() { l.syncHeroData(ctx) }); err != nil { return err } // 每天凌晨4点执行神器数据同步 if err := l.AddJob(ctx, "artifact_sync_daily", "0 0 4 * * *", func() { l.syncArtifactData(ctx) }); err != nil { return err } // 每30分钟执行一次OSS预签名URL缓存刷新任务 //if err := l.AddJob(ctx, "oss_presignurl_refresh", "0 0/30 * * * *", func() { // l.refreshOssPresignUrlCacheJob(ctx) //}); err != nil { // return err //} return nil } // syncDataFromThirdParty 从第三方网站同步数据 func (l *Logic) syncDataFromThirdParty(ctx context.Context) { g.Log().Info(ctx, "Starting data sync from third party...") // 使用第三方数据同步器 if err := l.sync.SyncAllData(ctx); err != nil { g.Log().Error(ctx, "Data sync failed:", err) return } g.Log().Info(ctx, "Data sync completed") } // syncHeroData 同步英雄数据 func (l *Logic) syncHeroData(ctx context.Context) { g.Log().Info(ctx, "Starting hero data sync...") if err := l.sync.SyncHeroData(ctx); err != nil { g.Log().Error(ctx, "Hero data sync failed:", err) return } g.Log().Info(ctx, "Hero data sync completed") } //同步神器数据 func (l *Logic) syncArtifactData(ctx context.Context) { g.Log().Info(ctx, "Starting artifact data sync...") if err := l.sync.SyncArtifactData(ctx); err != nil { g.Log().Error(ctx, "Artifact data sync failed:", err) return } g.Log().Info(ctx, "Artifact data sync completed") } // healthCheck 健康检查 func (l *Logic) healthCheck(ctx context.Context) { g.Log().Debug(ctx, "Performing health check...") // TODO: 实现健康检查逻辑 // 1. 检查数据库连接 // 2. 检查Redis连接 // 3. 检查第三方API可用性 // 4. 记录系统状态 g.Log().Debug(ctx, "Health check completed") } // refreshCache 刷新缓存 func (l *Logic) refreshCache(ctx context.Context) { g.Log().Info(ctx, "Starting cache refresh...") // TODO: 实现缓存刷新逻辑 // 1. 刷新英雄数据缓存 // 2. 刷新神器数据缓存 // 3. 刷新其他业务缓存 g.Log().Info(ctx, "Cache refresh completed") } // 刷新OSS图片预签名URL缓存的定时任务 func (l *Logic) refreshOssPresignUrlCacheJob(ctx context.Context) { g.Log().Info(ctx, "Starting OSS presigned URL cache refresh...") // 1. 从数据库读取所有英雄图片地址 var dbHeroes []*entity.EpicHeroInfo err := dao.EpicHeroInfo.Ctx(ctx).Scan(&dbHeroes) if err != nil { g.Log().Error(ctx, "Failed to query hero info for OSS presign refresh:", err) return } // 2. 提取OSS图片key(去掉域名和bucket前缀) var keys []string for _, hero := range dbHeroes { headImgUrl := hero.HeadImgUrl if headImgUrl == "" { continue } // 只处理以http开头的图片地址 // 例:https://s3.bitiful.net/bucket/epic/hero/xxx.png 或 https://bfoss.htoop.cn/epic/hero/xxx.png // 目标key: epic/hero/xxx.png key := "" if idx := strings.Index(headImgUrl, "/epic/hero/"); idx != -1 { key = headImgUrl[idx+1:] } if key != "" { keys = append(keys, key) } } expire := 1 * time.Hour // 预签名URL有效期 err = util.RefreshOssPresignedUrlCache(ctx, keys, expire) if err != nil { g.Log().Error(ctx, "OSS presigned URL cache refresh failed:", err) } else { g.Log().Info(ctx, "OSS presigned URL cache refresh completed") } }