Все говорят, что в Java превосходная поддержка многопоточности, я хочу показать что есть некоторое количество боли, и неочевидных решений в этой поддержке.

Тут я привожу не все примеры боли, например про безопасную публикацию объектов, безопасное конструирование [4], про happens-before, про spurious wakeup(wait() - notify()), про некорректность инкремента оператором ++ поля, в том числе volatile, про 2 потокобезопасных ленивых синглтона, ради одного из которых пришлось[15] пофиксить[16] Java Memory Model

Поток нельзя прервать

Это все знают, но пусть будет на всякий случай:

import java.util.Date;

import java.util.concurrent.TimeUnit;

public class ThreadPainNoInterrupt {

public static void main(String[] args) throws InterruptedException {

Thread t = new Thread(() -> {

while(true) {

System.out.println("" + new Date() + " ololo am a bad boy");

}

});

t.start();

TimeUnit.SECONDS.sleep(2);

t.interrupt();

}

}

...этот код никогда сам не остановится.

Решение - использовать встроенный флаг interrupted

import java.util.Date;

import java.util.concurrent.TimeUnit;

public class ThreadPainNoInterruptFix {

public static void main(String[] args) throws InterruptedException {

Thread t = new Thread(() -> {

while(!Thread.currentThread().isInterrupted()) {

System.out.println("" + new Date() + " ololo am a bad boy");

}

});

t.start();

TimeUnit.SECONDS.sleep(2);

t.interrupt();

}

}

...с которым есть некоторые нюансы - а именно, методы (ниже метод sleep()), бросающие

InterruptedException

С ним(и) есть частое заблуждение:

import java.util.Date;

import java.util.concurrent.TimeUnit;

