Java-Java中创建线程池的实现方案

Java中创建线程池的实现方案

1.建一个如下的Runable子类:

    Runnable runnable = new Runnable(){
        public void run(){
            System.out.println("Run");
        }
    }

2.实例化一个Thread来执行这个线程

new Thread (runnable). Start ();

More info see: Java Concurrency Tutorial

2.运用线程池

1.示例1,Future存于list中

This is very simple and clean, but what if you’ve several long running tasks that you want to load in parralel and then wait for the completion of all the tasks, it’s a little bit harder to code and if you want to get the return value of all the tasks it becomes really difficult to keep a good code. But like for almost any problems, Java has a solution for you, the Executors. This simple class allows you to create thread pools and thread factories.

A thread pool is represented by an instance of the class ExecutorService. With an ExecutorService, you can submit task that will be completed in the future. Here are the type of thread pools you can create with the Executors class :

§ Single Thread Executor : A thread pool with only one thread. So all the submitted task will be executed sequentially. Method : Executors.newSingleThreadExecutor()

§ Cached Thread Pool : A thread pool that create as many threads it needs to execute the task in parralel. The old available threads will be reused for the new tasks. If a thread is not used during 60 seconds, it will be terminated and removed from the pool. Method : Executors.newCachedThreadPool()

§ Fixed Thread Pool : A thread pool with a fixed number of threads. If a thread is not available for the task, the task is put in queue waiting for an other task to ends. Method : Executors.newFixedThreadPool()

§ Scheduled Thread Pool : A thread pool made to schedule future task. Method :Executors.newScheduledThreadPool()

§ Single Thread Scheduled Pool : A thread pool with only one thread to schedule future task. Method :Executors.newSingleThreadScheduledExecutor()

Once you have a thread pool, you can submit task to it using the different submit methods. You can submit a Runnable or a Callableto the thread pool. The method return a Future representing the future state of the task. If you submitted a Runnable, the Future object return null once the task finished.

callable
callable

2.示例2,Future存于Map中
1.创建线程池,使用线程池,如下图:
Future实现

2.定义Callable实现类

package com.ttpod.musicSNS.job;
import com.ttpod.musicSNS.service.MusicService;
import com.ttpod.musicSNS.view.dto.Music;
import java.util.concurrent.Callable;
/**
 * User: luowei
 * Date: 12-7-23
 * Time: 上午10:29
 */
public class SetMusicPic implements Callable<Music> {
    MusicService musicService;
    Music music;
    public SetMusicPic() {
    }
    public SetMusicPic(MusicService musicService) {
        this.musicService = musicService;
    }
    public SetMusicPic(MusicService musicService,Music music) {
        this.musicService = musicService;
        this.music = music;
    }
    /**
     * 执行线程
     */
    @Override
    public Music call() throws Exception {
…………
        String picUrl = this.musicService.getPic(this.music);
        if(picUrl==null){
            picUrl = "";
        }
        this.music.setPicUrl(picUrl);
        return this.music;
    }
}

线程同步

synchronized block 与synchronized method
Future实现

对一个同步代码块锁定同一个对象资源
锁定同一个对象资源

自定义线程安全的list与map

Most often I use this to synchronize access to a list or map but I don’t want to block access to all methods of the object.

In the following code one thread modifying the list will not block waiting for a thread that is modifying the map. If the methods were synchronized on the object then each method would have to wait even though the modifications they are making would not conflict.

private List<Foo> myList = new ArrayList<Foo>();
private Map<String,Bar) myMap = new HashMap<String,Bar>();
public void put( String s, Bar b ) {
synchronized( myMap ) {
        myMap.put( s,b );
        // then some thing that may take a while like a database access or RPC or notifying listeners
        }
        }
public void hasKey( String s, ) {
synchronized( myMap ) {
        myMap.hasKey( s );
        }
        }
public void add( Foo f ) {
synchronized( myList ) {
        myList.add( f );
// then some thing that may take a while like a database access or RPC or notifying listeners
        }
        }
public Thing getMedianFoo() {
        Foo med = null;
synchronized( myList ) {
        Collections.sort(myList);
        med = myList.get(myList.size()/2);
        }
        return med;
        }

