luggage baggage

Machine learning, data analysis, web technologies and things around me.

SQLAlchemy + PostgreSQL で Upsert を行う(ユニークキーに重複があるデータのバルクインサート)

こんにちは。吉田弁二郎です。

タイトルにある Upsert とは、Update or Insert のことです。あるテーブルにデータを insert しようとするとき、ユニーク制約が効いているキーが過去データのものと重複して insert できない場合、update に切り替える処理のことですね。データ量が多くキー重複の懸念がある状況で multiple insert したい場合などに有効な手段の一つです。

私はよく PostgreSQL を使うのですが、Python 経由で操作したい時には psycopg2 とか SQLAlchemy を選ぶことが多いです。特に SQLAlchemy はオブジェクト的にデータベースを操作できるため、web アプリを開発する時に使っています。

今回は、PostgreSQL 9.5 以上で利用可能な upsert 構文 ON CONFLICT ... DO UPDATE を SQLAlchemy (>= 1.1) から使う方法について書いていきます。

PostgreSQL での ON CONFLICT ... DO UPDATE

まず PostgreSQL でテーブルを用意してしまいましょう。

-- テーブル定義
create table test_conflict (
id serial
, message varchar(256)
, constraint pk_on_id primary key(id));

-- データの準備
insert into test_conflict (id, message) values
(1, 'msg1'),
(2, 'msg2');

-- 結果
--  id | message
-- ----+---------
--   1 | msg1
--   2 | msg2

idpk_on_id という名前の主キー制約を受けており、重複データは許容されていない状況です。実際、

-- id=1 に insert しようとするとエラー
insert into test_conflict (id, message) values
(1, 'msg1_updated');
-- ERROR:  duplicate key value violates unique constraint "pk_on_id"
-- DETAIL:  Key (id)=(1) already exists.

となってしまいますね。そこで、upsert するようにしましょう。

-- upsert する
insert into test_conflict values
(1, 'msg1_updated')
on conflict on constraint pk_on_id
do update set message=excluded.message;
-- INSERT 0 1

-- 結果
--  id |   message
-- ----+--------------
--  2 | msg2
--  1 | msg1_updated

確かに id=1 のレコードの message が更新されていますね。on constraint で指定しているのはユニーク制約の効いているキーの名前です。また、do update set message=excluded.messageexcluded は、いま insert しようとしているレコードを指すエイリアスです。このエイリアスを使わないで、update する値を固定値で与えておくこともできます。

これで PostgreSQL では upsert ができるようになったので、次は SQLAlchemy で同じことをやってみましょう。

SQLAlchemy での ON CONFLICT ... DO UPDATE

まずはコードから。upsert するテーブルは上で用意したものをそのまま使います。

from sqlalchemy import create_engine, MetaData
from sqlalchemy.dialects.postgresql import insert  # on_conflict_do_update が使える


# テーブルの取得
engine = create_engine(db_url)
meta = MetaData(bind=engine)
meta.reflect()
table = meta.tables["test_conflict"]

table.select().execute().fetchall()
# [(2, 'msg2'), (1, 'msg1_updated')]

# on_conflict_do_update の挙動を設定する
# set_ で定義しているのが、PostgreSQL での 'do update set ' 以降の部分にあたる
insert_stmt = insert(table)
set_ = dict(id=insert_stmt.excluded.id, message=insert_stmt.excluded.message)
insert_stmt = insert_stmt.on_conflict_do_update(index_elements=['id'],
                                                set_=set_)
# upsert の実行
with engine.connect() as conn:
    values = [dict(id=1, message='msg1_updated2'),
              dict(id=3, message='msg3'),
              dict(id=3, message='msg3_updated')]
    conn.execute(insert_stmt, values)

table.select().execute().fetchall()
# [(2, 'msg2'), (1, 'msg1_updated2'), (3, 'msg3_updated')]

ポイントは sqlalchemy.dialects.postgresql で定義されている insert を使う部分ですね。この insert には on_conflict_do_update メソッドが実装されており、SQL でやったのと同じことができるようになっています。

細かな点はコードにコメントとして書いた通りで、set_ で使っている insert_stmt.exclude が insert しようとしているレコードに対するエイリアスになっています。また、制約の指定方法は他にも選択肢があるので、適宜ドキュメントを参照していただくとよいと思います。

雑記

なんでこの記事を書いたかというと、クローラを実装する際に複数のレコードを一度に insert する必要があったのですが、その際に前処理として過去の登録済みデータとの重複を全件検索する形でチェックしエラーハンドリングしているのをやめたかったからなんです。時間が経つほど検索対象のデータが増える状況になっていたので。結構同じ課題を抱えている人も多そうに思うのですが、意外とピンポイントな情報が出てこなかったので記事化しました。

■ 動作確認環境

  • PostgreSQL: 9.5.6
  • Python: 3.6.1
  • SQLAlchemy: 1.1.10

■ 参考資料