public class ThreadPainNoInterruptSleep {

public static void main(String[] args) throws InterruptedException {

Thread t = new Thread(() -> {

while(!Thread.currentThread().isInterrupted()) {

System.out.println("" + new Date() + " ololo am a bad boy");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

t.start();

TimeUnit.SECONDS.sleep(2);

t.interrupt();

}

}

... это код, который нам помогает написать IntelliJ IDEA. Он тоже никогда не остановится.

Дело в том, что InterruptedException обладает особым смыслом, это не просто исключение[1], которое можно "проглотить" или залогировать, а сигнал нам (разработчику) что когда поток висел (state=WAITING или TIMED_WAITING) на блокирующей операции (метод sleep()) он был прерван каким-то другим потоком, и, сейчас можно/нужно сделать какие-либо завершающие операции(например удалить документы из mongodb, очистить временные файлы, и т. д.), а затем как только завершающие операции сделаны - мы действительно можем прервать поток, выставив ему флаг interrupted самостоятельно. Иными словами, когда мы поймали InterruptedException - флаг interrupted не выставлен, и мы должны сделать завершающие операции, и не забыть его выставить самостоятельно:

import java.util.Date;

import java.util.concurrent.TimeUnit;

public class ThreadPainNoInterruptSleepFix {

public static void main(String[] args) throws InterruptedException {

Thread t = new Thread(() -> {

while(!Thread.currentThread().isInterrupted()) {

System.out.println("" + new Date() + " ololo am a bad boy");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

// make clean tasks here

Thread.currentThread().interrupt();

}

}

});

t.start();

TimeUnit.SECONDS.sleep(2);

t.interrupt();

}

}

... это работает.

Интересно, что неудобство InterruptedException заметил [13] Рон Пресслер, делая Project Loom, но конкретных предложений пока нет. Looking forward to.

Почему статус interrupted сбрасывается внутри потока и мы должны выставить его обратно сами как в коде выше? Почему нельзя было сделать проще, чтобы не было неявного сброса и последующей ручной установки сего замечательного флага ?

Как объясняет Егор Бугаенко [14], могут быть завершающие действия внутри блока catch (InterruptedException e), которые вызывают например Thread.sleep() (или другие "interrupted aware" методы), который так сделан что не будет спать, если у потока interrupted=true, а быстро выбросит InterruptedException. Получается, если бы статус не сбросился, мы бы не смогли например поспать в завершающем действии. Вроде редкий кейс, но при проектировании многопоточности Java его учли.

Пример кода со StackOverflow, демонстрирущий сказанное в абзаце выше (что Thread.sleep непременно бросит исключение, если у потока стоит флаг interrupted), а также что простановка флага interrupted сократит время отмены Future

import java.util.concurrent.*;

public class ThreadStopper {

public static void main(String... ignored) throws InterruptedException, ExecutionException {

ExecutorService es = Executors.newSingleThreadExecutor();

Future<?> future = es.submit(new Runnable() {

@Override

public void run() {

methodA();

}

});

Thread.sleep(1000);

future.cancel(true); // interrupts task.

long start = System.nanoTime();

try {

future.get();

} catch (CancellationException expected) {

// ignored

}

es.shutdown();

es.awaitTermination(1, TimeUnit.MINUTES);

long time = System.nanoTime() - start;

System.out.printf("Time to cancel/shutdown was %.1f milli-seconds%n", time / 1e6);

}

private static void methodA() { // doesn't throw any checked exception

for (int i = 0; i < 100; i++)

methodB();

}

private static void methodB() {

// uncomment if you don't want to see multiple InterruptedException

// if (!Thread.currentThread().isInterrupted()) {

try {

Thread.sleep(500);

} catch (InterruptedException ie) {

System.out.println("catch InterruptedException");

// comment it and you have increase wait time up to 50 sec

Thread.currentThread().interrupt();

}

// }

}

}

Возникает вопрос, а можно ли, вызывая Thread.currentThread().interrupt() в одной таске "испортить" флаг у потока - так что последующие таски не выполнятся ?

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

public class CooperativeInterruptedNotFail {

public static void main(String[] args) throws InterruptedException {

final ExecutorService es = Executors.newFixedThreadPool(2);

List<Future<?>> futures = new ArrayList<>();

for (int i=0; i<10; ++i) {

futures.add(es.submit(() -> {

if (!Thread.currentThread().isInterrupted()) {

System.out.println("Make heavy computations and sleep "+Thread.currentThread().toString());

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

System.out.println("Interrupting "+Thread.currentThread().toString());

Thread.currentThread().interrupt();

}

} else {

System.out.println("Skipping task because thread "+Thread.currentThread().toString()+" is considered interrupted");

}

}));

}

TimeUnit.MILLISECONDS.sleep(500);

futures.get(0).cancel(true);

futures.get(1).cancel(true);

es.shutdown();

}

}

У меня не получилось, все таски завершились, несмотря на то что первые 2 были отменены и проставили флаг interrupted.

Make heavy computations and sleep Thread[pool-1-thread-2,5,main]

Make heavy computations and sleep Thread[pool-1-thread-1,5,main]

Interrupting Thread[pool-1-thread-2,5,main]

Interrupting Thread[pool-1-thread-1,5,main]

Make heavy computations and sleep Thread[pool-1-thread-2,5,main]

Make heavy computations and sleep Thread[pool-1-thread-1,5,main]

Make heavy computations and sleep Thread[pool-1-thread-2,5,main]

Make heavy computations and sleep Thread[pool-1-thread-1,5,main]

Make heavy computations and sleep Thread[pool-1-thread-2,5,main]

Make heavy computations and sleep Thread[pool-1-thread-1,5,main]

Make heavy computations and sleep Thread[pool-1-thread-2,5,main]

Make heavy computations and sleep Thread[pool-1-thread-1,5,main]

Исключение внутри ScheduledExecutorService

import java.util.Date;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceSwallowsException {

public static void main(String[] args) throws InterruptedException {

java.util.concurrent.ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

scheduledExecutorService.scheduleWithFixedDelay(() -> {

while (true) {

System.out.println("" + new Date() + " ololo am a bad boy");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

System.out.println("Catched InterruptedException");

Thread.currentThread().interrupt();

}

throw new RuntimeException("Because I can");

}

}, 0L, 1L, TimeUnit.SECONDS);

}

}

...Этот код отработает ровно 1 раз, и "молча" "остановится", и приложение будет висеть и ничего не делать.

Это нормальный сценарий согласно javadoc scheduleWithFixedDelay(), исполнение периодической таски(но не всего приложения) останавливается на первом исключении. Собственно "висит" потому что мы не вызвали scheduledExecutorService.shutdownNow().

"Достать" ошибку можно например так:

