engine = db.get_engine(db.app,'MY_DATABASE')df = pd.DataFrame({"ID": [1,2,3],"value": [100,200,300],"date": [date.today(),date.today(),date.today()]})temp_table = db.table('#temp_table',db.Column('ID',db.Integer),db.Column('value',db.Column('date',db.DateTime))temp_table.create(engine)df.to_sql(name='tempdb.dbo.#temp_table',con=engine,if_exists='append',index=False)query = db.session.query(Existingtable.ID).join(temp_table,temp_table.c.ID == Existingtable.ID)out_df = pd.read_sql(query.statement,engine)temp_table.drop(engine)return out_df.to_dict('records')
这不会返回任何结果,因为to_sql的插入语句没有运行(我认为这是因为它们是使用sp_prepexec运行的,但我并不完全确定).
然后我尝试写出SQL语句(CREATE table #temp_table …,INSERT INTO #temp_table …,SELECT [ID] FROM …)然后运行pd.read_sql(query,engine).我收到错误消息
This result object does not return rows. It has been closed automatically.
我想这是因为声明不仅仅是SELECT?
我该如何解决这个问题(任何一个解决方案都可以工作,虽然第一个会更好,因为它避免了硬编码的sql).要清楚,我无法修改现有数据库中的模式 – 它是供应商数据库.
解决方法 如果要插入临时表中的记录数量很小/中等,一种可能性是使用文字子查询或值CTE而不是创建临时表.# MODELclass Existingtable(Base): __tablename__ = 'existing_table' ID = sa.Column(sa.Integer,primary_key=True) name = sa.Column(sa.String) # ...
假设还将以下数据插入临时表:
# This data retrIEved from another database and used for filteringrows = [ (1,100,datetime.date(2017,1,1)),(3,300,3,(5,500,5,]
创建包含该数据的CTE或子查询:
stmts = [ # @NOTE: optimization to reduce the size of the statement: # make type cast only for first row,for other rows DB engine will infer sa.select([ sa.cast(sa.literal(i),sa.Integer).label("ID"),sa.cast(sa.literal(v),sa.Integer).label("value"),sa.cast(sa.literal(d),sa.DateTime).label("date"),]) if IDx == 0 else sa.select([sa.literal(i),sa.literal(v),sa.literal(d)]) # no type cast for IDx,(i,v,d) in enumerate(rows)]subquery = sa.union_all(*stmts)# Choose one option below.# I personally prefer B because one Could reuse the CTE multiple times in the same query# subquery = subquery.alias("temp_table") # option Asubquery = subquery.cte(name="temp_table") # option B
使用所需的连接和过滤器创建最终查询:
query = ( session .query(Existingtable.ID) .join(subquery,subquery.c.ID == Existingtable.ID) # .filter(subquery.c.date >= XXX_DATE))# TEMP: Test result outputfor res in query: print(res)
最后,获取pandas数据框:
out_df = pd.read_sql(query.statement,engine)result = out_df.to_dict('records')总结
以上是内存溢出为你收集整理的python – 将临时表与SQLAlchemy一起使用全部内容,希望文章能够帮你解决python – 将临时表与SQLAlchemy一起使用所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)