国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

在Python程序和Flask框架中使用SQLAlchemy的教程

2019-11-25 16:45:19
字體:
來源:轉載
供稿:網友

ORM 江湖
曾幾何時,程序員因為懼怕SQL而在開發的時候小心翼翼的寫著sql,心中總是少不了恐慌,萬一不小心sql語句出錯,搞壞了數據庫怎么辦?又或者為了獲取一些數據,什么內外左右連接,函數存儲過程等等。毫無疑問,不搞懂這些,怎么都覺得變扭,說不定某天就跳進了坑里,叫天天不應,喊地地不答。

ORM 的出現,讓畏懼SQL的開發者,在坑里看見了爬出去的繩索,仿佛天空并不是那么黑暗,至少再暗,我們也有了眼睛。顧名思義,ORM 對象關系映射,簡而言之,就是把數據庫的一個個table(表),映射為編程語言的class(類)。

python中比較著名的ORM框架有很多,大名頂頂的 SQLAlchemy 是python世界里當仁不讓的ORM框架。江湖中peewee,strom, pyorm,SQLObject 各領風騷,可是最終還是SQLAlchemy 傲視群雄。

SQLAlchemy 簡介
SQLAlchemy 分為兩個部分,一個用于 ORM 的對象映射,另外一個是核心的 SQL expression 。第一個很好理解,純粹的ORM,后面這個不是 ORM,而是DBAPI的封裝,當然也提供了很多方法,避免了直接寫sql,而是通過一些sql表達式。使用 SQLAlchemy 則可以分為三種方式。

  • 使用 sql expression ,通過 SQLAlchemy 的方法寫sql表達式,簡介的寫sql
  • 使用 raw sql, 直接書寫 sql
  • 使用 ORM 避開直接書寫 sql

本文先探討 SQLAlchemy的 sql expresstion 部分的用法。主要還是跟著官方的 SQL Expression Language Tutorial.介紹

為什么要學習 sql expresstion ,而不直接上 ORM?因為后面這個兩個是 orm 的基礎。并且,即是不使用orm,后面這兩個也能很好的完成工作,并且代碼的可讀性更好。純粹把SQLAlchemy當成dbapi使用。首先SQLAlchemy 內建數據庫連接池,解決了連接操作相關繁瑣的處理。其次,提供方便的強大的log功能,最后,復雜的查詢語句,依靠單純的ORM比較難實現。

實戰
連接數據庫
首先需要導入 sqlalchemy 庫,然后建立數據庫連接,這里使用 mysql。通過create_engine方法進行

from sqlalchemy import create_engineengine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)

create_engine 方法進行數據庫連接,返回一個 db 對象。里面的參數表示

數據庫類型://用戶名:密碼(沒有密碼則為空,不填)@數據庫主機地址/數據庫名?編碼
echo = True 是為了方便 控制臺 logging 輸出一些sql信息,默認是False
通過這個engine對象可以直接execute 進行查詢,例如 engine.execute("SELECT * FROM user") 也可以通過 engine 獲取連接在查詢,例如 conn = engine.connect() 通過 conn.execute()方法進行查詢。兩者有什么差別呢?

直接使用engine的execute執行sql的方式, 叫做connnectionless執行,
借助 engine.connect()獲取conn, 然后通過conn執行sql, 叫做connection執行
主要差別在于是否使用transaction模式, 如果不涉及transaction, 兩種方法效果是一樣的. 官網推薦使用后者。
定義表
定義數據表,才能進行sql表達式的操作,畢竟sql表達式的表的確定,是sqlalchemy制定的,如果數據庫已經存在了數據表還需要定義么?當然,這里其實是一個映射關系,如果不指定,查詢表達式就不知道是附加在那個表的操作,當然定義的時候,注意表名和字段名,代碼和數據的必須保持一致。定義好之后,就能創建數據表,一旦創建了,再次運行創建的代碼,數據庫是不會創建的。