import java.util.Date;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceWithFuture {

public static void main(String[] args) throws InterruptedException, ExecutionException {

java.util.concurrent.ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {

while (true) {

System.out.println("" + new Date() + " ololo am a bad boy");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

System.out.println("Catched InterruptedException");

Thread.currentThread().interrupt();

}

throw new RuntimeException("Because I can");

}

}, 0L, 1L, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(3);

Object o = scheduledFuture.get();

}

}

Вывод

Sun Nov 10 20:42:41 MSK 2019 ololo am a bad boy

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Because I can

at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)

at com.github.nkonev.blog.ScheduledExecutorServiceWithFuture.main(ScheduledExecutorServiceWithFuture.java:29)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:567)

at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)

Caused by: java.lang.RuntimeException: Because I can

at com.github.nkonev.blog.ScheduledExecutorServiceWithFuture.lambda$main$0(ScheduledExecutorServiceWithFuture.java:22)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:830)

Собственно чтобы приложение завершилось - можно вызвать shutdownNow():

import java.util.Date;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceShutdownNow {

public static void main(String[] args) throws InterruptedException, ExecutionException {

java.util.concurrent.ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {

while (true) {

System.out.println("" + new Date() + " ololo am a bad boy");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

System.out.println("Catched InterruptedException");

Thread.currentThread().interrupt();

}

throw new RuntimeException("Because I can");

}

}, 0L, 1L, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(3);

scheduledExecutorService.shutdownNow();

Object o = scheduledFuture.get();

}

}

Ещё вариант как получить эксепшн внутри таски, напрашивающийся по аналогии с @ExceptionHandler - это задать ExceptionHandler. Давайте это сделаем, позаимствовав код из [2]:

import java.util.Arrays;

import java.util.Date;

import java.util.concurrent.*;

class MyThreadFactory implements ThreadFactory {

private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

private final Thread.UncaughtExceptionHandler handler;

public MyThreadFactory(Thread.UncaughtExceptionHandler handler) {

this.handler = handler;

}

@Override

public Thread newThread(Runnable run) {

Thread thread = defaultFactory.newThread(run);

thread.setUncaughtExceptionHandler(handler);

return thread;

}

}

class MyExceptionHandler implements Thread.UncaughtExceptionHandler {

@Override

public void uncaughtException(Thread thread, Throwable t) {

System.err.println("Uncaught exception is detected! " + t

+ " st: " + Arrays.toString(t.getStackTrace()));

// ... Handle the exception

}

}

public class St {

public static void main(String[] args) throws InterruptedException, ExecutionException {

ThreadFactory factory = new MyThreadFactory(new MyExceptionHandler());

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);

scheduledExecutorService.scheduleWithFixedDelay(() -> {

while (true) {

System.out.println("" + new Date() + " ololo am a bad boy");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

System.out.println("Catched InterruptedException");

Thread.currentThread().interrupt();

}

throw new RuntimeException("Because I can");

}

}, 0L, 1L, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(3);

scheduledExecutorService.shutdownNow();

}

}

... этот код не выведет выброшенный нами RuntimeException("Because I can").

Говорят, для ScheduledExecutorService нужно ловить исключение в afterExecute(). Вот тут[17] описано, каким неочевидным образом его можно "выцепить".

В Spring (Boot?) @Scheduled это упущение исправлено, там предусмотрен по умолчанию логирующий ExceptionHandler:

@EnableScheduling

@SpringBootApplication

public class SpringScheduling {

public static void main(String[] args) throws Exception {

SpringApplication.run(SpringScheduling.class, args);

}

private static final Logger LOGGER = LoggerFactory.getLogger(SpringScheduling.class);

@Scheduled(cron = "* * * * * *")

public void crono() {

LOGGER.info("I will survive");

throw new RuntimeException("Or not");

}

}

Этот код работает, и не останавливается на первом исключении:

2020-08-23 08:04:29.000 INFO 55487 traceId= spanId= --- [pool-3-thread-1] com.github.nkonev.aaa.SpringScheduling:22 : I will survive

2020-08-23 08:04:29.001 ERROR 55487 traceId= spanId= --- [pool-3-thread-1] o.s.s.support.TaskUtils$LoggingErrorHandler:95 : Unexpected error occurred in scheduled task

java.lang.RuntimeException: Or not

at com.github.nkonev.aaa.SpringScheduling.crono(SpringScheduling.java:23) ~[classes/:na]

...

