Из Википедии, бесплатной энциклопедии
Перейти к навигации Перейти к поиску

В вычислении , то проблема , производитель-потребитель [1] [2] (также известные как проблема ограниченно-буфера ) является классическим примером мульти- процесса синхронизации проблемы, первая версия которого была предложена Дейкстром в 1965 году в его неопубликованной рукописи [3], в которой буфер был неограничен, а затем опубликован с ограниченным буфером в 1972 году. [4] В первой версии проблемы есть два циклических процесса, производитель и потребитель, которые совместно используют обычный буфер фиксированного размера, используемый в качестве очереди. Производитель многократно генерирует данные и записывает их в буфер. Потребитель многократно считывает данные в буфере, удаляя их в процессе чтения и используя эти данные каким-либо образом. В первой версии проблемы с неограниченным буфером проблема состоит в том, как разработать код производителя и потребителя так, чтобы при их обмене данными данные не терялись или не дублировались, данные считывались потребителем в том порядке, в котором они написан производителем, и оба процесса достигают максимального прогресса. В более поздней формулировке проблемы Дейкстра предложил несколько производителей и потребителей, совместно использующих конечный набор буферов. Это добавляло дополнительную проблему, заключающуюся в предотвращении попыток производителей записывать данные в буферы, когда все они были заполнены, и в попытке запретить потребителям читать буфер, когда все они были пусты.

Первый случай, который следует рассмотреть, - это тот, в котором есть один производитель и один потребитель, и есть буфер конечного размера. Решение для производителя - либо перейти в спящий режим, либо сбросить данные, если буфер заполнен. В следующий раз, когда потребитель удаляет элемент из буфера, он уведомляет производителя, который снова начинает заполнять буфер. Таким же образом потребитель может перейти в спящий режим, если обнаружит, что буфер пуст. В следующий раз, когда производитель помещает данные в буфер, он будит спящего потребителя. Решение может быть достигнуто посредством межпроцессного взаимодействия , обычно с использованием семафоров . Неадекватное решение может привести к тупиковой ситуации, когда оба процесса ожидают пробуждения.

Неадекватная реализация [ править ]

Чтобы решить эту проблему, возникает соблазн предложить "решение", показанное ниже. В решении используются две библиотечные подпрограммы, sleepи wakeup. Когда вызывается спящий режим, вызывающий блокируется до тех пор, пока другой процесс не разбудит его с помощью процедуры пробуждения. Глобальная переменная itemCountсодержит количество элементов в буфере.

int  itemCount  =  0 ; производитель процедуры ()   то время как  ( истина )  {  элемент  =  produItem (); если  ( itemCount  ==  BUFFER_SIZE )  {  спать ();  } putItemIntoBuffer ( элемент );  itemCount  =  itemCount  +  1 ; если  ( itemCount  ==  1 )  {  пробуждение ( потребитель );  }  } }процедура  consumer ()  {  while  ( true )  { если  ( itemCount  ==  0 )  {  спать ();  } item  =  removeItemFromBuffer ();  itemCount  =  itemCount  -  1 ; if  ( itemCount  ==  BUFFER_SIZE  -  1 )  {  пробуждение ( производитель );  } потреблятьItem ( элемент );  } }

Проблема с этим решением состоит в том, что оно содержит состояние гонки, которое может привести к тупиковой ситуации . Рассмотрим следующий сценарий:

  1. Он consumerтолько что прочитал переменную itemCount, заметил, что она равна нулю, и вот-вот переместится внутрь ifблока.
  2. Непосредственно перед вызовом сна потребитель прерывается, а производитель возобновляет работу.
  3. Производитель создает элемент, помещает его в буфер и увеличивает его itemCount.
  4. Поскольку перед последним добавлением буфер был пуст, производитель пытается разбудить потребителя.
  5. К сожалению, потребитель еще не спал, и пробуждение было потеряно. Когда потребитель возобновляет работу, он засыпает и больше никогда не проснется. Это связано с тем, что потребитель пробуждается производителем только тогда, когда itemCountон равен 1.
  6. Производитель будет повторять цикл до тех пор, пока буфер не заполнится, после чего он также перейдет в спящий режим.

