Skip to content

并发编程基础

线程创建

继承Thread类

java
public class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
        }
    }
    
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start();  // 启动线程
    }
}

实现Runnable接口

java
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
        }
    }
    
    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start();
        
        // 使用Lambda
        Thread thread2 = new Thread(() -> {
            System.out.println("Lambda线程");
        });
        thread2.start();
    }
}

实现Callable接口

java
import java.util.concurrent.*;

public class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(new MyCallable());
        
        System.out.println("计算结果: " + future.get());
        executor.shutdown();
    }
}

线程生命周期

线程状态

状态说明
NEW新建,未启动
RUNNABLE可运行,正在运行或等待CPU
BLOCKED阻塞,等待锁
WAITING等待,需要其他线程唤醒
TIMED_WAITING超时等待
TERMINATED终止,执行完毕
java
public class ThreadState {
    public static void main(String[] args) throws Exception {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        System.out.println("创建后: " + thread.getState());  // NEW
        thread.start();
        System.out.println("启动后: " + thread.getState());  // RUNNABLE
        Thread.sleep(100);
        System.out.println("睡眠中: " + thread.getState());  // TIMED_WAITING
        thread.join();
        System.out.println("结束后: " + thread.getState());  // TERMINATED
    }
}

线程同步

synchronized关键字

java
public class SynchronizedDemo {
    private int count = 0;
    
    // 同步方法
    public synchronized void increment() {
        count++;
    }
    
    // 同步代码块
    public void decrement() {
        synchronized (this) {
            count--;
        }
    }
    
    // 静态同步方法
    public static synchronized void staticMethod() {
        // 锁的是Class对象
    }
    
    // 静态同步代码块
    public static void staticBlock() {
        synchronized (SynchronizedDemo.class) {
            // 锁的是Class对象
        }
    }
}

同步示例

java
public class Counter {
    private int count = 0;
    
    public synchronized void increment() {
        count++;
    }
    
    public int getCount() {
        return count;
    }
    
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });
        
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });
        
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        
        System.out.println("最终计数: " + counter.getCount());  // 20000
    }
}

线程通信

wait/notify

java
public class ProducerConsumer {
    private final Object lock = new Object();
    private int product = 0;
    
    public void produce() throws InterruptedException {
        synchronized (lock) {
            while (product >= 10) {
                lock.wait();  // 等待消费
            }
            product++;
            System.out.println("生产: " + product);
            lock.notifyAll();  // 唤醒消费者
        }
    }
    
    public void consume() throws InterruptedException {
        synchronized (lock) {
            while (product <= 0) {
                lock.wait();  // 等待生产
            }
            System.out.println("消费: " + product);
            product--;
            lock.notifyAll();  // 唤醒生产者
        }
    }
}

生产者消费者示例

java
import java.util.LinkedList;
import java.util.Queue;

public class BoundedBuffer<T> {
    private final Queue<T> queue = new LinkedList<>();
    private final int capacity;
    
    public BoundedBuffer(int capacity) {
        this.capacity = capacity;
    }
    
    public void put(T item) throws InterruptedException {
        synchronized (queue) {
            while (queue.size() == capacity) {
                queue.wait();
            }
            queue.offer(item);
            queue.notifyAll();
        }
    }
    
    public T take() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                queue.wait();
            }
            T item = queue.poll();
            queue.notifyAll();
            return item;
        }
    }
}

线程池

ExecutorService

java
import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 固定大小线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(5);
        
        // 缓存线程池
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        
        // 单线程池
        ExecutorService singlePool = Executors.newSingleThreadExecutor();
        
        // 定时线程池
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
        
        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            fixedPool.submit(() -> {
                System.out.println("任务 " + taskId + " 由 " + 
                    Thread.currentThread().getName() + " 执行");
            });
        }
        
        // 关闭线程池
        fixedPool.shutdown();
        try {
            if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
                fixedPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            fixedPool.shutdownNow();
        }
    }
}

ThreadPoolExecutor

java
import java.util.concurrent.*;

public class CustomThreadPool {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,                          // 核心线程数
            10,                         // 最大线程数
            60L,                        // 空闲线程存活时间
            TimeUnit.SECONDS,           // 时间单位
            new LinkedBlockingQueue<>(100),  // 工作队列
            Executors.defaultThreadFactory(), // 线程工厂
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        executor.submit(() -> System.out.println("任务执行"));
        executor.shutdown();
    }
}

拒绝策略

策略说明
AbortPolicy抛出RejectedExecutionException(默认)
CallerRunsPolicy由调用线程执行任务
DiscardPolicy直接丢弃任务
DiscardOldestPolicy丢弃队列中最老的任务

ThreadLocal

java
public class ThreadLocalDemo {
    private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
    
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            threadLocal.set(100);
            System.out.println("Thread-0: " + threadLocal.get());
        });
        
        Thread t2 = new Thread(() -> {
            threadLocal.set(200);
            System.out.println("Thread-1: " + threadLocal.get());
        });
        
        t1.start();
        t2.start();
        
        System.out.println("Main: " + threadLocal.get());  // 0
        
        // 使用后清理,防止内存泄漏
        threadLocal.remove();
    }
}

线程安全集合

同步包装器

java
List<String> syncList = Collections.synchronizedList(new ArrayList<>());
Set<String> syncSet = Collections.synchronizedSet(new HashSet<>());
Map<String, String> syncMap = Collections.synchronizedMap(new HashMap<>());

并发集合

java
import java.util.concurrent.*;

// 并发队列
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

// 阻塞队列
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();

// 并发Map
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 跳表
ConcurrentSkipListMap<String, Integer> skipListMap = new ConcurrentSkipListMap<>();

常用方法

线程控制

java
Thread thread = new Thread(() -> {
    // 线程任务
});

thread.start();           // 启动线程
thread.join();            // 等待线程结束
thread.join(1000);        // 最多等待1秒
thread.interrupt();       // 中断线程
thread.isAlive();         // 是否存活
thread.isInterrupted();   // 是否被中断

Thread.sleep(1000);       // 睡眠1秒
Thread.yield();           // 让出CPU
Thread.currentThread();   // 获取当前线程

线程属性

java
Thread thread = new Thread();
thread.setName("MyThread");           // 设置名称
thread.setPriority(Thread.MAX_PRIORITY);  // 设置优先级
thread.setDaemon(true);               // 设置为守护线程
thread.getId();                       // 获取ID
thread.getState();                    // 获取状态