2020-08-23 08:04:30.000 INFO 55487 traceId= spanId= --- [pool-3-thread-1] com.github.nkonev.aaa.SpringScheduling:22 : I will survive

2020-08-23 08:04:30.002 ERROR 55487 traceId= spanId= --- [pool-3-thread-1] o.s.s.support.TaskUtils$LoggingErrorHandler:95 : Unexpected error occurred in scheduled task

java.lang.RuntimeException: Or not

at com.github.nkonev.aaa.SpringScheduling.crono(SpringScheduling.java:23) ~[classes/:na]

...

Состояние потока и чтение из сокета

Когда поток висит на чтении из сокета, он остаётся в статусе RUNNABLE, вот пример [5] тред-дампов, вопрос[6] на SO.

Остановка стрима ProjectReactor и RxJava при исключении

Если внутри стрима случился эксепшн, то по умолчанию наш реактивный стрим останавливается:

import reactor.core.publisher.Flux;

import java.util.Arrays;

import java.util.List;

public class ExceptionInProjectReactor {

public static void main(String[] args) {

List<Integer> tickets = Arrays.asList(2, 1, 6, 8, 9, 11, 13, 15);

Flux.fromIterable(tickets)

.map(integer -> {

if (integer % 3 == 0) {

throw new RuntimeException("divided by 3");

}

return integer;

})

.subscribe(System.out::println);

}

}

с ошибкой

21:58:44.308 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

2

1

Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: divided by 3

Caused by: java.lang.RuntimeException: divided by 3

at com.github.nkonev.blog.ExceptionInProjectReactor.lambda$main$0(ExceptionInProjectReactor.java:14)

at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)

...

Это далеко не всегда желаемо.

Зачастую мы хотим иметь возможность как-то обработать исключение, например залогировать, но не останавливать при этом реактивный стрим.

Можно оборачивать в try-catch, ещё проблему решали таким [7] неочевидным способом. Сообщество работало над хоть каким-то решением [8]. Сейчас это проще - если мы не хотим останавливать реактивный стрим, то нам нужно использовать оператор onErrorContinue():

import reactor.core.publisher.Flux;

import java.util.Arrays;

import java.util.List;

public class ExceptionInProjectReactorHandled {

public static void main(String[] args) {

List<Integer> tickets = Arrays.asList(2, 1, 6, 8, 9, 11, 13, 15);

Flux.fromIterable(tickets)

.map(integer -> {

if (integer % 3 == 0) {

throw new RuntimeException("divided by 3");

}

return integer;

})

.onErrorContinue((throwable, o) -> process(o))

.subscribe(System.out::println);

}

private static void process(Object o) {

System.err.println("got error: " + o);

}

}

Вывод:

22:44:27.687 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

2

1

got error: 6

8

got error: 9

11

13

got error: 15

Для RxJava2 такого красивого решения нет, тут либо try-catch, либо flatMap + onErrorResumeNext [9].

Список литературы

1 Работа с InterruptedException ru pdf mirror

1.1 Dealing with InterruptedException en pdf mirror

2 How to Handle Died Threads due to Uncaught Exceptions in Java

3 Understanding Thread Interruption in Java

4 Методы безопасного конструирования ru pdf mirror

4.1 Safe construction techniques en pdf mirror

5 Threads Stuck in java.net.SocketInputStream.socketRead0 API

6 Java socketRead0 Issue

7 Need an error handler method on Flux when processing a stream of elements

8 Sneak peek at Reactor-Core 3.2 with Milestone 1

9 How to ignore error and continue infinite stream?

10 The RxJava2 Default Error Handler

11 ConcurrentUnit - Toolkit for testing multi-threaded and asynchronous applications

12 Daily Reactive: Where is my exception?!

13 State of Loom: Part 2

14 What Do You Do With InterruptedException?

15 Fixing the Java Memory Model, Part 1

16 Fixing the Java Memory Model, Part 2

17 Handling exceptions from Java ExecutorService tasks

18 "Project Loom: Modern Scalable Concurrency for the Java" - Ron Pressler

19 Sonar Rule about InterruptedException

20 Алексей Шипилёв — Прагматика Java Memory Model

21 Прагматика JMM - слайды

22 Happens before (HB)

23 Неблокирующие алгоритмы Atomic compare-and-swap

SEO:

недостатки многопоточности Java, проблемы многопоточности Java