- 注释掉 OSS预签名 URL 缓存刷新任务的定时执行代码 - 在 hero/hero.go 中增加对 Redis缓存和英雄数据集的非空校验 - 修改 OSS预签名 URL 生成逻辑,自动替换为 CDN 域名
299 lines
7.2 KiB
Go
299 lines
7.2 KiB
Go
package cron
|
||
|
||
import (
|
||
"context"
|
||
"epic/internal/dao"
|
||
"epic/internal/model/entity"
|
||
"epic/internal/service"
|
||
"epic/internal/util"
|
||
"github.com/gogf/gf/v2/errors/gerror"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gcron"
|
||
"github.com/gogf/gf/v2/os/gtime"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type Logic struct {
|
||
cron *gcron.Cron
|
||
jobs map[string]*gcron.Entry
|
||
jobsMux sync.RWMutex
|
||
sync *ThirdPartyDataSync
|
||
}
|
||
|
||
func New() *Logic {
|
||
return &Logic{
|
||
cron: gcron.New(),
|
||
jobs: make(map[string]*gcron.Entry),
|
||
sync: NewThirdPartyDataSync(),
|
||
}
|
||
}
|
||
|
||
func init() {
|
||
service.SetCron(New())
|
||
}
|
||
|
||
// StartAllJobs 启动所有定时任务
|
||
func (l *Logic) StartAllJobs(ctx context.Context) error {
|
||
g.Log().Info(ctx, "Starting all cron jobs...")
|
||
|
||
// 启动定时任务调度器
|
||
l.cron.Start()
|
||
|
||
// 注册默认的定时任务
|
||
if err := l.registerDefaultJobs(ctx); err != nil {
|
||
return err
|
||
}
|
||
|
||
g.Log().Info(ctx, "All cron jobs started successfully")
|
||
return nil
|
||
}
|
||
|
||
// StopAllJobs 停止所有定时任务
|
||
func (l *Logic) StopAllJobs(ctx context.Context) error {
|
||
g.Log().Info(ctx, "Stopping all cron jobs...")
|
||
|
||
l.jobsMux.Lock()
|
||
defer l.jobsMux.Unlock()
|
||
|
||
// 停止所有任务
|
||
for name, entry := range l.jobs {
|
||
l.cron.Remove(entry.Name)
|
||
delete(l.jobs, name)
|
||
g.Log().Infof(ctx, "Stopped job: %s", name)
|
||
}
|
||
|
||
// 停止调度器
|
||
l.cron.Stop()
|
||
|
||
g.Log().Info(ctx, "All cron jobs stopped successfully")
|
||
return nil
|
||
}
|
||
|
||
// AddJob 添加定时任务
|
||
func (l *Logic) AddJob(ctx context.Context, name, cron string, job func()) error {
|
||
l.jobsMux.Lock()
|
||
defer l.jobsMux.Unlock()
|
||
|
||
// 检查任务是否已存在
|
||
if _, exists := l.jobs[name]; exists {
|
||
return gerror.New("job already exists: " + name)
|
||
}
|
||
|
||
// 添加任务到调度器
|
||
entry, err := l.cron.Add(ctx, cron, func(ctx context.Context) {
|
||
startTime := gtime.Now()
|
||
g.Log().Infof(ctx, "Starting job: %s at %s", name, startTime.String())
|
||
job()
|
||
endTime := gtime.Now()
|
||
duration := endTime.Sub(startTime)
|
||
g.Log().Infof(ctx, "Completed job: %s, duration: %v", name, duration)
|
||
}, name)
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 保存任务引用
|
||
l.jobs[name] = entry
|
||
g.Log().Infof(ctx, "Added job: %s with cron: %s", name, cron)
|
||
|
||
return nil
|
||
}
|
||
|
||
// RemoveJob 移除定时任务
|
||
func (l *Logic) RemoveJob(ctx context.Context, name string) error {
|
||
l.jobsMux.Lock()
|
||
defer l.jobsMux.Unlock()
|
||
|
||
entry, exists := l.jobs[name]
|
||
if !exists {
|
||
return gerror.New("job not found: " + name)
|
||
}
|
||
|
||
// 从调度器中移除任务
|
||
l.cron.Remove(entry.Name)
|
||
delete(l.jobs, name)
|
||
|
||
g.Log().Infof(ctx, "Removed job: %s", name)
|
||
return nil
|
||
}
|
||
|
||
// GetJobStatus 获取任务状态
|
||
func (l *Logic) GetJobStatus(ctx context.Context, name string) (bool, error) {
|
||
l.jobsMux.RLock()
|
||
defer l.jobsMux.RUnlock()
|
||
|
||
_, exists := l.jobs[name]
|
||
return exists, nil
|
||
}
|
||
|
||
// registerDefaultJobs 注册默认的定时任务
|
||
func (l *Logic) registerDefaultJobs(ctx context.Context) error {
|
||
// 每小时执行一次数据同步任务
|
||
if err := l.AddJob(ctx, "data_sync_hourly", "0 0 * * * *", func() {
|
||
l.syncDataFromThirdParty(ctx)
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每天凌晨2点执行数据清理任务
|
||
if err := l.AddJob(ctx, "data_cleanup_daily", "0 0 2 * * *", func() {
|
||
l.cleanupOldData(ctx)
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每5分钟执行一次健康检查任务
|
||
if err := l.AddJob(ctx, "health_check", "0 0/5 * * * *", func() {
|
||
l.healthCheck(ctx)
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每30分钟执行一次缓存刷新任务
|
||
if err := l.AddJob(ctx, "cache_refresh", "0 0/5 * * * *", func() {
|
||
l.refreshCache(ctx)
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每天凌晨3点执行英雄数据同步
|
||
if err := l.AddJob(ctx, "hero_sync_daily", "0 0 3 * * *", func() {
|
||
l.syncHeroData(ctx)
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每天凌晨4点执行神器数据同步
|
||
if err := l.AddJob(ctx, "artifact_sync_daily", "0 0 4 * * *", func() {
|
||
l.syncArtifactData(ctx)
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每30分钟执行一次OSS预签名URL缓存刷新任务
|
||
//if err := l.AddJob(ctx, "oss_presignurl_refresh", "0 0/30 * * * *", func() {
|
||
// l.refreshOssPresignUrlCacheJob(ctx)
|
||
//}); err != nil {
|
||
// return err
|
||
//}
|
||
|
||
return nil
|
||
}
|
||
|
||
// syncDataFromThirdParty 从第三方网站同步数据
|
||
func (l *Logic) syncDataFromThirdParty(ctx context.Context) {
|
||
g.Log().Info(ctx, "Starting data sync from third party...")
|
||
|
||
// 使用第三方数据同步器
|
||
if err := l.sync.SyncAllData(ctx); err != nil {
|
||
g.Log().Error(ctx, "Data sync failed:", err)
|
||
return
|
||
}
|
||
|
||
g.Log().Info(ctx, "Data sync completed")
|
||
}
|
||
|
||
// syncHeroData 同步英雄数据
|
||
func (l *Logic) syncHeroData(ctx context.Context) {
|
||
g.Log().Info(ctx, "Starting hero data sync...")
|
||
|
||
if err := l.sync.SyncHeroData(ctx); err != nil {
|
||
g.Log().Error(ctx, "Hero data sync failed:", err)
|
||
return
|
||
}
|
||
|
||
g.Log().Info(ctx, "Hero data sync completed")
|
||
}
|
||
|
||
// syncArtifactData 同步神器数据
|
||
func (l *Logic) syncArtifactData(ctx context.Context) {
|
||
g.Log().Info(ctx, "Starting artifact data sync...")
|
||
|
||
if err := l.sync.SyncArtifactData(ctx); err != nil {
|
||
g.Log().Error(ctx, "Artifact data sync failed:", err)
|
||
return
|
||
}
|
||
|
||
g.Log().Info(ctx, "Artifact data sync completed")
|
||
}
|
||
|
||
// cleanupOldData 清理旧数据
|
||
func (l *Logic) cleanupOldData(ctx context.Context) {
|
||
g.Log().Info(ctx, "Starting data cleanup...")
|
||
|
||
// TODO: 实现数据清理逻辑
|
||
// 1. 删除过期的缓存数据
|
||
// 2. 清理过期的日志记录
|
||
// 3. 归档历史数据
|
||
|
||
g.Log().Info(ctx, "Data cleanup completed")
|
||
}
|
||
|
||
// healthCheck 健康检查
|
||
func (l *Logic) healthCheck(ctx context.Context) {
|
||
g.Log().Debug(ctx, "Performing health check...")
|
||
|
||
// TODO: 实现健康检查逻辑
|
||
// 1. 检查数据库连接
|
||
// 2. 检查Redis连接
|
||
// 3. 检查第三方API可用性
|
||
// 4. 记录系统状态
|
||
|
||
g.Log().Debug(ctx, "Health check completed")
|
||
}
|
||
|
||
// refreshCache 刷新缓存
|
||
func (l *Logic) refreshCache(ctx context.Context) {
|
||
g.Log().Info(ctx, "Starting cache refresh...")
|
||
|
||
// TODO: 实现缓存刷新逻辑
|
||
// 1. 刷新英雄数据缓存
|
||
// 2. 刷新神器数据缓存
|
||
// 3. 刷新其他业务缓存
|
||
|
||
g.Log().Info(ctx, "Cache refresh completed")
|
||
}
|
||
|
||
// 刷新OSS图片预签名URL缓存的定时任务
|
||
func (l *Logic) refreshOssPresignUrlCacheJob(ctx context.Context) {
|
||
g.Log().Info(ctx, "Starting OSS presigned URL cache refresh...")
|
||
|
||
// 1. 从数据库读取所有英雄图片地址
|
||
var dbHeroes []*entity.EpicHeroInfo
|
||
err := dao.EpicHeroInfo.Ctx(ctx).Scan(&dbHeroes)
|
||
if err != nil {
|
||
g.Log().Error(ctx, "Failed to query hero info for OSS presign refresh:", err)
|
||
return
|
||
}
|
||
|
||
// 2. 提取OSS图片key(去掉域名和bucket前缀)
|
||
var keys []string
|
||
for _, hero := range dbHeroes {
|
||
headImgUrl := hero.HeadImgUrl
|
||
if headImgUrl == "" {
|
||
continue
|
||
}
|
||
// 只处理以http开头的图片地址
|
||
// 例:https://s3.bitiful.net/bucket/epic/hero/xxx.png 或 https://bfoss.htoop.cn/epic/hero/xxx.png
|
||
// 目标key: epic/hero/xxx.png
|
||
key := ""
|
||
if idx := strings.Index(headImgUrl, "/epic/hero/"); idx != -1 {
|
||
key = headImgUrl[idx+1:]
|
||
}
|
||
if key != "" {
|
||
keys = append(keys, key)
|
||
}
|
||
}
|
||
|
||
expire := 1 * time.Hour // 预签名URL有效期
|
||
err = util.RefreshOssPresignedUrlCache(ctx, keys, expire)
|
||
if err != nil {
|
||
g.Log().Error(ctx, "OSS presigned URL cache refresh failed:", err)
|
||
} else {
|
||
g.Log().Info(ctx, "OSS presigned URL cache refresh completed")
|
||
}
|
||
}
|