Java 多线程并发编程

在Java中开启新线程执行,一般做法是继承Runnable接口,实现run(),然后使用new Tread().start(run)。如果我们需要对执行结果处理或者线程限制,使用Java提供的并发工具会使多并发编程变得简单。

Java并发工具在 java.util.concurrent 包及其子包 java.util.concurrent.atomic 和 java.util.concurrent.locks 下。本章会介绍相关工具用法。

创建

在并发工具中我们用 Executor 代替 Thread 异步执行。

Executor
一个简单的标准化接口,用于定义定制的类线程子系统,包括线程池、异步I/O和轻量级任务框架。根据所使用的具体执行器类,任务可以在新创建的线程、现有的任务执行线程或调用execute的线程中执行,并且可以顺序执行或并发执行。

ExecutorService
继承Executor接口,提供了一个更完整的异步任务执行框架,理任务的排队和调度,并允许受控关机。

ScheduledExecutorService
继承ExecutorService接口,添加了对延迟和周期性任务执行的支持。

我们一般使用 Executors 创建线程池,获得执行器。

方法说明备注
newSingleThreadExecutor()创建单线程执行器返回ExecutorService
newSingleThreadScheduledExecutor()创建单线程任务执行器返回ScheduledExecutorService
newCachedThreadPool()创建线程池执行根据需要创建新线程,返回ExecutorService
newFixedThreadPool(5)创建固定线程数的线程池固定5个线程,返回ExecutorService
newScheduledThreadPool(5)创建固定线程数的线程池固定5个线程,返回ScheduledExecutorService

执行

实现一个 Runnable 接口:

/**
 * @author Engr-Z
 * @since 2021/2/10
 */
@Slf4j
public class DemoRunnable implements Runnable {

    @Override
    public void run() {
        log.info("demo runnable");
    }
}
  • 立即执行
// 创建单线程执行器
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new DemoRunnable());
  • 延迟执行
@Test
// 创建单线程任务执行器
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// 延迟5秒执行,可以指定单位,天/时/分/秒 等
scheduledExecutorService.schedule(new DemoRunnable(), 5, TimeUnit.SECONDS);
  • 周期性执行
// 创建单线程任务执行器
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// 延迟5秒执行,然后每隔1分钟执行一次
scheduledExecutorService.scheduleAtFixedRate(new DemoRunnable(), 5, 1, TimeUnit.MINUTES);

如果遇到异常,执行会停止。
scheduleWithFixedDelay 和 scheduleAtFixedRate 最大区别是:scheduleWithFixedDelay 会等待上一次任务执行完后再开始下一次执行, scheduleAtFixedRate 以固定周期间隔执行,无论上一次任务是否执行完成都会开始。
使用返回ScheduledFuture的对象执行 cancel(true) 可以终止任务

异步执行结果

如果需要获取异步执行结果,再进行下一步计算,需要关现 Callable 接口中 call 方法。它比 Runnable 的 run 方法不同的是方法执行完需要返回值。

使用 FutureTask 执行 Callable :

FutureTask futureTask = new FutureTask(new Callable() {
    @Override
    public Object call() throws Exception {
        log.info("call method");
        return "hello world";
    }
});
// 执行
futureTask.run();
// 阻塞
Object obj = futureTask.get();
log.info("obj->{}", obj);

使用 futureTask.get() 会阻塞当前线程,直到 call 方法执行完返回结果。

计数器

当我们需要等待一组异步任务执行完后再往下执行,可以使用 CountDownLatch 。

// 初始化10个计数
CountDownLatch countDownLatch = new CountDownLatch(10);

Executors.newSingleThreadExecutor().execute(new Runnable() {
    @SneakyThrows
    @Override
    public void run() {
        while (countDownLatch.getCount() > 0) {
            log.info("count={}", countDownLatch.getCount());
            // 减一
            countDownLatch.countDown();
            Thread.sleep(1000);
        }
    }
});
// 等待
countDownLatch.await();
log.info("finish");

创建 CountDownLatch 对象,在需要等待的线程中执行 await() 阻塞。在线程中任务完成后执行 countDownLatch.countDown() 减一,直到 count 为0时,继续执行。

同步锁

在并发编程时,对于共享数据,要保证线程安全(同一时间只被一个线程操作),可以在方法,或代码块中加锁。

public synchronized void method() {
    // ....
}
public void method() {
    // this 做用于该实例,使用 class 为线程锁
    synchronized (this) {

    }
}

除了使用 synchronized 关键字,可以使用 ReentrantLock 手动加锁解锁,操作更灵活。

private final ReentrantLock lock = new ReentrantLock();

public void m() {
    lock.lock();  // 加锁
    try {
        // ...
    } finally {
        lock.unlock()
    }
}

加锁和解锁需要手动进行,如果次数不一至就无法获得锁。
synchronized 不可响应中断,一个线程获取不到锁就一直等着,ReentrantLock可以相应中断。

voliate 和 atomic包

volidate 是Java中的关键字,用来保证变量线程安全。和 synchronized 相比, volatile 更轻量级。

volatile保证一个线程对变量的修改对其他线程可见,无法保证原子性。当只有一个线程修改共享变量时,适合使用volatile 。示例:

private volatile int i = 0;

java.util.concurrent.atomic 是一个支持在单个变量上进行无锁线程安全编程的类的小工具包。

private final AtomicInteger sequenceNumber = new AtomicInteger(0);

public long next() {
    // 增加
    return sequenceNumber.getAndIncrement();
}

有 AtomicBoolean,AtomicLongInteger,AtomicLong,AtomicLongArray 等,分别对应不同类型。可查阅JDK文档。

ThreadLocal

ThreadLocal 类属于 java.lang 包中的类,它可以在线程中保存一个值。

ThreadLocal<Integer> threadId = new ThreadLocal<>();
threadId.set(1);

获取使用 get 方法。

在 ThreadLocal 中保存的值是存放在当前线程中的,其他线程无法获取。需要注意的是,如果使用了线程程,该值是会保存线程上一次的值。所有用完后在线程释放前使用 threadId.remove() 清除,以免造成bug 。


已发布

分类

来自

标签:

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注