WebClub - Всероссийский Клуб Веб-разработчиков
WebClub.RU » Архив » Вычислительные потоки

Вычислительные потоки


Дата публикации: 17-03-2013

Введение.
Многопоточный режим - хорошо известное выражение в компьютерном жаргоне. Для некоторых, это - источник опасения, для некоторых - это просто "кусок пирога". Однако, вычислительные потоки имеют глубокие корни в программировании на Java и их понимание может помочь в написании лучших программ на Java.

Так, что же такое многопоточный режим? Большинство из нас знает, что многозадачный режим относится к выполнению более одной программы в одно и то же время. Многопоточный режим выполняет это немного глубже, создавая запуск программы, как множество потоков с поведением каждого потока как подпрограммы. Типичный пример многопоточного приложения - это броузер, который позволяет вам скачивать файл и в это же время продолжать серфинг по веб'у.

Вычислительный поток - легкий процесс. Обычно, когда вы запускаете процесс (если вы знакомы с системным вызовом fork() на UNIX), сегмент данных дублируется, что приводит к потреблению ресурсов системы. Вычислительный поток, напротив, совместно использует сегмент данных с его создателем. Следовательно, он легкий.

Использование вычислительных потоков является общепринятым в ежедневном программировании на Java. Например, в типичном приложении Java, когда вызывается метод main(), он запускается в вычислительном потоке Java - главном потоке. Конечно, это - однопоточная модель. Однако, более сложные приложения нуждались бы в более чем одном потоке управления. Java упрощает это, обеспечивая удобный набор API.

Сначала давайте посмотрим как создать вычислительный поток. Существуют два способа, которыми вы можете сделать это на Java:

1. Классифицируя на подклассы класс java.lang.Thread
2. Выполняя интерфейс java.lang.Runnable

Следующий фрагмент кода показывает, как создать поток, классифицируя на подклассы java.lang.Thread.


/** Demonstrates subclassing of java.lang.Thread. */
public class ThreadDemo extends Thread {

/** The main method for thread. */
public void run() {
System.out.println("run(): Invoked");
}

/** Main. */
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
System.out.println("main(): Starting thread");
td.start();
try {
Thread.sleep(2000);
}
catch(InterruptedException e) {
}
System.out.println("main(): Exiting");
}

}
Здесь, метод main() создает образец ThreadDemo, который распространяется из java.lang.Thread. Затем он вызывает метод start(), который внутренне вызывает метод run(), который является основным методом для потока. Каждый поток должен отменить этот метод соответственно.
Так как класс Java может распространяться только из одиночного класса, вы можете выполнять java.lang.Runnable интерфейс для создания потока. Следующая типовая программа показывает это.

/** Demonstrates implementing the java.lang.Runnable interface. */
public class RunnableDemo implements Runnable {

/** The main method for thread. */
public void run() {
System.out.println("run(): Invoked");
}

/** Main. */
public static void main(String[] args) {
RunnableDemo rd = new RunnableDemo();
// Create a launcher thread for this runnable
Thread t = new Thread(rd);
System.out.println("main(): Starting thread");
t.start();
try {
Thread.sleep(2000);
}
catch(InterruptedException e) {
}
System.out.println("main(): Exiting");
}

}
Это абсолютно подобно предыдущему примеру, за исключением предложения, что мы должны создать дополнительный объект потока, чтобы запустить наш образец. Это необходимо, так как java.lang.Thread инкапсулирует логику, чтобы запустить и управлять потоком.
Поток имеет жизненный цикл, который включает различные состояния.

1. Готов к запуску: это вводится когда вы вызываете метод start()
2. Выполнение: это то, где выполняется метод run()
3. Ожидание: поток может входить в это состояние, например, при ожидании ввода/вывода или когда бездействует
4. Мертвый: Вводится, когда метод run() выходит или когда либо метод stop(), либо destroy() вызываются

