[컴][파이썬] flask 의 SQLAlchemy 에서 query 하는 법

SQLAlchemy 에서 select 하는 법 / 플라스크 / 셀렉트 / flask 에서 db 사용법

Flask-SQLAlchemy

flask 에서 SQLAlchemy 를 사용하려고 하는데, ORM 을 좋아해서 쓰려한다기 보다, SQLAlchemy 를 많이 쓴다고 하길래 써본다. 아래 page 에 modelClass.query 의 예제만 있어서 그런데 이녀석이 select query 에 대한 결과에 대해서도 model class 를 만들어야 되는 줄 알고, 한참 삽질했다.

Session.query()

그런데 아래처럼 session 에서 바로 query 를 줄 수도 있었다.

# source from : http://docs.sqlalchemy.org/en/rel_0_9/orm/examples.html?highlight=sessionmaker
session.add(shrew)
session.flush()
q = (session.query(Animal).
     filter(Animal.facts.any(
       and_(AnimalFact.key == u'weasel-like',
            AnimalFact.value == True))))
print 'weasel-like animals', q.all()

scoped_session().session_factory()

그런데 db_session 을 scoped_session() 으로 만들어서 바로 query() 호출이 안되었다. db_session = scoped_session(sessionmaker(autocommit=False,                                          autoflush=False,                                          bind=engine)) 여기서 조금(사실 아주많이) 헤매었는데, 결론은 아래처럼 session_factory() 로 Session 을 이용할 수 있었다. 그래서 아래처럼 select 를 실행 할 수 있었다.

from sqlalchemy.orm import aliased
...
dinfo = aliased(DomainInfo) 
rmap = aliased(RelationMap)
uinfo = aliased(UserInfo)


query = db_session.session_factory().query(
        "seq", "id", "c_id", "user_name", "url") 
                .select_from(
            join(dinfo, rmap,
                 and_(dinfo.c_id == rmap.c_id, rmap.type == 'TYPE')) 
                         .join(uinfo, uinfo.d == dinfo.id)) 
                         .filter(dinfo.use == 'Y') 
                         .filter(dinfo.status.in_([100, 200])) 
                         .filter(dinfo.id.notin_([8, 9])) 
                         .group_by(dinfo.seq) 
                         .order_by(dinfo.id, dinfo.seq)

Connection 끊김

종종 db 와의 connection 이 끊어졌다는 exception 이 발생한다.

sqlalchemy.exc.OperationalError
OperationalError: (OperationalError) (2006, 'MySQL server has gone away') 'SELECT ...

이와 관련돼서는 create_engine 에서 pool_recycle 의 시간을 설정해서 connection 을 좀 더 오래 유지 할 수 있다.[see also 4.] 이밖에 자세한 사항은 see also 4. 를 참고하자. 참고로 MySQL 은 약 8시간정도 쓰이지 않는 녀석은 스스로 끊는다. (설정마다 다르다)[ref. 1]

engine = sqlalchemy.create_engine('mysql://user:pw@host/db_name?charset=utf8', pool_recycle=3600)

SQL Expression Language

가장 편하게 사용할 수 있는 방법 같다. 위 링크를 참고하자.

예제들

result = self.session.execute(stm)
res = result.fetchall()
resDict = [dict(zip(row.keys(), row)) for row in res]
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, ForeignKey, MetaData
from sqlalchemy import select, join, outerjoin

user = 'user'
pw = 'pw'
host = '1.1.1.1'
db = 'mydb'

engine = create_engine(
    f'mysql+mysqlconnector://{user}:{pw}@{host}/{db}?charset=utf8',
    convert_unicode=True,
    pool_recycle=3600, pool_size=10)

session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))

meta = MetaData(engine)
    
users = Table('users', meta, autoload=True)
inv2 = Table('inv2', meta, autoload=True)

# SELECT t1.user_id, 
#     t2.account AS account_no, 
#     t3.username 
# FROM bid AS t1 
# JOIN (
#    SELECT * FROM users WHERE t1.type NOT IN ('normal')
# ) AS t2 
#     ON t1.user_id = t2.id 
# LEFT JOIN users AS t3
#     ON t1.user_id = t3.id
# WHERE bid_id = :bidId AND t2.id='test'
t2 = select([
    users, 
]).select_from(users) 
.where(
    not_(inv2.c.status.in_(['normal']))
)).alias()

stm = select([bid.c.user_id, t2.c.account.label('account_no'), users.c.username])
.select_from(
    join(t2, bid, bid.c.user_id == t2.c.id)
    .outerjoin(users, bid.c.user_id == users.c.id)
).where(
    and_(
        bid.c.bid_id == bidId,
        users.c.id == 'test'
    )
)

