为什么编写TaskSchedulerEx类?
因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。
特点:
1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount
2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker
3、队列中尚未执行的任务可以取消
4、通过扩展类TaskHelper实现任务分组
5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程
6、代码量相当精简,TaskSchedulerEx类只有260多行代码
7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = cpu核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能
对比SmartThreadPool:
TaskSchedulerEx类代码(使用Semaphore实现):
using System; System.Collections.Concurrent; System.Collections.Generic; System.linq; System.Runtime.InteropServices; System.Text; System.Threading; System.Threading.Tasks;namespace Utils{ /// <summary> /// TaskScheduler扩展 每个实例都是独立线程池 </summary> public class TaskSchedulerEx : TaskScheduler,Idisposable { #region 外部方法 [Dllimport("kernel32.dll",EntryPoint = SetProcessWorkingSetSize")] static extern int SetProcessWorkingSetSize(IntPtr process,int minSize,1)">int maxSize); #endregion #region 变量属性事件 private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); private int _coreThreadCount = 0; int _maxThreadCount = int _auxiliaryThreadTimeOut = 20000; //辅助线程释放时间 int _activeThreadCount = private System.Timers.Timer _timer; object _lockCreateTimer = new objectbool _run = trueprivate Semaphore _sem = nullint _semmaxCount = int.MaxValue; 可以同时授予的信号量的最大请求数 int _semCount = 0; 可用信号量请求数 int _runcount = 正在执行的和等待执行的任务数量 <summary> 活跃线程数 </summary> ActiveThreadCount { get { return _activeThreadCount; } } 核心线程数 CoreThreadCount { _coreThreadCount; } } 最大线程数 MaxThreadCount { _maxThreadCount; } } #region 构造函数 TaskScheduler扩展 每个实例都是独立线程池 </summary> <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param> <param name="maxThreadCount">最大线程数</param> public TaskSchedulerEx(int coreThreadCount = 10,1)">int maxThreadCount = 20) { _sem = new Semaphore(,_semmaxCount); _maxThreadCount = maxThreadCount; CreateCoreThreads(coreThreadCount); } #region overrIDe GetScheduledTasks protected overrIDe IEnumerable<Task> GetScheduledTasks() { _tasks; } #region overrIDe TryExecuteTaskInline overrIDe bool TryExecuteTaskInline(Task task,1)">bool taskwasprevIoUslyQueued) { return false; } #region overrIDe QueueTask voID QueueTask(Task task) { _tasks.Enqueue(task); while (_semCount >= _semmaxCount) 信号量已满,等待 { Thread.Sleep(1); } _sem.Release(); Interlocked.Increment(ref _semCount); Interlocked.Increment( _runcount); if (_activeThreadCount < _maxThreadCount && _activeThreadCount < _runcount) { CreateThread(); } } #region 资源释放 资源释放 队列中尚未执行的任务不再执行 dispose() { _run = ; if (_timer != ) { _timer.Stop(); _timer.dispose(); _timer = ; } while (_activeThreadCount > ) { _sem.Release(); Interlocked.Increment( _semCount); } } #region 创建核心线程池 创建核心线程池 voID CreateCoreThreads(int? coreThreadCount = ) { if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value; for (int i = 0; i < _coreThreadCount; i++) { Interlocked.Increment( _activeThreadCount); Thread thread = ; thread = new Thread(new ThreadStart(() => { Task task; while (_run) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); Interlocked.Decrement( _runcount); } else { _sem.WaitOne(); Interlocked.Decrement( _semCount); } } Interlocked.Decrement( _activeThreadCount); if (_activeThreadCount == ) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.Osversion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle,-1,-); } } })); thread.IsBackground = ; thread.Start(); } } #region 创建辅助线程 创建辅助线程 CreateThread() { Interlocked.Increment( _activeThreadCount); Thread thread = ; thread = { Task task; DateTime dt = DateTime.Now; while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut) { task)) { TryExecuteTask(task); Interlocked.Decrement( _runcount); dt = DateTime.Now; } { _sem.WaitOne(_auxiliaryThreadTimeOut); Interlocked.Decrement( _semCount); } } Interlocked.Decrement( _activeThreadCount); if (_activeThreadCount == _coreThreadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle,1)">); } } })); thread.IsBackground = ; thread.Start(); } #region 全部取消 全部取消 取消队列中尚未执行的任务 CancelAll() { Task tempTask; while (_tasks.TryDequeue( tempTask)) { Interlocked.Decrement( _runcount); } } #endregion }}VIEw Code
TaskSchedulerEx类代码(使用autoresetEvent实现):
private autoresetEvent _evt = new autoresetEvent(); ) { _maxThreadCount = QueueTask(Task task) { CreateTimer(); _tasks.Enqueue(task); _evt.Set(); } ) { _evt.Set(); } } task)) { TryExecuteTask(task); } { _evt.WaitOne(); } } Interlocked.Decrement( task)) { TryExecuteTask(task); dt = { _evt.WaitOne(_auxiliaryThreadTimeOut); } } Interlocked.Decrement(#region 创建定时器 CreateTimer() { if (_timer == null) _timer不为空时,跳过,不走lock,提升性能 { if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) 活跃线程数达到最大线程数时,跳过,不走lock,提升性能 { lock (_lockCreateTimer) { ) { _timer = new System.Timers.Timer(); _timer.Interval = _coreThreadCount == 0 ? 1 : 500; _timer.Elapsed += (s,e) => { if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) { if (_tasks.Count > ) { if (_timer.Interval != 20) _timer.Interval = ; CreateThread(); } { 500) _timer.Interval = ; } } { ) { _timer.Stop(); _timer.dispose(); _timer = ; } } }; _timer.Start(); } } } } } tempTask)) { } } }}VIEw Code
RunHelper类代码:
线程工具类 RunHelper { #region 变量属性事件 #region 线程中执行 线程中执行 static Task Run(this TaskScheduler scheduler,Action<object> doWork,1)">object arg = null,Action<Exception> errorAction = return Task.Factory.StartNew((obj) => { try { doWork(obj); } catch (Exception ex) { if (errorAction != ) errorAction(ex); LogUtil.Error(ex,ThreadUtil.Run错误); } },arg,CancellationToken.None,TaskCreationoptions.None,scheduler); } return Task.Factory.StartNew(() => { doWork(); } static Task<T> Run<T>(object,T> doWork,1)">return Task.Factory.StartNew<T>((obj) => { doWork(obj); } ); default(T); } },Func<T> doWork,1)">return Task.Factory.StartNew<T>(() => doWork(); } async Task<T> RunAsync<T>(await Task.Factory.StartNew<T>((obj) =>await Task.Factory.StartNew<T>(() => }}VIEw Code
TaskHelper扩展类:
Task帮助类基类 TaskHelper { #region 变量 处理器数 int _processorCount = Environment.ProcessorCount; #region UI任务 static TaskScheduler _UITask; UI任务(2-4个线程) TaskScheduler UITask { getif (_UITask == null) _UITask = new TaskSchedulerEx(2,4); _UITask; } } #region 菜单任务 TaskScheduler _MenuTask; 菜单任务(2-4个线程) TaskScheduler MenuTask { if (_MenuTask == null) _MenuTask = _MenuTask; } } #region 计算任务 TaskScheduler _CalcTask; 计算任务(线程数:处理器数*2) TaskScheduler CalcTask { if (_CalcTask == null) _CalcTask = new limitedTaskScheduler(_processorCount * 2 _CalcTask; } } #region 网络请求 TaskScheduler _RequestTask; 网络请求(8-32个线程) TaskScheduler RequestTask { if (_RequestTask == null) _RequestTask = 8,1)">32 _RequestTask; } } #region 数据库任务 TaskScheduler _DBTask; 数据库任务(8-32个线程) TaskScheduler DBTask { if (_DBTask == null) _DBTask = _DBTask; } } #region IO任务 TaskScheduler _IOTask; IO任务(8-32个线程) TaskScheduler IOTask { if (_IOTask == null) _IOTask = _IOTask; } } #region 首页任务 TaskScheduler _MainPageTask; 首页任务(8-32个线程) TaskScheduler MainPageTask { if (_MainPageTask == null) _MainPageTask = _MainPageTask; } } #region 图片加载任务 TaskScheduler _LoadImageTask; 图片加载任务(8-32个线程) TaskScheduler LoadImageTask { if (_LoadImageTask == null) _LoadImageTask = _LoadImageTask; } } #region 浏览器任务 TaskScheduler _browserTask; 浏览器任务(2-4个线程) TaskScheduler browserTask { if (_browserTask == null) _browserTask = _browserTask; } } }}VIEw Code
Form1.cs测试代码:
System.ComponentModel; System.Data; System.Drawing; System.Management; System.Reflection; System.Threading.Tasks; System.windows.Forms; Utils; test{ partial Form1 : Form { private TaskSchedulerEx _taskSchedulerEx = private TaskSchedulerEx _taskSchedulerExSmall = private TaskSchedulerEx _task = ; public Form1() { InitializeComponent(); _taskSchedulerEx = 50,1)">); _taskSchedulerExSmall = 5,1)">50); _task = 10); } voID Form1_Load( sender,EventArgs e) { } 模拟大量网络请求任务 voID button1_Click(200000,1)">1000,1)">); } 模拟cpu密集型任务 voID button2_Click(100000,1)">2000,1)">voID button3_Click(100,1)">voID button4_Click( 模拟任务 <param name="scheduler">scheduler<param name="taskCount">任务数量<param name="logCount">每隔多少条数据打一个日志<param name="delay">模拟延迟或耗时(毫秒)voID DoTask(TaskSchedulerEx scheduler,1)">int taskCount,1)">int logCount,1)"> delay) { _task.Run(() => { Log(开始); DateTime dt = DateTime.Now; List<Task> taskList = new List<Task>(); 1; i <= taskCount; i++) { Task task = scheduler.Run((obj) => { var k = ()obj; Thread.Sleep(delay); 模拟延迟或耗时 if (k % logCount == ) { Log(最大线程数:" + scheduler.MaxThreadCount + 核心线程数:" + scheduler.CoreThreadCount + 活跃线程数:" + scheduler.ActiveThreadCount.ToString().Padleft(4,1)">' ') + 处理数/总数:" + k + / " + taskCount); } },i,(ex) => { Log(ex.Message); }); taskList.Add(task); } Task.WaitAll(taskList.ToArray()); double d = DateTime.Now.Subtract(dt).TotalSeconds; Log(完成,耗时:" + d + 秒); }); } voID Form1_FormClosed(if (_taskSchedulerEx != ) { _taskSchedulerEx.dispose(); 释放资源 _taskSchedulerEx = ; } } }}VIEw Code
测试截图:
总结
以上是内存溢出为你收集整理的C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task全部内容,希望文章能够帮你解决C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)