并发编程进阶
JUC锁机制
Lock接口
java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
private final Lock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 释放锁
}
}
public int getCount() {
return count;
}
}ReentrantLock
java
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
public void method() {
// 尝试获取锁
if (lock.tryLock()) {
try {
// 执行任务
} finally {
lock.unlock();
}
} else {
// 获取锁失败的处理
}
// 带超时的尝试
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// 执行任务
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}ReadWriteLock
java
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int value;
public int read() {
rwLock.readLock().lock();
try {
return value;
} finally {
rwLock.readLock().unlock();
}
}
public void write(int newValue) {
rwLock.writeLock().lock();
try {
value = newValue;
} finally {
rwLock.writeLock().unlock();
}
}
}Condition
java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] items;
private int putIndex, takeIndex, count;
public BoundedBuffer(int capacity) {
items = new Object[capacity];
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); // 等待不满
}
items[putIndex] = item;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal(); // 通知不为空
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 等待不为空
}
Object item = items[takeIndex];
if (++takeIndex == items.length) takeIndex = 0;
count--;
notFull.signal(); // 通知不满
return (T) item;
} finally {
lock.unlock();
}
}
}原子类
基本原子类
java
import java.util.concurrent.atomic.*;
public class AtomicDemo {
public static void main(String[] args) {
AtomicInteger atomicInt = new AtomicInteger(0);
// 原子操作
atomicInt.incrementAndGet(); // ++i
atomicInt.getAndIncrement(); // i++
atomicInt.decrementAndGet(); // --i
atomicInt.getAndDecrement(); // i--
atomicInt.getAndAdd(10); // 返回旧值并加10
atomicInt.addAndGet(10); // 加10并返回新值
// CAS操作
atomicInt.compareAndSet(0, 10); // 如果是0则设为10
// 原子更新
atomicInt.updateAndGet(x -> x * 2);
AtomicLong atomicLong = new AtomicLong(0);
AtomicBoolean atomicBool = new AtomicBoolean(false);
}
}原子引用
java
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceDemo {
private AtomicReference<User> userRef = new AtomicReference<>();
public void updateUser(User newUser) {
User oldUser;
do {
oldUser = userRef.get();
} while (!userRef.compareAndSet(oldUser, newUser));
}
static class User {
String name;
int age;
}
}原子数组
java
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndIncrement(0); // 原子递增索引0的元素
array.compareAndSet(1, 0, 10); // CAS操作原子字段更新器
java
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class AtomicFieldUpdaterDemo {
private volatile int count;
private static final AtomicIntegerFieldUpdater<AtomicFieldUpdaterDemo> UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AtomicFieldUpdaterDemo.class, "count");
public void increment() {
UPDATER.incrementAndGet(this);
}
}LongAdder
java
import java.util.concurrent.atomic.LongAdder;
public class LongAdderDemo {
public static void main(String[] args) {
LongAdder adder = new LongAdder();
// 高并发下比AtomicLong性能更好
adder.increment();
adder.add(10);
System.out.println(adder.sum()); // 获取值
System.out.println(adder.sumThenReset()); // 获取并重置
}
}并发工具类
CountDownLatch
java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("任务 " + taskId + " 开始");
Thread.sleep(1000);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数减1
}
});
}
latch.await(); // 等待计数归零
System.out.println("所有任务完成");
executor.shutdown();
}
}CyclicBarrier
java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("所有线程到达屏障,开始下一阶段");
});
for (int i = 0; i < parties; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 到达屏障");
barrier.await(); // 等待其他线程
System.out.println(Thread.currentThread().getName() + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}Semaphore
java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
int permits = 3; // 同时允许3个线程访问
Semaphore semaphore = new Semaphore(permits);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("任务 " + taskId + " 获取资源");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("任务 " + taskId + " 释放资源");
semaphore.release(); // 释放许可
}
});
}
executor.shutdown();
}
}Exchanger
java
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "来自线程A的数据";
System.out.println("线程A发送: " + data);
String received = exchanger.exchange(data);
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
String data = "来自线程B的数据";
System.out.println("线程B发送: " + data);
String received = exchanger.exchange(data);
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}CompletableFuture
创建CompletableFuture
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
// 异步执行,无返回值
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务执行");
}, executor);
// 异步执行,有返回值
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);
// 获取结果
future2.thenAccept(result -> System.out.println("结果: " + result));
executor.shutdown();
}
}链式调用
java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 同步转换
.thenApplyAsync(String::toUpperCase); // 异步转换
future.thenAccept(System.out::println); // HELLO WORLD组合多个Future
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
// 两个都完成后组合
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.join()); // Hello World
// 任一完成即返回
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
// 全部完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);异常处理
java
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "Success";
})
.exceptionally(ex -> "默认值") // 异常时返回默认值
.handle((result, ex) -> { // 统一处理结果和异常
if (ex != null) return "错误: " + ex.getMessage();
return result;
});并发最佳实践
1. 优先使用并发集合
java
// 推荐
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 不推荐
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());2. 使用线程池管理线程
java
// 推荐
ExecutorService executor = Executors.newFixedThreadPool(10);
// 不推荐
new Thread(() -> { }).start();3. 正确关闭资源
java
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
// 使用线程池
} finally {
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
}4. 避免死锁
java
// 按顺序获取锁
public void transfer(Account from, Account to, int amount) {
Account first = from.getId() < to.getId() ? from : to;
Account second = from.getId() < to.getId() ? to : from;
synchronized (first) {
synchronized (second) {
from.debit(amount);
to.credit(amount);
}
}
}5. 使用volatile保证可见性
java
private volatile boolean running = true;
public void stop() {
running = false;
}
public void run() {
while (running) {
// 执行任务
}
}