口腔医院网站开发,牡丹江最新信息网0453,淘宝店招免费做的网站有,西安有哪些做网站建设的公司文章目录 连接池实例flask定制命令flask 缓存的使用flask信号的使用sqlalchemy原生操作sqlalchemy操作表flask orm操作表一对多的增加和跨表查询 #xff08;一对一只需要关联字段加上 ,uniqueTrue#xff09;多对多关系的增加和查询多对多基本的增删改查 连接池 import pymy… 文章目录 连接池实例flask定制命令flask 缓存的使用flask信号的使用sqlalchemy原生操作sqlalchemy操作表flask orm操作表一对多的增加和跨表查询 一对一只需要关联字段加上 ,uniqueTrue多对多关系的增加和查询多对多基本的增删改查 连接池 import pymysqlfrom dbutils.pooled_db import PooledDBPOOL PooledDB(creatorpymysql, # 使用链接数据库的模块maxconnections6, # 连接池允许的最大连接数0和None表示不限制连接数mincached2, # 初始化时链接池中至少创建的空闲的链接0表示不创建maxcached5, # 链接池中最多闲置的链接0和None不限制maxshared3, # 链接池中最多共享的链接数量0和None表示全部共享。PS: 无用因为pymysql和MySQLdb等模块的 threadsafety都为1所有值无论设置为多少_maxcached永远为0所以永远是所有链接都共享。blockingTrue, # 连接池中如果没有可用连接后是否阻塞等待。True等待False不等待然后报错maxusageNone, # 一个链接最多被重复使用的次数None表示无限制setsession[], # 开始会话前执行的命令列表。ping0,# ping MySQL服务端检查是否服务可用。host127.0.0.1,port3306,userroot,password123,databaseluffy,charsetutf8)-第二步:使用from sql_pool import POOL # 做成单例conn POOL.connection() # 从连接池种取一个链接如果没有阻塞在这curser conn.cursor()curser.execute(select * from luffy_order where id2)rescurser.fetchall()print(res)实例
from flask import Flask
app Flask(__name__)app.route(/)
def hello_world(): # put applications code hereconn POOL.connection() # 从连接池种取一个链接如果没有阻塞在这curser conn.cursor()curser.execute(select * from luffy_course where id2)res curser.fetchall()return 数据查到了 %s % resif __name__ __main__:app.run()flask定制命令
使用 flask-script定制命令(老版本不用了)
# flask 老版本中没有命令运行项目自定制命令
# flask-script 解决了这个问题flask项目可以通过命令运行可以定制命令
# 新版的flask--》官方支持定制命令 click 定制命令这个模块就弃用了
# flask-migrate 老版本基于flask-script新版本基于flask-click写的### 使用步骤-1 pip3 install Flask-Script2.0.3-2 pip3 install flask1.1.4-3 pip3 install markupsafe1.1.1-4 使用from flask_script import Managermanager Manager(app)if __name__ __main__:manager.run()-5 自定制命令manager.commanddef custom(arg):自定义命令python manage.py custom 123print(arg)- 6 执行自定制命令python manage.py custom 123新版本定制命令
from flask import Flask
import click
app Flask(__name__)app.cli.command(create-user)
click.argument(name)
def create_user(args):# from pool import POOL# connPOOL.connection()# cursorconn.cursor()# cursor.excute(insert into user (username,password) values (%s,%s),args[name,123456])# conn.commit()print(name)app.route(/)
def index():return indexif __name__ __main__:app.run()# 运行项目的命令是flask --app py文件名字:app run
# 命令行中执行
# flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
# flask create-user lqz# django定制命令
# 1 app下新建文件夹management/commands/
# 2 在该文件夹下新建py文件随便命名命令名# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):help 命令提示def handle(self, *args, **kwargs):命令逻辑
# 4 使用命令
python manage.py py文件(命令名)flask 缓存的使用
from flask import Flask
from flask_caching import Cacheconfig {DEBUG: True, # some Flask specific configsCACHE_TYPE: SimpleCache, # Flask-Caching related configsCACHE_DEFAULT_TIMEOUT: 300
}app Flask(__name__)
# tell Flask to use the above defined config
app.config.from_mapping(config)
cache Cache(app)app.route(/)
def index():cache.set(name, xxx)return indexapp.route(/get)
def get():rescache.get(name)return resif __name__ __main__:app.run()flask信号的使用
# 内置信号--》flask请求过程中--》源码中定义的---》不需要我们定义和触发---》只要写了函数跟它对应--》执行到这就会触发函数执行
# Flask框架中的信号基于blinker其主要就是让开发者可是在flask请求过程中定制一些用户行为request_started _signals.signal(request-started) # 请求到来前执行
request_finished _signals.signal(request-finished) # 请求结束后执行before_render_template _signals.signal(before-render-template) # 模板渲染前执行
template_rendered _signals.signal(template-rendered) # 模板渲染后执行got_request_exception _signals.signal(got-request-exception) # 请求执行出现异常时执行request_tearing_down _signals.signal(request-tearing-down) # 请求执行完毕后自动执行无论成功与否
appcontext_tearing_down _signals.signal(appcontext-tearing-down)# 应用上下文执行完毕后自动执行无论成功与否appcontext_pushed _signals.signal(appcontext-pushed) # 应用上下文push时执行
appcontext_popped _signals.signal(appcontext-popped) # 应用上下文pop时执行
message_flashed _signals.signal(message-flashed) # 调用flask在其中添加数据时自动触发# 内置信号的使用
# 案例
### 我们现在想在模板渲染之 记录日志 使用内置信号实现# 1 写一个函数
def before_render(*args, **kwargs):print(args)print(kwargs)# 谁ip 在什么时间 访问了哪个页面(template)print(记录日志模板要渲染了)# 2 跟内置信号绑定
from flask.signals import before_render_template
before_render_template.connect(before_render) # 信号触发的函数# 3 源码中触发信号执行(我们不需要动)
# before_render_template.send() 源码再模板渲染之前它写死了app.route(/)
def index():return render_template(index.html)app.route(/login)
def login():return render_template(login.html)if __name__ __main__:app.run()# 自定义信号
from flask import Flask, render_template
from flask.signals import _signals
import pymysqlapp Flask(__name__)
app.debug True# 1 定义信号
# 自定义信号
db_save _signals.signal(db_save)# 2 写一个函数
def db_save_fun(*args, **kwargs):print(args)print(kwargs)print(表数据插入了)# 3 跟自定义置信号绑定
db_save.connect(db_save_fun)# 3 触发信号执行(需要我们做)
# before_render_template.send() 源码再模板渲染之前它写死了app.route(/)
def index():return render_template(index.html)app.route(/create_article)
def create_article():conn pymysql.connect(host127.0.0.1, userroot, password1234, databasecnblogs)cursor conn.cursor()cursor.execute(insert into article (title,author) VALUES (%s,%s), args[测试测试标题, 测试作者测试])conn.commit()# 手动触发信号db_save.send(table_namearticle,info{title:测试测试标题,author:测试作者测试})return 插入成功if __name__ __main__:app.run()# 步骤
# 1 定义信号
db_save _signals.signal(db_save)
# 2 写一个函数
def db_save_fun(*args, **kwargs):print(args)print(kwargs)print(表数据插入了)# 3 跟自定义置信号绑定
db_save.connect(db_save_fun)# 3 触发信号执行(需要我们做)
db_save.send() # 需要在代码中写可以传参数传入的参数--》db_save_fun 能拿到sqlalchemy原生操作
# 操作原生sql ---》用得少
import pymysql
import threading
# 1 导入
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
# 2 创建引擎
engine create_engine(mysqlpymysql://root:1234127.0.0.1:3306/cnblogs, max_overflow0, # 超过连接池大小外最多创建的连接pool_size5, # 连接池大小pool_timeout30, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置
)# 3 使用引擎拿到链接
# conn engine.raw_connection()
# # 4 剩下的一样了
# cursorconn.cursor(pymysql.cursors.DictCursor)
# cursor.execute(select * from article limit 10)
# rescursor.fetchall()
# print(res)## 多线程测试
def task(arg):conn engine.raw_connection()cursor conn.cursor()cursor.execute(select * from article)result cursor.fetchall()print(result)cursor.close()conn.close()for i in range(20):t threading.Thread(targettask, args(i,))t.start()sqlalchemy操作表
# 1 建个 models.py 里面写 表模型
# 2 把表模型---》同步到数据库中
# 3 增删查改
# 1 导入
import datetime # 时间模块
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexBase declarative_base() # Base 当成 django的 models.Model# 2 创建表模型
class User(Base): # 模型层的名字__tablename__ users # 数据库表的名字id Column(Integer, primary_keyTrue, autoincrementTrue)name Column(String(32), indexTrue, nullableTrue)email Column(String(32), uniqueTrue)# datetime.datetime.now不能加括号加了括号以后永远是当前时间ctime Column(DateTime, defaultdatetime.datetime.now)extra Column(Text)# 3 没有迁移命令---》后期使用第三方模块可以有命令
# 目前需要手动做
# sqlalchemy 不能创建数据库能创建表删除表不能删除增加字段第三方模块
# 3.1 创建引擎
engine create_engine(mysqlpymysql://root:123456127.0.0.1:3306/sqlalchemy01,max_overflow0, # 超过连接池大小外最多创建的连接pool_size5, # 连接池大小pool_timeout30, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置
)
## 3.2 把表模型同步到数据库中
Base.metadata.create_all(engine) # 3.3 同步和删除表只能执行一条
# Base.metadata.drop_all(engine)
flask orm操作表
基本操作
from models import User
from sqlalchemy import create_engine# 1 创建引擎
engine create_engine(mysqlpymysql://root:1234127.0.0.1:3306/sqlalchemy01,max_overflow0, # 超过连接池大小外最多创建的连接pool_size5, # 连接池大小pool_timeout30, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置
)# 2 orm操作--》借助于 engine 得到sessionconn对象
from sqlalchemy.orm import sessionmakerConnection sessionmaker(bindengine) # 绑定连接
session Connection()# 3 使用conn---》进行orm操作
# 3.1 增加数据# user User(namelqz, email3qq.com)
# # 插入到数据库
# conn.add(user) # 放个对象# # 提交
# conn.commit()
# # 关闭链接
# conn.close()# 3.2 查询数据
# 查询User表中id为1的所有记录--》放到列表中
# resconn.query(User).filter_by(id1).all()
# print(res)## 3.3 删除
# res conn.query(User).filter_by(namelqz).delete()
# print(res)
# conn.commit()## 3.4 修改
# resconn.query(User).filter_by(name9999).update({extra:xxsss})
# conn.commit()一对多的增加和跨表查询 一对一只需要关联字段加上 ,uniqueTrue
modles.py文件
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexBase declarative_base() # Base 当成 models.Model# 一对多 一个兴趣被多个人喜欢 一个人只喜欢一个兴趣
class Hobby(Base):__tablename__ hobbyid Column(Integer, primary_keyTrue)caption Column(String(50), default篮球)def __str__(self):return self.captiondef __repr__(self):return self.captionclass Person(Base):__tablename__ personid Column(Integer, primary_keyTrue)name Column(String(32), indexTrue, nullableTrue)# hobby指的是tablename而不是类名uselistFalse# 外键关联--》强外键--》物理外键hobby_id Column(Integer, ForeignKey(hobby.id)) # 如果是一对一加上uniqueTrue# 跟数据库无关不会新增字段只用于快速连表操作# 类名backref用于反向查询hobby relationship(Hobby, backrefpers) # 重点 正反向查询要通过这个字段def __str__(self):return self.namedef __repr__(self):return self.name# views.py中
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Hobby, Person, Userengine create_engine(mysqlpymysql://root:123456127.0.0.1:3306/sqlalchemy01,max_overflow0, # 超过连接池大小外最多创建的连接pool_size5, # 连接池大小pool_timeout30, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置
)Session sessionmaker(bindengine)
session Session()# 1 增加 Hobby
# hobby Hobby(caption足球)
# hobby1 Hobby(caption篮球)
# session.add_all([hobby, hobby1]) # 列表套对象
# session.commit() # 一定要记得提交# 2 增加Person
# p1 Person(name彭于晏, hobby_id1) # 可以是查询出来的对象 也可以是id 但是id写死了
# p2 Person(name刘亦菲, hobby_id2)
# session.add_all([p1, p2])
# session.commit()# 3 简便方式增加person---》增加Person直接新增Hobby
# hobby1 Hobby(caption乒乓球)
# # 同时把数据插入两张表
# p1 Person(name彭于晏, hobbyhobby1) # 前提是表模型 必须有 relationship
# session.add(p1)
# session.commit()# # 4 基于对象的跨表查询---正向
# per session.query(Person).filter_by(name彭于晏).first()
# print(per)
# # 正向 Person的对象通过 hobby 查询到 Hobby的caption字段
# print(per.hobby.caption)# 5 基于对象的跨表查询---正向
# hobbysession.query(Hobby).filter_by(caption乒乓球).first()
# print(hobby)# 反向---拿到多条 可以通过索引取值
hobbysession.query(Hobby).filter_by(caption乒乓球).first()
print(hobby.pers)
print(hobby.pers[0].name) # 列表套对象
多对多关系的增加和查询
# 多对多 models.py
class Boy2Girl(Base):__tablename__ boy2girlid Column(Integer, primary_keyTrue, autoincrementTrue)girl_id Column(Integer, ForeignKey(girl.id))boy_id Column(Integer, ForeignKey(boy.id))# boy relationship(Boy, backrefboydef __str__(self):return self.namedef __repr__(self):return self.nameclass Girl(Base):__tablename__ girlid Column(Integer, primary_keyTrue)name Column(String(64), uniqueTrue, nullableFalse)class Boy(Base):__tablename__ boyid Column(Integer, primary_keyTrue, autoincrementTrue)name Column(String(64), uniqueTrue, nullableFalse)# 与生成表结构无关仅用于查询方便,放在哪个单表中都可以--等同于manytomanygirls relationship(Girl, secondaryboy2girl, backrefboys)def __str__(self):return self.namedef __repr__(self):return self.nameif __name__ __main__:# 3.1 创建引擎engine create_engine(mysqlpymysql://root:123456127.0.0.1:3306/sqlalchemy01,max_overflow0, # 超过连接池大小外最多创建的连接pool_size5, # 连接池大小pool_timeout30, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置)## 3.2 把表模型同步到数据库中Base.metadata.create_all(engine)# 3.3 删除表# Base.metadata.drop_all(engine)# 视图类中
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Hobby, Person, User, Girl, Boy2Girl, Boyengine create_engine(mysqlpymysql://root:123456127.0.0.1:3306/sqlalchemy01,max_overflow0, # 超过连接池大小外最多创建的连接pool_size5, # 连接池大小pool_timeout30, # 池中没有线程最多等待的时间否则报错pool_recycle-1 # 多久之后对线程池中的线程进行一次连接的回收重置
)Session sessionmaker(bindengine)
session Session()# 第一种简便方式增加# obj2 Girl(name张亦菲)
# obj3 Girl(name李娜扎)
# obj1 Boy(name张小勇, girls[obj2, obj3]) # 在Boy 添加一个 通过girls关键字往 Girl 加上两个并建立关系
#
# session.add(obj1)
# session.commit()# # 4 基于对象的跨表查询---正向
boy session.query(Boy).filter_by(name张小勇).first()
print(boy.girls[0].name)# 5 基于对象的跨表查询---反向
# girlsession.query(Girl).filter_by(name张亦菲).first()
# print(girl.boys)# 第二中添加方式 一张一张表的添加
# 1 增加 Boy
# boy Boy(name王小刚)
# boy2 Boy(name王小明)
# boy3 Boy(name王小勇)
# session.add_all([boy,boy2,boy3])
# session.commit()# 2 增加Girl
# girl Girl(name张小华)
# girl2 Girl(name刘小红)
# girl3 Girl(name李小丽)
# session.add_all([girl3, girl2, girl])
# session.commit()# 3 增加Boy2Girl
# obj1Boy2Girl(boy_id1,girl_id1)
# obj2Boy2Girl(boy_id1,girl_id2)
# obj3Boy2Girl(boy_id1,girl_id3)
# session.add_all([obj2, obj3, obj1])
# session.commit()多对多基本的增删改查
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import User, Person, Hobby, Boy, Girl, Boy2Girl
from sqlalchemy.sql import text# 添加
# obj1 Boy(name张小勇, girls[obj2, obj3]) # 在Boy 添加一个 通过girls关键字往 Girl 加上两个并建立关系
# session.add(obj1)# 2 删除
# 2.1 session.query(Users).filter_by(id1).delete()
# 2.1 session.delete(对象)
# user session.query(User).filter_by(id1).first()
# session.delete(user)
# session.commit()# 修改
# 1 方式一
# session.query(Boy).filter_by(id1).update({name:lqz})
# session.commit()# # 2 方式二 类名.属性名作为要修改的key
# session.query(Boy).filter_by(id4).update({Boy.name:lqz1})
# session.commit()# id为4的人的名字后 _nb 类似于django的 F 查询
# session.query(User).filter_by(id2).update({name:User.name_nb},synchronize_sessionFalse) # 字符串拼接
# session.query(User).filter_by(id2).update({id:User.id6}, synchronize_sessionevaluate) # 数字之间加
# session.commit()# # 3 方式三
# 对象.namexxx
#session.add(对象)
# boysession.query(Boy).filter_by(id1).first()
# boy.namexxzzyy
# session.add(boy) # 有id就是修改没有就是新增
# session.commit()