C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task

C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task,第1张

概述为什么编写TaskSchedulerEx类? 因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。 特点: 1

    为什么编写TaskSchedulerEx类?

    因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。

     特点:

    1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount

    2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker

    3、队列中尚未执行的任务可以取消

    4、通过扩展类TaskHelper实现任务分组

    5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程

    6、代码量相当精简,TaskSchedulerEx类只有260多行代码

    7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = cpu核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能

     对比SmartThreadPool:

    TaskSchedulerEx类代码(使用Semaphore实现):

using System; System.Collections.Concurrent; System.Collections.Generic; System.linq; System.Runtime.InteropServices; System.Text; System.Threading; System.Threading.Tasks;namespace Utils{    /// <summary>    /// TaskScheduler扩展     每个实例都是独立线程池    </summary>    public class TaskSchedulerEx : TaskScheduler,Idisposable    {        #region 外部方法        [Dllimport("kernel32.dll",EntryPoint = SetProcessWorkingSetSize")]        static extern int SetProcessWorkingSetSize(IntPtr process,int minSize,1)">int maxSize);        #endregion        #region 变量属性事件        private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();        private int _coreThreadCount = 0;        int _maxThreadCount = int _auxiliaryThreadTimeOut = 20000; //辅助线程释放时间        int _activeThreadCount = private System.Timers.Timer _timer;        object _lockCreateTimer = new objectbool _run = trueprivate Semaphore _sem = nullint _semmaxCount = int.MaxValue; 可以同时授予的信号量的最大请求数        int _semCount = 0; 可用信号量请求数        int _runcount = 正在执行的和等待执行的任务数量        <summary>         活跃线程数        </summary>         ActiveThreadCount        {            get { return _activeThreadCount; }        }         核心线程数         CoreThreadCount        {             _coreThreadCount; }        }         最大线程数         MaxThreadCount        {             _maxThreadCount; }        }        #region 构造函数         TaskScheduler扩展         每个实例都是独立线程池        </summary>        <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>        <param name="maxThreadCount">最大线程数</param>        public TaskSchedulerEx(int coreThreadCount = 10,1)">int maxThreadCount = 20)        {            _sem = new Semaphore(,_semmaxCount);            _maxThreadCount = maxThreadCount;            CreateCoreThreads(coreThreadCount);        }        #region overrIDe GetScheduledTasks        protected overrIDe IEnumerable<Task> GetScheduledTasks()        {             _tasks;        }        #region overrIDe TryExecuteTaskInline        overrIDe bool TryExecuteTaskInline(Task task,1)">bool taskwasprevIoUslyQueued)        {            return false;        }        #region overrIDe QueueTask        voID QueueTask(Task task)        {            _tasks.Enqueue(task);            while (_semCount >= _semmaxCount) 信号量已满,等待            {                Thread.Sleep(1);            }            _sem.Release();            Interlocked.Increment(ref _semCount);            Interlocked.Increment( _runcount);            if (_activeThreadCount < _maxThreadCount && _activeThreadCount < _runcount)            {                CreateThread();            }        }        #region 资源释放         资源释放         队列中尚未执行的任务不再执行         dispose()        {            _run = ;            if (_timer != )            {                _timer.Stop();                _timer.dispose();                _timer = ;            }            while (_activeThreadCount > )            {                _sem.Release();                Interlocked.Increment( _semCount);            }        }        #region 创建核心线程池         创建核心线程池        voID CreateCoreThreads(int? coreThreadCount = )        {            if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value;            for (int i = 0; i < _coreThreadCount; i++)            {                Interlocked.Increment( _activeThreadCount);                Thread thread = ;                thread = new Thread(new ThreadStart(() =>                {                    Task task;                    while (_run)                    {                        if (_tasks.TryDequeue(out task))                        {                            TryExecuteTask(task);                            Interlocked.Decrement( _runcount);                        }                        else                        {                            _sem.WaitOne();                            Interlocked.Decrement( _semCount);                        }                    }                    Interlocked.Decrement( _activeThreadCount);                    if (_activeThreadCount == )                    {                        GC.Collect();                        GC.WaitForPendingFinalizers();                        if (Environment.Osversion.Platform == PlatformID.Win32NT)                        {                            SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle,-1,-);                        }                    }                }));                thread.IsBackground = ;                thread.Start();            }        }        #region 创建辅助线程         创建辅助线程         CreateThread()        {            Interlocked.Increment( _activeThreadCount);            Thread thread = ;            thread =             {                Task task;                DateTime dt = DateTime.Now;                while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut)                {                     task))                    {                        TryExecuteTask(task);                        Interlocked.Decrement( _runcount);                        dt = DateTime.Now;                    }                                        {                        _sem.WaitOne(_auxiliaryThreadTimeOut);                        Interlocked.Decrement( _semCount);                    }                }                Interlocked.Decrement( _activeThreadCount);                if (_activeThreadCount == _coreThreadCount)                {                    GC.Collect();                    GC.WaitForPendingFinalizers();                     PlatformID.Win32NT)                    {                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle,1)">);                    }                }            }));            thread.IsBackground = ;            thread.Start();        }        #region 全部取消         全部取消         取消队列中尚未执行的任务         CancelAll()        {            Task tempTask;            while (_tasks.TryDequeue( tempTask))            {                Interlocked.Decrement( _runcount);            }        }        #endregion    }}
VIEw Code

    TaskSchedulerEx类代码(使用autoresetEvent实现):

private autoresetEvent _evt = new autoresetEvent();        )        {            _maxThreadCount = QueueTask(Task task)        {            CreateTimer();            _tasks.Enqueue(task);            _evt.Set();        }        )            {                _evt.Set();            }        }         task))                        {                            TryExecuteTask(task);                        }                                                {                            _evt.WaitOne();                        }                    }                    Interlocked.Decrement( task))                    {                        TryExecuteTask(task);                        dt =                    {                        _evt.WaitOne(_auxiliaryThreadTimeOut);                    }                }                Interlocked.Decrement(#region 创建定时器         CreateTimer()        {            if (_timer == null) _timer不为空时,跳过,不走lock,提升性能            {                if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) 活跃线程数达到最大线程数时,跳过,不走lock,提升性能                {                    lock (_lockCreateTimer)                    {                        )                        {                            _timer = new System.Timers.Timer();                            _timer.Interval = _coreThreadCount == 0 ? 1 : 500;                            _timer.Elapsed += (s,e) =>                            {                                if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount)                                {                                    if (_tasks.Count > )                                    {                                        if (_timer.Interval != 20) _timer.Interval = ;                                        CreateThread();                                    }                                                                        {                                        500) _timer.Interval = ;                                    }                                }                                                                {                                    )                                    {                                        _timer.Stop();                                        _timer.dispose();                                        _timer = ;                                    }                                }                            };                            _timer.Start();                        }                    }                }            }        }         tempTask)) { }        }            }}
VIEw Code

    RunHelper类代码:

 线程工具类     RunHelper    {        #region 变量属性事件        #region 线程中执行         线程中执行        static Task Run(this TaskScheduler scheduler,Action<object> doWork,1)">object arg = null,Action<Exception> errorAction = return Task.Factory.StartNew((obj) =>            {                try                {                    doWork(obj);                }                catch (Exception ex)                {                    if (errorAction != ) errorAction(ex);                    LogUtil.Error(ex,ThreadUtil.Run错误);                }            },arg,CancellationToken.None,TaskCreationoptions.None,scheduler);        }        return Task.Factory.StartNew(() =>                {                    doWork();                }                static Task<T> Run<T>(object,T> doWork,1)">return Task.Factory.StartNew<T>((obj) =>                {                     doWork(obj);                }                );                    default(T);                }            },Func<T> doWork,1)">return Task.Factory.StartNew<T>(() => doWork();                }                async Task<T> RunAsync<T>(await Task.Factory.StartNew<T>((obj) =>await Task.Factory.StartNew<T>(() =>    }}
VIEw Code

    TaskHelper扩展类:

 Task帮助类基类     TaskHelper    {        #region 变量         处理器数        int _processorCount = Environment.ProcessorCount;        #region UI任务        static TaskScheduler _UITask;         UI任务(2-4个线程)         TaskScheduler UITask        {            getif (_UITask == null) _UITask = new TaskSchedulerEx(2,4);                 _UITask;            }        }        #region 菜单任务         TaskScheduler _MenuTask;         菜单任务(2-4个线程)         TaskScheduler MenuTask        {            if (_MenuTask == null) _MenuTask =  _MenuTask;            }        }        #region 计算任务         TaskScheduler _CalcTask;         计算任务(线程数:处理器数*2)         TaskScheduler CalcTask        {            if (_CalcTask == null) _CalcTask = new limitedTaskScheduler(_processorCount * 2 _CalcTask;            }        }        #region 网络请求         TaskScheduler _RequestTask;         网络请求(8-32个线程)         TaskScheduler RequestTask        {            if (_RequestTask == null) _RequestTask = 8,1)">32 _RequestTask;            }        }        #region 数据库任务         TaskScheduler _DBTask;         数据库任务(8-32个线程)         TaskScheduler DBTask        {            if (_DBTask == null) _DBTask =  _DBTask;            }        }        #region IO任务         TaskScheduler _IOTask;         IO任务(8-32个线程)         TaskScheduler IOTask        {            if (_IOTask == null) _IOTask =  _IOTask;            }        }        #region 首页任务         TaskScheduler _MainPageTask;         首页任务(8-32个线程)         TaskScheduler MainPageTask        {            if (_MainPageTask == null) _MainPageTask =  _MainPageTask;            }        }        #region 图片加载任务         TaskScheduler _LoadImageTask;         图片加载任务(8-32个线程)         TaskScheduler LoadImageTask        {            if (_LoadImageTask == null) _LoadImageTask =  _LoadImageTask;            }        }        #region 浏览器任务         TaskScheduler _browserTask;         浏览器任务(2-4个线程)         TaskScheduler browserTask        {            if (_browserTask == null) _browserTask =  _browserTask;            }        }            }}
VIEw Code

    Form1.cs测试代码:

 System.ComponentModel; System.Data; System.Drawing; System.Management; System.Reflection; System.Threading.Tasks; System.windows.Forms; Utils; test{    partial  Form1 : Form    {        private TaskSchedulerEx _taskSchedulerEx = private TaskSchedulerEx _taskSchedulerExSmall = private TaskSchedulerEx _task = ;        public Form1()        {            InitializeComponent();            _taskSchedulerEx = 50,1)">);            _taskSchedulerExSmall = 5,1)">50);            _task = 10);        }        voID Form1_Load( sender,EventArgs e)        {        }         模拟大量网络请求任务        voID button1_Click(200000,1)">1000,1)">);        }         模拟cpu密集型任务        voID button2_Click(100000,1)">2000,1)">voID button3_Click(100,1)">voID button4_Click( 模拟任务        <param name="scheduler">scheduler<param name="taskCount">任务数量<param name="logCount">每隔多少条数据打一个日志<param name="delay">模拟延迟或耗时(毫秒)voID DoTask(TaskSchedulerEx scheduler,1)">int taskCount,1)">int logCount,1)"> delay)        {            _task.Run(() =>            {                Log(开始);                DateTime dt = DateTime.Now;                List<Task> taskList = new List<Task>();                1; i <= taskCount; i++)                {                    Task task = scheduler.Run((obj) =>                    {                        var k = ()obj;                        Thread.Sleep(delay); 模拟延迟或耗时                        if (k % logCount == )                        {                            Log(最大线程数:" + scheduler.MaxThreadCount +  核心线程数:" + scheduler.CoreThreadCount +  活跃线程数:" + scheduler.ActiveThreadCount.ToString().Padleft(4,1)">' ') +  处理数/总数:" + k +  / " + taskCount);                        }                    },i,(ex) =>                    {                        Log(ex.Message);                    });                    taskList.Add(task);                }                Task.WaitAll(taskList.ToArray());                double d = DateTime.Now.Subtract(dt).TotalSeconds;                Log(完成,耗时:" + d + );            });        }        voID Form1_FormClosed(if (_taskSchedulerEx != )            {                _taskSchedulerEx.dispose(); 释放资源                _taskSchedulerEx = ;            }        }    }}
VIEw Code

     测试截图:

 

总结

以上是内存溢出为你收集整理的C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task全部内容,希望文章能够帮你解决C# Task 多任务:C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1212623.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存