Callable方法的使用
使用闭包,实现要执行的任务 multiplyArray(xs,ys,length)

  public static Callable<Integer[][]> getMultiplierCallable(final int[][] xs,
            final int[][] ys, final int length) {
        return new Callable<Integer[][]>() {
            public Integer[][] call() throws Exception {
                Integer[][] answer = new Integer[length][length];
                answer = multiplyArray(xs, ys, length);
                return answer;
            }
        };
    }

    public static void main(final String[] args) throws ExecutionException,
            InterruptedException {
                
        final int[][] xs = {{1, 2}, {3, 4}};
        final int[][] ys = {{1, 2}, {3, 4}};
                
        final Callable<Integer[][]> callable = getMultiplierCallable(xs, ys, 2);
        final ExecutorService service = Executors.newFixedThreadPool(2);
        final Future<Integer[][]> result = service.submit(callable);
        final Integer[][] intArray = result.get();
        for (final Integer[] element : intArray) {
            System.out.println(Arrays.toString(element));
        }
    }

FutureTask的应用

I really want to create a subclass of FutureTask that has a default no-arg constructor. In particular, I want my subclass to implement the Callable interface and use itself as the callable. This way, users of MyFutureTask can just subclass MyFutureTask instead of having to implement their own callable and pass it to an instance of FutureTask.

Here is a good idear, I really want to create a subclass of FutureTask that has a default no-arg constructor. In particular, I want my subclass to implement the Callable interface and use itself as the callable. This way, users of MyFutureTask can just subclass MyFutureTask instead of having to implement their own callable and pass it to an instance of FutureTask.

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public abstract class MyFutureClass<V> implements Callable<V> {
    private final FutureTask<V> futureTask;
    public MyFutureClass() {
        futureTask = new FutureTask<V>(this);
    }
    @Override
    public V call() throws Exception {
        return myCall();
    }
    protected abstract V myCall();
    public FutureTask<V> getFutureTask() {
        return futureTask;
    }
}

示例2:

    private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
    private static <T> T timedCall(FutureTask<T> task, long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        THREAD_POOL.execute(task);
        return task.get(timeout, timeUnit);
    }
    public static void main(String[] args) {
        try {
            FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
                public Integer call() throws Exception {
                    for (int i=0; i < 1000000; i++) {
                        if (Thread.interrupted()) return 1;
                        System.out.println(new java.util.Date());
                        Thread.sleep(1000);
                    }
                    return 0;
                }
            });
            int returnCode = timedCall(task, 100, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
            task.Cancel(true);
        }
        return;
    }

使用多线程构建线程链表
Use something like this:

    public void executeMultiThread(int numThreads)
            throws Exception
    {
        List threads = new ArrayList();
        for (int i = 0; i < numThreads; i++)
        {
            Thread t = new Thread(new Runnable()
            {
                public void run()
                {
                    // do your work
                }
            });
            // System.out.println("STARTING: " + t);
            t.start();
            threads.add(t);
        }
        for (int i = 0; i < threads.size(); i++)
        {
            // Big number to wait so this can be debugged
            // System.out.println("JOINING: " + threads.get(i));
            ((Thread)threads.get(i)).join(1000000);
        }

volatile关键字的应用
volatile关键字修饰的变量是线程安全的。

volatile has semantics for memory visibility. Basically, the value of a volatile field becomes visible to all readers (other threads in particular) after a write operation completes on it. Withoutvolatile, readers could see some non-updated value.

One common example for using volatile is to use a volatile boolean variable as a flag to terminate a thread. If you’ve started a thread, and you want to be able to safely interrupt it from a different thread, you can have the thread periodically check a flag. To stop it, set the flag to true. By making the flag volatile, you can ensure that the thread that is checking it will see it has been set the next time it checks it without having to even use a synchronized block.

public class Foo extends Thread {
  private volatile boolean close = false;
  public void run() {
    while(!close) {
      // do work
    }
  }
  public void close() {
    close = true;
    // interrupt here if needed
  }
}

Notice how there’s no need for synchronization.
sample 2:

package net.jcip.examples;

import static java.util.concurrent.TimeUnit.SECONDS;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.*;

import net.jcip.annotations.*;

/**
 * PrimeGenerator
 * <p/>
 * Using a volatile field to hold cancellation state
 *
 * @author Brian Goetz and Tim Peierls
 */
@ThreadSafe
public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @GuardedBy("this") private final List<BigInteger> primes
            = new ArrayList<BigInteger>();
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
}

come from Java Concurrency – Part 7 : Executors and thread pools
more see: Java 线程池学习

版权所有,转载请注明出处 luowei.github.io.