Эта статья включает в себя список общих ссылок , но он остается в значительной степени непроверенным, поскольку в нем отсутствует достаточное количество соответствующих встроенных ссылок . ( Сентябрь 2014 г. ) ( Узнайте, как и когда удалить этот шаблон сообщения ) |
Эта статья содержит инструкции, советы или практические советы . ( Декабрь 2020 г. ) |
В вычислении , то проблема , производитель-потребитель [1] [2] (также известные как проблема ограниченно-буфера ) является классическим примером мульти- процесса синхронизации проблемы, первая версия которого была предложена Дейкстром в 1965 году в его неопубликованной рукописи [3], в которой буфер был неограничен, а затем опубликован с ограниченным буфером в 1972 году. [4] В первой версии проблемы есть два циклических процесса, производитель и потребитель, которые совместно используют обычный буфер фиксированного размера, используемый в качестве очереди. Производитель многократно генерирует данные и записывает их в буфер. Потребитель многократно считывает данные в буфере, удаляя их в процессе чтения и используя эти данные каким-либо образом. В первой версии проблемы с неограниченным буфером проблема состоит в том, как разработать код производителя и потребителя так, чтобы при их обмене данными данные не терялись или не дублировались, данные считывались потребителем в том порядке, в котором они написан производителем, и оба процесса достигают максимального прогресса. В более поздней формулировке проблемы Дейкстра предложил несколько производителей и потребителей, совместно использующих конечный набор буферов. Это добавляло дополнительную проблему, заключающуюся в предотвращении попыток производителей записывать данные в буферы, когда все они были заполнены, и в попытке запретить потребителям читать буфер, когда все они были пусты.
Первый случай, который следует рассмотреть, - это тот, в котором есть один производитель и один потребитель, и есть буфер конечного размера. Решение для производителя - либо перейти в спящий режим, либо сбросить данные, если буфер заполнен. В следующий раз, когда потребитель удаляет элемент из буфера, он уведомляет производителя, который снова начинает заполнять буфер. Таким же образом потребитель может перейти в спящий режим, если обнаружит, что буфер пуст. В следующий раз, когда производитель помещает данные в буфер, он будит спящего потребителя. Решение может быть достигнуто посредством межпроцессного взаимодействия , обычно с использованием семафоров . Неадекватное решение может привести к тупиковой ситуации, когда оба процесса ожидают пробуждения.
Неадекватная реализация [ править ]
Чтобы решить эту проблему, возникает соблазн предложить "решение", показанное ниже. В решении используются две библиотечные подпрограммы, sleep
и wakeup
. Когда вызывается спящий режим, вызывающий блокируется до тех пор, пока другой процесс не разбудит его с помощью процедуры пробуждения. Глобальная переменная itemCount
содержит количество элементов в буфере.
int itemCount = 0 ; производитель процедуры () {в то время как ( истина ) { элемент = produItem (); если ( itemCount == BUFFER_SIZE ) { спать (); } putItemIntoBuffer ( элемент ); itemCount = itemCount + 1 ; если ( itemCount == 1 ) { пробуждение ( потребитель ); } } }процедура consumer () { while ( true ) { если ( itemCount == 0 ) { спать (); } item = removeItemFromBuffer (); itemCount = itemCount - 1 ; if ( itemCount == BUFFER_SIZE - 1 ) { пробуждение ( производитель ); } потреблятьItem ( элемент ); } }
Проблема с этим решением заключается в том, что оно содержит состояние гонки, которое может привести к тупиковой ситуации . Рассмотрим следующий сценарий:
- Он
consumer
только что прочитал переменнуюitemCount
, заметил, что она равна нулю, и вот-вот переместится внутрьif
блока. - Непосредственно перед вызовом сна потребитель прерывается, а производитель возобновляет работу.
- Производитель создает элемент, помещает его в буфер и увеличивает его
itemCount
. - Поскольку перед последним добавлением буфер был пуст, производитель пытается разбудить потребителя.
- К сожалению, потребитель еще не спал, и пробуждение было потеряно. Когда потребитель возобновляет работу, он засыпает и больше никогда не проснется. Это связано с тем, что потребитель пробуждается производителем только тогда, когда
itemCount
он равен 1. - Производитель будет повторять цикл до тех пор, пока буфер не заполнится, после чего он также перейдет в спящий режим.
Поскольку оба процесса будут спать вечно, мы зашли в тупик. Следовательно, это решение неудовлетворительно.
Альтернативный анализ заключается в том, что если язык программирования не определяет семантику одновременного доступа к общим переменным (в данном случае itemCount
) с использованием синхронизации, то решение неудовлетворительно по этой причине, без необходимости явно демонстрировать состояние гонки.
Использование семафоров [ править ]
Семафоры решают проблему потерянных пробуждающих вызовов. В приведенном ниже решении мы используем два семафора fillCount
и emptyCount
для решения проблемы. fillCount
- это количество элементов, уже находящихся в буфере и доступных для чтения, а emptyCount
- количество доступных пространств в буфере, где элементы могут быть записаны. fillCount
увеличивается и emptyCount
уменьшается, когда новый элемент помещается в буфер. Если производитель пытается уменьшить, emptyCount
когда его значение равно нулю, производитель переводится в спящий режим. В следующий раз, когда элемент будет потреблен, emptyCount
увеличивается на единицу, и производитель просыпается. Аналогично работает потребитель.
семафор fillCount = 0 ; // семафор произведенных элементов emptyCount = BUFFER_SIZE ; // оставшееся место производитель процедуры () {в то время как ( истина ) { элемент = produItem (); вниз ( emptyCount ); putItemIntoBuffer ( элемент ); вверх ( fillCount ); } }процедура- потребитель () {в то время как ( истина ) { вниз ( fillCount ); item = removeItemFromBuffer (); вверх ( emptyCount ); потреблятьItem ( элемент ); } }
Приведенное выше решение отлично работает, когда есть только один производитель и потребитель. Когда несколько производителей используют одно и то же пространство памяти для буфера элементов, или несколько потребителей используют одно и то же пространство памяти, это решение содержит серьезное состояние гонки, которое может привести к одновременному чтению или записи двух или более процессов в один и тот же слот. Чтобы понять, как это возможно, представьте, как putItemIntoBuffer()
можно реализовать процедуру . Он может содержать два действия: одно определяет следующий доступный слот, а другое записывает в него. Если процедура может выполняться одновременно несколькими производителями, то возможен следующий сценарий:
- Два производителя уменьшаются
emptyCount
- Один из производителей определяет следующий пустой слот в буфере
- Второй производитель определяет следующий пустой слот и получает тот же результат, что и первый производитель
- Оба производителя записывают в один и тот же слот
Чтобы решить эту проблему, нам нужен способ убедиться, что одновременно выполняется только один продюсер putItemIntoBuffer()
. Другими словами, нам нужен способ выполнить критическую секцию с взаимным исключением . Ниже показано решение для нескольких производителей и потребителей.
мьютекс buffer_mutex ; // похоже на "семафор buffer_mutex = 1", но отличается (см. примечания ниже) semaphore fillCount = 0 ; семафор emptyCount = BUFFER_SIZE ; производитель процедуры () {в то время как ( истина ) { элемент = produItem (); вниз ( emptyCount ); вниз ( buffer_mutex ); putItemIntoBuffer ( элемент ); вверх ( buffer_mutex ); вверх ( fillCount ); } }процедура- потребитель () {в то время как ( истина ) { вниз ( fillCount ); вниз ( buffer_mutex ); item = removeItemFromBuffer (); вверх ( buffer_mutex ); вверх ( emptyCount ); потреблятьItem ( элемент ); } }
Обратите внимание, что порядок, в котором разные семафоры увеличиваются или уменьшаются, имеет важное значение: изменение порядка может привести к тупиковой ситуации. Здесь важно отметить, что, хотя мьютекс, кажется, работает как семафор со значением 1 (двоичный семафор), но есть разница в том, что мьютекс имеет концепцию владения. Владение означает, что мьютекс может быть "увеличен" обратно (установлен в 1) тем же процессом, который "уменьшил" его (установлен в 0), а все другие задачи ждут, пока мьютекс не станет доступным для уменьшения (фактически означает, что ресурс доступен) , что обеспечивает взаимную исключительность и позволяет избежать тупика. Таким образом, неправильное использование мьютексов может остановить многие процессы, когда монопольный доступ не требуется, но мьютекс используется вместо семафора.
Использование мониторов [ править ]
Следующий псевдокод показывает решение проблемы производителя и потребителя с помощью мониторов . Поскольку в случае мониторов взаимное исключение подразумевается, не требуется дополнительных усилий для защиты критического раздела. Другими словами, представленное ниже решение работает с любым количеством производителей и потребителей без каких-либо модификаций. Также стоит отметить, что для программиста менее вероятно, что он напишет код, который страдает от состояния гонки при использовании мониторов, чем при использовании семафоров. [ необходима цитата ]
монитор ProducerConsumer { int itemCount = 0 ; состояние полное ; состояние пусто ; процедура add ( item ) { if ( itemCount == BUFFER_SIZE ) { ждать ( полный ); } putItemIntoBuffer ( элемент ); itemCount = itemCount + 1 ; если ( itemCount == 1 ) { уведомить ( пусто ); } } процедура remove () { если ( itemCount == 0 ) { ждать ( пусто ); } item = removeItemFromBuffer (); itemCount = itemCount - 1 ; если ( itemCount == BUFFER_SIZE - 1 ) { уведомление ( полное ); } возврат товара ; } } производитель процедуры () {в то время как ( истина ) { элемент = produItem (); ПроизводительПотребитель . добавить ( элемент ); } }процедура consumer () { while ( true ) { item = ProducerConsumer . удалить (); потреблятьItem ( элемент ); } }
Без семафоров и мониторов [ править ]
Проблема производителя и потребителя, особенно в случае одного производителя и одного потребителя, в значительной степени связана с реализацией FIFO или канала . Шаблон производитель-потребитель может обеспечить высокоэффективную передачу данных, не полагаясь на семафоры, мьютексы или мониторы для передачи данных . Использование этих примитивов может быть дорогостоящим с точки зрения производительности по сравнению с базовой атомарной операцией чтения / записи. Каналы и FIFO популярны только потому, что они избегают необходимости в сквозной атомарной синхронизации. Базовый пример, закодированный на C, показан ниже. Обратите внимание, что:
- Атомарный доступ для чтения-изменения-записи к общим переменным исключается, поскольку каждая из двух
Count
переменных обновляется только одним потоком. Кроме того, эти переменные поддерживают неограниченное количество операций приращения; отношение остается правильным, когда их значения возвращаются к целочисленному переполнению. - В этом примере потоки не переводятся в спящий режим, что может быть приемлемо в зависимости от системного контекста.
schedulerYield()
Вставляются как попытка улучшить производительность, и может быть опущена. Библиотеки потоков обычно требуют семафоров или условных переменных для управления режимом сна / пробуждением потоков. В многопроцессорной среде спящий / пробуждение потока будет происходить гораздо реже, чем передача токенов данных, поэтому полезно избегать атомарных операций при передаче данных. - Этот пример не работает для нескольких производителей и / или потребителей, поскольку при проверке состояния возникает состояние гонки. Например, если в буфере хранилища находится только один токен, и два потребителя обнаруживают, что буфер непустой, то оба будут использовать один и тот же токен и, возможно, увеличат счетчик потребленных токенов выше счетчика для произведенных токенов.
- В этом примере, как написано, требуется, чтобы
UINT_MAX + 1
оно делилось без остатка наBUFFER_SIZE
; если это не нацело,[Count % BUFFER_SIZE]
дает неверный индекс буфера послеCount
обертываний последниеUINT_MAX
обратно к нулю. Альтернативное решение, позволяющее избежать этого ограничения, использует две дополнительныеIdx
переменные для отслеживания текущего индекса буфера для заголовка (производителя) и хвоста (потребителя). ЭтиIdx
переменные будут использоваться вместо[Count % BUFFER_SIZE]
, и каждый из них должен быть увеличен в то же время , как соответствующаяCount
переменная увеличивается на единицу, следующим образом :Idx = (Idx + 1) % BUFFER_SIZE
. - Две
Count
переменные должны быть достаточно маленькими, чтобы поддерживать атомарные операции чтения и записи. В противном случае возникает состояние гонки, когда другой поток считывает частично обновленное и, следовательно, неправильное значение.
летучий беззнаковый INT produceCount = 0 , consumeCount = 0 ; TokenType sharedBuffer [ BUFFER_SIZE ];недействительный производитель ( void ) {в то время как ( 1 ) {в то время как ( produCount - consumerCount == BUFFER_SIZE ) { schedulerYield (); / * sharedBuffer заполнен * / } / * Запись в sharedBuffer _ до_ увеличения produCount * / sharedBuffer [ producount % BUFFER_SIZE ] = produToken (); / * Здесь требуется барьер памяти для обеспечения того, чтобы обновление sharedBuffer было видимым для других потоков перед обновлением produCount * / ++produCount ; } }недействительный потребитель ( void ) {в то время как ( 1 ) {в то время как ( producount - consumerCount == 0 ) { schedulerYield (); / * sharedBuffer пуст * / } consumerToken ( & sharedBuffer [ consumerCount % BUFFER_SIZE ]); ++ consumerCount ; } }
В приведенном выше решении используются счетчики, которые при частом использовании могут перегружаться и достигать максимального значения UINT_MAX
. Идея указано на четвертой пули, первоначально предложенный Лэмпорт , [5] объясняет , как счетчики могут быть заменены на счетчики конечной дальности. В частности, их можно заменить счетчиками конечного диапазона с максимальным значением N - емкостью буфера.
Спустя четыре десятилетия после представления проблемы производителя-потребителя Агилера, Гафни и Лампорт показали, что проблема может быть решена так, что процессы обращаются только к счетчикам с фиксированным диапазоном (т. Е. Диапазон, который не зависит от размера буфера). при определении, пуст ли буфер или нет. [6] Мотивация для этой меры эффективности состоит в том, чтобы ускорить взаимодействие между процессором и устройствами, которые взаимодействуют через каналы FIFO. Они предложили решение, при котором считываются счетчики максимального значения , чтобы определить, безопасен ли доступ к буферу. Однако их решение по-прежнему использует неограниченные счетчики, которые бесконечно растут, только к этим счетчикам не обращаются во время описанной фазы проверки.
Позже Абрахам и Амрам [7] предложили более простое решение, представленное ниже в псевдокоде, которое обладает обсуждаемым свойством фиксированного диапазона. Решение использует счетчики максимального значения N. Однако, чтобы определить , является ли пустым или полным буфер, процессы получить доступ только к конечному радиусу действия одиночных регистров писателя . Каждому из процессов принадлежит одно записывающее устройство с 12 значениями. Процесс-производитель записывает в массив Flag_p
, а процесс-потребитель записывает в Flap_c
массивы с 3 полями. Flag_p[2]
и Flag_c[2]
может хранить «полный», «пустой» или «безопасный», что соответственно указывает, является ли буфер полным, пустым или ни полным, ни пустым.
Идея алгоритма заключается в следующем. Процессы подсчитывают количество доставленных и удаленных элементов по модулю N + 1 через регистры CountDelivered
и CountRemoved
. Когда процесс доставляет или удаляет элемент, он сравнивает эти счетчики и, таким образом, успешно определяет состояние буфера и сохраняет эти данные в Flag_p[2]
или Flag_c[2]
. На этапе проверки выполняющийся процесс считывает Flag_p
и Flag_c
пытается оценить, какое значение среди Flag_p[2]
и Flag_c[2]
отражает текущее состояние буфера. Достичь этой цели помогают два метода синхронизации.
- После доставки товара, производитель пишет
Flag_p[0]
значения , которое он прочитал сFlag_c[0]
, и после удаления элемента, потребитель пишет кFlag_c[1]
значению:1-Flag_p[0]
. Следовательно, условиеFlag_p[0] == Flag_c[0]
предполагает, что производитель недавно проверил состояние буфера, аFlag_p[0] != Flag_c[0]
предполагает обратное. - Операция доставки (удаления) заканчивается записью в
Flag_p[1]
(Flag_c[1]
) значения, хранящегося вFlag_p[0]
(Flag_c[0]
). Следовательно, условиеFlag_p[0] == Flag_p[1]
предполагает, что производитель завершил свою последнюю операцию доставки. Аналогичным образом, ConditionFlag_c[0] == Flag_c[1]
предполагает, что последнее удаление потребителя уже было прекращено.
Следовательно, на этапе проверки, если производитель обнаруживает это Flag_c[0] != Flag_p[0] & Flag_c[0] == Flag_c[1]
, он действует в соответствии со значением Flag_c[2]
, а в противном случае - в соответствии со значением, сохраненным в Flag_p[2]
. Аналогично, если потребитель обнаруживает это Flag_p[0] == Flag_c[0] & Flag_p[0] == Flag_p[1]
, он действует в соответствии со значением Flag_p[2]
, а в противном случае - в соответствии со значением, хранящимся в Flag_c[2]
. В приведенном ниже коде переменные с заглавной буквы обозначают общие регистры, записанные одним из процессов и прочитанные обоими процессами. Переменные без заглавной буквы - это локальные переменные, в которые процессы копируют значения, считанные из общих регистров.
countDelivered = 0 ; countRemoved = 0 ; Flag_p [ 0 ] = 0 ; Flag_p [ 1 ] = 0 ; Flag_p [ 2 ] = "пустой" ; Flag_c [ 0 ] = 0 ; Flag_c [ 1 ] = 0 ; Flag_c [ 2 ] = "пустой" ; производитель процедуры () {в то время как ( истина ) { элемент = produItem (); / * фаза проверки: занято ждать, пока буфер не заполнится * / repeat { flag_c = Flag_c ; если ( flag_c [ 0 ] ! = Flag_p [ 0 ] & flag_c [ 0 ] == flag_c [ 1 ]) ans = flag_c [ 2 ]; иначе ans = Flag_p [ 2 ]; } до ( ans ! = "полный" ) / * фаза доставки товара * / putItemIntoBuffer ( item ); CountDeliverd = countDelivered + 1 % N + 1 ; flag_c = Flag_c ; Flag_p [ 0 ] = flag_c [ 0 ]; удалено = CountRemoved ; если ( CountDelivered - удалено == N ) { Flag_p [ 1 ] = flag_c [ 0 ]; Flag_p[ 2 ] = "полный" ; } если ( CountDelivered - удалено == 0 ) { Flag_p [ 1 ] = flag_c [ 0 ]; Flag_p [ 2 ] = "пустой" ; } если ( 0 < CountDelivered - удалено < N ) { Flag_p [ 1 ] = flag_c [ 0 ]; Flag_p [2 ] = "безопасно" ; } } }процедура consumer () { while ( true ) { / * фаза проверки: занято ожидание, пока буфер не станет пустым * / repeat { flag_p = Flag_p ; если ( flag_p [ 0 ] == Flag_c [ 0 ] & flag_p [ 1 ] == flag_p [ 0 ]) ans = flag_p [ 2 ]); иначе ans = Flag_c [ 2 ]; } до ( ans ! = "пусто" ) / * этап удаления элемента * / Item = removeItemFromBuffer (); countRemoved = countRemoved + 1 % N + 1 ; flag_p = Flag_p ; Flag_c [ 0 ] = 1 - flag_p [ 0 ]; доставлено = CountDelivered ; если ( доставлено - CountRemoved == N ) { Flag_c [ 1 ] = 1 - flag_p[ 0 ]; Flag_c [ 2 ] = "полный" ; } если ( доставлено - CountRemoved == 0 ) { Flag_c [ 1 ] = 1 - flag_p [ 0 ]; Flag_c [ 2 ] = "пустой" ; } if ( 0 < доставлено - CountRemoved < N ) { Flag_c [ 1 ] = 1 - flag_p [ 0 ]; Flag_c [ 2 ] = "безопасно" ; } } }
Правильность кода зависит от предположения, что процессы могут читать весь массив или записывать в несколько полей массива за одно атомарное действие. Поскольку это предположение нереально, на практике следует заменить Flag_p
и Flag_c
(log (12) -битными) целыми числами, которые кодируют значения этих массивов. Flag_p
и Flag_c
представлены здесь в виде массивов только для удобства чтения кода.
См. Также [ править ]
- Атомная операция
- Шаблон дизайна
- ФИФО
- Трубопровод
- Реализация на Java: Служба сообщений Java
Ссылки [ править ]
- ^ Arpaci-Dusseau, Remzi H .; Арпачи-Дюссо, Андреа К. (2014), Операционные системы: три простых элемента [Глава: переменные состояния] (PDF) , Книги Арпачи-Дюссо
- ^ Arpaci-Dusseau, Remzi H .; Arpaci-Dusseau, Andrea C. (2014), Операционные системы: три простых элемента [Глава: семафоры] (PDF) , Arpaci-Dusseau Books
- ^ Dijkstra, EW "Cooperating Sequential Processes", неопубликованная рукопись EWD123 (1965), http://www.cs.utexas.edu/users/EWD/ewd01xx/EWD123.PDF .
- ^ Дейкстра, EW "Информационные потоки, разделяющие конечный буфер". Письма об обработке информации 1.5 (1972): 179-180.
- ^ Лэмпорт, Лесли. «Доказательство правильности многопроцессорных программ». Транзакции IEEE по разработке программного обеспечения 2 (1977): 125-143.
- ^ Агилера, Маркос К., Эли Гафни и Лесли Лэмпорт. «Проблема с почтовым ящиком». Распределенные вычисления 23.2 (2010): 113-134.
- ↑ Авраам, Ури и Гал Амрам. «Двухпроцессная синхронизация». Теоретическая информатика 688 (2017): 2-23.
Дальнейшее чтение [ править ]
- Mark Grand Patterns in Java, Volume 1, Каталог многоразовых шаблонов проектирования, иллюстрированных с помощью UML
- Журнал пользователей C / C ++ (Dr.Dobb's), январь 2004 г., «Библиотека шаблонов параллелизма между производителями и потребителями C ++» , автор Тед Юан, представляет собой готовую к использованию библиотеку шаблонов C ++. Исходный код небольшой библиотеки шаблонов и примеры можно найти здесь