Поток также имеет приоритет. Поток более высокого приоритета получает предпочтение над низкими приоритетами. Недавно созданный поток имеет приоритет, установленный при создании. Он может получать и устанавливать свой приоритет вызывая методы getPriority() и setPriority() соответственно.

Вычислительные потоки распределяются по времени процессора (CPU) планировщиком. Хотя индивидуальное планирование может меняться от платформы к платформе, в общем случае оно классифицируется либо как приоритетное, либо неприоритетное. Приоритетный планировщик резервирует (приостанавливает) выполняющийся поток, чтобы позволить выполниться другим потокам. Напротив неприоритетный планировщик не приостанавливает поток. Он исключительно полагается на поток, чтобы освободить управление. Поток может делать это, вызывая метод yield(). И это хороший этикет, чтобы следовать интенсивным потокам CPU.

Некоторые вычислительные потоки используются как сервисные потоки. Они существуют для обслуживания других потоков и называются "daemon threads". Коллектор мусора - хорошо известный пример daemon thread'а. Обычно такие потоки запускаются как потоки низкого приоритета. JVM выходит только когда daemon threads остаются действующими. Метод SetDaemon() используется, чтобы сделать поток daemon, а метод isDaemon() сообщает является ли этот поток daemon'ом или нет.

Каждый поток Java принадлежит группе потоков, которая является образцом java.lang.ThreadGroup. Группа обеспечивает удобный API для управления группой потоков. Например, Вы можете остановить все потоки, принадлежащие к группе, используя метод stop(). Она также позволяет сохранять иерархию других групп и потоков, то есть группа потоков может иметь другие группы и потоки как элементы одного уровня и т.д и т.п. Самая верхняя группа вычислительных потоков называется системой. Она содержит несколько системных потоков типа коллектора мусора. Она также содержит объект основной группы потоков, который содержит основной поток. Этот поток фактически выполняет метод main().

Это были потоки в "ореховой скорлупе"! В следующей статье, мы обсудим более серьезную реализацию и проблемы синхронизации.

Мониторинг.
Часто, вычислительные потоки должны сотрудничать друг с другом, чтобы поддержать целостность данных. Технически, это сотрудничество включает доступные общие данные. Это может привести к условию "гонок", в которых один поток может писать данные, в то время как другой может обращаться к ним.

Хотя, в начале, это может не показаться трудным, конечные результаты могут быть нежелательными, если об этом не позаботиться. Например, рассмотрите следующий класс:

/** A terse implementation of a bank account. */
public class Account {

/** The account id. */
private int id;
/** The account balance. */
private float balance;

/** Returns the account number. */
public int getID() {
return id;
}

/** Returns the account balance. */
public float getBalance() {
return balance;
}

/** Withdraws amount from the account. */
public void withdraw(float amount) {
// ... The withdraw logic
// Update balance in the end
balance -= amount;
}

}
Этот невинный класс Account может стать потенциальной жертвой когда он используется множественными вычислительными потоками одновременно. Чтобы понять это, рассмотрим ситуацию в которой поток выполняет метод withdraw(), чтобы снять некоторую величину со счета и выгрузится на полпути. Тем временем, другой поток вызывает метод getBalance(), чтобы отобразить равновесие счета. При этом все еще будет показываться старое равновесие (потому что общие данные, баланс, еще не обновлялись методом withdraw()).

Это - типичная взаимная проблема исключения, в которой только одному методу должен быть предоставлен доступ к общим данным одновременно. Java обеспечивает взаимное исключение через "мониторы". Java монитор обеспечивает механизм блокировки. Только поток, который имеет ключ может обращаться к контролируемым методам. Контролируемый метод объявляется синхронизированным ключевым словом в сигнатуре как показано ниже.

public synchronized void monitoredMethod() {
// ...
}
Здесь показано как это работает. Когда вы синхронизируете метод объекта, только один вычислительный поток одновременно может выполнять метод на том объекте. Этот поток приобретает блокировку на том объекте. Любой другой поток, который должен обратиться к синхронизированному методу этого объекта, консультируется с монитором. Монитор не одобряет запрос пока предыдущий поток не снимает блокировку. Блокировка снимается когда метод заканчивает работу.

