本文实例讲述了Python使用sqlalchemy模块连接数据库操作。分享给大家供大家参考,具体如下:
安装:
1
2
3
4
|
pip install sqlalchemy # 安装数据库驱动: pip install pymysql pip install cx_oracle |
举例:(在url后面加入?charset=utf8可以防止乱码)
1
2
|
from sqlalchemy import create_engine engine = create_engine( 'mysql+pymysql://username:password@hostname:port/dbname' , echo = True ) #echo=True 打印sql语句信息 |
create_engine
接受一个url,格式为:
1
2
3
4
5
6
7
8
9
|
# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名' # 常用的 engine = create_engine( 'sqlite:///:memory:' , echo = True ) # sqlite内存 engine = create_engine( 'sqlite:///./cnblogblog.db' ,echo = True ) # sqlite文件 engine = create_engine( "mysql+pymysql://username:password@hostname:port/dbname" ,echo = True ) # mysql+pymysql engine = create_engine( 'mssql+pymssql://username:password@hostname:port/dbname' ,echo = True ) # mssql+pymssql engine = create_engine( 'postgresql://scott:tiger@hostname:5432/dbname' ) # postgresql示例 engine = create_engine( 'oracle://scott:tiger@hostname:1521/sidname' ) # oracle engine = create_engine( 'oracle+cx_oracle://scott:tiger@tnsname' ) #pdb就可以用tns连接 |
简单demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base engine = create_engine( 'oracle://spark:a@orclpdb' ,echo = True ) #echo要求打印sql语句等调试信息 session_maker = sessionmaker(bind = engine) session = session_maker() Base = declarative_base() #对应一张表 class Student(Base): __tablename__ = 'STUDENT' id = Column( 'STUID' , Integer, primary_key = True ) name = Column( 'STUNAME' , String( 32 ), nullable = False ) age = Column( 'STUAGE' , Integer) def __repr__( self ): return '<Student(id:%s, name:%s, age:%s)>' % ( self . id , self .name, self .age) Base.metadata.create_all(engine) #若存在STUDENT表则不做,不存在则创建。 queryObject = session.query(Student).order_by(Student. id .desc()) for ins in queryObject: print (ins. id , ins.name, ins.age) ''' 4 hey 24 3 lwtxxs 27 2 gyb 89 1 ns 23 ''' |
将查询结果映射为DataFrame:
1
2
3
4
5
6
7
8
9
|
import pandas as pd df = pd.read_sql(session.query(Student). filter (Student. id > 1 ).statement, engine) print (df) ''' STUID STUNAME STUAGE 0 4 hey 24 1 2 gyb 89 2 3 lwtxxs 27 ''' |
查询:
session的query方法除了可以接受Base子类对象作为参数外,还可以是字段,如:
1
2
3
|
query = session.query(Student.name, Student.age) # query为一个sqlalchemy.orm.query.Query对象 for stu_name, stu_age in query: print (stu_name, stu_age) |
查询条件filter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# = / like query. filter (Student.name = = 'wendy' ) query. filter (Student.name.like( '%ed%' )) # in query. filter (Student.name.in_([ 'wendy' , 'jack' ])) query. filter (Student.name.in_( session.query(User.name). filter (User.name.like( '%ed%' )) )) # not in query. filter (~Student.name.in_([ 'ed' , 'wendy' , 'jack' ])) # is null / is not null query. filter (Student.name = = None ) query. filter (Student.name.is_( None )) query. filter (Student.name ! = None ) query. filter (Student.name.isnot( None )) # and from sqlalchemy import and_, or_ query. filter (and_(Student.name = = 'ed' , Student.age ! = 23 )) query. filter (Student.name = = 'ed' , Student.age ! = 23 ) query. filter (Student.name = = 'ed' ). filter (Student.age ! = 23 ) # or query. filter (or_(Student.name = = 'ed' , Student.name = = 'wendy' )) # match query. filter (Student.name.match( 'wendy' )) |
Query的方法:
all()
方法以列表形式返回结果集:
1
2
3
4
5
|
from sqlalchemy import or_, and_ queryObject = session.query(Student). filter (or_(Student. id = = 1 , Student. id = = 2 )) print (queryObject. all ()) # [<Student(id:1, name:ns, age:23)>, <Student(id:2, name:gyb, age:89)>] queryObject = session.query(Student.name). filter (or_(Student. id = = 1 , Student. id = = 2 )) print (queryObject. all ()) # [('ns',), ('gyb',)] |
first()
方法返回单个结果。(若结果集为空则返回None)
1
|
print (queryObject.first()) # ('ns',) |
one()
方法返回单个结果,与first()
方法不同的是:当结果集中没有元素或有多于一个元素会抛出异常。
one_or_none()
方法同one()
一样,不同是结果集为空则返回None,为多个抛出异常。
查询数量:
1
2
|
from sqlalchemy import func session.query(func.count(Student. id )).scalar() # SELECT count("STUDENT"."STUID") AS count_1 FROM "STUDENT" |
分组:
1
|
session.query(func.count(Student. id ), Student.name).group_by(Student.name). all () |
嵌套SQL语句:
1
2
3
4
|
from sqlalchemy import text query = session.query(Student. id , Student.name). filter (text( 'stuid>2' )) query = session.query( 'stuid' , 'stuname' , 'stuage' ).from_statement(\ text( "select * from student where stuname=:stuname" )).params(stuname = 'hey' ). all () #[(4, 'hey', 24)] |
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://blog.csdn.net/xuejianbest/article/details/85159552