Skip to content

并发编程进阶

JUC锁机制

Lock接口

java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    private final Lock lock = new ReentrantLock();
    private int count = 0;
    
    public void increment() {
        lock.lock();  // 获取锁
        try {
            count++;
        } finally {
            lock.unlock();  // 释放锁
        }
    }
    
    public int getCount() {
        return count;
    }
}

ReentrantLock

java
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
    private final ReentrantLock lock = new ReentrantLock(true);  // 公平锁
    
    public void method() {
        // 尝试获取锁
        if (lock.tryLock()) {
            try {
                // 执行任务
            } finally {
                lock.unlock();
            }
        } else {
            // 获取锁失败的处理
        }
        
        // 带超时的尝试
        try {
            if (lock.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    // 执行任务
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

ReadWriteLock

java
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private int value;
    
    public int read() {
        rwLock.readLock().lock();
        try {
            return value;
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    public void write(int newValue) {
        rwLock.writeLock().lock();
        try {
            value = newValue;
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

Condition

java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer<T> {
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    private final Object[] items;
    private int putIndex, takeIndex, count;
    
    public BoundedBuffer(int capacity) {
        items = new Object[capacity];
    }
    
    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();  // 等待不满
            }
            items[putIndex] = item;
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();  // 通知不为空
        } finally {
            lock.unlock();
        }
    }
    
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();  // 等待不为空
            }
            Object item = items[takeIndex];
            if (++takeIndex == items.length) takeIndex = 0;
            count--;
            notFull.signal();  // 通知不满
            return (T) item;
        } finally {
            lock.unlock();
        }
    }
}

原子类

基本原子类

java
import java.util.concurrent.atomic.*;

public class AtomicDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInt = new AtomicInteger(0);
        
        // 原子操作
        atomicInt.incrementAndGet();    // ++i
        atomicInt.getAndIncrement();    // i++
        atomicInt.decrementAndGet();    // --i
        atomicInt.getAndDecrement();    // i--
        atomicInt.getAndAdd(10);        // 返回旧值并加10
        atomicInt.addAndGet(10);        // 加10并返回新值
        
        // CAS操作
        atomicInt.compareAndSet(0, 10);  // 如果是0则设为10
        
        // 原子更新
        atomicInt.updateAndGet(x -> x * 2);
        
        AtomicLong atomicLong = new AtomicLong(0);
        AtomicBoolean atomicBool = new AtomicBoolean(false);
    }
}

原子引用

java
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceDemo {
    private AtomicReference<User> userRef = new AtomicReference<>();
    
    public void updateUser(User newUser) {
        User oldUser;
        do {
            oldUser = userRef.get();
        } while (!userRef.compareAndSet(oldUser, newUser));
    }
    
    static class User {
        String name;
        int age;
    }
}

原子数组

java
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndIncrement(0);  // 原子递增索引0的元素
array.compareAndSet(1, 0, 10);  // CAS操作

原子字段更新器

java
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicFieldUpdaterDemo {
    private volatile int count;
    
    private static final AtomicIntegerFieldUpdater<AtomicFieldUpdaterDemo> UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(AtomicFieldUpdaterDemo.class, "count");
    
    public void increment() {
        UPDATER.incrementAndGet(this);
    }
}

LongAdder

java
import java.util.concurrent.atomic.LongAdder;

public class LongAdderDemo {
    public static void main(String[] args) {
        LongAdder adder = new LongAdder();
        
        // 高并发下比AtomicLong性能更好
        adder.increment();
        adder.add(10);
        
        System.out.println(adder.sum());      // 获取值
        System.out.println(adder.sumThenReset());  // 获取并重置
    }
}

并发工具类

CountDownLatch

java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("任务 " + taskId + " 开始");
                    Thread.sleep(1000);
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();  // 计数减1
                }
            });
        }
        
        latch.await();  // 等待计数归零
        System.out.println("所有任务完成");
        executor.shutdown();
    }
}

CyclicBarrier

java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有线程到达屏障,开始下一阶段");
        });
        
        for (int i = 0; i < parties; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 到达屏障");
                    barrier.await();  // 等待其他线程
                    System.out.println(Thread.currentThread().getName() + " 继续执行");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        int permits = 3;  // 同时允许3个线程访问
        Semaphore semaphore = new Semaphore(permits);
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    semaphore.acquire();  // 获取许可
                    System.out.println("任务 " + taskId + " 获取资源");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    System.out.println("任务 " + taskId + " 释放资源");
                    semaphore.release();  // 释放许可
                }
            });
        }
        
        executor.shutdown();
    }
}

Exchanger

java
import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        new Thread(() -> {
            try {
                String data = "来自线程A的数据";
                System.out.println("线程A发送: " + data);
                String received = exchanger.exchange(data);
                System.out.println("线程A收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        new Thread(() -> {
            try {
                String data = "来自线程B的数据";
                System.out.println("线程B发送: " + data);
                String received = exchanger.exchange(data);
                System.out.println("线程B收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

CompletableFuture

创建CompletableFuture

java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // 异步执行,无返回值
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            System.out.println("异步任务执行");
        }, executor);
        
        // 异步执行,有返回值
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        }, executor);
        
        // 获取结果
        future2.thenAccept(result -> System.out.println("结果: " + result));
        
        executor.shutdown();
    }
}

链式调用

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")      // 同步转换
    .thenApplyAsync(String::toUpperCase);  // 异步转换

future.thenAccept(System.out::println);  // HELLO WORLD

组合多个Future

java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

// 两个都完成后组合
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.join());  // Hello World

// 任一完成即返回
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);

// 全部完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);

异常处理

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("出错了");
        return "Success";
    })
    .exceptionally(ex -> "默认值")  // 异常时返回默认值
    .handle((result, ex) -> {       // 统一处理结果和异常
        if (ex != null) return "错误: " + ex.getMessage();
        return result;
    });

并发最佳实践

1. 优先使用并发集合

java
// 推荐
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 不推荐
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());

2. 使用线程池管理线程

java
// 推荐
ExecutorService executor = Executors.newFixedThreadPool(10);

// 不推荐
new Thread(() -> { }).start();

3. 正确关闭资源

java
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
    // 使用线程池
} finally {
    executor.shutdown();
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        executor.shutdownNow();
    }
}

4. 避免死锁

java
// 按顺序获取锁
public void transfer(Account from, Account to, int amount) {
    Account first = from.getId() < to.getId() ? from : to;
    Account second = from.getId() < to.getId() ? to : from;
    
    synchronized (first) {
        synchronized (second) {
            from.debit(amount);
            to.credit(amount);
        }
    }
}

5. 使用volatile保证可见性

java
private volatile boolean running = true;

public void stop() {
    running = false;
}

public void run() {
    while (running) {
        // 执行任务
    }
}