Чтобы понять это лучше, давайте синхронизируем методы нашего вышеописанного класса Account.

/** A terse implementation of a bank account. */
public class Account {

/** The account id. */
private int id;
/** The account balance. */
private float balance;

/** Returns the account number. */
public int getID() {
return id;
}

/** Returns the account balance. */
public synchronized float getBalance() {
return balance;
}

/** Withdraws amount from the account. */
public synchronized void withdraw(float amount) {
// ... The withdraw logic
// Update balance in the end
balance -= amount;
}

}
В этом случае, когда один поток выполняет метод withdraw(), он приобретает блокировку на том объекте. Даже если он резервируется, блокировка не снимается. Тем временем, если другой поток пытается вызывать метод getBalance(), он должен ждать, потому что он не может получить блокировку. Таким образом взаимное исключение достигнуто.

Обратите внимание, что метод getID() не синхронизирован, потому что он обращается к данным только для чтения. Это предполагает, что счет id не должен быть изменен, что является достоверным предположением. Дело в том, что вы должны синхронизировать только те методы, имеющие доступ к данным, которые могли бы быть изменены другими потоками.

Java обеспечивает два уровня синхронизации - объектный уровень и уровень класса. Мы видели выше использование синхронизации объектного уровня, в которой только один поток одновременно имеет доступ к синхронизированному методу объекта. В синхронизации уровня класса только один поток одновременно имеет доступ к синхронизированному методу класса. Метод класса, также известный как статический метод, объявляется статическим ключевым словом. Снова, чтобы синхронизировать метод класса используют синхронизированное ключевое слово как показано ниже.

public static synchronized void monitoredClassMethod() {
// ...
}
Когда вам нужно сделать синхронизацию уровня класса? Вам нужно будет сделать это, когда ваши вычислительные потоки используют статические методы класса, чтобы обращаться или манипулировать данными.

Иногда вы можете захотеть синхронизировать доступ к фрагменту кода, основанного на некотором другом объекте. Например, рассмотрите следующий код:

public void normalMethod() {
synchronized(commonData) {
// ...Privileged code
}
}
Это синхронизация блочного уровня, то есть, вместо того, чтобы синхронизировать метод, вы синхронизируете блок кода. Обратите внимание, что метод normalMethod() непосредственно не синхронизирован. Выше описанный CommonData мог бы быть любым объектом, приобретающим потребности блокировки, чтобы выполнить привилегированный код. Этот тип синхронизации используется в методах внутреннего класса, при доступе к данным внешнего класса, который может изменяться другими потоками как показано ниже.

class Outer {
/** Private data. */
private int data;

/** Updates data. */
public synchronized void updateData(int newData) {
this.data = newData;
}

// ... Other methods

/** Inner class. */
class Inner {
public synchronized void update() {
synchronized(Outer.this) {
// ...Use "data"
}
}
}
}
Большинство из нас слышало о критических разделах, которые используются, чтобы выполнить привилегированные части кода. Хотя синхронизация Java обеспечивает взаимное исключение, это не то же самое, что критический раздел. Важное различие между двумя понятиями может анализироваться, понимая, как синхронизация объектного уровня отличается от критического раздела. Критический раздел предоставляет доступ к фрагменту кода только одному потоку. Доступ мог бы быть осуществлен через различные объекты, но это не существенно. В это же время в синхронизации объектного уровня только один поток имеет доступ к синхронизированным методам этого объекта. Другой поток не может вызывать синхронизированный метод на тот же самый объект, пока он не приобретет блокировку. Но можно свободно вызвать синхронизированный метод на другой объект того же самого класса. Вы можете, конечно, выполнять критический раздел в Java, используя синхронизацию блочного уровня.