Поскольку оба процесса будут спать вечно, мы зашли в тупик. Следовательно, это решение неудовлетворительно.

Альтернативный анализ заключается в том, что если язык программирования не определяет семантику одновременного доступа к общим переменным (в данном случае itemCount) с использованием синхронизации, то решение неудовлетворительно по этой причине, без необходимости явно демонстрировать состояние гонки.

Использование семафоров [ править ]

Семафоры решают проблему потерянных пробуждающих вызовов. В приведенном ниже решении мы используем два семафора fillCountи emptyCountдля решения проблемы. fillCount- это количество элементов, уже находящихся в буфере и доступных для чтения, а emptyCount- количество доступных пространств в буфере, где элементы могут быть записаны. fillCountувеличивается и emptyCountуменьшается, когда новый элемент помещается в буфер. Если производитель пытается уменьшить, emptyCountкогда его значение равно нулю, производитель переводится в спящий режим. В следующий раз, когда элемент будет потреблен, emptyCountувеличивается на единицу, и производитель просыпается. Аналогично работает потребитель.

семафор  fillCount  =  0 ;  // семафор  произведенных элементов emptyCount  =  BUFFER_SIZE ;  // оставшееся место производитель процедуры ()   то время как  ( истина )  {  элемент  =  produItem ();  вниз ( emptyCount );  putItemIntoBuffer ( элемент );  вверх ( fillCount );  } }процедура-  потребитель ()   то время как  ( истина )  {  вниз ( fillCount );  item  =  removeItemFromBuffer ();  вверх ( emptyCount );  потреблятьItem ( элемент );  } }

Приведенное выше решение отлично работает, когда есть только один производитель и потребитель. Поскольку несколько производителей используют одно и то же пространство памяти для буфера элементов, или несколько потребителей используют одно и то же пространство памяти, это решение содержит серьезное состояние гонки, которое может привести к одновременному чтению или записи двух или более процессов в один и тот же слот. Чтобы понять, как это возможно, представьте, как putItemIntoBuffer()можно реализовать процедуру . Он может содержать два действия: одно определяет следующий доступный слот, а другое записывает в него. Если процедура может выполняться одновременно несколькими производителями, то возможен следующий сценарий:

  1. Два производителя уменьшаются emptyCount
  2. Один из производителей определяет следующий пустой слот в буфере
  3. Второй производитель определяет следующий пустой слот и получает тот же результат, что и первый производитель
  4. Оба производителя записывают в один и тот же слот

Чтобы решить эту проблему, нам нужен способ убедиться, что одновременно выполняется только один продюсер putItemIntoBuffer(). Другими словами, нам нужен способ выполнить критическую секцию с взаимным исключением . Ниже показано решение для нескольких производителей и потребителей.

мьютекс  buffer_mutex ;  // похоже на "семафор buffer_mutex = 1", но отличается (см. примечания ниже) semaphore  fillCount  =  0 ; семафор  emptyCount  =  BUFFER_SIZE ; производитель процедуры ()   то время как  ( истина )  {  элемент  =  produItem ();  вниз ( emptyCount );  вниз ( buffer_mutex );  putItemIntoBuffer ( элемент );  вверх ( buffer_mutex );  вверх ( fillCount );  } }процедура-  потребитель ()   то время как  ( истина )  {  вниз ( fillCount );  вниз ( buffer_mutex );  item  =  removeItemFromBuffer ();  вверх ( buffer_mutex );  вверх ( emptyCount );  потреблятьItem ( элемент );  } }

