2025, Nov 29 12:02

Как получить glob-подобные сигнатуры логов с difflib в Python

Показываем, как с помощью Python и difflib кластеризовать строки логов и вывести компактные glob-сигнатуры. Базовые и исчерпывающие методы, кэширование.

Когда вы группируете похожие сообщения журнала, одного показательного примера из бакета обычно недостаточно. Чаще всего нужна компактная сигнатура, которая сохраняет общую часть текста, а переменные фрагменты заменяет подстановочным символом. Иначе говоря, шаблон наподобие glob, который совпадает с каждым сообщением в бакете и при этом максимально удерживает общие символы.

Пример результата

Рассмотрим эти строки журнала, собранные в один бакет:

Error: [MODULE_FOO] File foo has 5 unsolved dependencies and 4 errors.
Error: [MODULE_BLA] Files bar and yaz have 123 unsolved dependencies.
Error: [MODULE_123] File baz has 45 unsolved dependencies and 3 warnings.

Полезный общий шаблон мог бы выглядеть так:

Error: [MODULE_*] File* * ha* * unsolved dependencies*.

Стойкие фрагменты остаются как есть, различия схлопываются в звездочки.

Сначала кластеризация

Если вы ещё на этапе объединения похожих строк, практичный вариант — использовать difflib как эвристику сходства. Ниже компактная процедура, которая группирует строки по быстрому порогу схожести.

import difflib

def group_lines(items: list[str], cutoff: float = 0.9) -> list[list[str]]:
    buckets: list[list[str]] = []
    bidx = 0
    while len(items) > 0:
        pivot = items.pop()
        buckets.append([pivot])
        drop: list[int] = []
        for j, peer in enumerate(items):
            if difflib.SequenceMatcher(None, pivot, peer).quick_ratio() > cutoff:
                buckets[bidx].append(peer)
                drop.append(j)
        items = [v for j, v in enumerate(items) if j not in drop]
        bidx += 1
    return buckets

Когда бакет сформирован, остаётся решить, как получить glob-подобную сигнатуру для всех его строк.

Как получить общий шаблон с помощью difflib

Базовая идея проста. Берём строку-зерно в качестве текущей сигнатуры. Для каждой другой строки в бакете вычисляем diff-opcodes из difflib и заменяем в текущей сигнатуре каждый неравный участок на звёздочку. Эту свёртку выполняем по всему списку. Обратный порядок прохода по opcodes обеспечивает корректность индексов срезов при редактировании строки.

Базовый подход

Эта версия намеренно простая. Она быстрая и на практике обычно достаточна, но может упустить наилучший шаблон в некоторых пограничных случаях, таких как ["ab", "ba", "cb", "bc"], где результатом будет "*" вместо "*b*".

import difflib

def derive_glob(shards: list[str]) -> str:
    if len(shards) == 0:
        return ""
    signature = shards[0]
    if len(shards) == 1:
        return signature
    for probe in shards[1:]:
        for op, a1, a2, _b1, _b2 in reversed(
            difflib.SequenceMatcher(None, signature, probe).get_opcodes()
        ):
            if op != "equal":
                signature = signature[:a1] + "*" + signature[a2:]
    return signature

Применение к примеру выше даёт:

Error: [MODULE_*] File* * ha* * unsolved dependencies*.

Более исчерпывающий вариант

Результат предыдущего метода зависит от того, какую строку вы выбрали в качестве зерна. Чтобы повысить устойчивость, выполните ту же свёртку несколько раз, каждый раз начиная с другого элемента, и выберите самый длинный из полученных шаблонов. Это сглаживает крайние случаи ценой дополнительных вычислений.

import difflib

def derive_glob_exhaustive(shards: list[str]) -> str:
    if len(shards) == 0:
        return ""
    if len(shards) == 1:
        return shards[0]
    candidates: set[str] = set()
    for k, seed in enumerate(shards):
        candidates.add(derive_glob([seed] + shards[:k] + shards[k + 1 :]))
    return sorted(candidates, key=len, reverse=True)[0]

