当前位置:网站首页>SQLAlchemy使用相关
SQLAlchemy使用相关
2022-07-28 05:22:00 【Alex_z0897】
子查询时做分页
sub_query = db.session.query(Table,Table.id).offset((page_no - 1) * page_size).limit(page_size).subquery()
query1 = db.session.query(sub_query, Table4, Table3, Table2) \
.outerjoin(Table2) \
.outerjoin(Table3) \
.outerjoin(Table4) \
SQLAlchemy查询之exists
SELECT *
FROM table1
WHERE EXISTS (SELECT *
FROM (SELECT max(table2.event_type) AS event_type, table2.event_code AS event_code
FROM table2
WHERE table2.`status` = 200 GROUP BY table2.event_code) AS anon_1
WHERE table1.id = anon_1.event_code AND anon_1.event_type = 100 ) ORDER BY table1.create_time DESC
query = db.session.query(Table1).order_by(Table1.create_time.desc())
func_criteria = db.session.query(func.max(Table2.event_type).label('event_type'),
Table2.event_code.label('event_code'))\
.group_by(Table2.event_code).filter(Table2.status == 200).subquery()
query = query.filter(exists().where(Table1.id == func_criteria.c.event_code)\
.where(func_criteria.c.event_type== status))
官网样例
>>> subq = (
... select(func.count(address_table.c.id)).
... where(user_table.c.id == address_table.c.user_id).
... group_by(address_table.c.user_id).
... having(func.count(address_table.c.id) > 1)
... ).exists()
>>> with engine.connect() as conn:
... result = conn.execute(
... select(user_table.c.name).where(subq)
... )
... print(result.all())
sqlalchemy的update手动commit之前通过yield执行远程异步调用时session对象被替换
SQLAlchemy==1.2.18
result = db.session.query(ObjectModel).get(ids.get("id"))
result.status = 2 #此时 session._is_clean() = False
#中间执行一次yield远程调用
#外部请求函数
def onAyncFetch():
req = ...省略其它代码
response = yield async_http.fetch(req)
...省略其它代码
return Return(json_decode(response.body))
print id(result.query.session)
data = yield onAyncFetch(...)
print id(result.query.session) #两次打印内存地址不相同
...省略其它代码
#执行yield调用结束之后 session._is_clean() = True
db.session.commit() #再进行commit的会进行如下代码段,上面修改的status字段无法被commit
上面是一段伪代码.流程大概就这样,下面贴sqlalchemy时commiit的源码
sqlalchemy/orm/session.py
def _prepare_impl(self):
self._assert_active()
if self._parent is None or self.nested:
self.session.dispatch.before_commit(self.session)
stx = self.session.transaction
if stx is not self:
for subtransaction in stx._iterate_self_and_parents(upto=self):
subtransaction.commit()
if not self.session._flushing:
for _flush_guard in range(100):
if self.session._is_clean(): # 判断对象信息是否变更,如果变更则执行flush()
break
self.session.flush()
else:
raise exc.FlushError(
"Over 100 subsequent flushes have occurred within "
"session.commit() - is an after_flush() hook "
"creating new objects?"
)
if self._parent is None and self.session.twophase:
try:
for t in set(self._connections.values()):
t[1].prepare()
except:
with util.safe_reraise():
self.rollback()
self._state = PREPARED
边栏推荐
- 分布式锁-数据库实现
- 小程序开发
- 微信上的小程序店铺怎么做?
- Regular verification rules of wechat applet mobile number
- MYSQL的select语句查询;运算符课后练习
- MySQL trigger
- At the moment of the epidemic, online and offline travelers are trapped. Can the digital collection be released?
- FlinkX安装及使用
- ModuleNotFoundError: No module named ‘pip‘
- 【1】 Introduction to redis
猜你喜欢

Distributed lock redis implementation

幂等性组件

Structured streaming in spark

微信小程序制作模板套用时需要注意什么呢?

There is a problem with MySQL paging

Distributed cluster architecture scenario optimization solution: session sharing problem

Xshell suddenly failed to connect to the virtual machine

Flink CDC (MySQL as an example)

小程序开发

【4】 Redis persistence (RDB and AOF)
随机推荐
3:Mysql 主从复制搭建
MySQL select statement query; Operator after class practice
数字藏品以虚强实,赋能实体经济发展
小程序开发解决零售业的焦虑
Sort method for sorting
Distributed cluster architecture scenario optimization solution: distributed ID solution
CentOS7 安装Mysql
高端大气的小程序开发设计有哪些注意点?
Sqoop安装及使用
Spark中的Structured Streaming
分布式锁-数据库实现
分布式集群架构场景优化解决方案:分布式调度问题
Distinguish between real-time data, offline data, streaming data and batch data
项目不报错,正常运行,无法请求到服务
小程序制作小程序开发适合哪些企业?
No module named yum
Xshell suddenly failed to connect to the virtual machine
KubeSphere安装版本问题
单行函数,聚合函数课后练习
Invalid packaging for parent POM x, must be “pom“ but is “jar“ @