feat(cron): 实现定时任务管理功能
- 新增 cron模块,支持定时任务管理- 实现了任务列表获取、任务添加、任务移除和任务状态获取等接口 - 添加了默认任务,包括数据同步、数据清理、健康检查和缓存刷新等 - 实现了优雅关闭功能,确保在服务停止时正确停止所有任务 - 添加了定时任务相关文档和使用指南
This commit is contained in:
246
internal/logic/cron/cron.go
Normal file
246
internal/logic/cron/cron.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"epic/internal/service"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gcron"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Logic struct {
|
||||
cron *gcron.Cron
|
||||
jobs map[string]*gcron.Entry
|
||||
jobsMux sync.RWMutex
|
||||
sync *ThirdPartyDataSync
|
||||
}
|
||||
|
||||
func New() *Logic {
|
||||
return &Logic{
|
||||
cron: gcron.New(),
|
||||
jobs: make(map[string]*gcron.Entry),
|
||||
sync: NewThirdPartyDataSync(),
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
service.SetCron(New())
|
||||
}
|
||||
|
||||
// StartAllJobs 启动所有定时任务
|
||||
func (l *Logic) StartAllJobs(ctx context.Context) error {
|
||||
g.Log().Info(ctx, "Starting all cron jobs...")
|
||||
|
||||
// 启动定时任务调度器
|
||||
l.cron.Start()
|
||||
|
||||
// 注册默认的定时任务
|
||||
if err := l.registerDefaultJobs(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
g.Log().Info(ctx, "All cron jobs started successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopAllJobs 停止所有定时任务
|
||||
func (l *Logic) StopAllJobs(ctx context.Context) error {
|
||||
g.Log().Info(ctx, "Stopping all cron jobs...")
|
||||
|
||||
l.jobsMux.Lock()
|
||||
defer l.jobsMux.Unlock()
|
||||
|
||||
// 停止所有任务
|
||||
for name, entry := range l.jobs {
|
||||
l.cron.Remove(entry.Name)
|
||||
delete(l.jobs, name)
|
||||
g.Log().Infof(ctx, "Stopped job: %s", name)
|
||||
}
|
||||
|
||||
// 停止调度器
|
||||
l.cron.Stop()
|
||||
|
||||
g.Log().Info(ctx, "All cron jobs stopped successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddJob 添加定时任务
|
||||
func (l *Logic) AddJob(ctx context.Context, name, cron string, job func()) error {
|
||||
l.jobsMux.Lock()
|
||||
defer l.jobsMux.Unlock()
|
||||
|
||||
// 检查任务是否已存在
|
||||
if _, exists := l.jobs[name]; exists {
|
||||
return gerror.New("job already exists: " + name)
|
||||
}
|
||||
|
||||
// 添加任务到调度器
|
||||
entry, err := l.cron.Add(ctx, cron, func(ctx context.Context) {
|
||||
startTime := gtime.Now()
|
||||
g.Log().Infof(ctx, "Starting job: %s at %s", name, startTime.String())
|
||||
job()
|
||||
endTime := gtime.Now()
|
||||
duration := endTime.Sub(startTime)
|
||||
g.Log().Infof(ctx, "Completed job: %s, duration: %v", name, duration)
|
||||
}, name)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 保存任务引用
|
||||
l.jobs[name] = entry
|
||||
g.Log().Infof(ctx, "Added job: %s with cron: %s", name, cron)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveJob 移除定时任务
|
||||
func (l *Logic) RemoveJob(ctx context.Context, name string) error {
|
||||
l.jobsMux.Lock()
|
||||
defer l.jobsMux.Unlock()
|
||||
|
||||
entry, exists := l.jobs[name]
|
||||
if !exists {
|
||||
return gerror.New("job not found: " + name)
|
||||
}
|
||||
|
||||
// 从调度器中移除任务
|
||||
l.cron.Remove(entry.Name)
|
||||
delete(l.jobs, name)
|
||||
|
||||
g.Log().Infof(ctx, "Removed job: %s", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetJobStatus 获取任务状态
|
||||
func (l *Logic) GetJobStatus(ctx context.Context, name string) (bool, error) {
|
||||
l.jobsMux.RLock()
|
||||
defer l.jobsMux.RUnlock()
|
||||
|
||||
_, exists := l.jobs[name]
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
// registerDefaultJobs 注册默认的定时任务
|
||||
func (l *Logic) registerDefaultJobs(ctx context.Context) error {
|
||||
// 每小时执行一次数据同步任务
|
||||
if err := l.AddJob(ctx, "data_sync_hourly", "0 0 * * * *", func() {
|
||||
l.syncDataFromThirdParty(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 每天凌晨2点执行数据清理任务
|
||||
if err := l.AddJob(ctx, "data_cleanup_daily", "0 0 2 * * *", func() {
|
||||
l.cleanupOldData(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 每5分钟执行一次健康检查任务
|
||||
if err := l.AddJob(ctx, "health_check", "0 0/5 * * * *", func() {
|
||||
l.healthCheck(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 每30分钟执行一次缓存刷新任务
|
||||
if err := l.AddJob(ctx, "cache_refresh", "0 0/5 * * * *", func() {
|
||||
l.refreshCache(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 每天凌晨3点执行英雄数据同步
|
||||
if err := l.AddJob(ctx, "hero_sync_daily", "0 0 3 * * *", func() {
|
||||
l.syncHeroData(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 每天凌晨4点执行神器数据同步
|
||||
if err := l.AddJob(ctx, "artifact_sync_daily", "0 0 4 * * *", func() {
|
||||
l.syncArtifactData(ctx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncDataFromThirdParty 从第三方网站同步数据
|
||||
func (l *Logic) syncDataFromThirdParty(ctx context.Context) {
|
||||
g.Log().Info(ctx, "Starting data sync from third party...")
|
||||
|
||||
// 使用第三方数据同步器
|
||||
if err := l.sync.SyncAllData(ctx); err != nil {
|
||||
g.Log().Error(ctx, "Data sync failed:", err)
|
||||
return
|
||||
}
|
||||
|
||||
g.Log().Info(ctx, "Data sync completed")
|
||||
}
|
||||
|
||||
// syncHeroData 同步英雄数据
|
||||
func (l *Logic) syncHeroData(ctx context.Context) {
|
||||
g.Log().Info(ctx, "Starting hero data sync...")
|
||||
|
||||
if err := l.sync.SyncHeroData(ctx); err != nil {
|
||||
g.Log().Error(ctx, "Hero data sync failed:", err)
|
||||
return
|
||||
}
|
||||
|
||||
g.Log().Info(ctx, "Hero data sync completed")
|
||||
}
|
||||
|
||||
// syncArtifactData 同步神器数据
|
||||
func (l *Logic) syncArtifactData(ctx context.Context) {
|
||||
g.Log().Info(ctx, "Starting artifact data sync...")
|
||||
|
||||
if err := l.sync.SyncArtifactData(ctx); err != nil {
|
||||
g.Log().Error(ctx, "Artifact data sync failed:", err)
|
||||
return
|
||||
}
|
||||
|
||||
g.Log().Info(ctx, "Artifact data sync completed")
|
||||
}
|
||||
|
||||
// cleanupOldData 清理旧数据
|
||||
func (l *Logic) cleanupOldData(ctx context.Context) {
|
||||
g.Log().Info(ctx, "Starting data cleanup...")
|
||||
|
||||
// TODO: 实现数据清理逻辑
|
||||
// 1. 删除过期的缓存数据
|
||||
// 2. 清理过期的日志记录
|
||||
// 3. 归档历史数据
|
||||
|
||||
g.Log().Info(ctx, "Data cleanup completed")
|
||||
}
|
||||
|
||||
// healthCheck 健康检查
|
||||
func (l *Logic) healthCheck(ctx context.Context) {
|
||||
g.Log().Debug(ctx, "Performing health check...")
|
||||
|
||||
// TODO: 实现健康检查逻辑
|
||||
// 1. 检查数据库连接
|
||||
// 2. 检查Redis连接
|
||||
// 3. 检查第三方API可用性
|
||||
// 4. 记录系统状态
|
||||
|
||||
g.Log().Debug(ctx, "Health check completed")
|
||||
}
|
||||
|
||||
// refreshCache 刷新缓存
|
||||
func (l *Logic) refreshCache(ctx context.Context) {
|
||||
g.Log().Info(ctx, "Starting cache refresh...")
|
||||
|
||||
// TODO: 实现缓存刷新逻辑
|
||||
// 1. 刷新英雄数据缓存
|
||||
// 2. 刷新神器数据缓存
|
||||
// 3. 刷新其他业务缓存
|
||||
|
||||
g.Log().Info(ctx, "Cache refresh completed")
|
||||
}
|
||||
Reference in New Issue
Block a user