Теперь мы готовы для дальнейшего обсуждения проблем синхронизации и решений. Я оставлю их до следующей недели. До этого времени контролируйте ваши вычислительные потоки.

Синхронизация.
Помимо потокового сотрудничества, в котором потоки обращаются к синхронизированному коду взаимно исключающим способом, имеются другие общие ситуации, в которых сотрудничество является основой события. Это означает, что потоки ждут специфического условия, чтобы появиться и когда это условие возникает, поток сообщает одному или большему количеству потоков выйти из спящего состояния и приступить к дальнейшей обработке.

Это кажущееся простым описание нуждается в более точном понимании в уровне программирования. Далее представлено то, чего мы хотим достичь:

1. Мы должны быть способны заставить поток ждать на специфическом условии
2. Мы должны быть способны оповестить поток, что условие возникло

Java.lang.Object обеспечивает удобный API для выполнения вышеупомянутого - wait() и notify(). Метод wait() заставляет текущий поток (это выполняемый код) ждать. Метод notify() пробуждает одиночный поток.

Давайте разберемся в этом. Вы вызываете wait() в синхронизированном методе или утверждении для ожидания по условию. Так как поток входит в ждущее состояние в этом случае, другой поток должен сообщить этому потоку, что событие возникло. Это работа метода notify(). Как упомянуто выше, метод или блок должен быть синхронизирован, чтобы получить доступ к монитору объекта. Если вы не вызываете эти методы из синхронизированного кода, вы получаете IllegalMonitorStateException во время выполнения.

Следующий фрагмент кода показывает проблему популярной программы читатель-писатель, где потоки читателя и писателя должены сотрудничать. Читатель должен ждать пока появятся данные для чтения. Писатель должн сообщить читателю, когда он записал данные. Точно так же читатель должен читать данные и затем сообщать писателю. Писатель должна ждать, если буфер полон, пока читатель не возьмет данные из него.

/** The common buffer used by the reader and writer. */
class Buffer {

/** The maximum buffer capacity. */
public final static int BUF_CAPACITY = 10;
/** The buffer data. */
private char[] data;
/** Indicates if there's no data to read. */
private boolean isEmpty;
/** Indicates if the buffer is full. */
private boolean isFull;
/** The count of data items in the buffer. */
private int count;

/** Constructor. */
Buffer() {
this.data = new char[BUF_CAPACITY];
// The buffer is empty initially
this.isEmpty = true;
this.isFull = false;
this.count = 0;
}

/** Returns data in the buffer. */
public synchronized char read() {

// Wait till there's some data to read
while(isEmpty) {
try {
wait();
}
catch(InterruptedException e) {
}
}

// Consume the first character and update the count
char ch = data[0];
System.arraycopy(data, 1, data, 0, --count);

// If the count drops to zero, the buffer is empty
if (count == 0) {
this.isEmpty = true;
}

// Reset the condition for writer
this.isFull = false;

// Notify the writer that the data has been read
notify();

return ch;

}

/** Writes data to the buffer. */
public synchronized void write(char ch) {

// Wait till the buffer is full
while(isFull) {
try {
wait();
}
catch(InterruptedException e) {
}
}

// Store the data and update the count
data[count++] = ch;

// Check if the buffer has reached its maximum capacity
if (count == BUF_CAPACITY) {
this.isFull = true;
}

// Reset the condition for reader
this.isEmpty = false;

// Notify the reader
notify();

}

}

/** The reader. */
class Reader extends Thread {

/** The buffer. */
private Buffer buf;

/** Constructor. */
public Reader(Buffer buf) {
this.buf = buf;
}

/** The main method for the thread. */
public void run() {
// Read the data in the buffer
for(int i = 0; i < Buffer.BUF_CAPACITY; i++) {
char ch = buf.read();
System.out.println("Reader: Read " + ch);
}
}

}

