2025, Sep 23 10:01
Фикс ошибки сериализации value state в PySpark Structured Streaming
Как исправить ошибку сериализации value state в PySpark Structured Streaming: сбой в StatefulProcessor устраняется переносом массивов в два list state.
При создании прототипа потокового обнаружения мошенничества в реальном времени на PySpark Structured Streaming с использованием StatefulProcessor и transformWithStateInPandas может всплыть тонкая проблема сериализации состояния сразу после первого микропакета. Инициализация проходит нормально, первый пакет обрабатывается, но на втором выполнение падает: при чтении состояния выбрасывается PythonException, указывающий на PySparkRuntimeError и невозможность проинспектировать JavaBean с сообщением «wrong number of arguments». Сбой происходит при чтении сохранённого value state, в котором хранятся массивы, из‑за чего конвейер обрывается на ходу.
Воспроизводимый пример
Следующая программа демонстрирует проблему. Она сохраняет два массива во value state и обновляет их по ключу по мере поступления новых микропакетов. Общая логика стрима неизменна; для ясности отличаются только названия.
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, unix_timestamp
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
ArrayType,
LongType,
)
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
import pandas as pd
class AccumProc(StatefulProcessor):
def init(self, ctrl: StatefulProcessorHandle):
self._bundle = ctrl.getValueState(
"bundle",
StructType([
StructField("places", ArrayType(StringType())),
StructField("ticks", ArrayType(LongType())),
])
)
def handleInputRows(self, k, batches, timers):
if not self._bundle.exists():
snapshot = {"places": [], "ticks": []}
else:
snapshot = {"places": self._bundle.get()[0], "ticks": self._bundle.get()[1]}
inc_places = []
inc_ticks = []
for frame in batches:
inc_places.extend(frame["location"].tolist())
inc_ticks.extend(frame["unix_timestamp"].tolist())
snapshot["places"].extend(inc_places)
snapshot["ticks"].extend(inc_ticks)
self._bundle.update((snapshot["places"], snapshot["ticks"]))
yield pd.DataFrame()
def run():
sess = (
SparkSession.builder.appName("RealTimeFraudDetector")
.config(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)
.getOrCreate()
)
sess.sparkContext.setLogLevel("WARN")
tx_schema = StructType(
[
StructField("transaction_id", StringType(), True),
StructField("card_number", StringType(), True),
StructField("card_holder", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("currency", StringType(), True),
StructField("location", StringType(), True),
StructField("timestamp", TimestampType(), True),
]
)
out_schema = StructType(
[
StructField("card_number", StringType(), True),
StructField("is_fraud", StringType(), True),
StructField("message", StringType(), True),
]
)
kafka_stream = (
sess.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:29092")
.option("subscribe", "transaction")
.load()
)
tx_df = (
kafka_stream.select(from_json(col("value").cast("string"), tx_schema).alias("payload"))
.select("payload.*")
.withColumn("unix_timestamp", unix_timestamp(col("timestamp")))
)
enriched_df = (
tx_df.withWatermark("timestamp", "10 minutes")
.groupBy("card_number")
.transformWithStateInPandas(
AccumProc(), outputStructType=out_schema, outputMode="append", timeMode="None"
)
)
query = enriched_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
if __name__ == "__main__":
run()
Что именно идёт не так
Сбой происходит на втором микропакете при чтении ранее сохранённого value state. По стеку вызовов видно, что исключение возникает при вызове get на хэндле состояния и всплывает через сериализацию Arrow, заканчиваясь сообщением «couldn’t introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments». По сути, value state на основе StructType, содержащий массивы примитивов, записывается, но при следующем обращении в этой конфигурации не десериализуется.
Практическое решение
Надёжный способ избежать ошибки десериализации — разбить составной value state на два list state. У каждого list state схема из одного поля, и каждая коллекция хранится отдельно. С этим изменением запись и чтение состояния становятся стабильными.
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
class AccumProc(StatefulProcessor):
def init(self, ctrl: StatefulProcessorHandle):
ts_list_schema = StructType([StructField("ts", LongType(), True)])
loc_list_schema = StructType([StructField("loc", StringType(), True)])
self._ts_state = ctrl.getListState(stateName="tsState", schema=ts_list_schema)
self._loc_state = ctrl.getListState(stateName="locState", schema=loc_list_schema)
С раздельными list state процессор может сохранять и читать массивы меток времени и локаций, не натыкаясь на проблемы десериализации Arrow.
Почему это важно
Управление состоянием — опора любой потоковой системы выявления мошенничества, особенно когда нужно сопоставлять события между микропакетами. Если хранилище состояния не умеет надёжно «туда‑обратно» прогонять ваши данные, поток рушится на полпути, и вы теряете непрерывность, критичную для логики обнаружения аномалий. Простые, минималистичные схемы состояния помогают обойти «мины» сериализации и повышают устойчивость конвейера в проде. Кроме того, независимо от этой конкретной ошибки, DataFrame, который возвращает ваш процессор, должен соответствовать объявленному outputStructType; расхождения там приводят к своим сбоям.
Ключевые выводы
Если при чтении value state в PySpark StatefulProcessor вы сталкиваетесь с ошибкой «couldn’t introspect javabean», попробуйте хранить массивы в двух list state с одно-полевыми схемами вместо одного value state на StructType. Так состояние будет корректно сериализоваться и десериализоваться через Arrow. Держите состояние стрима минимальным и строго типизированным и проверяйте, что выдаваемые DataFrame соответствуют объявленной выходной схеме — так Structured Streaming работает без сюрпризов.