# -*- coding: utf-8 -*-__author__ = 'ghost'from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey# 連接數據庫 engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)# 獲取元數據metadata = MetaData()# 定義表user = Table('user', metadata,  Column('id', Integer, primary_key=True),  Column('name', String(20)),  Column('fullname', String(40)), )address = Table('address', metadata,  Column('id', Integer, primary_key=True),  Column('user_id', None, ForeignKey('user.id')),  Column('email', String(60), nullable=False) )# 創建數據表,如果數據表存在,則忽視metadata.create_all(engine)# 獲取數據庫連接conn = engine.connect()

插入 insert
有了數據表和連接對象,對應數據庫操作就簡單了。

>>> i = user.insert() # 使用查詢>>> i <sqlalchemy.sql.dml.Insert object at 0x0000000002637748>>>> print i # 內部構件的sql語句INSERT INTO "user" (id, name, fullname) VALUES (:id, :name, :fullname)>>> u = dict(name='jack', fullname='jack Jone')>>> r = conn.execute(i, **u) # 執行查詢,第一個為查詢對象,第二個參數為一個插入數據字典,如果插入的是多個對象,就把對象字典放在列表里面>>> r<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390>>>> r.inserted_primary_key # 返回插入行 主鍵 id[4L]>>> addresses[{'user_id': 1, 'email': 'jack@yahoo.com'}, {'user_id': 1, 'email': 'jack@msn.com'}, {'user_id': 2, 'email': 'www@www.org'}, {'user_id': 2, 'email': 'wendy@aol.com'}]>>> i = address.insert()>>> r = conn.execute(i, addresses) # 插入多條記錄>>> r<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080>>>> r.rowcount #返回影響的行數4L>>> i = user.insert().values(name='tom', fullname='tom Jim')>>> i.compile()<sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390>>>> print i.compile()INSERT INTO "user" (name, fullname) VALUES (:name, :fullname)>>> print i.compile().params{'fullname': 'tom Jim', 'name': 'tom'}>>> r = conn.execute(i)>>> r.rowcount1L

查詢 select
查詢方式很靈活,多數時候使用 sqlalchemy.sql 下面的 select方法

>>> s = select([user]) # 查詢 user表>>> s<sqlalchemy.sql.selectable.Select at 0x25a7748; Select object>>>> print sSELECT "user".id, "user".name, "user".fullname FROM "user"

如果需要查詢自定義的字段,可是使用 user 的cloumn 對象,例如

>>> user.c # 表 user 的字段column對象<sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8>>>> print user.c['user.id', 'user.name', 'user.fullname']>>> s = select([user.c.name,user.c.fullname])>>> r = conn.execute(s)>>> r<sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748>>>> r.rowcount # 影響的行數5L>>> ru = r.fetchall() >>> ru[(u'hello', u'hello world'), (u'Jack', u'Jack Jone'), (u'Jack', u'Jack Jone'), (u'jack', u'jack Jone'), (u'tom', u'tom Jim')]>>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748>>>> r.closed # 只要 r.fetchall() 之后,就會自動關閉 ResultProxy 對象True

同時查詢兩個表

>>> s = select([user.c.name, address.c.user_id]).where(user.c.id==address.c.user_id) # 使用了字段和字段比較的條件>>> s<sqlalchemy.sql.selectable.Select at 0x2f03390; Select object>>>> print sSELECT "user".name, address.user_id FROM "user", address WHERE "user".id = address.user_id

操作符

>>> print user.c.id == address.c.user_id # 返回一個編譯的字符串"user".id = address.user_id>>> print user.c.id == 7"user".id = :id_1 # 編譯成為帶參數的sql 語句片段字符串>>> print user.c.id != 7"user".id != :id_1>>> print user.c.id > 7"user".id > :id_1>>> print user.c.id == None"user".id IS NULL>>> print user.c.id + address.c.id # 使用兩個整形的變成 +"user".id + address.id>>> print user.c.name + address.c.email # 使用兩個字符串 變成 ||"user".name || address.email

