using Microsoft.Practices.Unity;
using MyCode.Project.Domain.Model;
using MyCode.Project.Domain.Repositories;
using MyCode.Project.Infrastructure.Common;
using MyCode.Project.Infrastructure.Enumeration;
using MyCode.Project.Infrastructure.Exceptions;
using MyCode.Project.Infrastructure.Extensions;
using MyCode.Project.Infrastructure.UnityExtensions;
using MyCode.Project.Repositories.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
namespace MyCode.Project.Services.Implementation
{
///
/// 工作调度模块 相关服务
///
public class WorkProcessService : ServiceBase, IWorkProcessService
{
private ISysWorkProcessV2Repository _sysWorkProcessV2Repository;
private static object locker = new object(); //创建锁
public WorkProcessService(ISysWorkProcessV2Repository sysWorkProcessV2Repository)
{
_sysWorkProcessV2Repository = sysWorkProcessV2Repository;
}
#region Add(添加调度)
///
/// 添加调度
///
/// 执行类
/// 商家ID
/// 方法名
/// 备注
/// 参数信息
/// 执行类型
public void Add(Guid merchantId, string methodName, string remark = "", object entity = null, int priority = 5, FuncType funcType = FuncType.Method) where T : class
{
Add(merchantId, typeof(T), methodName, remark, entity, funcType, priority);
}
///
/// 添加调度
///
/// 商家ID
/// 执行类
/// 方法名
/// 备注
/// 参数信息
/// 执行类型
public void Add(Guid merchantId, Type type, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Method, int priority = 5)
{
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name );
string paramInfo = entity == null
? ""
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
entity is float || entity is double
? entity.SafeString()
: JsonHelper.ToJson(entity);
Add(merchantId, typePath, methodName, remark, paramInfo, funcType, priority);
}
///
/// 添加调度任务
///
/// 商家ID
/// 类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services
/// 方法名
/// 备注
/// 参数信息
/// 执行类型
public void Add(Guid merchantId, string typePath, string methodName, string remark = "", string paramInfo = "",
FuncType funcType = FuncType.Method, int priority = 5)
{
if (this._sysWorkProcessV2Repository.Exists(funcType, merchantId, typePath, methodName, paramInfo) && methodName!= "RunWechatVSKingDee")
{
return;
}
SysWorkProcessV2 entity = new SysWorkProcessV2();
entity.ID = Guid.NewGuid();
entity.MerchantID = merchantId;
entity.FuncType = funcType.Value();
entity.FuncClass = typePath;
entity.FuncMethod = methodName;
entity.ParamInfo = paramInfo;
entity.FuncStatus = FuncStatus.Waiting.Value();
entity.Remark = remark;
entity.RetryCount = 0;
entity.Creater = "系统调度";
entity.CreateTime = DateTime.Now;
entity.Editor = "系统调度";
entity.EditTime = DateTime.Now;
entity.Priority = priority;
this._sysWorkProcessV2Repository.Add(entity);
}
#endregion
#region SelectInitWorkProcess(返回前几十条数据)
public List SelectInitWorkProcess(int top)
{
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && ( p.Priority==1 || p.Priority==null) )
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region Add(批量添加)
public void Add(List workProcess)
{
_sysWorkProcessV2Repository.Add(workProcess);
}
#endregion
#region RestratStopProcess(重新启用所有暂停了的调度,这里不需要事务,因为修改失败也不影响)
public void RestratStopProcess()
{
var list = _sysWorkProcessV2Repository.SelectList(p => p.FuncStatus == (int)WorkProcessStatus.Pause);
list.ForEach(x => {
x.FuncStatus = (int)WorkProcessStatus.Running;
x.EditTime = DateTime.Now;
});
_sysWorkProcessV2Repository.Update(list);
}
#endregion
#region RestartStopProcess(重新启用某个暂停了的调度)
[TransactionCallHandler]
public void RestartStopProcess(Guid workprocessId)
{
var workprocess = _sysWorkProcessV2Repository.SelectFirst(p => p.ID == workprocessId);
if (workprocess.FuncStatus != (int)WorkProcessStatus.Pause) { throw new BaseException("当前进程状态不是停止"); }
workprocess.FuncStatus = (int)WorkProcessStatus.Running;
workprocess.EditTime = DateTime.Now;
_sysWorkProcessV2Repository.Update(workprocess);
}
#endregion
#region 清空历史数据
public void ClearHistoryTask()
{
var historyDate = DateTime.Now.AddDays(-7);
_sysWorkProcessV2Repository.Delete(p => p.FuncStatus == (int)WorkProcessStatus.Complete && p.EditTime < historyDate);
}
#endregion
#region ExecuteSingle(执行单个)
public void ExecuteSingle(SysWorkProcessV2 process)
{
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.FuncClass));
MethodInfo method = type.GetType().GetMethod(process.FuncMethod);
if (!string.IsNullOrEmpty(process.ParamInfo))
{
method.Invoke(type, new object[] { process.ParamInfo });
}
else
{
method.Invoke(type, new object[] { });
}
process.FuncStatus = (int)WorkProcessStatus.Complete;
process.ExecuteTime = DateTime.Now;
process.ExceptionInfo = string.Empty;
_sysWorkProcessV2Repository.Update(process);
}
#endregion
#region Execute(调度执行)
public void Execute()
{
lock (locker)
{
var client = UnityHelper.GetService();
var list = SelectInitWorkProcess(5);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region 执行一个任务
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
}
}
}
#endregion
#region SelectOtherInitWorkProcess(返回优先级大于2的前几十条数据)
///
/// 获取优先级3,4,5的调度
///
///
///
public List SelectOtherInitWorkProcess(int top)
{
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority > 2 && p.Priority <= 5)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecuteOther(调度执行优先级比较低的任务)
///
/// 调度执行优先级比较低的任务
///
public void ExecuteOther()
{
var client = UnityHelper.GetService();
var list = SelectOtherInitWorkProcess(20);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region 执行一个任务
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
}
}
#endregion
#region SelectPriority2InitWorkProcess(返回优先级等于2的前几十条数据)
///
/// 返回优先级等于2的前几十条数据
///
///
///
public List SelectPriority2InitWorkProcess(int top)
{
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 2)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecutePriority2Work(调度执行优先级等于2的任务)
///
/// 调度执行优先级等于2的任务
///
public void ExecutePriority2Work()
{
var client = UnityHelper.GetService();
var list = SelectPriority2InitWorkProcess(10);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region 执行一个任务
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
//Thread.Sleep(300);
}
}
#endregion
#region SelectSmsInitWorkProcess(获取优先级6的短信调度)
///
/// 获取优先级6的短信调度
///
///
///
public List SelectPriority6WorkProcess(int top)
{
DateTime days = DateTime.Now.Date;
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 6)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecutePriority6(调度执行优先级=6的任务)
///
/// 调度执行优先级=6的任务
///
public void ExecutePriority6()
{
var client = UnityHelper.GetService();
var list = SelectPriority6WorkProcess(20);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region 执行一个任务
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
}
}
#endregion
}
}