Обратите внимание, что порядок, в котором разные семафоры увеличиваются или уменьшаются, имеет важное значение: изменение порядка может привести к тупиковой ситуации. Здесь важно отметить, что, хотя мьютекс, кажется, работает как семафор со значением 1 (двоичный семафор), но есть разница в том, что мьютекс имеет концепцию владения. Владение означает, что мьютекс может быть "увеличен" обратно (установлен в 1) тем же процессом, который "уменьшил" его (установлен в 0), а все другие задачи ждут, пока мьютекс не станет доступным для уменьшения (фактически означает, что ресурс доступен) , что обеспечивает взаимную исключительность и позволяет избежать тупиковых ситуаций. Таким образом, неправильное использование мьютексов может остановить многие процессы, когда монопольный доступ не требуется, но мьютекс используется вместо семафора.

Использование мониторов [ править ]

Следующий псевдокод показывает решение проблемы производителя и потребителя с помощью мониторов . Поскольку в случае мониторов взаимное исключение подразумевается, не требуется дополнительных усилий для защиты критического раздела. Другими словами, представленное ниже решение работает с любым количеством производителей и потребителей без каких-либо модификаций. Также стоит отметить, что для программиста менее вероятно, что он напишет код, который страдает от состояния гонки при использовании мониторов, чем при использовании семафоров. [ необходима цитата ]

монитор  ProducerConsumer  {  int  itemCount  =  0 ;  состояние  полное ;  состояние  пусто ; процедура  add ( item )  {  if  ( itemCount  ==  BUFFER_SIZE )  {  ждать ( полный );  } putItemIntoBuffer ( элемент );  itemCount  =  itemCount  +  1 ; если  ( itemCount  ==  1 )  {  уведомить ( пусто );  }  } процедура  remove ()  {  если  ( itemCount  ==  0 )  {  ждать ( пусто );  } item  =  removeItemFromBuffer ();  itemCount  =  itemCount  -  1 ; если  ( itemCount  ==  BUFFER_SIZE  -  1 )  {  уведомление ( полное );  } возврат  товара ;  } } производитель процедуры ()   то время как  ( истина )  {  элемент  =  produItem ();  ПроизводительПотребитель . добавить ( элемент );  } }процедура  consumer ()  {  while  ( true )  {  item  =  ProducerConsumer . удалить ();  потреблятьItem ( элемент );  } }

Без семафоров и мониторов [ править ]

Проблема производителя и потребителя, особенно в случае одного производителя и одного потребителя, в значительной степени связана с реализацией FIFO или канала . Шаблон производитель-потребитель может обеспечить высокоэффективную передачу данных, не полагаясь на семафоры, мьютексы или мониторы для передачи данных . Использование этих примитивов может быть дорогостоящим с точки зрения производительности по сравнению с базовой атомарной операцией чтения / записи. Каналы и FIFO популярны только потому, что они избегают необходимости в сквозной атомарной синхронизации. Базовый пример, закодированный на C, показан ниже. Обратите внимание, что:

  • Атомарный доступ для чтения-изменения-записи к общим переменным исключается, поскольку каждая из двух Countпеременных обновляется только одним потоком. Кроме того, эти переменные поддерживают неограниченное количество операций приращения; отношение остается правильным, когда их значения возвращаются к целочисленному переполнению.
  • В этом примере потоки не переводятся в спящий режим, что может быть приемлемо в зависимости от системного контекста. schedulerYield()Вставляются как попытка улучшить производительность, и может быть опущена. Библиотеки потоков обычно требуют семафоров или условных переменных для управления режимом сна / пробуждением потоков. В многопроцессорной среде спящий / пробуждение потока будет происходить гораздо реже, чем передача токенов данных, поэтому полезно избегать атомарных операций при передаче данных.
  • Этот пример не работает для нескольких производителей и / или потребителей, поскольку при проверке состояния возникает состояние гонки. Например, если в буфере хранилища находится только один токен, и два потребителя обнаруживают, что буфер непустой, то оба будут использовать один и тот же токен и, возможно, увеличат счетчик потребленных токенов выше счетчика для произведенных токенов.
  • В этом примере, как написано, требуется, чтобы UINT_MAX + 1оно делилось без остатка на BUFFER_SIZE; если это не нацело, [Count % BUFFER_SIZE]дает неверный индекс буфера после Countобертываний последние UINT_MAXобратно к нулю. Альтернативное решение, позволяющее избежать этого ограничения, использует две дополнительные Idxпеременные для отслеживания текущего индекса буфера для заголовка (производителя) и хвоста (потребителя). Эти Idxпеременные будут использоваться вместо [Count % BUFFER_SIZE], и каждый из них должен быть увеличен в то же время , как соответствующая Countпеременная увеличивается на единицу, следующим образом : Idx = (Idx + 1) % BUFFER_SIZE.
  • Две Countпеременные должны быть достаточно маленькими, чтобы поддерживать атомарные операции чтения и записи. В противном случае возникает состояние гонки, когда другой поток считывает частично обновленное и, следовательно, неправильное значение.


