并发编程基础
线程创建
继承Thread类
java
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}实现Runnable接口
java
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
}
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
// 使用Lambda
Thread thread2 = new Thread(() -> {
System.out.println("Lambda线程");
});
thread2.start();
}
}实现Callable接口
java
import java.util.concurrent.*;
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new MyCallable());
System.out.println("计算结果: " + future.get());
executor.shutdown();
}
}线程生命周期
线程状态
| 状态 | 说明 |
|---|---|
| NEW | 新建,未启动 |
| RUNNABLE | 可运行,正在运行或等待CPU |
| BLOCKED | 阻塞,等待锁 |
| WAITING | 等待,需要其他线程唤醒 |
| TIMED_WAITING | 超时等待 |
| TERMINATED | 终止,执行完毕 |
java
public class ThreadState {
public static void main(String[] args) throws Exception {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("创建后: " + thread.getState()); // NEW
thread.start();
System.out.println("启动后: " + thread.getState()); // RUNNABLE
Thread.sleep(100);
System.out.println("睡眠中: " + thread.getState()); // TIMED_WAITING
thread.join();
System.out.println("结束后: " + thread.getState()); // TERMINATED
}
}线程同步
synchronized关键字
java
public class SynchronizedDemo {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
// 同步代码块
public void decrement() {
synchronized (this) {
count--;
}
}
// 静态同步方法
public static synchronized void staticMethod() {
// 锁的是Class对象
}
// 静态同步代码块
public static void staticBlock() {
synchronized (SynchronizedDemo.class) {
// 锁的是Class对象
}
}
}同步示例
java
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("最终计数: " + counter.getCount()); // 20000
}
}线程通信
wait/notify
java
public class ProducerConsumer {
private final Object lock = new Object();
private int product = 0;
public void produce() throws InterruptedException {
synchronized (lock) {
while (product >= 10) {
lock.wait(); // 等待消费
}
product++;
System.out.println("生产: " + product);
lock.notifyAll(); // 唤醒消费者
}
}
public void consume() throws InterruptedException {
synchronized (lock) {
while (product <= 0) {
lock.wait(); // 等待生产
}
System.out.println("消费: " + product);
product--;
lock.notifyAll(); // 唤醒生产者
}
}
}生产者消费者示例
java
import java.util.LinkedList;
import java.util.Queue;
public class BoundedBuffer<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
public BoundedBuffer(int capacity) {
this.capacity = capacity;
}
public void put(T item) throws InterruptedException {
synchronized (queue) {
while (queue.size() == capacity) {
queue.wait();
}
queue.offer(item);
queue.notifyAll();
}
}
public T take() throws InterruptedException {
synchronized (queue) {
while (queue.isEmpty()) {
queue.wait();
}
T item = queue.poll();
queue.notifyAll();
return item;
}
}
}线程池
ExecutorService
java
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 定时线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("任务 " + taskId + " 由 " +
Thread.currentThread().getName() + " 执行");
});
}
// 关闭线程池
fixedPool.shutdown();
try {
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
}
}
}ThreadPoolExecutor
java
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
executor.submit(() -> System.out.println("任务执行"));
executor.shutdown();
}
}拒绝策略
| 策略 | 说明 |
|---|---|
| AbortPolicy | 抛出RejectedExecutionException(默认) |
| CallerRunsPolicy | 由调用线程执行任务 |
| DiscardPolicy | 直接丢弃任务 |
| DiscardOldestPolicy | 丢弃队列中最老的任务 |
ThreadLocal
java
public class ThreadLocalDemo {
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
threadLocal.set(100);
System.out.println("Thread-0: " + threadLocal.get());
});
Thread t2 = new Thread(() -> {
threadLocal.set(200);
System.out.println("Thread-1: " + threadLocal.get());
});
t1.start();
t2.start();
System.out.println("Main: " + threadLocal.get()); // 0
// 使用后清理,防止内存泄漏
threadLocal.remove();
}
}线程安全集合
同步包装器
java
List<String> syncList = Collections.synchronizedList(new ArrayList<>());
Set<String> syncSet = Collections.synchronizedSet(new HashSet<>());
Map<String, String> syncMap = Collections.synchronizedMap(new HashMap<>());并发集合
java
import java.util.concurrent.*;
// 并发队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 阻塞队列
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();
// 并发Map
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 跳表
ConcurrentSkipListMap<String, Integer> skipListMap = new ConcurrentSkipListMap<>();常用方法
线程控制
java
Thread thread = new Thread(() -> {
// 线程任务
});
thread.start(); // 启动线程
thread.join(); // 等待线程结束
thread.join(1000); // 最多等待1秒
thread.interrupt(); // 中断线程
thread.isAlive(); // 是否存活
thread.isInterrupted(); // 是否被中断
Thread.sleep(1000); // 睡眠1秒
Thread.yield(); // 让出CPU
Thread.currentThread(); // 获取当前线程线程属性
java
Thread thread = new Thread();
thread.setName("MyThread"); // 设置名称
thread.setPriority(Thread.MAX_PRIORITY); // 设置优先级
thread.setDaemon(true); // 设置为守护线程
thread.getId(); // 获取ID
thread.getState(); // 获取状态