Fork/Join framework в Java

fork/join framework для разбиения задачи на подзадачи и чтобы эти подзадачи выполнялись в отдельных потоках.

Также эти подзадачи могут тоже делиться на подзадачи и опять же чтобы эти подзадачи выполнялись в отдельных потоках.

Повторяться этот процесс деления может пока подзадача не станет необходимо мала.

Есть пул потоков. В один из его потоков ставиться большая задача.

Эта задача и подзадачи в будущем являют собой выполнения метода compile, который нужно переопределить

Example

Переопределяется он всегда похожим образом:

if задача или подзадача уже необходимо мала, возвращаем что-то из метода compile.

else делим задачу или подзадачу на подзадачи.

Деление на подзадачи в else происходит особым рекурсивным образом с помощью методов fork и join.

fork добавляет подзадачу в пул потоков и любой свободный поток может ее подхватить для выполнения. fork вызывает метод compile подзадачи, к которой fork был применен.

join ждет пока эта форкнутая подзадача выполнится.

Search Icon

Внимание джойном останавливается выполнение кода, но поток не блокируется, поток может выполнять другую подзадачу из пула потоков.

join возвращает значение из compile, который fork вызвал.

Ясное дело, compile будет вызываться рекурсивно форком и соответственно джойны будут копиться пока не дойдет до наименьшей подзадачи.

И джойны, понятное дело, в обратном направлении начнут освобождаться.

Помещенные задачи форком в любом потоке в пул потоков могут быть взяты любым свободным потоком для выполнения.

В этом и есть профит, то что задача распараллеливается и любой поток может взять любую подзадачу для выполнения, даже если она была сгенерирована другим потоком.

Пример программы:

import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinExample { static volatile int j; public static void main(final String[] arguments) throws InterruptedException, ExecutionException { //узнать количество ядер компьютера //(в каждом ядре эффективно выполнять 1 поток) int nThreads = Runtime.getRuntime().availableProcessors(); System.out.println(nThreads); int[] numbers = new int[1000]; //запишем в массив числа от 1 до 1000 for(int i = 0; i < numbers.length; i++) { numbers[i] = i+1; } //Будем складывать числа от 1 до 1000 //эффективным образом с помощью fork/join // сюда передаем количество // ядер (размер пула потоков) ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads); // invoke запускает задачу на выполнение // то есть вызывает compile. Long result = forkJoinPool.invoke( new Sum(numbers,0,numbers.length)); //пока invoke не вернул значение дальше //в методе мейн ничего не выполняется System.out.println(result); } static class Sum extends RecursiveTask< Long > { int low; int high; int[] array; Sum(int[] array, int low, int high) { this.array = array; this.low = low; this.high = high; } protected Long compute() { //if задача или подзадача уже необходимо мала //возвращаем что-то из метода compile if(high – low <= 10) { long sum = 0; for(int i = low; i < high; ++i) sum += array[i]; return sum; } //else - делим задачу или подзадачу //на подзадачи (в данном случае на 2 половины) else { int mid = low + (high - low) / 2; Sum left = new Sum(array, low, mid); Sum right = new Sum(array, mid, high); // добавляем одну половину задачи (подзадачу) // в пул потоков left.fork(); // добавляем вторую половину задачи (подзадачу) // в пул потоков right.fork(); // ждем когда compile в форке завершиться long leftResult = left.join(); // ждем когда compile в форке завершиться long rightResult = right.join(); return leftResult + rightResult; } } } }

Вывод:

Как можно увидеть, в консоли сумма чисел от 1 до 1000 посчитана верна – 500500.

ThreadLocal в Java: переменные потока

ThreadLocalпеременная ПОТОКА. Это не переменная объекта потока, а именно переменная ПОТОКА. То есть она принадлежит только одному потоку и существует только в одном потоке.

В примере ниже мы создаем объект потока threadDemo, который передаем на выполнение в три потока. То есть три потока будут работать с одним и тем же объектом.

Если переменная НЕ ThreadLocal и эта переменная в этом объекте меняется каким-то одним потоком, то эта переменная будет измененной и в других потоках, так как они работают с одним и тем же объектом.

Но ThreadLocal переменная не такая. Если ThreadLocal переменную в объекте изменит какой-то один поток, то она не будет измененной в других потоках не смотря на то, что они работают с одним объектом. Она будет измененной только в потоке, который ее изменил.

Пример программы:

class SomeThread implements Runnable { // локальная переменная объекта класса int counter; // локальная переменная потока ThreadLocal threadLocalCounter = new ThreadLocal<>(); // то есть, как видим, есть переменные ОБЪЕКТА, // а есть переменные ПОТОКА. // Смотри более подробно после кода public void run() { counter++; if (threadLocalCounter.get() != null) { // запись в переменную ThreadLocal // через метод set threadLocalCounter.set(threadLocalCounter.get() + 1); } else { threadLocalCounter.set(0); } System.out.println(“Counter: ” + counter); System.out.println(“threadLocalCounter: ” + threadLocalCounter.get()); } } class ThreadLocalExample { public static void main(String[] args) { SomeThread threadDemo = new SomeThread(); Thread t1 = new Thread(threadDemo); Thread t2 = new Thread(threadDemo); Thread t3 = new Thread(threadDemo); t1.start(); try { Thread.sleep(200); } catch (InterruptedException e) {} t2.start(); try { Thread.sleep(200); } catch (InterruptedException e) {} t3.start(); } }

Вывод:

Как можно увидеть в консоли, переменную counter меняли все три потока, а переменная threadLocalCounter была у каждого потока своя личная и в каждом из потоков она только успела стать 0.

Как можно увидеть ThreadLocal иногда не заменим, например в этом случае, когда создать локальную переменную для потока можно только с помощью ThreadLocal, потому что объект один, это threadDemo и все потоки работают с ним, и соответственно, например, локальная переменная counter для всех потоков общая и все потоки изменяют ее в этом объекте, а чтобы создать личную для каждого потока работающего с объектом threadDemo необходимо использовать ThreadLocal.

Search Icon

То есть counter – переменная объекта, threadLocalCounter – переменная потока.


ThreadLocal и потоконебезопасные обьекты

Одним из важных применений ThreadLocal является работа с потокоНЕбезопасными объектами.

Объекты некоторых классов (например SimpleDateFormat) потокоНЕбезопасны.

То есть если к одному и тому же объекту SimpleDateFormat одновременно обращаются несколько потоков, то нужно применять синхронизацию в потоках (используя synchronized например), иначе будет ошибка. Но синхр-я тормозит работу программы. Поможет ThreadLocal.

То есть если объект реализует Runnable, и он будет использоваться многими потоками, и в этом объекте есть потокоНЕбезопасный объект, лучше не использовать синхронизацию, а размножить его на все потоки используя ThreadLocal.

То есть с помощью ThreadLocal можно сделать так, что у каждого потока будет свой объект SimpleDateFormat.

ReadWriteLock в Java

ReadWriteLock – класс содержащий 2 лока: один для чтения, другой для записи.

Часто бывает так, что один поток пишет в ресурс, а много других потоков читают из него.

То есть один поток блокирует ресурс для записи, а другие потоки, желающие читать из ресурса, ждут пока он запишет, и после того, как записывающий закончил запись, потоки читают, и далее всё повторяется.

Пример программы:

import java.io.*; import java.util.*; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class RedWrtLockExample { static long numOfOperations=10000000000L; public static void main(String[] args) { CommonResource commonResource = new CommonResource(); //объект ресурса for (int i = 1; i < 4; i++){ //Запускаем 3 потока и передаем в каждый из них //ресурс общий для потоков. Thread t = new Thread(new CountThread(commonResource)); t.setName("Thread " + i); t.start(); } //5 раз записать 10 млрд в commonResource.value for (int i = 0; i < 5; i++) { //Здесь начинается запись в ресурс потоком мейн //и пока происходит запись в ресурс потоком мейн //другие потоки читать из ресурса не смогут. commonResource.write(); try{ Thread.sleep(100); } catch(InterruptedException e){} } } static class CommonResource { long value; private final ReadWriteLock lock = new ReentrantReadWriteLock(); void write() { //здесь происходит writeLock().lock() на время //записи десяти млрд блокируем все другие потоки, //которые дошли до readLock().lock() в этом объекте lock.writeLock().lock(); for (long i = 0; i < numOfOperations; i++){ value++; } //запускаются залоченные в read lock.writeLock().unlock(); } void read() { //Если в каком то из потоков в этом объекте //случился writeLock().lock() и пока еще не случился //writeLock().unlock() то readLock().lock() //останавливает потоки, которые дошли //до readLock().lock() в этом объекте ресурса. lock.readLock().lock(); System.out.println("Counter: " + value); lock.readLock().unlock(); } } static class CountThread implements Runnable { CommonResource res; CountThread(CommonResource res){ this.res=res; } public void run(){ for (int i = 0; i < 5; i++) { try { Thread.sleep(200); } catch(InterruptedException e){} //Пока объект заблокирован записью //чтение потоками не будет происходить. res.read(); } } } }

Вывод:

Из консоли видно, что три запущенных в мейн потока каждый раз ждут пока мейн запишет 10 миллиардов в переменную value в ресурсе, и каждый раз, как он это сделал потоки выводят переменную value с добавленными 10 миллиардами.

ExecutorService в Java: управление пулом потоков

Создание потока дорогостоящая операция.

Поэтому можно вместо создания новых потоков переиспользовать те которые завершили свою работу.

ExecutorService помогает поддерживать пул потоков, то есть поддерживает выполнение некоторого фиксированного количества потоков, которые одновременно выполняются.

Также он назначает задачи этим потокам в этом пуле (в пуле всё время находятся те же самые потоки, это важно).

Он также предоставляет возможность ставить задачи в очередь до тех пор, пока не появится свободный поток в пуле, если количество задач превышает количество доступных потоков.

Как уже было сказано ExecutorService создает некоторый пул потоков.

Example

Например:

Если пул потоков имеет размер 10 то параллельно выполняться будет всего 10 потоков и как только один из 10 потоков завершит свою работу, он не создается заново, он перезапускается для выполнения уже другой задачи.

То есть потоки в пуле потоков не создаются заново, а переиспользуются.

Search Icon

Это частно может быть полезно в клиент-серверных программах.

То есть представим, что чтобы обработать один запрос клиента сервер создает отдельный поток. Когда серверной программой обрабатываются запросы клиентов, из-за большого количества потоков может увеличиваться время отклика (промежуток времени, который требуется серверу, чтобы обработать запрос извне). То есть требуется дополнительное время для постоянного создания нитей на сервере, что приводит к увеличению времени отклика и чтобы постоянно не создавались потоки на сервере дла обработки запросов, ExecutorService может помочь. То есть запросы будут выполняться в ExecutorService, в котором потоки переиспользуются для обработки запросов клиентов, и если в ExecutorService нет места, запросы становиться в очередь и будут ждать пока освободиться поток в ExecutorService.

Также это полезно тем, что в процессе может создаться лишь ограниченное количество потоков. Благодаря ExecutorService больше заданного в конструкторе количества потоков не создастся и ясное дело ExecutorService экономит ресурсы и время. Достигается желаемая нагрузка и не подвергаются опасности системные ресурсы.

Пример программы:

import java.io.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class ServiceExecutorExample { //к этой переменной добавляется //в потоках SomeThread static volatile int j; public static void main(String[] args) { ExecutorService exeServ = //одновременно будет работать //только 10 потоков Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++) { //в этом цикле 10 потоков будет //переполнены 100 раз. //Запустить поток в ExecutorService. exeServ.submit(new SomeThread()); //Если бы мы запускали здесь потоки //стандартным образом то создалось //бы 1000 потоков. } try { Thread.sleep(1000); } catch (InterruptedException e){} //завершить ExecutorService exeServ.shutdown(); System.out.println("Counter: " + j); } static class SomeThread implements Runnable{ @Override public void run(){ try{ for (int i = 0; i < 100; i++){ j++; } } catch(Exception e){} } } }

Вывод:

То есть ExecutorService нам нужен чтобы некоторое количество потоков, некоторое продолжительное время переиспользовалось для выполнения каких-либо задач.

Callable в Java

До этого мы рассматривали создание потоков с помощью Thread и Runnable. Последний вариант того как можно создать класс потока это реализовать Callable.

Callable – как Runnable, только вместо run() – call(), который может возвращать значение, то есть благодаря call поток может вернуть значение.

FutureTaskдля взаимодействия с потоками. Он для получения результата выполнения потока (того, что вернет call), еще имеет методы проверки состояния потока.

С помощью метода get() класса FutureTask можно получить результат выполнения потока когда он завершит свое выполнение.

get() блокирует поток, в котором он был вызван пока не выполниться поток переданный в объект FutureTask.

Из также важных методов – с помощью isDone() можем проверить завершился ли уже поток или еще нет.

Пример программы:

import java.io.*; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; class CallableFutureExample { public static void main(String[] args) throws Exception { FutureTask futTask = new FutureTask(new MyCallable()); Thread t = new Thread(futTask); t.start(); //main останавливается и get ждет пока //нить t не выполнится и после этого //возвращает значение метода call. System.out.println(“Counter: ” + futTask.get()); } } class MyCallable implements Callable { public Integer call() throws Exception { Integer j=0; try { for (int i = 0; i < 5; i++) { j++; System.out.println("j = " + j); Thread.sleep(1000); } } catch(Exception e) { return j; //возвращаем j, который //будет помещен в FutureTask } return j; } }

Вывод:

Как видим, get остановил мейн пока выполнялся поток t и когда t выполнился, мейн снова запустился и вывел Counter: 5

Корректная остановка потоков в Java с помощью Interrupt

Чтобы резко остановить поток можно использовать метод Stop(), но резко останавливать не рекомендуется, это может быть опасно.

Search Icon

Поэтому метод Stop предан Строжайшей Анафеме ))) и использовать его можно только в самом-самом крайнем случае, например, когда поток полностью завис и когда другого варианта его остановить нету.

Используют более аккуратный способ interrupt() – этот метод не останавливает поток, а устанавливает статус потока, как прерван, но САМ ПОТОК НЕ ОСТАНАВЛИВАЕТСЯ. Статус потока теперь можно будет проверить в этом потоке в нужном нам месте с помощью метода isInterrupted() и исходя из того, что он вернет завершить выполнение каких-то действий.

Пример программы:

class SomeThread extends Thread { SomeThread(String name){ super(name); } public void run(){ System.out.printf(“%s started… \n”, Thread.currentThread().getName()); int counter=1; // счетчик циклов while(!isInterrupted()){ //если возвращает true //то цикл останавливается System.out.println(“Loop ” + counter++); } //то есть получается блок while работает //пока не придет сигнал его остановить //методом interrupt и в этом вся суть System.out.printf(“%s finished… \n”, Thread.currentThread().getName()); } } public class InterruptExample { public static void main(String[] args) { System.out.println(“Main thread started…”); SomeThread t = new SomeThread(“SomeThread”); t.start(); try{ Thread.sleep(150); //до вызова interrupt метод isInterrupted //в потоке возвращает false t.interrupt();//теперь статус потока t – прерван //и теперь в потоке t вызов метода isInterrupted //должен вернуть true Thread.sleep(150); } catch(InterruptedException e){ System.out.println(“Thread has been interrupted”); } System.out.println(“Main thread finished…”); } }

Вывод:

Из консоли видно, что цикл выполнялся в потоке пока он не получил статус прерван из main.

CyclicBarrier в Java

Принцып работы CyclicBarrier такой: потоки зависают друг за другом в месте вызова в них await. Когда зависло необходимое количество потоков, зависнувшие потоки развисают и запускается другой дополнительный отдельный поток в котором можно что-то сделать, например обработать данные, полученные в предыдущих потоках.

В примере ниже мы создаем переменную volatile и потоки добавляют к ней 1, потом в месте await они останавливаются и когда зависнет указанное в CyclicBarrier количество потоков потоки развисают и сразу после этого запускается метод run в классе Run. И в этом методе run класса Run будет использована эта ранее изменяемая зависающими потоками переменная.

Пример программы:

import java.io.*; import java.util.concurrent.CyclicBarrier; class CyclicBarrierExample { //к этой переменной добавляется 1 //в потоках SomeThread static volatile int j; public static void main(String[] args) { //Только когда на await зависнут три потока все //зависнувшие потоки развиснут. //и метод run в классе Run запуститься только //когда все три потока дойдут до await CyclicBarrier cycbar = new CyclicBarrier(6, new Run()); for (int i = 1; i < 7; i++){ SomeThread t = new SomeThread(); t.setName("Thread "+ i); t.cycbar = cycbar; t.start(); } } //здесь в Run выводиться результирующее j static class Run extends Thread { @Override public void run(){ System.out.println(j); } } static class SomeThread extends Thread{ CyclicBarrier cycbar; @Override public void run(){ try{ j++; System.out.println( Thread.currentThread().getName()+" "+j); cycbar.await(); //здесь поток зависает System.out.println( Thread.currentThread().getName() + " is alive again"); } catch(Exception e){} } } }

Вывод:

Как видим шесть потоков остановили свою работу, потом запустился поток Run и вывел 6 и потом потоки продолжили работу.

Ожидание завершения потоков с помощью CountDownLatch

CountDownLatch – нужен когда требуется чтобы какой-то поток ждал пока не выполняться какое-то количество потоков.

В примере ниже мейн останавливает свою работу с помощью await и когда в потоках вызовется countDown три раза, мейн продолжает свою работу.

Пример программы:

import java.io.*; import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) { int sleeptime = 0; CountDownLatch resources = new CountDownLatch(3); //запускаем 4 потока. каждый поток будет //уменьшать на единицу число, которое указано //в CountDownLatch. у нас 3. for (int i = 1; i < 11; i++) { SomeThread t = new SomeThread(); t.setName("Thread " + i); t.resources = resources; sleeptime=200; t.sleeptime = sleeptime; t.start(); } try { System.out.println( Thread.currentThread().getName()+" is stopped"); //этот текущий поток (то есть поток мейна) //будет остановлен пока счетчик resources //не будет ноль resources.await(); } catch(InterruptedException e){} System.out.println( Thread.currentThread().getName()+" is alive again"); } } class SomeThread extends Thread{ int sleeptime; CountDownLatch resources; @Override public void run() { try{ Thread.sleep(sleeptime); } catch (InterruptedException e){} //поток выполнился. уменьшаем счетчик на 1 resources.countDown(); System.out.println( Thread.currentThread().getName()+" finished working"); } }

Вывод:

Из консоли видно, что три потока завершили свое выполнение и сразу после этого ожил мейн.

ReentrantLock в Java – гибкая альтернатива synchronized

ReentrantLock ведет себя как synchronized, но есть некоторые отличия.

Мы помним, что synchronized блочит цельный кусок кода, то есть synchronized, это цельный метод, который не может быть выполняем параллельно несколькими потоками.

ReentrantLock же хоть и ведет себя как synchronized, но он блочит не цельный метод как это делает synchronized, а может залочить вообще любой кусок кода программы и не важно где начало этого куска а где конец.

То есть если synchronized лочит для других потоков цельный метод то ReentrantLock например может залочить для других потоков часть одного метода и часть другого метода.

Лучше пояснить на примере.

Пример программы:

import java.io.*; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockExample { public static void main(String[] args) { CommonResource commonResource = new CommonResource(); //объект ресурса for (int i = 1; i < 6; i++) { //запускаем 5 потоков передаем в каждый //из них ресурс общий для потоков Thread t = new Thread(new CountThread(commonResource)); t.setName("Thread " + i); t.start(); } } } class CommonResource { int x; Lock lock = new ReentrantLock(); void increment() { //Заблокируем кусок кода, который состоит //из части метода increment и части метода //increment1. То есть очевидно, что это не //цельный кусок кода, а из ранных частей нашего //класса. При synchronized блокируется весь код, //который содержится в методе от первой строки //в нем до последней. Здесь же достаем часть //метода increment, которая после lock.lock() //и часть метода increment1, которая до lock.unlock(), склеиваем эти две части и //блокируем этот склеиный кусок кода как цельный. //То есть очевидно, что все что было в методе increment до вызова lock.lock() //(хотя я данном случае, как видим, там ничего нет) не блокируется, блокируется кусок //посреди метода, а не на весь метод. lock.lock(); x=1; for (int i = 1; i < 5; i++) { System.out.printf("%s %d \n", Thread.currentThread().getName(), x); x++; try { Thread.sleep(300); } catch (InterruptedException e) {} } lock.unlock(); } void increment1(){ x=1; for (int i = 1; i < 5; i++){ System.out.printf("%s %d \n", Thread.currentThread().getName(), x); x++; try{ Thread.sleep(300); } catch(InterruptedException e){} } lock.unlock(); //разблокируем кусок кода //объекта CommonResource который был //заблокирован в другом методе объекта. //то есть в отличии от synchronized //можно залочить и разлочить //НЕЦЕЛЫЙ кусок кода объекта, то есть //в разных частях объекта. } } class CountThread implements Runnable{ CommonResource res; CountThread(CommonResource res){ this.res=res; } public void run(){ //Лочиться в методе increment res.increment(); //Разлочивается в методе increment1 res.increment1(); //то есть как уже говорилось лок //и разлок нецельного куска //кода в любом месте объекта } }

Вывод:

Видим, 5 потоков выполняются по очереди.

Также видим, что каждый из 5 потоков выводит 8 значений. То есть оба цикла в объекте попадают в тот самый не цельный кусок кода, который не может быть выполняем параллельно несколькими потоками благодаря Reentrantlock.

Semaphore в Java

Обычно нужен если много потоков одновременно пытаются получить доступ к каким-то ресурсам, а нам нужно, чтобы к ним одновременно получала доступ только часть из потоков, при этом, оставшиеся потоки стояли в очереди и если какой-то поток оставил ресурс, нужно, чтобы сразу занимал его место один из стоящих в очереди.

Пример программы:

import java.io.*; import java.util.concurrent.Semaphore; public class SemaphoreExample { public static void main(String[] args) { Semaphore resources = new Semaphore(3); // три разрешения // То есть одновременно могут выполняться только // три потока, все остальные стоят в очереди пока // одно из трех мест не освободится. for (int i = 1; i < 10; i++) { SomeThread t = new SomeThread(); t.setName("Thread " + i); t.resources = resources; t.start(); } } } class SomeThread extends Thread { Semaphore resources; @Override public void run() { try { // Поток занимает одно из трех разрешений resources.acquire(); System.out.println( Thread.currentThread().getName() + " entered the resource" ); Thread.sleep(1000); System.out.println( Thread.currentThread().getName() + " left the resource" ); resources.release(); // Поток покидает одно // из трех разрешений, после этого оставленное // разрешение сразу занимает другой поток, // стоявший в очереди на занятие разрешения, // поскольку не мог занять разрешение, так как // все три были заняты другими потоками. } catch (InterruptedException e) {} } }

Вывод:

Из консоли видно, что одновременно выполняются только три потока.

Example

Программа выполняется в следующей последовательности:

Видим, что сначала выполняются только 1,2,3 поток.

Потом 1,2 поток завершают свое выполнение. И на их место стразу становятся 4,6 потоки, которые стояли в очереди.

Далее завершает свою работу 3 поток, и на его место сразу становиться 5 поток.

То есть, очевидно, что параллельно друг другу всё время работают только три потока.