летучий  беззнаковый  INT  produceCount  =  0 ,  consumeCount  =  0 ; TokenType  sharedBuffer [ BUFFER_SIZE ];недействительный  производитель ( void )  то время как  ( 1 )  то время как  ( produCount  -  consumerCount  ==  BUFFER_SIZE )  { schedulerYield ();  / * sharedBuffer заполнен * / } / * Запись в sharedBuffer _ перед_ увеличением producount * / sharedBuffer [ produCount  %  BUFFER_SIZE ]  =  produToken (); / * Здесь требуется барьер памяти для обеспечения того, чтобы обновление sharedBuffer было видимым для других потоков перед обновлением produCount * / ++produCount ; } }недействительный  потребитель ( void )  то время как  ( 1 )  то время как  ( producount  -  consumerCount  ==  0 )  { schedulerYield ();  / * sharedBuffer пуст * / } consumerToken ( & sharedBuffer [ consumerCount  %  BUFFER_SIZE ]); ++ consumerCount ; } }

В приведенном выше решении используются счетчики, которые при частом использовании могут перегружаться и достигать максимального значения UINT_MAX. Идея указано на четвертой пули, первоначально предложенный Лэмпорт , [5] объясняет , как счетчики могут быть заменены на счетчики конечной дальности. В частности, их можно заменить счетчиками конечного диапазона с максимальным значением N - емкостью буфера.

Спустя четыре десятилетия после представления проблемы производителя-потребителя Агилера, Гафни и Лампорт показали, что проблема может быть решена так, что процессы обращаются только к счетчикам с фиксированным диапазоном (т. Е. Диапазон, который не зависит от размера буфера). при определении, пуст ли буфер или нет. [6] Мотивация для этой меры эффективности состоит в том, чтобы ускорить взаимодействие между процессором и устройствами, которые взаимодействуют через каналы FIFO. Они предложили решение, при котором считываются счетчики максимального значения , чтобы определить, безопасен ли доступ к буферу. Однако их решение по-прежнему использует неограниченные счетчики, которые бесконечно растут, только к этим счетчикам не обращаются во время описанной фазы проверки.

Позже Абрахам и Амрам [7] предложили более простое решение, представленное ниже в псевдокоде, которое обладает обсуждаемым свойством фиксированного диапазона. Решение использует счетчики максимального значения N. Однако, чтобы определить , является ли пустым или полным буфер, процессы получить доступ только к конечному радиусу действия одиночных регистров писателя . Каждому из процессов принадлежит одно записывающее устройство с 12 значениями. Процесс-производитель записывает в массив Flag_p, а процесс-потребитель записывает в Flap_cмассивы с 3 полями. Flag_p[2]и Flag_c[2]может хранить «полный», «пустой» или «безопасный», что соответственно указывает, является ли буфер полным, пустым или ни полным, ни пустым.

