少女祈祷中...

1.并发框架Executor

  • JDK 5开始提供Executor FrameWork (java.util.concurrent.*)
    • 分离任务的创建和执行者的创建
    • 线程重复利用(new线程代价很大)

共享线程池

  • 预设好的多个Thread,可弹性增加
  • 多次执行很多很小的任务
  • 任务创建和执行过程解耦
  • 程序员无需关心线程池执行任务过程

主要类

  • Executors.newCachedThreadPool/newFixedThreadPool 创建线程池
  • ExecutorService 线程池服务
  • Callable 具体的逻辑对象(线程类)
    • (Runnable的run方法没有返回值,而Callable的call方法可以有返回值)
  • Future 返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//创建可变线程池
private ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
//创建固定线程池
private ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);

//关闭线程池
executor.shutdown();

//执行任务,无返回值
executor.execute(task);
//执行任务,有返回值
Future<Integer> result=executor.submit(task);
/*
public class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return null;
}
}
*/

//线程池基础信息
System.out.println(executor.getPoolSize());
System.out.println(executor.getActiveCount());
System.out.println(executor.getCompletedTaskCount());

//Future返回类基本操作
result.isDone() //判断任务是否完成
int sum=result.get(); //获得结果

2.并发框架Fork-join

  • Java 7 提供另一种并行框架:分解、治理、合并(分治编程)
  • 适合用于整体任务量不好确定的场合(最小任务可确定)

主要类

  • ForkJoinPool 任务池
  • RecursiveAction
  • RecursiveTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//创建执行线程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinPool pool = new ForkJoinPool(4); //固定线程池

//创建任务
Task task = new Task(1, 10000000);
/*
public class Task extends RecursiveTask<Integer> {
@Override
protected Integer compute() {
//任务足够小,则直接执行
//TODO
//任务大于阈值,分裂为2个任务
invokeAll(subTask1,subTask2);
Integer sum1 = subTask1.join();
Integer sum2 = subTask2.join();
// 结果合并
sum = sum1 + sum2;
return sum;
}
}
*/

//提交任务
ForkJoinTask<Integer> result = pool.submit(task);

//输出结果
System.out.println(result.get().toString());

3.并发数据结构

  • 常用的数据结构是线程不安全的

    • ArrayList, HashMap, HashSet 非同步的
    • 多个线程同时读写,可能会抛出异常或数据错误
  • 传统Vector,Hashtable等同步集合性能过差

  • 并发数据结构:数据添加和删除

    • 阻塞式集合:当集合为空或者满时,等待
    • 非阻塞式集合:当集合为空或者满时,不等待,返回null或异常
  • List

    • Vector 同步安全,写多读少
    • ArrayList 不安全
    • Collections.synchronizedList(List list) 基于synchronized,效率差
    • CopyOnWriteArrayList 读多写少,基于复制机制,非阻塞
  • Set

    • HashSet 不安全
    • Collections.synchronizedSet(Set set) 基于synchronized,效率差
    • CopyOnWriteArraySet (基于CopyOnWriteArrayList实现) 读多写少,非阻塞
  • Map

    • Hashtable 同步安全,写多读少
    • HashMap 不安全
    • Collections.synchronizedMap(Map map) 基于synchronized,效率差
    • ConcurrentHashMap 读多写少,非阻塞
  • Queue & Deque (队列,JDK 1.5 提出)

    • ConcurrentLinkedQueue 非阻塞
    • ArrayBlockingQueue/LinkedBlockingQueue 阻塞

4.并发协作控制

Lock

  • Lock也可以实现同步的效果
    • 实现更复杂的临界区结构
    • tryLock方法可以预判锁是否空闲
    • 允许分离读写的操作,多个读,一个写
    • 性能更好
  • ReentrantLock类,可重入的互斥锁
  • ReentrantReadWriteLock类,可重入的读写锁
  • lock和unlock函数
1
2
3
4
5
6
7
8
9
10
//构造锁
private static final ReentrantLock queueLock = new ReentrantLock(); //可重入锁
private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock(); //可重入读写锁

//基本函数
queueLock.tryLock()//尝试加锁(包含lock),返回boolean
queueLock.lock() //加锁
queueLock.unlock()//解锁
orderLock.writeLock().lock()/unlock() //写加锁/解锁(写锁只能一个线程拥有)
orderLock.readLock().lock()/unlock() //读加锁/解锁(读锁多个线程共享)