操作連接
這里的連接指條件查詢的時候,邏輯運算符的連接,即 and or 和 not

>>> print and_(  user.c.name.like('j%'),  user.c.id == address.c.user_id,  or_(   address.c.email == 'wendy@aol.com',   address.c.email == 'jack@yahoo.com'  ),  not_(user.c.id>5))"user".name LIKE :name_1 AND "user".id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user".id <= :id_1>>> 

得到的結果為 編譯的sql語句片段,下面看一個完整的例子

>>> se_sql = [(user.c.fullname +", " + address.c.email).label('title')]>>> wh_sql = and_(    user.c.id == address.c.user_id,    user.c.name.between('m', 'z'),    or_(     address.c.email.like('%@aol.com'),     address.c.email.like('%@msn.com')    )   )>>> print wh_sql"user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)>>> s = select(se_sql).where(wh_sql)>>> print sSELECT "user".fullname || :fullname_1 || address.email AS title FROM "user", address WHERE "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)>>> r = conn.execute(s)>>> r.fetchall()

使用 raw sql 方式

遇到負責的sql語句的時候,可以使用 sqlalchemy.sql 下面的 text 函數。將字符串的sql語句包裝編譯成為 execute執行需要的sql對象。例如:、

>>> text_sql = "SELECT id, name, fullname FROM user WHERE id=:id" # 原始sql語句,參數用( :value)表示>>> s = text(text_sql)>>> print sSELECT id, name, fullname FROM user WHERE id=:id>>> s<sqlalchemy.sql.elements.TextClause object at 0x0000000002587668>>>> conn.execute(s, id=3).fetchall() # id=3 傳遞:id參數[(3L, u'Jack', u'Jack Jone')]

連接 join
連接有join 和 outejoin 兩個方法,join 有兩個參數,第一個是join 的表,第二個是on 的條件,joing之后必須要配合select_from 方法:

>>> print user.join(address)"user" JOIN address ON "user".id = address.user_id # 因為開啟了外鍵 ,所以join 能只能識別 on 條件>>> print user.join(address, address.c.user_id==user.c.id) # 手動指定 on 條件"user" JOIN address ON address.user_id = "user".id>>> s = select([user.c.name, address.c.email]).select_from(user.join(address, user.c.id==address.c.user_id)) # 被jion的sql語句需要用 select_from方法配合>>> s<sqlalchemy.sql.selectable.Select at 0x2eb63c8; Select object>>>> print sSELECT "user".name, address.email FROM "user" JOIN address ON "user".id = address.user_id>>> conn.execute(s).fetchall()[(u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com')]

排序 分組 分頁
排序使用 order_by 方法,分組是 group_by ,分頁自然就是limit 和 offset兩個方法配合

>>> s = select([user.c.name]).order_by(user.c.name) # order_by>>> print sSELECT "user".name FROM "user" ORDER BY "user".name>>> s = select([user]).order_by(user.c.name.desc())>>> print sSELECT "user".id, "user".name, "user".fullname FROM "user" ORDER BY "user".name DESC>>> s = select([user]).group_by(user.c.name)  # group_by>>> print sSELECT "user".id, "user".name, "user".fullname FROM "user" GROUP BY "user".name>>> s = select([user]).order_by(user.c.name.desc()).limit(1).offset(3) # limit(1).offset(3)>>> print sSELECT "user".id, "user".name, "user".fullname FROM "user" ORDER BY "user".name DESC LIMIT :param_1 OFFSET :param_2[(4L, u'jack', u'jack Jone')]

更新 update
前面都是一些查詢,更新和插入的方法很像,都是 表下面的方法,不同的是,update 多了一個 where 方法 用來選擇過濾

>>> s = user.update()>>> print sUPDATE "user" SET id=:id, name=:name, fullname=:fullname>>> s = user.update().values(fullname=user.c.name)   # values 指定了更新的字段>>> print sUPDATE "user" SET fullname="user".name>>> s = user.update().where(user.c.name == 'jack').values(name='ed') # where 進行選擇過濾>>> print s UPDATE "user" SET name=:name WHERE "user".name = :name_1>>> r = conn.execute(s)>>> print r.rowcount   # 影響行數3