Идея алгоритма заключается в следующем. Процессы подсчитывают количество доставленных и удаленных элементов по модулю N + 1 через регистры CountDeliveredи CountRemoved. Когда процесс доставляет или удаляет элемент, он сравнивает эти счетчики и, таким образом, успешно определяет состояние буфера и сохраняет эти данные в Flag_p[2]или Flag_c[2]. На этапе проверки выполняющийся процесс считывает Flag_pи Flag_cпытается оценить, какое значение среди Flag_p[2]и Flag_c[2]отражает текущее состояние буфера. Достичь этой цели помогают два метода синхронизации.

  1. После доставки товара, производитель пишет Flag_p[0]значения , которое он прочитал с Flag_c[0], и после удаления элемента, потребитель пишет к Flag_c[1]значению: 1-Flag_p[0]. Следовательно, условие Flag_p[0] == Flag_c[0]предполагает, что производитель недавно проверил состояние буфера, а Flag_p[0] != Flag_c[0]предполагает обратное.
  2. Операция доставки (удаления) заканчивается записью в Flag_p[1]( Flag_c[1]) значения, хранящегося в Flag_p[0]( Flag_c[0]). Следовательно, условие Flag_p[0] == Flag_p[1]предполагает, что производитель завершил свою последнюю операцию доставки. Аналогичным образом, Condition Flag_c[0] == Flag_c[1]предполагает, что последнее удаление потребителя уже было прекращено.

Следовательно, на этапе проверки, если производитель обнаруживает это Flag_c[0] != Flag_p[0] & Flag_c[0] == Flag_c[1], он действует в соответствии со значением Flag_c[2], а в противном случае - в соответствии со значением, сохраненным в Flag_p[2]. Аналогично, если потребитель обнаруживает это Flag_p[0] == Flag_c[0] & Flag_p[0] == Flag_p[1], он действует в соответствии со значением Flag_p[2], а в противном случае - в соответствии со значением, хранящимся в Flag_c[2]. В приведенном ниже коде переменные с заглавной буквы обозначают общие регистры, записанные одним из процессов и прочитанные обоими процессами. Переменные без заглавной буквы - это локальные переменные, в которые процессы копируют значения, считанные из общих регистров.

countDelivered  =  0 ; countRemoved  =  0 ; Flag_p [ 0 ]  =  0 ;  Flag_p [ 1 ]  =  0 ;  Flag_p [ 2 ]  =  "пустой" ; Flag_c [ 0 ]  =  0 ;  Flag_c [ 1 ]  =  0 ;  Flag_c [ 2 ]  =  "пустой" ;  производитель процедуры ()   то время как  ( истина )  {  элемент  =  produItem ();  / * фаза проверки: занято ждать, пока буфер не  заполнится * / repeat  {  flag_c  =  Flag_c ;  если  ( flag_c [ 0 ]  ! =  Flag_p [ 0 ]  &  flag_c [ 0 ]  ==  flag_c [ 1 ])  ans  =  flag_c [ 2 ];  иначе  ans  =  Flag_p [ 2 ];  }  до  ( ans  ! =  "полный" )  / * фаза доставки товара * /  putItemIntoBuffer ( item );  CountDeliverd  =  countDelivered + 1  %  N + 1 ;  flag_c  =  Flag_c ;  Flag_p [ 0 ]  =  flag_c [ 0 ];  удалено  =  CountRemoved ;  если  ( CountDelivered  -  удалено  ==  N )  {  Flag_p [ 1 ]  =  flag_c [ 0 ];  Flag_p[ 2 ]  =  "полный" ;  }  если  ( CountDelivered  -  удалено  ==  0 )  {  Flag_p [ 1 ]  =  flag_c [ 0 ];  Flag_p [ 2 ]  =  "пустой" ;  }  если  ( 0  <  CountDelivered  -  удалено  <  N )  {  Flag_p [ 1 ]  =  flag_c [ 0 ];  Flag_p [2 ]  =  "безопасно" ;  }  } }процедура  consumer ()  {  while  ( true )  {  / * фаза проверки: занято ожидание, пока буфер не станет пустым * /  repeat  {  flag_p  =  Flag_p ;  если  ( flag_p [ 0 ]  ==  Flag_c [ 0 ]  &  flag_p [ 1 ]  ==  flag_p [ 0 ])  ans  =  flag_p [ 2 ]);  иначе  ans  =  Flag_c [ 2 ];  } до  ( ans  ! =  "пусто" )  / * этап удаления элемента * /  Item  =  removeItemFromBuffer ();  countRemoved  =  countRemoved + 1  %  N + 1 ;  flag_p  =  Flag_p ;  Flag_c [ 0 ]  =  1 - flag_p [ 0 ];  доставлено  =  CountDelivered ;  если  ( доставлено  -  CountRemoved  ==  N )  {  Flag_c [ 1 ]  =  1 - flag_p[ 0 ];  Flag_c [ 2 ]  =  "полный" ;  }  если  ( доставлено  -  CountRemoved  ==  0 )  {  Flag_c [ 1 ]  =  1 - flag_p [ 0 ];  Flag_c [ 2 ]  =  "пустой" ;  }  if  ( 0  <  доставлено  -  CountRemoved  <  N )  {  Flag_c [ 1 ]  = 1 - flag_p [ 0 ];  Flag_c [ 2 ]  = "безопасно" ;  }  } }