Этот подход медленнее, но исправляет случаи, когда другая стартовая строка даёт более выразительный шаблон.

Масштабирование: дедупликация и кэширование

Для крупных бакетов помогают две очевидные оптимизации. Во‑первых, удаляйте дублирующиеся строки до обработки. Во‑вторых, кэшируйте промежуточные результаты свёртки для пар (текущая сигнатура, следующая строка). В одном тесте примерно на тысяче входов это сократило время выполнения с около 17 секунд до примерно 0.20 секунды.

import difflib
import copy

_smemo: dict[tuple[str, str], str] = {}

def _derive_glob_cached_inner(seq: list[str]) -> str:
    signature = seq[0]
    for probe in seq[1:]:
        if (signature, probe) not in _smemo:
            before = copy.copy(signature)
            for op, a1, a2, _b1, _b2 in reversed(
                difflib.SequenceMatcher(None, signature, probe).get_opcodes()
            ):
                if op != "equal":
                    signature = signature[:a1] + "*" + signature[a2:]
            _smemo[(before, probe)] = signature
        else:
            signature = _smemo[(signature, probe)]
    return signature

def derive_glob_exhaustive_cached(shards: list[str]) -> str:
    uniq = list(set(shards))
    if len(uniq) == 0:
        return ""
    if len(uniq) == 1:
        return uniq[0]
    outcomes: list[str] = []
    for k, seed in enumerate(uniq):
        ordered = [seed] + uniq[:k] + uniq[k + 1 :]
        outcomes.append(_derive_glob_cached_inner(ordered))
    _smemo.clear()
    return max(outcomes, key=len)

Что улавливают шаблоны — и чего они не улавливают

Генерация шаблонов работает на уровне символов. Это значит, что она может совмещать части слов, а не целые токены — что здесь как раз полезно. Например, "have" и "has" сходятся к "ha*". Также не предполагается, что общие фрагменты будут стоять в одинаковых позициях, что важно, когда ваши шаблоны логов выровнены не идеально. В хитрых входах вроде ["ab", "ba", "cb", "bc"] выбор зерна имеет значение для базовой версии, и потому исчерпывающий вариант способен выдавать более сильный шаблон, например "*b*" вместо универсального "*".

Зачем это нужно для анализа логов

Читаемая, компактная сигнатура делает бакеты понятными с первого взгляда. Она избегает бесполезной "*", которая формально подходит ко всему, но ничего не объясняет. Сохранение общих подстрок дословно при схлопывании остального позволяет сквозь идентификаторы, счета и имена файлов увидеть устойчивый шаблон за шумом.

Соберём всё вместе

Минимальная демонстрация сквозного процесса на тех же примерных строках.

lines = [
    "Error: [MODULE_FOO] File foo has 5 unsolved dependencies and 4 errors.",
    "Error: [MODULE_BLA] Files bar and yaz have 123 unsolved dependencies.",
    "Error: [MODULE_123] File baz has 45 unsolved dependencies and 3 warnings.",
]

pattern_simple = derive_glob(lines)
pattern_stronger = derive_glob_exhaustive_cached(lines)

print(pattern_simple)
print(pattern_stronger)

Оба варианта выдают шаблон, который сохраняет инвариантный текст, а переменные сегменты заменяет звёздочками.

Итоги

Когда нужна glob-подобная сигнатура для бакета лог-сообщений, difflib даёт ровно те строительные блоки, которые нужны. Быстрый однопроходный вариант часто достаточен. Если сталкиваетесь с краевыми случаями, где стартовая строка влияет на результат, переходите к исчерпывающему варианту. Если упираетесь во время, сначала уберите дубликаты и кэшируйте шаги свёртки.

Такой подход сохраняет именно то, что стабильно в бакете, и абстрагирует остальное — ровно то, что нужно для удобной диагностики логов.