2025, Nov 29 06:02
Устранение гонок в SimPy: корректное закрытие и часы работы магазина
Разбор проблемы гонок при закрытии в SimPy и готовое решение: событие-допуск вместо приоритетов. Надежные часы работы и предсказуемая очередь. Без гонок.
Правила закрытия в дискретно-событийных моделях кажутся простыми — пока не вмешивается параллелизм. В этом сценарии на SimPy магазин работает с 8:00 до 20:00, перестает принимать посетителей в 18:00 и выполняет «жесткое» закрытие в 20:00, захватывая все ресурсы на 12 часов. Время от времени сущности всё же успевают проскочить и забрать ресурс сразу после запуска процедуры закрытия, хотя запросы на закрытие имеют более высокий приоритет. Итог получается запутанным: одни ресурсы захватываются ровно в момент закрытия, другие — с задержкой, а пара сущностей успевает кратко взять и освободить ресурс в период, который должен быть закрытым. Ниже — разбор этой проблемы и устойчивый способ обеспечить часы работы без гонок за очередь.
Минимальный пример, на котором проявляется гонка
Модель ниже воспроизводит поведение при детерминированном зерне случайности (10). Используется PriorityResource; клиенты запрашивают ресурс с приоритетом 1, а процесс закрытия на 12 часов делает запросы с приоритетом −1, рассчитывая полностью занять пул ресурсов с 20:00 до 8:00. Имена намеренно подобраны так, чтобы они были наглядными и легко читались.
import simpy
import random
import math
import numpy as np
class BaseSettings:
random.seed(10)
sim_duration = 10080
runs = 1
capacity_units = 4
interarrival = 120
service_duration = 240
open_hour = 8
cutoff_hour = 18
close_hour = 20
class Customer:
def __init__(self, ident):
self.ident = ident
self.arrived_at = np.nan
self.seized_at = np.nan
self.left_at = np.nan
class StoreSim:
def __init__(self, run_idx, cfg):
self.env = simpy.Environment()
self.cfg = cfg
self.counter = 0
self.run_idx = run_idx
self.pool = simpy.PriorityResource(self.env, capacity=cfg.capacity_units)
def clock_parts(self, t):
day = math.floor(t / (24*60))
dow = day % 7
hour = math.floor((t % (day*(24*60)) if day != 0 else t) / 60)
return day, dow, hour
def generate_arrivals(self):
yield self.env.timeout(1)
while True:
self.counter += 1
person = Customer(self.counter)
self.env.process(self.visit_flow(person))
gap = round(random.expovariate(1.0 / self.cfg.interarrival))
yield self.env.timeout(gap)
def scheduler_close(self):
while True:
if self.env.now == 0:
closed_hours = self.cfg.open_hour
next_span = self.cfg.close_hour
else:
closed_hours = 12
next_span = 24
print(f'--!!!!!CLOSING SHOP AT {self.env.now} FOR {closed_hours * 60} MINS!!!!!--')
k = 0
for _ in range(self.cfg.capacity_units):
k += 1
print(f'--claiming resource {k}')
self.env.process(self.occupy_slot(closed_hours * 60))
print(f'--!!!!!SHOP CLOSED!!!!!--')
yield self.env.timeout(next_span * 60)
def occupy_slot(self, span):
with self.pool.request(priority=-1) as rq:
yield rq
print(f'--resource claimed for close at {self.env.now} for {span} mins')
yield self.env.timeout(span)
def visit_flow(self, cust):
cust.arrived_at = self.env.now
day, dow, hour = self.clock_parts(cust.arrived_at)
print(f'entity {cust.ident} starting process at {cust.arrived_at}')
if ((hour >= self.cfg.cutoff_hour) and (hour < self.cfg.close_hour)):
next_close = ((day * 60 * 24) + (self.cfg.close_hour * 60))
pause = (next_close - cust.arrived_at) + 1
print(f'entity {cust.ident} arrived in queue after stop accepting. Time out {pause} mins until close')
yield self.env.timeout(pause)
print(f'entity {cust.ident} has waited until close at {self.env.now}')
attempt = 1
with self.pool.request(priority=1) as rq:
now = self.env.now
print(f'entity {cust.ident} requesting resource at time {now} - attempt {attempt}')
day, dow, hour = self.clock_parts(now)
next_cutoff_day = (day + 1 if hour >= self.cfg.cutoff_hour else day)
next_cutoff = ((next_cutoff_day * 60 * 24) + (self.cfg.cutoff_hour * 60))
until_cutoff = (next_cutoff - now) + 1
print(f'entity {cust.ident} has {until_cutoff} mins until shop stops accepting')
yield rq | self.env.timeout(until_cutoff)
while not rq.triggered:
attempt += 1
print(f'entity {cust.ident} did not get resource as shop stopped accepting at {self.env.now}, entity waiting 2 hours then rejoining queue for attempt {attempt}')
yield self.env.timeout(((self.cfg.close_hour - self.cfg.cutoff_hour) * 60) + 1)
print(f'entity {cust.ident} requesting resource at time {self.env.now} - attempt {attempt}')
now = self.env.now
day, dow, hour = self.clock_parts(now)
next_cutoff_day = (day + 1 if hour >= self.cfg.cutoff_hour else day)
next_cutoff = ((next_cutoff_day * 60 * 24) + (self.cfg.cutoff_hour * 60))
until_cutoff = (next_cutoff - now) + 1
print(f'entity {cust.ident} has {until_cutoff} until shop stops accepting')
yield rq | self.env.timeout(until_cutoff)
print(f'entity {cust.ident} got resource at {self.env.now} on attempt {attempt}')
cust.seized_at = self.env.now
day, dow, hour = self.clock_parts(self.env.now)
next_close_day = (day + 1 if hour >= self.cfg.close_hour else day)
hard_close = ((next_close_day * 60 * 24) + (self.cfg.close_hour * 60)) - 1
window = hard_close - self.env.now
service = round(random.expovariate(1.0 / self.cfg.service_duration))
yield self.env.timeout(min(service, window))
cust.left_at = self.env.now
def start(self):
self.env.process(self.generate_arrivals())
self.env.process(self.scheduler_close())
self.env.run(until=self.cfg.sim_duration)
def execute(cfg):
for r in range(cfg.runs):
print(f"Run {r+1} of {cfg.runs}")
model = StoreSim(r, cfg)
model.start()
execute(BaseSettings)
В чем проблема
Из логов видно две вещи. Во-первых, ровно в момент закрытия «процедура закрытия» начинает отправлять высокоприоритетные запросы, чтобы занять всю пропускную способность на ночь. Во-вторых, активность сущностей местами чередуется с отдельными захватами ресурса для закрытия. В результате два ресурса могут быть заняты сразу в момент закрытия, а оставшиеся — позже. В этот промежуток сущности успевают получить ресурс и быстро уйти. Это заметно в выводе: часть заявок на закрытие попадает точно во время закрытия, а другие — позже, после того как сущности на мгновение удержали ресурсы. Такое поведение проявляется время от времени даже при фиксированном зерне, поскольку то, как переплетаются запросы и выдача в момент закрытия, зависит от планирования процессов при одновременных событиях.
Надежный подход: «шлагбаум» для очереди через событие
Доработка ниже избавляется от гонки за очередь в момент закрытия. Вместо параллельных попыток захватить все ресурсы в 20:00 состояние магазина моделируется явно: есть событие допуска и простой планировщик. Когда магазин не принимает или закрыт, запросы сущностей либо не проходят дальше, либо отменяются и повторяются при следующем открытии. Это устраняет необходимость «заливать» ресурс заявками на закрытие и не дает никому проскочить в нерабочие часы. Дополнительно решение ведет список ожидающих сущностей для наглядности. Зерно генератора случайных чисел остается детерминированным — 10.
import simpy
import random
import math
import numpy as np
class BaseSettings:
random.seed(10)
sim_duration = 10080
runs = 1
capacity_units = 4
interarrival = 120
service_duration = 240
open_hour = 8
cutoff_hour = 18
close_hour = 20
active_days = [0, 1, 2, 3, 4]
class Customer:
def __init__(self, ident):
self.ident = ident
self.arrived_at = np.nan
self.seized_at = np.nan
self.left_at = np.nan
class StoreSim:
def __init__(self, run_idx, cfg):
self.env = simpy.Environment()
self.cfg = cfg
self.counter = 0
self.run_idx = run_idx
self.pool = simpy.PriorityResource(self.env, capacity=cfg.capacity_units)
self.waitlist = []
self.is_open = False
self.accepting = False
self.accept_evt = self.env.event()
def clock_parts(self, t):
day = math.floor(t / (24*60))
dow = day % 7
hour = math.floor((t % (day*(24*60)) if day != 0 else t) / 60)
return day, dow, hour
def schedule_open_state(self, day, dow, hour):
accepting = ((dow in self.cfg.active_days) and ((hour >= self.cfg.open_hour) and (hour < self.cfg.cutoff_hour)))
open_flag = ((dow in self.cfg.active_days) and ((hour >= self.cfg.open_hour) and (hour < self.cfg.close_hour)))
return accepting, open_flag
def generate_arrivals(self):
yield self.env.timeout(1)
while True:
self.counter += 1
person = Customer(self.counter)
self.env.process(self.visit_flow(person))
gap = round(random.expovariate(1.0 / self.cfg.interarrival))
yield self.env.timeout(gap)
def update_accept_event(self):
if self.accepting:
if not self.accept_evt.triggered:
self.accept_evt.succeed()
else:
if self.accept_evt.triggered:
self.accept_evt = self.env.event()
def gatekeeper(self):
while True:
now = self.env.now
day, dow, hour = self.clock_parts(now)
should_accept, should_open = self.schedule_open_state(day, dow, hour)
if should_accept and not self.accepting:
print(f'!!!!!SHOP START ACCEPTING AT {now}!!!!!')
self.accepting = True
self.is_open = True
self.update_accept_event()
elif not should_accept and self.accepting:
print(f'!!!!!SHOP STOP ACCEPTING AT {now}!!!!!')
self.accepting = False
self.update_accept_event()
elif not should_open and self.is_open:
print(f'!!!!!SHOP CLOSING AT {now}!!!!!')
self.is_open = False
self.accepting = False
if dow < 4:
closed_span = (24 - self.cfg.close_hour + self.cfg.open_hour) * 60
else:
closed_span = (72 - self.cfg.close_hour + self.cfg.open_hour) * 60
print(f'!!!!!SHOP CLOSED!!!!!')
yield self.env.timeout(closed_span)
print(f'!!!!!SHOP REOPENING AT {self.env.now}!!!!!')
self.is_open = True
self.accepting = True
self.update_accept_event()
yield self.env.timeout(60)
def visit_flow(self, cust):
cust.arrived_at = self.env.now
day, dow, hour = self.clock_parts(cust.arrived_at)
print(f'entity {cust.ident} starting process at {cust.arrived_at}')
self.waitlist.append(cust)
got_it = False
req = None
while not got_it:
if not self.accepting:
yield self.accept_evt
print(f'entity {cust.ident} requesting resource at {self.env.now}')
req = self.pool.request()
now = self.env.now
day, dow, hour = self.clock_parts(now)
next_cutoff_day = (day + 1 if hour >= self.cfg.cutoff_hour else day)
next_cutoff = ((next_cutoff_day * 60 * 24) + (self.cfg.cutoff_hour * 60))
until_cutoff = max((next_cutoff - now), 1)
outcome = yield req | self.env.timeout(until_cutoff)
if req in outcome:
got_it = True
print(f'++ entity {cust.ident} got resource at {self.env.now}')
else:
if not req.triggered:
req.cancel()
print(f'entity {cust.ident} did not get resource at {self.env.now} - retrying...')
cust.seized_at = self.env.now
day, dow, hour = self.clock_parts(self.env.now)
next_close_day = (day + 1 if hour >= self.cfg.close_hour else day)
hard_close = ((next_close_day * 60 * 24) + (self.cfg.close_hour * 60)) - 1
window = hard_close - self.env.now
service = round(random.expovariate(1.0 / self.cfg.service_duration))
yield self.env.timeout(min(service, window))
self.pool.release(req)
print(f'-- entity {cust.ident} released resource at {self.env.now}')
cust.left_at = self.env.now
def start(self):
self.env.process(self.generate_arrivals())
self.env.process(self.gatekeeper())
self.env.run(until=self.cfg.sim_duration)
def execute(cfg):
for r in range(cfg.runs):
print(f"Run {r+1} of {cfg.runs}")
model = StoreSim(r, cfg)
model.start()
execute(BaseSettings)
Почему это работает
Улучшенная модель фиксирует два состояния: «принимаем» и «открыто». Сущности блокируются на событии допуска и могут продолжать только тогда, когда магазин принимает. Как только прием заканчивается, незавершенные запросы отменяются, и процесс ждет следующего окна приема, чтобы попробовать снова. В момент жесткого закрытия магазин переключается в «закрыто» и остается в этом состоянии заранее рассчитанное время — ночь или выходные — без попыток занимать все ресурсы. Это убирает чередование событий, которое раньше позволяло сущностям успевать запросить и кратко удержать ресурс между отдельными захватами на закрытие.
Зачем это важно
Модели дискретных событий, которые симулируют режим работы, SLA или простои, могут искажать результаты, если граничные условия не применяются атомарно. Очередь, которая вроде бы соблюдает дедлайны приема, но иногда пропускает поздние запросы, способна сместить загрузку, время ожидания и пропускную способность, особенно при малом числе ресурсов. Явное управление допуском через события убирает двусмысленности и делает модель точнее по отношению к политике.
Выводы
Если нужно прекратить прием в конкретное время и полностью закрыться позже, оформляйте график магазина как первоклассное состояние, а не соревнуйтесь с очередью через высокие приоритеты. Используйте единое событие для управления допуском, отменяйте незавершенные запросы при завершении приема и повторяйте попытку только после открытия. Так очередь остается предсказуемой, правила закрытия соблюдаются, а пограничные случаи на стыках не создают неожиданных интерливингов. Детерминированное зерно 10 делает поведение воспроизводимым для проверки и отладки.