Правильность кода зависит от предположения, что процессы могут читать весь массив или записывать в несколько полей массива за одно атомарное действие. Поскольку это предположение нереально, на практике следует заменить Flag_pи Flag_c(log (12) -битными) целыми числами, которые кодируют значения этих массивов. Flag_pи Flag_cпредставлены здесь в виде массивов только для удобства чтения кода.

См. Также [ править ]

  • Атомная операция
  • Шаблон дизайна
  • ФИФО
  • Трубопровод
  • Реализация на Java: Служба сообщений Java

Ссылки [ править ]

  1. ^ Arpaci-Dusseau, Remzi H .; Арпачи-Дюссо, Андреа К. (2014), Операционные системы: три простых элемента [Глава: переменные состояния] (PDF) , Книги Арпачи-Дюссо
  2. ^ Arpaci-Dusseau, Remzi H .; Arpaci-Dusseau, Andrea C. (2014), Операционные системы: три простых элемента [Глава: семафоры] (PDF) , Arpaci-Dusseau Books
  3. ^ Dijkstra, EW "Cooperating Sequential Processes", неопубликованная рукопись EWD123 (1965), http://www.cs.utexas.edu/users/EWD/ewd01xx/EWD123.PDF .
  4. ^ Дейкстра, EW "Информационные потоки, разделяющие конечный буфер". Письма об обработке информации 1.5 (1972): 179-180.
  5. ^ Лэмпорт, Лесли. «Доказательство правильности многопроцессорных программ». Транзакции IEEE по разработке программного обеспечения 2 (1977): 125-143.
  6. ^ Агилера, Маркос К., Эли Гафни и Лесли Лэмпорт. «Проблема с почтовым ящиком». Распределенные вычисления 23.2 (2010): 113-134.
  7. Авраам, Ури и Гал Амрам. «Двухпроцессная синхронизация». Теоретическая информатика 688 (2017): 2-23.

Дальнейшее чтение [ править ]

  • Mark Grand Patterns in Java, Volume 1, Каталог многоразовых шаблонов проектирования, иллюстрированных с помощью UML
  • Журнал пользователей C / C ++ (Dr.Dobb's), январь 2004 г., «Библиотека шаблонов параллелизма для производителей и потребителей C ++» , автор - Тед Юань, представляет собой готовую к использованию библиотеку шаблонов C ++. Исходный код небольшой библиотеки шаблонов и примеры можно найти здесь