2025, Dec 23 09:01
Эффективная загрузка DataFrame в PostgreSQL: COPY и ON CONFLICT
Почему df.to_sql с chunksize тормозит и как ускорить импорт pandas DataFrame в PostgreSQL: COPY FROM, staging-таблица, INSERT ON CONFLICT, индексы — быстрее.
Если вы регулярно загружаете DataFrame из pandas в PostgreSQL и вынуждены дробить операции, чтобы поддерживать стареющий сервер на плаву, вы не одиноки. Частый подход — разбивать отправку df.to_sql() на порции и проверять существующие строки с помощью огромной цепочки WHERE по столбцу первичного ключа. Это кажется надежным, но зря тратит процессорное время. PostgreSQL умеет намного эффективнее справляться с массовой загрузкой и разрешением конфликтов, если опираться на правильные механизмы.
Как выглядит проблемный шаблон
Один из частых узких мест — предварительная проверка наличия строк через постоянно растущее дизъюнктивное условие по первичному ключу типа timestamp. Это выглядит так:
SELECT ts_col FROM audit_log
WHERE ts_col = ts_row1_df
OR ts_col = ts_row2_df
...
OR ts_col = ts_row3500_df
Вы можете пытаться снизить нагрузку на CPU при записи, разбивая загрузки df.to_sql() на части, например по 150 строк. Такое дробление кажется логичным, но для PostgreSQL это далеко не самый действенный рычаг.
Почему это мешает производительности
Дробя проверки существования на множество мелких запросов, вы заставляете базу повторять работу. Каждая порция запускает новый скан и оценку длинной цепочки OR, добавляя накладные расходы без выгоды для логики. СУБД умеет определять «уже есть или новое» напрямую, если задать корректный ключ и позволить движку его соблюдать. Для умеренных объемов — несколько тысяч строк — PostgreSQL очень быстро разрешает конфликты, если передавать данные набором, а не выполнять построчные проверки из Python.
Со стороны записи дробление df.to_sql() удобно, но это не лучший способ снизить загрузку CPU на сервере. В PostgreSQL есть нативный механизм пакетной загрузки, который и быстрее, и экономичнее для больших вставок.
Лучший подход: COPY и передать обработку конфликтов самой базе
Рекомендуемый путь загрузки — COPY. Он переносит данные в PostgreSQL с минимальными накладными расходами, при условии что ваш DataFrame очищен, а порядок и имена столбцов совпадают с целевой таблицей.
COPY core_events FROM '/app/batch_records.csv' csv header;
Подготовьте DataFrame в Python, согласуйте схему и сохраните CSV, который сможет прочитать COPY. Валидацию и очистку данных выполняйте на DataFrame до загрузки.
Потоковая загрузка через psycopg
Строки можно передавать программно через интерфейс copy в psycopg. Как задокументировано:
С помощью операции copy вы можете загружать данные в базу из любого итерируемого объекта Python (списка кортежей или любого итерируемого набора последовательностей): значения Python будут адаптированы так же, как при обычных запросах. Для выполнения такой операции используйте COPY ... FROM STDIN с Cursor.copy() и вызывайте write_row() у возвращаемого объекта внутри блока with. При выходе из блока операция завершается. Если внутри блока возникнет исключение, операция прерывается, а уже вставленные записи отбрасываются.
payloads = [(10, 20, "hello"), (40, None, "world")]
with db_cur.copy("COPY demo_sink (c1, c2, c3) FROM STDIN") as cp:
for item in payloads:
cp.write_row(item)
Если вы работаете с psycopg2, для COPY используйте copy_expert. Практичный способ передавать DataFrame потоком — сериализовать его в памяти в CSV и передать буфер в COPY:
from io import StringIO
import csv
csv_buf = StringIO()
writer = csv.writer(csv_buf)
writer.writerows(df_values)
csv_buf.seek(0)
cur.copy_expert(sql=copy_sql, file=csv_buf)
Итерироваться по большому DataFrame в Python и вставлять построчно не рекомендуется: это тоже займет много процессорного времени.
Обработка дубликатов: промежуточная таблица и ON CONFLICT
Если входная партия может пересекаться с уже существующими строками, сначала загрузите данные во временную промежуточную таблицу, затем выполните одну наборную вставку с ON CONFLICT, чтобы апсертом обновить только затронутые строки. Так вы избавитесь от дорогих проверок «существует ли?» из Python.
sql_block = """
CREATE TEMPORARY TABLE stage_load (
ts_key ,
full_name,
details
);
COPY stage_load FROM STDIN With CSV;
INSERT INTO prod_main(ts_key, full_name, details)
SELECT *
FROM stage_load ON conflict (ts_key)
DO update set full_name=EXCLUDED.full_name, details=EXCLUDED.details;
DROP TABLE stage_load;
"""
Помните, что ON CONFLICT опирается на индексы. Если нужно обнаруживать конфликты, соответствующий индекс должен быть включен для этой вставки.
Когда индексы тормозят и когда их можно отключать
При массовых вставках основную стоимость могут составлять индексы, потому что база обновляет каждую индексную запись для каждой строки. Если гарантировано, что данные из staging не конфликтуют с приемником, иногда быстрее временно отключить индексы на время вставки и затем их перестроить. Как резюмировал один из практиков:
Когда вы запускаете большой запрос (insert/update) по огромной таблице с несколькими индексами, эти индексы могут сильно замедлить выполнение. В PostgreSQL часто гораздо быстрее отключить индексы перед запуском запроса и затем переиндексировать всю таблицу.
И из отчета из практики:
Проблема была в индексах. В таблице history было 160M проиндексированных строк. При COPY FROM или INSERT INTO .. SELECT время уходило не на вставку строк, а на обновление индексов. Когда я отключил индексы, импортировал 3M строк за 10 секунд. Теперь мне нужно найти более быстрый способ переиндексации большой таблицы.
Важно учитывать компромисс. Нельзя использовать INSERT ... ON CONFLICT, пока индексы отключены, потому что для обнаружения конфликтов они необходимы. Включайте индексацию, когда выполняете апсерт из промежуточной таблицы в основную.
Ответы на два ключевых вопроса
Во‑первых, df.to_sql() с chunksize — не лучший способ снизить нагрузку на CPU на стороне PostgreSQL. Для загрузки предпочтительнее COPY FROM; это рекомендуемый путь для быстрого ввода больших наборов данных при условии очистки DataFrame и совпадения имен и порядка столбцов с таблицей.
Во‑вторых, разбиение огромного условия WHERE на несколько мелких запросов обычно только увеличивает работу. Вместо проверок существования из Python доверьте обработку конфликтов базе. Определите корректные первичные/альтернативные ключи и позвольте PostgreSQL обеспечивать их с ON CONFLICT. Если конфликты возможны — используйте промежуточную таблицу и INSERT ... ON CONFLICT DO UPDATE. Если конфликтов нет — можно временно отключить индексы на время массовой вставки и затем переиндексировать таблицу, чтобы снизить накладные расходы.
Почему это важно
Переход от построчной логики на стороне Python и множества мелких запросов к наборным операциям помогает держать загрузку CPU под контролем и снижает риск зависаний на скромном железе. Загрузка через COPY создана для высокой пропускной способности. Обработка конфликтов с ON CONFLICT — сильная сторона PostgreSQL при корректно определенных ключах. Эти практики надежнее, понятнее и проще в эксплуатации.
Практическое резюме
Сначала подготовьте и очистите DataFrame, точно сопоставив его схеме PostgreSQL. Загружайте через COPY FROM — это почти мгновенная вставка. Если нужны апсерты, копируйте во временную таблицу и выполняйте один INSERT ... ON CONFLICT DO UPDATE в основную таблицу при включенном необходимом индексе. Если партия гарантированно без конфликтов, ускорить вставку поможет временное отключение индексов с последующей переиндексацией. Избегайте циклов Python по большим DataFrame и откажитесь от точечных проверок существования на стороне клиента: позвольте PostgreSQL выполнить эту логику за один проход.