2025, Nov 25 06:02

Эффективное суммирование «рваной» диагонали в 3D NumPy

Как посчитать b[i,j]=Σ a[i+k,j+k,k] без циклов: быстрый способ в NumPy через векторизацию и предвычисленные индексы, кеширование буферов для повторных запусков.

Суммировать «рваную» диагональ в 3D-массиве кажется простой задачей, пока не начинаешь векторизовать её. Пусть задан кубический тензор a формы N×N×N; нужно получить b так, чтобы b[i, j] = Σ_k a[i+k, j+k, k]. Прямолинейное решение — двойной цикл с вычислениями внутри через einsum — работает, но тормозит. Ниже — практичный способ сделать это эффективно в NumPy и вариант ускорения для многократных запусков с неизменными формой и dtype.

Базовый вариант: прямой подход

Фрагмент ниже демонстрирует «наивную» реализацию. Она корректна, однако вложенные циклы и постоянные срезы обходятся дорого.

import numpy as np

N = 10
cube = np.random.rand(N, N, N)

grid = np.zeros((N, N))
for r in range(N):
    for c in range(N):
        span = np.maximum(r, c)
        grid[r, c] = np.einsum('iii', cube[r:N-span+r, c:N-span+c, :N-span])

Почему это медленно

Суммирование идёт не по прямоугольному блоку: для каждой пары (i, j) нужно пройти диагональ переменной длины, зависящей и от i, и от j. Такая «рваная» структура плохо поддаётся прямому broadcasting. Итерация по элементам и срезы внутри цикла только наращивают издержки.

Быстрое вычисление через формирование индексов

Более быстрый подход — заранее построить индексы, которые выбирают все нужные диагональные отрезки для каждой длины, а затем выполнить суммирование по нужной оси. Так мы избавляемся от покомпонентных накладных расходов Python и используем векторизованную индексацию и редукции NumPy.

import time
import numpy as np

def pairwise_scan(cube: np.ndarray) -> np.ndarray:
    grid = np.zeros(cube.shape[:2], dtype=cube.dtype)
    for r in range(cube.shape[0]):
        for c in range(cube.shape[1]):
            kmax = min(cube.shape[0] - r, cube.shape[1] - c)
            for k in range(kmax):
                grid[r, c] += cube[r+k, c+k, k]
    return grid


def diagonal_pack(cube: np.ndarray) -> np.ndarray:
    h, w = hw = cube.shape[:2]
    grid = np.zeros(hw, dtype=cube.dtype)

    for seg in range(1, 1 + w):
        idx_mat = np.arange(w - seg + 1)[:, np.newaxis] + np.arange(seg)
        tail = range(w - seg, w)
        head = range(seg)

        grid[w - seg, range(1 + w - seg)] = cube[
            tail, idx_mat, head,
        ].sum(axis=1)

        grid[range(w - seg), w - seg] = cube[
            idx_mat[:-1, :], tail, head,
        ].sum(axis=1)

    return grid


def diag_trace(cube: np.ndarray) -> np.ndarray:
    N = cube.shape[0]
    grid = np.zeros((N, N))
    for r in range(N):
        for c in range(N):
            u = cube[r:, c:, :]
            grid[r, c] = np.trace([u[k, k] for k in range(min(u.shape))])
    return grid


def run_check() -> None:
    rng = np.random.default_rng(seed=0)
    cube = rng.integers(low=0, high=10, size=(12, 12, 12))
    g1 = pairwise_scan(cube)
    g2 = diagonal_pack(cube)
    g3 = diag_trace(cube)
    assert np.array_equal(g1, g2)
    assert np.array_equal(g1, g3.astype(cube.dtype))


def timeit_run() -> None:
    rng = np.random.default_rng(seed=0)
    cube = rng.integers(low=0, high=99, size=(100, 100, 100))
    for fn in (diagonal_pack, diag_trace):
        t0 = time.perf_counter()
        fn(cube)
        t1 = time.perf_counter()
        print(f'{fn.__name__}: {t1-t0:.4f}')

if __name__ == '__main__':
    run_check()
    timeit_run()

На показательном запуске получились такие времена:

semivectorised: 0.0082
literal_trace: 0.2085

Это показывает, что суммирование по заранее сформированным индексам заметно быстрее, чем буквальное «протаскивание» по диагоналям в данном сценарии.

Если операция повторяется: кешируйте индексы и переиспользуйте буферы

Во многих реальных задачах одно и то же вычисление вызывается многократно при неизменных форме и dtype. Частый шаблон — формировать a умножением заранее посчитанного x на y, приходящий в каждом вызове, с приведением формы к y[..., np.newaxis]. Если размерность тензора и тип данных постоянны, индексацию можно подготовить один раз, а временные буферы — переиспользовать, уменьшая накладные расходы на выделение памяти.

import typing
import numpy as np

class DiagRunner(typing.NamedTuple):
    n: int
    buf3: np.ndarray
    out2: np.ndarray
    slots: tuple[tuple, ...]

    @classmethod
    def from_like(cls, arr: np.ndarray) -> typing.Self:
        return cls.make(n=arr.shape[0], dtype=arr.dtype)

    @classmethod
    def make(cls, n: int, dtype: np.dtype) -> typing.Self:
        ir = np.arange(1 + n, dtype=np.int32)
        ij = ir[:, np.newaxis] + ir

        return cls(
            n=n,
            buf3=np.empty((n, n, n), dtype=dtype),
            out2=np.empty((n, n), dtype=dtype),
            slots=tuple([
                (
                    (tail := range(n - seg, n), ij[:n - seg + 1, :seg], head := range(seg)),
                    (ij[:n - seg, :seg], tail, head),
                    (n - seg, range(1 + n - seg)),
                    (range(n - seg), n - seg),
                )
                for seg in range(1, 1 + n)
            ]),
        )

    def apply(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
        a, b = self.buf3, self.out2
        np.multiply(x, y[..., np.newaxis], out=a)
        for av1, av2, bv1, bv2 in self.slots:
            b[bv1] = a[av1].sum(axis=1)
            b[bv2] = a[av2].sum(axis=1)
        return b

# использование
# runner = DiagRunner.from_like(x)
# result = runner.apply(x, y)

Такая стратегия хорошо ложится на кейсы, где вычисление выполняется порядка 10²–10⁴ раз, размеры — около 10² по каждой оси, тип данных — комплексное число с плавающей точкой; при этом x общий для всех вызовов, а y меняется. Кешируя план индексации и переиспользуя рабочие буферы, вы сокращаете накладные расходы на уровне Python и уменьшаете количество выделений памяти на вызов.

Почему это важно

Умение векторизовать нерегулярные редукции — полезный навык для производительных задач в NumPy. Он позволяет оставаться в чистом NumPy, не прибегая к Cython. При многократных запусках вынос решений об индексации и раскладке памяти за пределы горячего пути даёт стабильный прирост скорости и не превращает индексацию в узкое место.

Итоги

Если вам нужно b[i, j] = Σ_k a[i+k, j+k, k], избегайте покомпонентных циклов и срезов. Сформируйте наборы индексов для каждой длины диагонали, суммируйте по последней оси и записывайте результат в соответствующие строки и столбцы выходного массива. Когда одна и та же операция выполняется много раз при неизменных форме и типе, предварительно посчитайте индексацию и переиспользуйте буферы. Проверьте корректность на простой эталонной реализации и замерьте производительность на ваших реальных размерах данных, чтобы подтвердить ожидаемый выигрыш.