Postgres版本:11
程序代码:
CREATE OR REPLACE PROCEDURE public.prc_insert_proc(table_name character varying)
LANGUAGE plpgsql
AS $procedure$ declare StartTime timestamptz;
EndTime timestamptz;
Delta double precision;
err_context text;
tab_partition RECORD;
v_tab_partition varchar;
begin
for tab_partition in (SELECT
child.relname AS child
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname=table_name) loop
v_tab_partition = tab_partition.child;
begin
execute 'insert
into
test_tab (id,
report_date,
effective_asof_date,
effective_until_date,
active_flag) select id,
to_date(report_date,''YYMMDD'') report_date,
now(),
''31-Dec-2200'',
''Y''
from '||
tab_partition.child ||' on
conflict (id) do update set report_date=excluded.report_date;';
commit;
end;
end loop;
exception
when others then
GET STACKED DIAGNOSTICS err_context = PG_EXCEPTION_CONTEXT;
RAISE INFO 'Error Name:%',SQLERRM;
RAISE INFO 'Error State:%', SQLSTATE;
RAISE INFO 'Error Context:%', err_context;
end;
$procedure$;
我通过创建sqlalchemy引擎调用上面的过程,然后调用此过程 .
def load_test(session, table, log): # pragma: no cover
try:
sql = "call prc_insert_proc('" + \
table + "');"
session.execute(text(sql).execution_options(autocommit=False))
session.commit()
except ErrorBase as e:
log.error(f"{str(e)}")
raise
它引发了以下错误 . 会话是使用sessionmaker和create_engine创建的 .
engine = create_engine(config.connection_string, echo=False)
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
无效的交易终止
我对sqlalchemy和PostgreSQL都很陌生 . 表的每个分区都有大约一百万条记录,我想在每次插入后提交 . 其他选项是获取python脚本中的分区列表,并在过程中添加分区名称参数 . 循环遍历python中的过程并在每次调用后提交 . 但是,当我在PostgreSQL 11中读到它时,我们可以在程序内部提交,而不是使用它 .