還有一個高級用法,就是一次命令執行多個記錄的更新,需要用到 bindparam 方法

>>> s = user.update().where(user.c.name==bindparam('oldname')).values(name=bindparam('newname')) # oldname 與下面的傳入的從拿書進行綁定,newname也一樣>>> print sUPDATE "user" SET name=:newname WHERE "user".name = :oldname>>> u = [{'oldname':'hello', 'newname':'edd'},{'oldname':'ed', 'newname':'mary'},{'oldname':'tom', 'newname':'jake'}]>>> r = conn.execute(s, u)>>> r.rowcount5L

刪除 delete
刪除比較容易,調用 delete方法即可,不加 where 過濾,則刪除所有數據,但是不會drop掉表,等于清空了數據表

>>> r = conn.execute(address.delete()) # 清空表>>> print r<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550>>>> r.rowcount8L>>> r = conn.execute(users.delete().where(users.c.name > 'm')) # 刪除記錄>>> r.rowcount3L


flask-sqlalchemy
SQLAlchemy已經成為了python世界里面orm的標準,flask是一個輕巧的web框架,可以自由的使用orm,其中flask-sqlalchemy是專門為flask指定的插件。

安裝flask-sqlalchemy

pip install flask-sqlalchemy

初始化sqlalchemy

from flask import Flaskfrom flask.ext.sqlalchemy import SQLAlchemyapp = Flask(__name__)#     dialect+driver://username:password@host:port/database?charset=utf8# 配置 sqlalchemy 數據庫驅動://數據庫用戶名:密碼@主機地址:端口/數據庫?編碼app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:@localhost:3306/sqlalchemy?charset=utf8'# 初始化db = SQLAlchemy(app)

定義model

class User(db.Model): """ 定義了三個字段, 數據庫表名為model名小寫 """ id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True) email = db.Column(db.String(120), unique=True) def __init__(self, username, email):  self.username = username  self.email = email def __repr__(self):  return '<User %r>' % self.username def save(self):  db.session.add(self)  db.session.commit()

創建數據表
數據包的創建使用sqlalchemy app,如果表已經存在,則忽略,如果不存在,則新建

>>> from yourapp import db, User>>> u = User(username='admin', email='admin@example.com') # 創建實例>>> db.session.add(u)          # 添加session>>> db.session.commit()         # 提交查詢>>> users = User.query.all()        # 查詢

需要注意的是,如果要插入中文,必須插入 unicode字符串

>>> u = User(username=u'人世間', email='rsj@example.com')>>> u.save()

定義關系
關系型數據庫,最重要的就是關系。通常關系分為 一對一(例如無限級欄目),一對多(文章和欄目),多對多(文章和標簽)

one to many:
我們定義一個Category(欄目)和Post(文章),兩者是一對多的關系,一個欄目有許多文章,一個文章屬于一個欄目。

class Category(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) def __init__(self, name):  self.name = name def __repr__(self):  return '<Category %r>' % self.nameclass Post(db.Model): """ 定義了五個字段,分別是 id,title,body,pub_date,category_id """ id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(80)) body = db.Column(db.Text) pub_date = db.Column(db.String(20)) # 用于外鍵的字段 category_id = db.Column(db.Integer, db.ForeignKey('category.id')) # 外鍵對象,不會生成數據庫實際字段 # backref指反向引用,也就是外鍵Category通過backref(post_set)查詢Post category = db.relationship('Category', backref=db.backref('post_set', lazy='dynamic')) def __init__(self, title, body, category, pub_date=None):  self.title = title  self.body = body  if pub_date is None:   pub_date = time.time()  self.pub_date = pub_date  self.category = category def __repr__(self):  return '<Post %r>' % self.title def save(self):  db.session.add(self)  db.session.commit()

如何使用查詢呢?

