using Microsoft.Practices.Unity; using MyCode.Project.Domain.Message.Response.Queue; using MyCode.Project.Infrastructure.Cache; using MyCode.Project.Infrastructure.Common; using MyCode.Project.Infrastructure.Constant; using MyCode.Project.Infrastructure.Extensions; using MyCode.Project.Infrastructure.UnityExtensions; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace MyCode.Project.Services.Implementation { public class QueueProcessService:ServiceBase, IQueueProcessService { #region 初始化 private readonly IMyCodeCacheService _myCodeCacheService; public QueueProcessService(IMyCodeCacheService myCodeCacheService) { _myCodeCacheService = myCodeCacheService; } #endregion #region AddQueue(添加队列) /// /// 添加队列 /// /// 方法名 /// 参数信息 public void AddQueue(string methodName, DateTime? expireTime, object entity = null) { AddQueue(typeof(T), methodName, expireTime, entity); } #endregion #region AddQueue(添加队列) /// /// 添加队列 /// /// 执行类 /// 方法名 /// 参数信息 public void AddQueue(Type type, string methodName, DateTime? expireTime, object entity = null) { string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name); string paramInfo = entity == null ? "" : entity is string || entity is Guid || entity is int || entity is long || entity is decimal || entity is float || entity is double ? entity.ToString() : JsonConvert.SerializeObject(entity); AddQueue(typePath, methodName, expireTime, paramInfo); } #endregion #region AddQueue(添加队列) /// /// 添加调度任务 /// /// 类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services /// 方法名 /// 参数信息 public void AddQueue(string typePath, string methodName, DateTime? expireTime, string paramInfo = "") { var model = new QueueProcess() { MethodName = methodName, Parameter = paramInfo, TypePath = typePath, CreateTime = DateTime.Now, ExpireTime = expireTime }; //这里判断下,队列中是否已经存在 _myCodeCacheService.Push(CacheKey.LxmQueueCacheKey, model); } #endregion #region GetLen(返回队列长度) /// /// 返回队列长度 /// /// public long GetLen() { return _myCodeCacheService.ListLen(CacheKey.LxmQueueCacheKey); } #endregion #region Execute(调度执行,普通调度) /// /// 涉及到资源释放问题,每次开启一个线程去执行 /// public void Execute() { var queueLen = _myCodeCacheService.ListLen(CacheKey.LxmQueueCacheKey); Console.WriteLine($"目前队列长度:{queueLen}"); var objProcess = _myCodeCacheService.Popup(CacheKey.LxmQueueCacheKey); if (objProcess == null) { return; } var process = (QueueProcess)objProcess; if (process.ExpireTime != null && DateTime.Now > process.ExpireTime) { var error = $"过期不进行,{process.ToJson()}"; LogHelper.Error(error); //如果需要执行的调度过期了,则跳过继续执行 return; } //这里对AOP的事务或者缓存也会生效 var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.TypePath)); Console.WriteLine($"执行方法:{process.TypePath}.{process.MethodName}"); Console.WriteLine($"参数:{process.Parameter}"); MethodInfo method = type.GetType().GetMethod(process.MethodName); try { if (!string.IsNullOrEmpty(process.Parameter)) { method.Invoke(type, new object[] { process.Parameter }); } else { method.Invoke(type, new object[] { }); } } catch (Exception ex) { LogHelper.Error($"执行方法:{process.TypePath}.{process.MethodName},传参:{process.Parameter}"); LogHelper.Error(ex); DingDingHelper.SendMsg($"调度QueueProcess异常:{ex}"); } } #endregion } }