2025, Sep 23 10:01

Фикс ошибки сериализации value state в PySpark Structured Streaming

Как исправить ошибку сериализации value state в PySpark Structured Streaming: сбой в StatefulProcessor устраняется переносом массивов в два list state.

При создании прототипа потокового обнаружения мошенничества в реальном времени на PySpark Structured Streaming с использованием StatefulProcessor и transformWithStateInPandas может всплыть тонкая проблема сериализации состояния сразу после первого микропакета. Инициализация проходит нормально, первый пакет обрабатывается, но на втором выполнение падает: при чтении состояния выбрасывается PythonException, указывающий на PySparkRuntimeError и невозможность проинспектировать JavaBean с сообщением «wrong number of arguments». Сбой происходит при чтении сохранённого value state, в котором хранятся массивы, из‑за чего конвейер обрывается на ходу.

Воспроизводимый пример

Следующая программа демонстрирует проблему. Она сохраняет два массива во value state и обновляет их по ключу по мере поступления новых микропакетов. Общая логика стрима неизменна; для ясности отличаются только названия.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, unix_timestamp
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    TimestampType,
    ArrayType,
    LongType,
)
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
import pandas as pd
class AccumProc(StatefulProcessor):
    def init(self, ctrl: StatefulProcessorHandle):
        self._bundle = ctrl.getValueState(
            "bundle",
            StructType([
                StructField("places", ArrayType(StringType())),
                StructField("ticks", ArrayType(LongType())),
            ])
        )
    def handleInputRows(self, k, batches, timers):
        if not self._bundle.exists():
            snapshot = {"places": [], "ticks": []}
        else:
            snapshot = {"places": self._bundle.get()[0], "ticks": self._bundle.get()[1]}
        inc_places = []
        inc_ticks = []
        for frame in batches:
            inc_places.extend(frame["location"].tolist())
            inc_ticks.extend(frame["unix_timestamp"].tolist())
        snapshot["places"].extend(inc_places)
        snapshot["ticks"].extend(inc_ticks)
        self._bundle.update((snapshot["places"], snapshot["ticks"]))
        yield pd.DataFrame()
def run():
    sess = (
        SparkSession.builder.appName("RealTimeFraudDetector")
        .config(
            "spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
        )
        .getOrCreate()
    )
    sess.sparkContext.setLogLevel("WARN")
    tx_schema = StructType(
        [
            StructField("transaction_id", StringType(), True),
            StructField("card_number", StringType(), True),
            StructField("card_holder", StringType(), True),
            StructField("amount", DoubleType(), True),
            StructField("currency", StringType(), True),
            StructField("location", StringType(), True),
            StructField("timestamp", TimestampType(), True),
        ]
    )
    out_schema = StructType(
        [
            StructField("card_number", StringType(), True),
            StructField("is_fraud", StringType(), True),
            StructField("message", StringType(), True),
        ]
    )
    kafka_stream = (
        sess.readStream.format("kafka")
        .option("kafka.bootstrap.servers", "broker:29092")
        .option("subscribe", "transaction")
        .load()
    )
    tx_df = (
        kafka_stream.select(from_json(col("value").cast("string"), tx_schema).alias("payload"))
        .select("payload.*")
        .withColumn("unix_timestamp", unix_timestamp(col("timestamp")))
    )
    enriched_df = (
        tx_df.withWatermark("timestamp", "10 minutes")
        .groupBy("card_number")
        .transformWithStateInPandas(
            AccumProc(), outputStructType=out_schema, outputMode="append", timeMode="None"
        )
    )
    query = enriched_df.writeStream.outputMode("append").format("console").start()
    query.awaitTermination()
if __name__ == "__main__":
    run()

Что именно идёт не так

Сбой происходит на втором микропакете при чтении ранее сохранённого value state. По стеку вызовов видно, что исключение возникает при вызове get на хэндле состояния и всплывает через сериализацию Arrow, заканчиваясь сообщением «couldn’t introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments». По сути, value state на основе StructType, содержащий массивы примитивов, записывается, но при следующем обращении в этой конфигурации не десериализуется.

Практическое решение

Надёжный способ избежать ошибки десериализации — разбить составной value state на два list state. У каждого list state схема из одного поля, и каждая коллекция хранится отдельно. С этим изменением запись и чтение состояния становятся стабильными.

from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
class AccumProc(StatefulProcessor):
    def init(self, ctrl: StatefulProcessorHandle):
        ts_list_schema = StructType([StructField("ts", LongType(), True)])
        loc_list_schema = StructType([StructField("loc", StringType(), True)])
        self._ts_state = ctrl.getListState(stateName="tsState", schema=ts_list_schema)
        self._loc_state = ctrl.getListState(stateName="locState", schema=loc_list_schema)

С раздельными list state процессор может сохранять и читать массивы меток времени и локаций, не натыкаясь на проблемы десериализации Arrow.

Почему это важно

Управление состоянием — опора любой потоковой системы выявления мошенничества, особенно когда нужно сопоставлять события между микропакетами. Если хранилище состояния не умеет надёжно «туда‑обратно» прогонять ваши данные, поток рушится на полпути, и вы теряете непрерывность, критичную для логики обнаружения аномалий. Простые, минималистичные схемы состояния помогают обойти «мины» сериализации и повышают устойчивость конвейера в проде. Кроме того, независимо от этой конкретной ошибки, DataFrame, который возвращает ваш процессор, должен соответствовать объявленному outputStructType; расхождения там приводят к своим сбоям.

Ключевые выводы

Если при чтении value state в PySpark StatefulProcessor вы сталкиваетесь с ошибкой «couldn’t introspect javabean», попробуйте хранить массивы в двух list state с одно-полевыми схемами вместо одного value state на StructType. Так состояние будет корректно сериализоваться и десериализоваться через Arrow. Держите состояние стрима минимальным и строго типизированным и проверяйте, что выдаваемые DataFrame соответствуют объявленной выходной схеме — так Structured Streaming работает без сюрпризов.

Статья основана на вопросе на StackOverflow от Marco Filippozzi и ответе Marco Filippozzi.