>>> c = Category(name='Python')>>> c<Category 'Python'>>>> c.post_set<sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B58F60>>>> c.post_set.all()[]>>> p = Post(title='hello python', body='python is cool', category=c)>>> p.save()>>> c.post_set<sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B73710>>>> c.post_set.all() # 反向查詢[<Post u'hello python'>]>>> p<Post u'hello python'>>>> p.category<Category u'Python'># 也可以使用category_id 字段來添加>>> p = Post(title='hello flask', body='flask is cool', category_id=1)>>> p.save()

many to many (評論已經指出,這樣的做法無法關聯刪除,簡書沒有刪除線格式,多多對例子作廢,在此提示,以免被誤導)
對于多對多的關系,往往是定義一個兩個model的id的另外一張表,例如 Post 和 Tag之間是多對多,需要定義一個 Post_Tag的表

post_tag = db.Table('post_tag',     db.Column('post_id', db.Integer, db.ForeignKey('post.id')),     db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'))    )class Post(db.Model): id = db.Column(db.Integer, primary_key=True) # ... 省略 # 定義一個反向引用,tag可以通過 post_set查詢到 post的集合 tags = db.relationship('Tag', secondary=post_tag,       backref=db.backref('post_set', lazy='dynamic'))class Tag(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.String(10), unique=True) # 定義反向查詢 posts = db.relationship('Post', secondary=post_tag,       backref=db.backref('tag_set', lazy='dynamic')) def __init__(self, content):  self.content = content def save(self):  db.session.add(self)  db.session.commit()

查詢:

>>> tag_list = []>>> tags = ['python', 'flask', 'ruby', 'rails']>>> for tag in tags:  t = Tag(tag)  tag_list.append(t)>>> tag_list[<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>]>>> p<Post u'hello python'>>>> p.tags[]>>> p.tags = tag_list # 添加多對多的數據>>> p.save()>>> p.tags[<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>]>>> p.tag_set   # 反向查詢<sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C080>>>> p.tag_set.all()[<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>]>>> t = Tag.query.all()[1]>>> t<f_sqlalchemy.Tag object at 0x0000000003B7CF28>>>> t.contentu'python'>>> t.posts[<Post u'hello python'>]>>> t.post_set<sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C358>>>> t.post_set.all()[<Post u'hello python'>]self one to one

自身一對一也是常用的需求,比如無限分級欄目

class Category(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) # 父級 id pid = db.Column(db.Integer, db.ForeignKey('category.id')) # 父欄目對象 pcategory = db.relationship('Category', uselist=False, remote_side=[id], backref=db.backref('scategory', uselist=False)) def __init__(self, name, pcategory=None):  self.name = name  self.pcategory = pcategory def __repr__(self):  return '<Category %r>' % self.name def save(self):  db.session.add(self)  db.session.commit()

查詢:

>>> p = Category('Python')>>> p<Category 'Python'>>>> p.pid>>> p.pcategory # 查詢父欄目>>> p.scategory # 查詢子欄目>>> f = Category('Flask', p)>>> f.save()>>> f<Category u'Flask'>>>> f.pid1L>>> f.pcategory # 查詢父欄目<Category u'Python'>>>> f.scategory # 查詢父欄目>>> p.scategory # 查詢子欄目<Category u'Flask'>

關于 flask-sqlalchemy 定義models的簡單應用就這么多,更多的技巧在于如何查詢。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 五家渠市| 元阳县| 东阿县| 乌兰浩特市| 西吉县| 台安县| 菏泽市| 泊头市| 博湖县| 泽普县| 凤城市| 兴安盟| 渭南市| 柳江县| 汨罗市| 聂荣县| 鲁山县| 富裕县| 泊头市| 柳江县| 山阳县| 汕头市| 齐河县| 耿马| 镇赉县| 左权县| 当阳市| 兖州市| 虎林市| 盱眙县| 嘉黎县| 汉川市| 万盛区| 昌黎县| 从江县| 手游| 龙泉市| 井陉县| 泊头市| 梅河口市| 和平县|