Semaphore

  • 信号量:本质上是一个计数器
  • 计数器大于0,可以使用,等于0不能使用
  • 可以设置多个并发量,例如限制10个访问
  • Semaphore
    • acquire获取
    • release释放
  • 比Lock更进一步,可以控制多个同时访问关键区
1
2
3
4
5
6
7
8
9
10
//构造信号量
private final Semaphore placeSemaphore = new Semaphore(5);

//获取(信号量-1)
placeSemaphore.acquire();
//尝试获取(包含acquire)
placeSemaphore.tryAcquire()

//释放(信号量+1)
placeSemaphore.release();

Latch

  • 等待锁,是一个同步辅助类
  • 用来同步执行任务的一个或者多个线程
  • 不是用来保护临界区或者共享资源
  • CountDownLatch
    • countDown() 计数减1
    • await() 等待latch变成0
1
2
3
4
5
6
7
8
//构造等待锁
CountDownLatch startSignal = new CountDownLatch(1);

//计数减1
startSignal.countDown();

//等待latch变成0
startSignal.await();

Barrier

  • 集合点,也是一个同步辅助类
  • 允许多个线程在某一个点上进行同步
  • CyclicBarrier
    • 构造函数是需要同步的线程数量
    • await等待其他线程,到达数量后,就放行
1
2
3
4
5
6
7
//当有3个线程在barrier上await,就执行finalResultCalculator中的run方法
CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results);
CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator);
//class CalculateFinalResult implements Runnable {}

//等待
barrier.await();

Phaser

  • 允许执行并发多阶段任务,同步辅助类
  • 在每一个阶段结束的位置对线程进行同步,当所有的线程都到达这步,再进行下一步
  • Phaser
    • arrive()
    • arriveAndAwaitAdvance()
1
2
3
4
5
//构造
Phaser phaser = new Phaser(5);

//等到5个线程都到了,才放行
phaser.arriveAndAwaitAdvance();

Exchanger

  • 允许在并发线程中互相交换消息
  • 允许在2个线程中定义同步点,当两个线程都到达同步点,它们交换数据结构
  • Exchanger
    • exchange(), 线程双方互相交互数据
    • 交换数据是双向的
1
2
3
4
5
//构造,交换类型为String
Exchanger<String> exchanger = new Exchanger<String>();

//对外交换null,获得内容放入item中
String item = exchanger.exchange(null);

5.定时任务执行

简单定时器机制

  • 设置计划任务,也就是在指定的时间开始执行某一个任务。
  • TimerTask 封装任务
  • Timer类 定时器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
class Task extends TimerTask {
public void run() {
//TODO
}
}
*/
//当前时间1秒后,每2秒执行一次
timer.schedule(task, 1000, 2000);

//取消当前的任务
task.cancel();
//取消定时器
timer.cancel();

Executor +定时器机制

  • ScheduledExecutorService
    • 定时任务
    • 周期任务
1
2
3
4
5
6
7
8
9
10
11
12
//固定时间(1s后)启动
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(new MyTask(),1,TimeUnit.SECONDS);

//周期任务 固定速率 是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕
//如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(new MyTask(),1,3000,TimeUnit.MILLISECONDS);

//周期任务 固定延时 是以上一个任务结束时开始计时,period时间过去后,立即执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new MyTask(),1,3000,TimeUnit.MILLISECONDS);

Quartz

  • Quartz是一个较为完善的任务调度框架
  • 解决程序中Timer零散管理的问题
  • 功能更加强大
    • Timer执行周期任务,如果中间某一次有异常,整个任务终止执行
    • Quartz执行周期任务,如果中间某一次有异常,不影响下次任务执行
    • ……
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

//创建scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

//定义一个Trigger
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") //定义name/group
.startNow()//一旦加入scheduler,立即生效
.withSchedule(simpleSchedule() //使用SimpleTrigger
.withIntervalInSeconds(2) //每隔2秒执行一次
.repeatForever()) //一直执行
.build();

//定义一个JobDetail
JobDetail job = newJob(HelloJob.class) //定义Job类为HelloQuartz类
.withIdentity("job1", "group1") //定义name/group
.usingJobData("name", "quartz") //定义属性
.build();

/*
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//TODO
}
}
*/

//加入这个调度
scheduler.scheduleJob(job, trigger);

//启动
scheduler.start();

//运行一段时间后关闭
Thread.sleep(10000);
scheduler.shutdown(true);