311 lines
11 KiB
C#
311 lines
11 KiB
C#
using Microsoft.Practices.Unity;
|
||
using MyCode.Project.Domain.Config;
|
||
using MyCode.Project.Domain.Model;
|
||
using MyCode.Project.Domain.Repositories;
|
||
using MyCode.Project.Infrastructure.Common;
|
||
using MyCode.Project.Infrastructure.Enumeration;
|
||
using MyCode.Project.Infrastructure.Exceptions;
|
||
using MyCode.Project.Infrastructure.Extensions;
|
||
using MyCode.Project.Infrastructure.UnityExtensions;
|
||
using MyCode.Project.Repositories.Common;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Reflection;
|
||
using System.Text;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace MyCode.Project.Services.Implementation
|
||
{
|
||
public class WorkProcessService : ServiceBase, IWorkProcessService
|
||
{
|
||
#region 初始化
|
||
private readonly ISysWorkprocessRepository _sysWorkprocessRepository;
|
||
private readonly ISysWorkprocessHistoryRepository _sysWorkprocessHistoryRepository;
|
||
|
||
public WorkProcessService(ISysWorkprocessRepository sysWorkprocessRepository,
|
||
ISysWorkprocessHistoryRepository sysWorkprocessHistoryRepository)
|
||
{
|
||
_sysWorkprocessRepository = sysWorkprocessRepository;
|
||
_sysWorkprocessHistoryRepository = sysWorkprocessHistoryRepository;
|
||
}
|
||
#endregion
|
||
|
||
|
||
#region SelectInitWorkProcess(返回前几十条数据)
|
||
public List<SysWorkprocess> SelectInitWorkProcess(int top)
|
||
{
|
||
var result = _sysWorkprocessRepository.Queryable()
|
||
.Where(p => p.Status == (int)WorkProcessStatus.Init)
|
||
.Take(top)
|
||
.OrderBy(p => p.Priority)
|
||
.OrderBy(p => p.EditTime).ToList();
|
||
|
||
return result;
|
||
}
|
||
#endregion
|
||
|
||
#region Add(批量添加)
|
||
public void Add(List<SysWorkprocess> workProcess)
|
||
{
|
||
_sysWorkprocessRepository.Add(workProcess);
|
||
}
|
||
#endregion
|
||
|
||
#region RestratStopProcess(重新启用所有暂停了的调度,这里不需要事务,因为修改失败也不影响)
|
||
public void RestratStopProcess()
|
||
{
|
||
var list = _sysWorkprocessRepository.SelectList(p => p.Status == (int)WorkProcessStatus.Stop);
|
||
list.ForEach(x => {
|
||
x.Status = (int)WorkProcessStatus.Running;
|
||
x.EditTime = DateTime.Now;
|
||
_sysWorkprocessRepository.Update(x);
|
||
|
||
});
|
||
}
|
||
#endregion
|
||
|
||
#region RestartStopProcess(重新启用某个暂停了的调度)
|
||
[TransactionCallHandler]
|
||
public void RestartStopProcess(long workprocessId)
|
||
{
|
||
|
||
var workprocess = _sysWorkprocessRepository.SelectFirst(p => p.Id == workprocessId);
|
||
|
||
if (workprocess.Status != (int)WorkProcessStatus.Stop) { throw new BaseException("当前进程状态不是停止"); }
|
||
|
||
workprocess.Status = (int)WorkProcessStatus.Init;
|
||
workprocess.EditTime = DateTime.Now;
|
||
_sysWorkprocessRepository.Update(workprocess);
|
||
|
||
}
|
||
#endregion
|
||
|
||
#region ProcessHandleExpire(进行执行超时)
|
||
/// <summary>
|
||
/// 进程执行超时
|
||
/// </summary>
|
||
public void ProcessHandleExpire()
|
||
{
|
||
var time = DateTime.Now.AddMinutes(-10);
|
||
|
||
var count = _sysWorkprocessRepository.Count(p => p.Status == (int)WorkProcessStatus.Running && p.EditTime <= time);
|
||
|
||
if (count > 0) { DingDingHelper.SendMsg($"有超时的进程任务产生,条数:{count}"); }
|
||
}
|
||
#endregion
|
||
|
||
#region 清空历史数据
|
||
[TransactionCallHandler]
|
||
public void ClearHistoryTask()
|
||
{
|
||
var historyDate = DateTime.Now.AddDays(-7);
|
||
|
||
var historyData = _sysWorkprocessRepository.SelectList(p => p.CreateTime < historyDate);
|
||
|
||
if (historyData == null || historyData.Count == 0) { return; }
|
||
|
||
_sysWorkprocessRepository.Add(historyData, "sys_workprocess_history");
|
||
|
||
_sysWorkprocessRepository.DeleteByIds(historyData.Select(p => p.Id).ToArray());
|
||
}
|
||
#endregion
|
||
|
||
#region ExecuteSingle(执行单个)
|
||
public void ExecuteSingle(SysWorkprocess process)
|
||
{
|
||
|
||
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.TypePath));
|
||
|
||
MethodInfo method = type.GetType().GetMethod(process.MethodName);
|
||
|
||
if (!string.IsNullOrEmpty(process.ParameterInfo))
|
||
{
|
||
method.Invoke(type, new object[] { process.ParameterInfo });
|
||
}
|
||
else
|
||
{
|
||
method.Invoke(type, new object[] { });
|
||
}
|
||
|
||
process.Status = (int)WorkProcessStatus.Finished;
|
||
process.EditTime = DateTime.Now;
|
||
_sysWorkprocessRepository.Update(process);
|
||
|
||
}
|
||
#endregion
|
||
|
||
#region Execute(调度执行)
|
||
|
||
public void Execute()
|
||
{
|
||
|
||
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
|
||
|
||
var list = SelectInitWorkProcess(10);
|
||
|
||
//先将这20个任务改成运行中
|
||
if (list != null && list.Count > 0)
|
||
{
|
||
DateTime now = DateTime.Now;
|
||
var updateList = list.Select(p => new SysWorkprocess { Id = p.Id, Status = (int)WorkProcessStatus.Running, EditTime = now });
|
||
|
||
_sysWorkprocessRepository.Update(updateList.ToList(), it => new { it.Status, it.EditTime });
|
||
}
|
||
|
||
foreach (var process in list)
|
||
{
|
||
//这里开启事务,同时才开启
|
||
client.Ado.BeginTran();
|
||
|
||
#region 执行一个任务
|
||
try
|
||
{
|
||
ExecuteSingle(process);
|
||
|
||
client.Ado.CommitTran();
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
client.Ado.RollbackTran();
|
||
|
||
while (ex.InnerException != null)
|
||
{
|
||
ex = ex.InnerException;
|
||
}
|
||
|
||
process.Status = (int)WorkProcessStatus.Stop;
|
||
process.EditTime = DateTime.Now;
|
||
if (ex is BaseException)
|
||
{
|
||
process.ExceptionInfo = ex.Message;
|
||
}
|
||
else
|
||
{
|
||
process.ExceptionInfo = ex.ToString();
|
||
}
|
||
|
||
_sysWorkprocessRepository.Update(process);
|
||
|
||
DingDingHelper.SendMsg(process.ExceptionInfo);
|
||
}
|
||
finally
|
||
{
|
||
client.Ado.Context.Queues.Clear();
|
||
}
|
||
#endregion
|
||
|
||
}
|
||
|
||
}
|
||
#endregion
|
||
|
||
#region Add(添加调度)
|
||
|
||
/// <summary>
|
||
/// 添加调度
|
||
/// </summary>
|
||
/// <typeparam name="T">执行类</typeparam>
|
||
/// <param name="merchantId">商家ID</param>
|
||
/// <param name="methodName">方法名</param>
|
||
/// <param name="remark">备注</param>
|
||
/// <param name="entity">参数信息</param>
|
||
/// <param name="funcType">执行类型</param>
|
||
public void Add<T>(string methodName,long companyId, object entity = null, string remark = "", FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
|
||
{
|
||
Add(typeof(T), companyId,methodName, remark, entity, funcType, priority);
|
||
}
|
||
#endregion
|
||
|
||
#region Add(添加调度)
|
||
/// <summary>
|
||
/// 添加调度
|
||
/// </summary>
|
||
/// <param name="merchantId">商家ID</param>
|
||
/// <param name="type">执行类</param>
|
||
/// <param name="methodName">方法名</param>
|
||
/// <param name="remark">备注</param>
|
||
/// <param name="entity">参数信息</param>
|
||
/// <param name="funcType">执行类型</param>
|
||
public void Add(Type type,long companyId, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
|
||
{
|
||
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
|
||
string paramInfo = entity == null
|
||
? ""
|
||
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
|
||
entity is float || entity is double
|
||
? entity.ToString()
|
||
: entity.ToJson();
|
||
Add(typePath,companyId, methodName, remark, paramInfo, funcType, priority);
|
||
}
|
||
#endregion
|
||
|
||
#region AddQueue(添加进程调度)
|
||
public void AddQueue<T>(string methodName, long companyId,object entity = null,FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
|
||
{
|
||
var type = typeof(T);
|
||
|
||
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
|
||
|
||
string paramInfo = entity == null
|
||
? ""
|
||
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
|
||
entity is float || entity is double
|
||
? entity.ToString()
|
||
: entity.ToJson();
|
||
|
||
var workProcess = new SysWorkprocess()
|
||
{
|
||
Id = IdHelper.GetNewId(),
|
||
FuncType = (byte)funcType,
|
||
TypePath = typePath,
|
||
MethodName = methodName,
|
||
ParameterInfo = paramInfo,
|
||
Status = (int)WorkProcessStatus.Init,
|
||
EditTime = DateTime.Now,
|
||
Priority = (byte)priority,
|
||
CreateTime = DateTime.Now,
|
||
CompanyId = companyId
|
||
|
||
};
|
||
|
||
_sysWorkprocessRepository.AddQueue(workProcess);
|
||
}
|
||
#endregion
|
||
|
||
#region Add(添加调度)
|
||
/// <summary>
|
||
/// 添加调度任务
|
||
/// </summary>
|
||
/// <param name="merchantId">商家ID</param>
|
||
/// <param name="typePath">类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services</param>
|
||
/// <param name="methodName">方法名</param>
|
||
/// <param name="remark">备注</param>
|
||
/// <param name="paramInfo">参数信息</param>
|
||
/// <param name="funcType">执行类型</param>
|
||
public void Add(string typePath,long companyId, string methodName, string remark = "", string paramInfo = "",
|
||
FuncType funcType = FuncType.Function,
|
||
Priority priority = Priority.Low)
|
||
{
|
||
|
||
var entity = new SysWorkprocess()
|
||
{
|
||
Id = IdHelper.GetNewId(),
|
||
FuncType = (byte)funcType,
|
||
TypePath = typePath,
|
||
MethodName = methodName,
|
||
ParameterInfo = paramInfo,
|
||
Status = (int)WorkProcessStatus.Init,
|
||
Remark = remark,
|
||
EditTime = DateTime.Now,
|
||
Priority = (byte)priority,
|
||
CreateTime = DateTime.Now,
|
||
CompanyId = companyId
|
||
};
|
||
_sysWorkprocessRepository.Add(entity);
|
||
}
|
||
#endregion
|
||
}
|
||
}
|