result = session.execute(stm)
prin(result.inserted_primary_key) # insert 이후, primary_key 를 얻어올 수 있다.

# convert result to dict type
res = result.fetchall()
resDict = [dict(zip(row.keys(), row)) for row in res] # 결과를 dict 로 변환
return resDict
from sqlalchemy import Table, MetaData, func
from sqlalchemy import select
from sqlalchemy.sql.expression import literal_column
from datetime import datetime, timedelta


from myproj.db import engine

meta = MetaData(engine)
trans = Table("trans", meta, autoload=True)

# SELECT user_id, SUM(send) AS send, '1' AS pledgeCode
# FROM trans
# WHERE TYPE='send'
# AND created_at > DATE_ADD(NOW(), INTERVAL -3 HOUR)
# GROUP BY user_id
before3H = datetime.now() + timedelta(hours=(-3))
stmt2 = (
    select([
        trans.c.user_id,
        func.sum(trans.c.send).label('send'),
        literal_column("1").label("pledgeCode"),

    ]).select_from(trans)
    .where(trans.c.type == 'send')
    .where(trans.c.created_at >= before3H)
    .group_by(trans.c.user_id)
    .with_for_update()
)
stmt = trans.update()
  .values(request_status=status)
  .where(trans.c.id == 100)

trans.insert().values(name="some name")
trans.insert().values({"name": "some name"})
trans.insert().values([
                    {"name": "some name"},
                    {"name": "some other name"},
                    {"name": "yet another name"},
                ])

session.execute(stmt.execution_options(autocommit=True))

그냥 Model 을 이용할 수도 있다. ref. 2에서 좀 더 자세한 이야기가 있다.

주의할 점은 expression language 로 값을 가져왔을 때 type(user1)<class 'sqlalchemy.engine.row.Row'> 이다.

stm = (
    select(
        [
            UserAccounts.email,
            UserAccounts.mobile,
            UserAccounts.job,
            CodeValues.cname.label("job_name"),
        ]
    )
    .select_from(
        UserAccounts.__table__.outerjoin(
            CodeValues, UserAccounts.job == CodeValues.code
        )
    )
    .where(UserAccounts.email == args["email"])
    .where(and_(CodeValues.code == "1008", CodeValues.use_yn == "Y"))
)
user_info = db.session.execute(stm)
user1 = user_info.fetchone()

# ORM case
UserAccounts.query.outerjoin(
    CodeValues, UserhabitAccounts.job == CodeValues.code
).filter(CodeValues.code == "1008" and CodeValues.use_yn == "Y").filter(
    UserhabitAccounts.email == args["email"]
).first()

inserted_primary_key

autocommit

The “autocommit” feature is only in effect when no Transaction has otherwise been declared. This means the feature is not generally used with the ORM, as the Session object by default always maintains an ongoing Transaction.

 orm 을 이용해서 transaction 을 사용하게 되는 case 라면 autocommit 이 동작하지 않는다. 이때는 session.commit() 과 같이 명시적으로 commit 을 해줘야 한다.

insert 시 insert 하려는 table 의 값을 이용하는 경우

table 을 이용할 때 'AS' 부분을 넣어주지 않으면 아래 같은 error 가 발생할 수 있다.

Table 'trans' is specified twice, both as a target for 'INSERT' and as a separate source for data

INSERT INTO trans 
  (type, date, seq, status, created_at, updated_at)
VALUES
  ('myType', CURDATE(), 
IFNULL(
    (SELECT MAX(seq) + 1 FROM trans AS t1 WHERE t1.date = CURDATE()), 1),
'pending', NOW(), NOW())
now = datetime.now()
trans = self.trans
t1 = trans.alias(name="t1")

stmtMaxSeq = (
    select([func.max(t1.c.seq) + 1])
    .select_from(t1)
    .where(t1.c.date == now)
    .as_scalar()
).label("max")

stmt = trans.insert().values(
    {
        "type": self.TRANS_TYPE_WITHDRAW,
        "date": now,
        "seq": func.ifnull(stmtMaxSeq, 1),
        "status": "",
        "created_at": now,
        "updated_at": now,
    }
)
res = self.session.execute(stmt)

See Also

  1. select 에서 if 등의 조건문 사용방법 : python - How can I write conditional clauses using SQLAlchemy's expression language? - Stack Overflow
  2. 복잡한 query 문들
  3. http://www.whitemiceconsulting.com/2012/02/complex-queries-with-sqlalchemy.html
  4. Avoiding “MySQL server has gone away” on infrequently used Python / Flask server with SQLAlchemy
  5. query apis : Query API — SQLAlchemy 1.4 Documentation

References

  1. Engine Configuration — SQLAlchemy 1.0 Documentation
  2. DepartmentEmployee

댓글 없음:

댓글 쓰기