Все говорят, что в Java превосходная поддержка многопоточности, я хочу показать что есть некоторое количество боли, и неочевидных решений в этой поддержке.
Тут я привожу не все примеры боли, например про безопасную публикацию объектов, безопасное конструирование [4], про happens-before, про spurious wakeup(wait() - notify()), про некорректность инкремента оператором ++ поля, в том числе volatile, про 2 потокобезопасных ленивых синглтона, ради одного из которых пришлось[15] пофиксить[16] Java Memory Model
Поток нельзя прервать
Это все знают, но пусть будет на всякий случай:
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ThreadPainNoInterrupt {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while(true) {
System.out.println("" + new Date() + " ololo am a bad boy");
}
});
t.start();
TimeUnit.SECONDS.sleep(2);
t.interrupt();
}
}
...этот код никогда сам не остановится.
Решение - использовать встроенный флаг interrupted
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ThreadPainNoInterruptFix {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()) {
System.out.println("" + new Date() + " ololo am a bad boy");
}
});
t.start();
TimeUnit.SECONDS.sleep(2);
t.interrupt();
}
}
...с которым есть некоторые нюансы - а именно, методы (ниже метод sleep()), бросающие
InterruptedException
С ним(и) есть частое заблуждение:
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ThreadPainNoInterruptSleep {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()) {
System.out.println("" + new Date() + " ololo am a bad boy");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
TimeUnit.SECONDS.sleep(2);
t.interrupt();
}
}
... это код, который нам помогает написать IntelliJ IDEA. Он тоже никогда не остановится.
Дело в том, что InterruptedException обладает особым смыслом, это не просто исключение[1], которое можно "проглотить" или залогировать, а сигнал нам (разработчику) что когда поток висел (state=WAITING или TIMED_WAITING) на блокирующей операции (метод sleep()) он был прерван каким-то другим потоком, и, сейчас можно/нужно сделать какие-либо завершающие операции(например удалить документы из mongodb, очистить временные файлы, и т. д.), а затем как только завершающие операции сделаны - мы действительно можем прервать поток, выставив ему флаг interrupted самостоятельно. Иными словами, когда мы поймали InterruptedException - флаг interrupted не выставлен, и мы должны сделать завершающие операции, и не забыть его выставить самостоятельно:
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ThreadPainNoInterruptSleepFix {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()) {
System.out.println("" + new Date() + " ololo am a bad boy");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// make clean tasks here
Thread.currentThread().interrupt();
}
}
});
t.start();
TimeUnit.SECONDS.sleep(2);
t.interrupt();
}
}
... это работает.
Интересно, что неудобство InterruptedException заметил [13] Рон Пресслер, делая Project Loom, но конкретных предложений пока нет. Looking forward to.
Почему статус interrupted сбрасывается внутри потока и мы должны выставить его обратно сами как в коде выше? Почему нельзя было сделать проще, чтобы не было неявного сброса и последующей ручной установки сего замечательного флага ?
Как объясняет Егор Бугаенко [14], могут быть завершающие действия внутри блока catch (InterruptedException e), которые вызывают например Thread.sleep() (или другие "interrupted aware" методы), который так сделан что не будет спать, если у потока interrupted=true, а быстро выбросит InterruptedException. Получается, если бы статус не сбросился, мы бы не смогли например поспать в завершающем действии. Вроде редкий кейс, но при проектировании многопоточности Java его учли.
Пример кода со StackOverflow, демонстрирущий сказанное в абзаце выше (что Thread.sleep непременно бросит исключение, если у потока стоит флаг interrupted), а также что простановка флага interrupted сократит время отмены Future
import java.util.concurrent.*;
public class ThreadStopper {
public static void main(String... ignored) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newSingleThreadExecutor();
Future<?> future = es.submit(new Runnable() {
@Override
public void run() {
methodA();
}
});
Thread.sleep(1000);
future.cancel(true); // interrupts task.
long start = System.nanoTime();
try {
future.get();
} catch (CancellationException expected) {
// ignored
}
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
long time = System.nanoTime() - start;
System.out.printf("Time to cancel/shutdown was %.1f milli-seconds%n", time / 1e6);
}
private static void methodA() { // doesn't throw any checked exception
for (int i = 0; i < 100; i++)
methodB();
}
private static void methodB() {
// uncomment if you don't want to see multiple InterruptedException
// if (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
System.out.println("catch InterruptedException");
// comment it and you have increase wait time up to 50 sec
Thread.currentThread().interrupt();
}
// }
}
}
Возникает вопрос, а можно ли, вызывая Thread.currentThread().interrupt() в одной таске "испортить" флаг у потока - так что последующие таски не выполнятся ?
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class CooperativeInterruptedNotFail {
public static void main(String[] args) throws InterruptedException {
final ExecutorService es = Executors.newFixedThreadPool(2);
List<Future<?>> futures = new ArrayList<>();
for (int i=0; i<10; ++i) {
futures.add(es.submit(() -> {
if (!Thread.currentThread().isInterrupted()) {
System.out.println("Make heavy computations and sleep "+Thread.currentThread().toString());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupting "+Thread.currentThread().toString());
Thread.currentThread().interrupt();
}
} else {
System.out.println("Skipping task because thread "+Thread.currentThread().toString()+" is considered interrupted");
}
}));
}
TimeUnit.MILLISECONDS.sleep(500);
futures.get(0).cancel(true);
futures.get(1).cancel(true);
es.shutdown();
}
}
У меня не получилось, все таски завершились, несмотря на то что первые 2 были отменены и проставили флаг interrupted.
Make heavy computations and sleep Thread[pool-1-thread-2,5,main]
Make heavy computations and sleep Thread[pool-1-thread-1,5,main]
Interrupting Thread[pool-1-thread-2,5,main]
Interrupting Thread[pool-1-thread-1,5,main]
Make heavy computations and sleep Thread[pool-1-thread-2,5,main]
Make heavy computations and sleep Thread[pool-1-thread-1,5,main]
Make heavy computations and sleep Thread[pool-1-thread-2,5,main]
Make heavy computations and sleep Thread[pool-1-thread-1,5,main]
Make heavy computations and sleep Thread[pool-1-thread-2,5,main]
Make heavy computations and sleep Thread[pool-1-thread-1,5,main]
Make heavy computations and sleep Thread[pool-1-thread-2,5,main]
Make heavy computations and sleep Thread[pool-1-thread-1,5,main]
Исключение внутри ScheduledExecutorService
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceSwallowsException {
public static void main(String[] args) throws InterruptedException {
java.util.concurrent.ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(() -> {
while (true) {
System.out.println("" + new Date() + " ololo am a bad boy");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("Catched InterruptedException");
Thread.currentThread().interrupt();
}
throw new RuntimeException("Because I can");
}
}, 0L, 1L, TimeUnit.SECONDS);
}
}
...Этот код отработает ровно 1 раз, и "молча" "остановится", и приложение будет висеть и ничего не делать.
Это нормальный сценарий согласно javadoc scheduleWithFixedDelay(), исполнение периодической таски(но не всего приложения) останавливается на первом исключении. Собственно "висит" потому что мы не вызвали scheduledExecutorService.shutdownNow().
"Достать" ошибку можно например так:
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceWithFuture {
public static void main(String[] args) throws InterruptedException, ExecutionException {
java.util.concurrent.ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
while (true) {
System.out.println("" + new Date() + " ololo am a bad boy");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("Catched InterruptedException");
Thread.currentThread().interrupt();
}
throw new RuntimeException("Because I can");
}
}, 0L, 1L, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(3);
Object o = scheduledFuture.get();
}
}
Вывод
Sun Nov 10 20:42:41 MSK 2019 ololo am a bad boy
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Because I can
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.github.nkonev.blog.ScheduledExecutorServiceWithFuture.main(ScheduledExecutorServiceWithFuture.java:29)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.lang.RuntimeException: Because I can
at com.github.nkonev.blog.ScheduledExecutorServiceWithFuture.lambda$main$0(ScheduledExecutorServiceWithFuture.java:22)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:830)
Собственно чтобы приложение завершилось - можно вызвать shutdownNow():
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceShutdownNow {
public static void main(String[] args) throws InterruptedException, ExecutionException {
java.util.concurrent.ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
while (true) {
System.out.println("" + new Date() + " ololo am a bad boy");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("Catched InterruptedException");
Thread.currentThread().interrupt();
}
throw new RuntimeException("Because I can");
}
}, 0L, 1L, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(3);
scheduledExecutorService.shutdownNow();
Object o = scheduledFuture.get();
}
}
Ещё вариант как получить эксепшн внутри таски, напрашивающийся по аналогии с @ExceptionHandler - это задать ExceptionHandler. Давайте это сделаем, позаимствовав код из [2]:
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.*;
class MyThreadFactory implements ThreadFactory {
private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final Thread.UncaughtExceptionHandler handler;
public MyThreadFactory(Thread.UncaughtExceptionHandler handler) {
this.handler = handler;
}
@Override
public Thread newThread(Runnable run) {
Thread thread = defaultFactory.newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
}
}
class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread thread, Throwable t) {
System.err.println("Uncaught exception is detected! " + t
+ " st: " + Arrays.toString(t.getStackTrace()));
// ... Handle the exception
}
}
public class St {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadFactory factory = new MyThreadFactory(new MyExceptionHandler());
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
scheduledExecutorService.scheduleWithFixedDelay(() -> {
while (true) {
System.out.println("" + new Date() + " ololo am a bad boy");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("Catched InterruptedException");
Thread.currentThread().interrupt();
}
throw new RuntimeException("Because I can");
}
}, 0L, 1L, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(3);
scheduledExecutorService.shutdownNow();
}
}
... этот код не выведет выброшенный нами RuntimeException("Because I can").
Говорят, для ScheduledExecutorService нужно ловить исключение в afterExecute(). Вот тут[17] описано, каким неочевидным образом его можно "выцепить".
В Spring (Boot?) @Scheduled это упущение исправлено, там предусмотрен по умолчанию логирующий ExceptionHandler:
@EnableScheduling
@SpringBootApplication
public class SpringScheduling {
public static void main(String[] args) throws Exception {
SpringApplication.run(SpringScheduling.class, args);
}
private static final Logger LOGGER = LoggerFactory.getLogger(SpringScheduling.class);
@Scheduled(cron = "* * * * * *")
public void crono() {
LOGGER.info("I will survive");
throw new RuntimeException("Or not");
}
}
Этот код работает, и не останавливается на первом исключении:
2020-08-23 08:04:29.000 INFO 55487 traceId= spanId= --- [pool-3-thread-1] com.github.nkonev.aaa.SpringScheduling:22 : I will survive
2020-08-23 08:04:29.001 ERROR 55487 traceId= spanId= --- [pool-3-thread-1] o.s.s.support.TaskUtils$LoggingErrorHandler:95 : Unexpected error occurred in scheduled task
java.lang.RuntimeException: Or not
at com.github.nkonev.aaa.SpringScheduling.crono(SpringScheduling.java:23) ~[classes/:na]
...
2020-08-23 08:04:30.000 INFO 55487 traceId= spanId= --- [pool-3-thread-1] com.github.nkonev.aaa.SpringScheduling:22 : I will survive
2020-08-23 08:04:30.002 ERROR 55487 traceId= spanId= --- [pool-3-thread-1] o.s.s.support.TaskUtils$LoggingErrorHandler:95 : Unexpected error occurred in scheduled task
java.lang.RuntimeException: Or not
at com.github.nkonev.aaa.SpringScheduling.crono(SpringScheduling.java:23) ~[classes/:na]
...
Состояние потока и чтение из сокета
Когда поток висит на чтении из сокета, он остаётся в статусе RUNNABLE, вот пример [5] тред-дампов, вопрос[6] на SO.
Остановка стрима ProjectReactor и RxJava при исключении
Если внутри стрима случился эксепшн, то по умолчанию наш реактивный стрим останавливается:
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;
public class ExceptionInProjectReactor {
public static void main(String[] args) {
List<Integer> tickets = Arrays.asList(2, 1, 6, 8, 9, 11, 13, 15);
Flux.fromIterable(tickets)
.map(integer -> {
if (integer % 3 == 0) {
throw new RuntimeException("divided by 3");
}
return integer;
})
.subscribe(System.out::println);
}
}
с ошибкой
21:58:44.308 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
2
1
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: divided by 3
Caused by: java.lang.RuntimeException: divided by 3
at com.github.nkonev.blog.ExceptionInProjectReactor.lambda$main$0(ExceptionInProjectReactor.java:14)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
...
Это далеко не всегда желаемо.
Зачастую мы хотим иметь возможность как-то обработать исключение, например залогировать, но не останавливать при этом реактивный стрим.
Можно оборачивать в try-catch, ещё проблему решали таким [7] неочевидным способом. Сообщество работало над хоть каким-то решением [8]. Сейчас это проще - если мы не хотим останавливать реактивный стрим, то нам нужно использовать оператор onErrorContinue():
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;
public class ExceptionInProjectReactorHandled {
public static void main(String[] args) {
List<Integer> tickets = Arrays.asList(2, 1, 6, 8, 9, 11, 13, 15);
Flux.fromIterable(tickets)
.map(integer -> {
if (integer % 3 == 0) {
throw new RuntimeException("divided by 3");
}
return integer;
})
.onErrorContinue((throwable, o) -> process(o))
.subscribe(System.out::println);
}
private static void process(Object o) {
System.err.println("got error: " + o);
}
}
Вывод:
22:44:27.687 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
2
1
got error: 6
8
got error: 9
11
13
got error: 15
Для RxJava2 такого красивого решения нет, тут либо try-catch, либо flatMap + onErrorResumeNext [9].
Список литературы
1 Работа с InterruptedException ru pdf mirror
1.1 Dealing with InterruptedException en pdf mirror
2 How to Handle Died Threads due to Uncaught Exceptions in Java
3 Understanding Thread Interruption in Java
4 Методы безопасного конструирования ru pdf mirror
4.1 Safe construction techniques en pdf mirror
5 Threads Stuck in java.net.SocketInputStream.socketRead0 API
7 Need an error handler method on Flux when processing a stream of elements
8 Sneak peek at Reactor-Core 3.2 with Milestone 1
9 How to ignore error and continue infinite stream?
10 The RxJava2 Default Error Handler
11 ConcurrentUnit - Toolkit for testing multi-threaded and asynchronous applications
12 Daily Reactive: Where is my exception?!
14 What Do You Do With InterruptedException?
15 Fixing the Java Memory Model, Part 1
16 Fixing the Java Memory Model, Part 2
17 Handling exceptions from Java ExecutorService tasks
18 "Project Loom: Modern Scalable Concurrency for the Java" - Ron Pressler
19 Sonar Rule about InterruptedException
20 Алексей Шипилёв — Прагматика Java Memory Model
21 Прагматика JMM - слайды
22 Happens before (HB)
23 Неблокирующие алгоритмы Atomic compare-and-swap
SEO:
недостатки многопоточности Java, проблемы многопоточности Java