SQLAlchemy + PostgreSQL で Upsert を行う(ユニークキーに重複があるデータのバルクインサート)
こんにちは。吉田弁二郎です。
タイトルにある Upsert とは、Update or Insert のことです。あるテーブルにデータを insert しようとするとき、ユニーク制約が効いているキーが過去データのものと重複して insert できない場合、update に切り替える処理のことですね。データ量が多くキー重複の懸念がある状況で multiple insert したい場合などに有効な手段の一つです。
私はよく PostgreSQL を使うのですが、Python 経由で操作したい時には psycopg2 とか SQLAlchemy を選ぶことが多いです。特に SQLAlchemy はオブジェクト的にデータベースを操作できるため、web アプリを開発する時に使っています。
今回は、PostgreSQL 9.5 以上で利用可能な upsert 構文 ON CONFLICT ... DO UPDATE
を SQLAlchemy (>= 1.1) から使う方法について書いていきます。
PostgreSQL での ON CONFLICT ... DO UPDATE
まず PostgreSQL でテーブルを用意してしまいましょう。
-- テーブル定義 create table test_conflict ( id serial , message varchar(256) , constraint pk_on_id primary key(id)); -- データの準備 insert into test_conflict (id, message) values (1, 'msg1'), (2, 'msg2'); -- 結果 -- id | message -- ----+--------- -- 1 | msg1 -- 2 | msg2
id
が pk_on_id
という名前の主キー制約を受けており、重複データは許容されていない状況です。実際、
-- id=1 に insert しようとするとエラー insert into test_conflict (id, message) values (1, 'msg1_updated'); -- ERROR: duplicate key value violates unique constraint "pk_on_id" -- DETAIL: Key (id)=(1) already exists.
となってしまいますね。そこで、upsert するようにしましょう。
-- upsert する insert into test_conflict values (1, 'msg1_updated') on conflict on constraint pk_on_id do update set message=excluded.message; -- INSERT 0 1 -- 結果 -- id | message -- ----+-------------- -- 2 | msg2 -- 1 | msg1_updated
確かに id=1 のレコードの message が更新されていますね。on constraint
で指定しているのはユニーク制約の効いているキーの名前です。また、do update set message=excluded.message
の excluded
は、いま insert しようとしているレコードを指すエイリアスです。このエイリアスを使わないで、update
する値を固定値で与えておくこともできます。
これで PostgreSQL では upsert ができるようになったので、次は SQLAlchemy で同じことをやってみましょう。
SQLAlchemy での ON CONFLICT ... DO UPDATE
まずはコードから。upsert するテーブルは上で用意したものをそのまま使います。
from sqlalchemy import create_engine, MetaData from sqlalchemy.dialects.postgresql import insert # on_conflict_do_update が使える # テーブルの取得 engine = create_engine(db_url) meta = MetaData(bind=engine) meta.reflect() table = meta.tables["test_conflict"] table.select().execute().fetchall() # [(2, 'msg2'), (1, 'msg1_updated')] # on_conflict_do_update の挙動を設定する # set_ で定義しているのが、PostgreSQL での 'do update set ' 以降の部分にあたる insert_stmt = insert(table) set_ = dict(id=insert_stmt.excluded.id, message=insert_stmt.excluded.message) insert_stmt = insert_stmt.on_conflict_do_update(index_elements=['id'], set_=set_) # upsert の実行 with engine.connect() as conn: values = [dict(id=1, message='msg1_updated2'), dict(id=3, message='msg3'), dict(id=3, message='msg3_updated')] conn.execute(insert_stmt, values) table.select().execute().fetchall() # [(2, 'msg2'), (1, 'msg1_updated2'), (3, 'msg3_updated')]
ポイントは sqlalchemy.dialects.postgresql
で定義されている insert
を使う部分ですね。この insert には on_conflict_do_update
メソッドが実装されており、SQL でやったのと同じことができるようになっています。
細かな点はコードにコメントとして書いた通りで、set_
で使っている insert_stmt.exclude
が insert しようとしているレコードに対するエイリアスになっています。また、制約の指定方法は他にも選択肢があるので、適宜ドキュメントを参照していただくとよいと思います。
雑記
なんでこの記事を書いたかというと、クローラを実装する際に複数のレコードを一度に insert する必要があったのですが、その際に前処理として過去の登録済みデータとの重複を全件検索する形でチェックしエラーハンドリングしているのをやめたかったからなんです。時間が経つほど検索対象のデータが増える状況になっていたので。結構同じ課題を抱えている人も多そうに思うのですが、意外とピンポイントな情報が出てこなかったので記事化しました。
■ 動作確認環境
- PostgreSQL: 9.5.6
- Python: 3.6.1
- SQLAlchemy: 1.1.10
■ 参考資料
- PostgreSQL での on conflict do update:PostgreSQL: Documentation: 9.5: INSERT
- SQLAlchemy での on conflict do update:PostgreSQL — SQLAlchemy 1.2 Documentation