2025, Nov 25 06:02
Эффективное суммирование «рваной» диагонали в 3D NumPy
Как посчитать b[i,j]=Σ a[i+k,j+k,k] без циклов: быстрый способ в NumPy через векторизацию и предвычисленные индексы, кеширование буферов для повторных запусков.
Суммировать «рваную» диагональ в 3D-массиве кажется простой задачей, пока не начинаешь векторизовать её. Пусть задан кубический тензор a формы N×N×N; нужно получить b так, чтобы b[i, j] = Σ_k a[i+k, j+k, k]. Прямолинейное решение — двойной цикл с вычислениями внутри через einsum — работает, но тормозит. Ниже — практичный способ сделать это эффективно в NumPy и вариант ускорения для многократных запусков с неизменными формой и dtype.
Базовый вариант: прямой подход
Фрагмент ниже демонстрирует «наивную» реализацию. Она корректна, однако вложенные циклы и постоянные срезы обходятся дорого.
import numpy as np
N = 10
cube = np.random.rand(N, N, N)
grid = np.zeros((N, N))
for r in range(N):
for c in range(N):
span = np.maximum(r, c)
grid[r, c] = np.einsum('iii', cube[r:N-span+r, c:N-span+c, :N-span])
Почему это медленно
Суммирование идёт не по прямоугольному блоку: для каждой пары (i, j) нужно пройти диагональ переменной длины, зависящей и от i, и от j. Такая «рваная» структура плохо поддаётся прямому broadcasting. Итерация по элементам и срезы внутри цикла только наращивают издержки.
Быстрое вычисление через формирование индексов
Более быстрый подход — заранее построить индексы, которые выбирают все нужные диагональные отрезки для каждой длины, а затем выполнить суммирование по нужной оси. Так мы избавляемся от покомпонентных накладных расходов Python и используем векторизованную индексацию и редукции NumPy.
import time
import numpy as np
def pairwise_scan(cube: np.ndarray) -> np.ndarray:
grid = np.zeros(cube.shape[:2], dtype=cube.dtype)
for r in range(cube.shape[0]):
for c in range(cube.shape[1]):
kmax = min(cube.shape[0] - r, cube.shape[1] - c)
for k in range(kmax):
grid[r, c] += cube[r+k, c+k, k]
return grid
def diagonal_pack(cube: np.ndarray) -> np.ndarray:
h, w = hw = cube.shape[:2]
grid = np.zeros(hw, dtype=cube.dtype)
for seg in range(1, 1 + w):
idx_mat = np.arange(w - seg + 1)[:, np.newaxis] + np.arange(seg)
tail = range(w - seg, w)
head = range(seg)
grid[w - seg, range(1 + w - seg)] = cube[
tail, idx_mat, head,
].sum(axis=1)
grid[range(w - seg), w - seg] = cube[
idx_mat[:-1, :], tail, head,
].sum(axis=1)
return grid
def diag_trace(cube: np.ndarray) -> np.ndarray:
N = cube.shape[0]
grid = np.zeros((N, N))
for r in range(N):
for c in range(N):
u = cube[r:, c:, :]
grid[r, c] = np.trace([u[k, k] for k in range(min(u.shape))])
return grid
def run_check() -> None:
rng = np.random.default_rng(seed=0)
cube = rng.integers(low=0, high=10, size=(12, 12, 12))
g1 = pairwise_scan(cube)
g2 = diagonal_pack(cube)
g3 = diag_trace(cube)
assert np.array_equal(g1, g2)
assert np.array_equal(g1, g3.astype(cube.dtype))
def timeit_run() -> None:
rng = np.random.default_rng(seed=0)
cube = rng.integers(low=0, high=99, size=(100, 100, 100))
for fn in (diagonal_pack, diag_trace):
t0 = time.perf_counter()
fn(cube)
t1 = time.perf_counter()
print(f'{fn.__name__}: {t1-t0:.4f}')
if __name__ == '__main__':
run_check()
timeit_run()
На показательном запуске получились такие времена:
semivectorised: 0.0082
literal_trace: 0.2085
Это показывает, что суммирование по заранее сформированным индексам заметно быстрее, чем буквальное «протаскивание» по диагоналям в данном сценарии.
Если операция повторяется: кешируйте индексы и переиспользуйте буферы
Во многих реальных задачах одно и то же вычисление вызывается многократно при неизменных форме и dtype. Частый шаблон — формировать a умножением заранее посчитанного x на y, приходящий в каждом вызове, с приведением формы к y[..., np.newaxis]. Если размерность тензора и тип данных постоянны, индексацию можно подготовить один раз, а временные буферы — переиспользовать, уменьшая накладные расходы на выделение памяти.
import typing
import numpy as np
class DiagRunner(typing.NamedTuple):
n: int
buf3: np.ndarray
out2: np.ndarray
slots: tuple[tuple, ...]
@classmethod
def from_like(cls, arr: np.ndarray) -> typing.Self:
return cls.make(n=arr.shape[0], dtype=arr.dtype)
@classmethod
def make(cls, n: int, dtype: np.dtype) -> typing.Self:
ir = np.arange(1 + n, dtype=np.int32)
ij = ir[:, np.newaxis] + ir
return cls(
n=n,
buf3=np.empty((n, n, n), dtype=dtype),
out2=np.empty((n, n), dtype=dtype),
slots=tuple([
(
(tail := range(n - seg, n), ij[:n - seg + 1, :seg], head := range(seg)),
(ij[:n - seg, :seg], tail, head),
(n - seg, range(1 + n - seg)),
(range(n - seg), n - seg),
)
for seg in range(1, 1 + n)
]),
)
def apply(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
a, b = self.buf3, self.out2
np.multiply(x, y[..., np.newaxis], out=a)
for av1, av2, bv1, bv2 in self.slots:
b[bv1] = a[av1].sum(axis=1)
b[bv2] = a[av2].sum(axis=1)
return b
# использование
# runner = DiagRunner.from_like(x)
# result = runner.apply(x, y)
Такая стратегия хорошо ложится на кейсы, где вычисление выполняется порядка 10²–10⁴ раз, размеры — около 10² по каждой оси, тип данных — комплексное число с плавающей точкой; при этом x общий для всех вызовов, а y меняется. Кешируя план индексации и переиспользуя рабочие буферы, вы сокращаете накладные расходы на уровне Python и уменьшаете количество выделений памяти на вызов.
Почему это важно
Умение векторизовать нерегулярные редукции — полезный навык для производительных задач в NumPy. Он позволяет оставаться в чистом NumPy, не прибегая к Cython. При многократных запусках вынос решений об индексации и раскладке памяти за пределы горячего пути даёт стабильный прирост скорости и не превращает индексацию в узкое место.
Итоги
Если вам нужно b[i, j] = Σ_k a[i+k, j+k, k], избегайте покомпонентных циклов и срезов. Сформируйте наборы индексов для каждой длины диагонали, суммируйте по последней оси и записывайте результат в соответствующие строки и столбцы выходного массива. Когда одна и та же операция выполняется много раз при неизменных форме и типе, предварительно посчитайте индексацию и переиспользуйте буферы. Проверьте корректность на простой эталонной реализации и замерьте производительность на ваших реальных размерах данных, чтобы подтвердить ожидаемый выигрыш.