2011年4月22日 星期五

[Java] ThreadPoolExecutor

一個任務通過execute(Runnable)方法被加入到執行緒池,任務就是一個Runnable類型的物件,任務的執行方法就是Runnable類型物件的run()方法。

當一個任務通過execute(Runnable)方法欲加入到執行緒池時:

  • 如果此時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要新增新的執行緒來處理被加入的任務。
  • 如果此時執行緒池中的數量等於 corePoolSize,但是緩衝佇列 workQueue未滿,那麼任務被放入緩衝佇列。
  • 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被添加的任務。
  • 如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。

也就是說處理任務的優先順序為:
  • 核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
  • 當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒閒置時間超過keepAliveTime,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。
unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:
  • NANOSECONDS
  • MICROSECONDS
  • MILLISECONDS
  • SECONDS。

workQueue常用的是:
  • java.util.concurrent.ArrayBlockingQueue

handler有四個選擇:
  • ThreadPoolExecutor.AbortPolicy():丟出java.util.concurrent.RejectedExecutionException異常
  • ThreadPoolExecutor.CallerRunsPolicy():重試加入當前的任務,他會自動重複調用execute()方法
  • ThreadPoolExecutor.DiscardOldestPolicy():放棄最舊的任務
  • ThreadPoolExecutor.DiscardPolicy():放棄當前的任務

Constructor: Creates a new ThreadPoolExecutor with the given initial parameters and default thread factory and handler.

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)


核心執行緒池大小(corePoolSize)
  • 池中所保存的執行緒數,包括空閒執行緒。
最大執行緒池大小(maximumPoolSize)
  • maximumPoolSize - 池中允許的最大執行緒數。
工作存活時間(KeepAliveTime)
  • keepAliveTime - 當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。
工作存活時間(unit)
  • unit - keepAliveTime 參數的時間單位。
工作佇列(workQueue)
  • workQueue - 執行前用於保持任務的佇列。此佇列僅保持由 execute 方法提交的 Runnable 任務。
import java.util.concurrent.*;
import java.util.*;
 
class MyThreadPoolExecutor
{
    int poolSize = 2;
 
    int maxPoolSize = 2;
 
    long keepAliveTime = 10;
 
    ThreadPoolExecutor threadPool = null;
 
    final ArrayBlockingQueue queue = new ArrayBlockingQueue(
            5);
 
    public MyThreadPoolExecutor()
    {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
 
    }
 
    public void runTask(Runnable task)
    {
        // System.out.println("Task count.."+threadPool.getTaskCount() );
        // System.out.println("Queue Size before assigning the
        // task.."+queue.size() );
        threadPool.execute(task);
        // System.out.println("Queue Size after assigning the
        // task.."+queue.size() );
        // System.out.println("Pool Size after assigning the
        // task.."+threadPool.getActiveCount() );
        // System.out.println("Task count.."+threadPool.getTaskCount() );
        System.out.println("Task count.." + queue.size());
 
    }
 
    public void shutDown()
    {
        threadPool.shutdown();
    }
 
    public static void main(String args[])
    {
        MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
        // start first one
        mtpe.runTask(new Runnable()
        {
            public void run()
            {
                for (int i = 0; i < 10; i++)
                {
                    try
                    {
                        System.out.println("First Task");
                        Thread.sleep(1000);
                    } catch (InterruptedException ie)
                    {
                    }
                }
            }
        });
        // start second one
        /*
         * try{ Thread.sleep(500); }catch(InterruptedException
         * ie){}
         */
        mtpe.runTask(new Runnable()
        {
            public void run()
            {
                for (int i = 0; i < 10; i++)
                {
                    try
                    {
                        System.out.println("Second Task");
                        Thread.sleep(1000);
                    } catch (InterruptedException ie)
                    {
                    }
                }
            }
        });
        // start third one
        /*
         * try{ Thread.sleep(500); }catch(InterruptedException
         * ie){}
         */
        mtpe.runTask(new Runnable()
        {
            public void run()
            {
                for (int i = 0; i < 10; i++)
                {
                    try
                    {
                        System.out.println("Third Task");
                        Thread.sleep(1000);
                    } catch (InterruptedException ie)
                    {
                    }
                }
            }
        });
        // start fourth one
        /*
         * try{ Thread.sleep(500); }catch(InterruptedException
         * ie){}
         */
        mtpe.runTask(new Runnable()
        {
            public void run()
            {
                for (int i = 0; i < 10; i++)
                {
                    try
                    {
                        System.out.println("Fourth Task");
                        Thread.sleep(1000);
                    } catch (InterruptedException ie)
                    {
                    }
                }
            }
        });
        // start fifth one
        /*
         * try{ Thread.sleep(500); }catch(InterruptedException
         * ie){}
         */
        mtpe.runTask(new Runnable()
        {
            public void run()
            {
                for (int i = 0; i < 10; i++)
                {
                    try
                    {
                        System.out.println("Fifth Task");
                        Thread.sleep(1000);
                    } catch (InterruptedException ie)
                    {
                    }
                }
            }
        });
        // start Sixth one
        /*
         * try{ Thread.sleep(500); }catch(InterruptedException
         * ie){}
         */
        mtpe.runTask(new Runnable()
        {
            public void run()
            {
                for (int i = 0; i < 10; i++)
                {
                    try
                    {
                        System.out.println("Sixth Task");
                        Thread.sleep(1000);
                    } catch (InterruptedException ie)
                    {
                    }
                }
            }
        });
        mtpe.shutDown();
    }
 
}
Reference:

沒有留言:

張貼留言