/** The writer. */
class Writer extends Thread {

/** The buffer. */
private Buffer buf;

/** Constructor. */
public Writer(Buffer buf) {
this.buf = buf;
}

/** The main method for the thread. */
public void run() {
// Write a character sequence to the buffer
int val = 'A';
for(int i = 0; i < Buffer.BUF_CAPACITY; i++) {
char ch = (char)(val + i);
System.out.println("Writer: Writing " + ch);
buf.write(ch);
}
}

}

/** The class to demonstrate the reader and writer. */
public class ReaderWriterDemo {

/** Main. */
public static void main(String[] args) {

// The common buffer
Buffer buf = new Buffer();

// The reader and writer threads
Reader reader = new Reader(buf);
Writer writer = new Writer(buf);

// Start the threads
reader.start();
writer.start();

// Wait for the threads to finish
try {
reader.join();
writer.join();
}
catch(InterruptedException e) {
}

}

} // ReaderWriterDemo() ENDS
Давайте обсудим это. Метод main() создает образец класса Buffer, который разделен среди читателя и писателя. Метод wait() также создает и запускает потоки читателя и писателя. Затем он ждет эти потоки, чтобы завершить выполнение, выполняя их join() методы.

Поток писателя просто записывает символьную последовательность 10-ти символов от 'A' -'J'. Поток читателя читает из буфера и отображает чтение символа.

Класс Buffer - основа этого примера. Он хранит данные, которые являются общедоступнными между читателем и писателем. Он также поддерживает информацию относительно состояния буфера - полный или пустой.

Если сначала планируется поток читателя, он будет ждать, пока буфер пустой. Даже если сначала планируется писатель, он не будет вводить состояние ожидания, потому что буфер не полон. Писатель записывает данные в буфер и увеличивает счетчик письменных символов. Если счетчик равняется максимальной буферной емкости, он выставляет флажок полного буфера. В любом случае, он сбрасывает флажок пустого буфера. В заключение, он оповещает поток читателя так, чтобы он мог читать данные.

После получения уведомления, метод read() выйдет из ждущего цикла. Затем он берет символ из буфера и проверяет, пустой ли буфер. Если пустой, он устанавливает флажок пустого буфера. В любом случае, он сбрасывает флажок полного буфера. Затем он оповещает писателя, чтобы он мог записывать данные.

Существуют интересные варианты этого примера. Например, могло бы быть много читателей и один писатель должен сообщить всем им, что он записал данные в буфер. Java.lang.Object поддерживает метод notifyAll() для достижения этого. Он оповещает все потоки ждущие на мониторе объекта. Далее представлена модифицированная версия вышеупомянутого примера с notifyAll():

/** The class to demonstrate multiple readers and a single writer. */
class Buffer {

/** The maximum buffer capacity. */
public final static int BUF_CAPACITY = 10;
/** The buffer data. */
private char[] data;
/** Indicates if there's no data to read. */
private boolean isEmpty;
/** Indicates if the buffer is full. */
private boolean isFull;
/** The count of data items in the buffer. */
private int count;

/** Constructor. */
Buffer() {
this.data = new char[BUF_CAPACITY];
// The buffer is empty initially
this.isEmpty = true;
this.isFull = false;
this.count = 0;
}

/** Returns data in the buffer. */
public synchronized char read() {

// Wait till there's some data to read
while(isEmpty) {
try {
wait();
}
catch(InterruptedException e) {
}
}

// Consume the first character and update the count
char ch = data[0];
System.arraycopy(data, 1, data, 0, --count);

// If the count drops to zero, the buffer is empty
if (count == 0) {
this.isEmpty = true;
}

// Reset the condition for writer
this.isFull = false;

// Notify the writer
notify();

return ch;

}

/** Writes data to the buffer. */
public synchronized void write(char ch) {

// Wait till the buffer is full
while(isFull) {
try {
wait();
}
catch(InterruptedException e) {
}
}

// Store the data and update the count
data[count++] = ch;

// Check if the buffer has reached its maximum capacity
if (count == BUF_CAPACITY) {
this.isFull = true;
}

// Reset the condition for reader
this.isEmpty = false;

// Notify all the readers
notifyAll();

}

}

