400选号网站源码,友点cms,做公司网站哪里好,帝国网站建设写在前面 在线程池中存在几个概念#xff1a;核心线程数、最大线程数、任务队列。 核心线程数指的是线程池的基本大小#xff1b;也就是指worker的数量最大线程数指的是#xff0c;同一时刻线程池中线程的数量最大不能超过该值#xff1b;实际上就是指task任务的数量。任务…写在前面 在线程池中存在几个概念核心线程数、最大线程数、任务队列。 核心线程数指的是线程池的基本大小也就是指worker的数量最大线程数指的是同一时刻线程池中线程的数量最大不能超过该值实际上就是指task任务的数量。任务队列是当任务较多时线程池中线程的数量已经达到了核心线程数这时候就是用任务队列来存储我们提交的任务。相当于缓冲作用。
与其他池化技术不同的是线程池是基于生产者-消费者模式来实现的任务的提交方是生产者线程池是消费者 。当我们需要执行某个任务时只需要把任务扔到线程池中即可。 池化技术这里的池化和卷积的池化不一样这里的池化技术简单点来说就是提前保存大量的资源以备不时之需 线程池中执行任务的流程如下图如下。
那么使用线程池可以带来一系列好处
降低资源消耗通过池化技术重复利用已创建的线程降低线程创建和销毁造成的损耗。提高响应速度任务到达时无需等待线程创建即可立即执行。提高线程的可管理性线程是稀缺资源如果无限制创建不仅会消耗系统资源还会因为线程的不合理分布导致资源调度失衡降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。提供更多更强大的功能线程池具备可拓展性允许开发人员向其中增加更多的功能。
任务调度
首先检测线程池运行状态如果不是RUNNING则直接拒绝线程池要保证在RUNNING的状态下执行任务。
如果 taskCount corePoolSize则创建并启动一个线程来执行新提交的任务。 如果 taskCount corePoolSize且线程池内的阻塞队列未满则将任务添加到该阻塞队列中。 如果workerCount maximumPoolSize并且线程池内的阻塞队列已满, 则根据 拒绝策略 来处理该任务, 默认的处理方式是直接抛异常。 常见的拒绝策略有以下几种 AbortPolicy 中止策略丢弃任务并抛出异常。DiscardPolicy 丢弃策略丢弃任务但是不抛出异常。如果线程队列已满则后续提交的任务都会被丢弃且是静默丢弃。DiscardOldestPolicy 弃老策略丢弃队列最前面的任务然后重新提交被拒绝的任务。 简单实现
定义任务Task 并 定义NewTask来新建Task对象
type Task struct {f func() error
}func NewTask(f func() error) *Task {return Task{f: f}
}定义 WorkPool 线程池
type WorkPool struct {TaskQueue chan *Task // Task队列workNum int // 协程池中最大的worker数量shop chan struct{} // 停止工作标识
}创建 WorkPool 的函数
func NewWorkPool(cap int) *WorkPool {if cap 0 {cap 10}return WorkPool{TaskQueue: make(chan *Task),workNum: cap,shop: make(chan struct{}),}
}具体的协程池中的工作节点
func (p *WorkPool) worker(workId int) {for task : range p.TaskQueue {err : task.Execute()if err ! nil {fmt.Println(err)continue}fmt.Printf( work id %d finished \n, workId) // 打印出具体是哪个节点进行工作}
}协程池启动函数
func (p *WorkPool) run() {// 根据work num 去创建worker工作for i : 0; i p.workNum; i {go p.worker(i)}-p.shop
}协程池关闭函数
func (p *WorkPool) close() {p.shop - struct{}{}
}测试一下使用定时器每2秒进行一次投放并且投放超过5个之后开始停止。
func TestWorkPool(t *testing.T) {task : NewTask(func() error {fmt.Print(time.Now())return nil})taskCount : 0ticker : time.NewTicker(2 * time.Second)p : NewWorkPool(3)go func(c *time.Ticker) {for {p.TaskQueue - task-c.CtaskCountif taskCount 5 {p.close()break}}return}(ticker)p.run()
}结果 可以看到结果是每两秒进行一次打印并且worker对象都不一样。
完整代码
package gorountine_poolimport (fmttestingtime
)func TestWorkPool(t *testing.T) {task : NewTask(func() error {fmt.Print(time.Now())return nil})taskCount : 0ticker : time.NewTicker(2 * time.Second)p : NewWorkPool(3)go func(c *time.Ticker) {for {p.TaskQueue - task-c.CtaskCountif taskCount 5 {p.close()break}}return}(ticker)p.run()
}type Task struct {f func() error
}func NewTask(f func() error) *Task {return Task{f: f}
}// Execute 执行业务方法
func (t *Task) Execute() error {return t.f()
}type WorkPool struct {TaskQueue chan *Task // task队列workNum int // 携程池中最大的worker数量shop chan struct{} // 停止标识
}// 创建Pool的函数
func NewWorkPool(cap int) *WorkPool {if cap 0 {cap 10}return WorkPool{TaskQueue: make(chan *Task),workNum: cap,shop: make(chan struct{}),}
}func (p *WorkPool) worker(workId int) {// 具体的工作for task : range p.TaskQueue {err : task.Execute()if err ! nil {fmt.Println(err)continue}fmt.Printf( work id %d finished \n, workId)}
}// 携程池开始工作
func (p *WorkPool) run() {// 根据work num 去创建worker工作for i : 0; i p.workNum; i {go p.worker(i)}-p.shop
}func (p *WorkPool) close() {p.shop - struct{}{}
}参考链接
[1] https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html [2] https://blog.csdn.net/weixin_44688301/article/details/123292211 [3] https://www.bilibili.com/video/BV1Nf4y137na