/** The reader. */
class Reader extends Thread {

/** The id of the reader. */
private int id;
/** The buffer. */
private Buffer buf;

/** Constructor. */
public Reader(int id, Buffer buf) {
this.id = id;
this.buf = buf;
}

/** The main method for the thread. */
public void run() {
// Read the data in the buffer
for(int i = 0; i < 2; i++) {
char ch = buf.read();
System.out.println("Reader#" + id + ": Read " + ch);
}
}

}

/** The writer. */
class Writer extends Thread {

/** The buffer. */
private Buffer buf;

/** Constructor. */
public Writer(Buffer buf) {
this.buf = buf;
}

/** The main method for the thread. */
public void run() {
// Write a character sequence to the buffer
int val = 'A';
for(int i = 0; i < Buffer.BUF_CAPACITY; i++) {
char ch = (char)(val + i);
System.out.println("Writer: Writing " + ch);
buf.write(ch);
}
}

}

/** The class to demonstrate the reader and writer. */
public class MultiReaderWriterDemo {

/** Main. */
public static void main(String[] args) {

// The common buffer
Buffer buf = new Buffer();

// The reader and writer threads
Reader reader1 = new Reader(1, buf);
Reader reader2 = new Reader(2, buf);
Writer writer = new Writer(buf);

// Start the threads
reader1.start();
reader2.start();
writer.start();

// Wait for the threads to finish
try {
reader1.join();
reader2.join();
writer.join();
}
catch(InterruptedException e) {
}

}

} // ReaderWriterDemo() ENDS
Здесь, метод write() использует notifyAll(), чтобы сообщить всем читателям, что имеются данные для чтения. Обратите внимание, что в этом примере, мы не могли бы запустить оба потока читателя для полной буферной емкости, так как два потока потребляют данные в буфере. Именно поэтому я выбрал их, чтобы произвольно работать с двумя итерациями.

Поскольку мы обсуждали вышесказанное, мы знаем, что wait() и notify() могут вызываться только из синхронизированного кода. Возникает интересный вопрос: если синхронизированный метод вызвал метод wait(), как другой поток выполняет синхронизированный метод того же самого объекта, чтобы вызвать notify()? Более точно, как другой поток получает блокировку. Метод wait() реализует блокировку так, чтобы другой поток мог обращаться к монитору. Но, прежде чем wait() возвратится, он заново приобретает блокировку так, чтобы он мог выполнять синхронизированный код.

Мы обсудили много аспектов программирования вычислительных потоков, и я надеюсь, что ваше следующее столкновение с вычислительными потоками будет более приятным. Теперь я могу сказать:
Удачной работы с вычислительными потоками!

Популярное

Не так давно в сети появился новый сервис, под названием Dead Man Zero. Этот сервис сделал...
Рынок социальных площадок уже давно стал стабильным. Несмотря на то, что время от времени...
Artisteer 4 – единственный в своем роде продукт, позволяющий автоматизировать работу над созданием...
Февраль 2017 (3)
Январь 2017 (1)
Август 2016 (1)
Май 2016 (2)
Ноябрь 2015 (1)
Октябрь 2015 (1)

Карта сайта: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41

Друзья сайта

Хотите продать свой сайт?
- Мы быстро и удобно для Вас сможем его купить:
  • Заявка на продажу сайта
  • Раcсматриваем цены на каждый сайт в индивидуальном порядке.

    Случайная цитата

    Oktal:

    "Я думаю, что Microsoft назвал технологию .Net для того, чтобы она не показывалась в списках директорий Unix."

    Опрос

    Какой текстовый редактор Вы используете?

    OpenOffice
    AbiWord
    Notepad++
    UltraEdit
    PSPad
    Microsoft Office
    Microsoft Блокнот
    Другой...