博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python基础-项目-day3 编写 数据库增删改查 ORM
阅读量:4147 次
发布时间:2019-05-25

本文共 8736 字,大约阅读时间需要 29 分钟。

前言

学习笔记来源-廖雪峰老师

orm.py

#!/usr/bin/env python3# -*- coding: utf-8 -*-# Python基础-编写 数据库增删改查 ORMimport logging; logging.basicConfig(level = logging.INFO)import asyncio# 创建连接池@asyncio.coroutinedef create_pool(loop, **kw):    logging.info("建立数据库连接池")    # 全局的连接池    global __pool    __pool = yield from aiomysql.create_pool(        host = kw.get("host", "localhost"),        port = kw.get("port", 3306),        user = kw["user"],        password = kw["123456"],        db = kw["db"],        # 防止乱码        charset = kw.get("charset", "utf8"),        # True 表示不需要在commit提交事务        autocommit = kw.get("autocommit", True),        maxsize = kw.get("maxsize", 10),        minsize = kw.get("minsize", 1),        loop = loop    )# Select 语句 查询语句# sql 为sql语句, args为占位符参数列表, siez为查询数量def select(sql, args, size = None):    log(sql, args)    global __pool    with (yield from __pool) as conn:        cur = yield from conn.cursor(aiomysql.DictCursor)        # SQL 语句的占位符是 ? ,而 MySQL 的占位符是 %s,股这里进行转换        # yield form 将调用一个子协程(即一个协程调用宁一个协程),并直接获得子协程的返回结果        yield from cur.execute(sql.replace("?", "%s"), args or ())        if size:            # 获取指定 size 的记录            rs = yield from cur.fetchmany(size)        else:            # 获取所有记录            rs = yield from cur.fetchall()        yield from cur.close()        logging.info('rows returned %s' % len(rs))        return rs# 用于增,删,改的数据库操作@asyncio.coroutinedef execute(sql, args):    log(sql)    with (yield from __pool) as conn:        try:            cur = yield from conn.cursor()            # SQL 语句的占位符是 ? ,而 MySQL 的占位符是 %s,股这里进行转换            # yield form 将调用一个子协程(即一个协程调用宁一个协程),并直接获得子协程的返回结果            yield from cur.execute(sql.replace("?", "%s"), args)            # 返回结果集            affected = cur.rowcount            yield from cur.close()        except BaseException as e:            raise        return affecteddef create_args_string(num):    L = []    for n in range(num):        L.append('?')    return ', '.join(L)# 定义基类class ModelMetaclass(type):    def __new__(cls, name, bases, attrs):        # 排除 Model 类本身        if name == "Model":            return type.__new__(cls, name, bases, attrs)        # 获取 table 名称        tableName = attrs.get("__table__", None) or name        logging.info("found model : %s (table: %s)" % (name, tableName))        # 获取所有的Field和主键名        mappings = dict()        fields = []        primaryKey = None        for k, v in attrs.items():            if isinstance(v, Field):                logging.info("Found mapping: %s --> %s" % (k, v))                mappings[k] = v                if v.primary_key:                    # 找到主键                    if primaryKey:                        raise RuntimeError('Duplicate primary key for field: %s' % k)                    primaryKey = k                else:                    fields.append(k)        if not primaryKey:            raise RuntimeError('Primary key not found.')        for k in mappings.keys():            attrs.pop(k)        escaped_fields = list(map(lambda f: '`%s`' % f, fields))        # 保存属性和列的映射关系        attrs['__mappings__'] = mappings         attrs['__table__'] = tableName        # 主键属性名        attrs['__primary_key__'] = primaryKey        # 除主键外的属性名        attrs['__fields__'] = fields         # 构造默认的SELECT, INSERT, UPDATE和DELETE语句:        attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)        attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))        attrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)        attrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey)        return type.__new__(cls, name, bases, attrs)# 定义 Modelclass Model(dict, metaclass = ModelMetaclass):    def __init__(self, **kw):        super(Model, self).__init__(**kw)    def __getattr__(self, key):        try:            return self[key]        except KeyError:            raise AttributeError(r"'Model' object has no attribute '%s'" % key)    def __setattr__(self, key, value):        self[key] = value    def getValue(self, key):        return getattr(self, key, None)    def getValueOrDefault(self, key):        value = getattr(self, key, None)        if value is None:            field = self.__mappings__[key]            if field.default is not None:                value = field.default() if callable(field.default) else field.default                logging.debug("using default value for %s : %s" % (key, str(value)))                setattr(self, key, value)        return value    @asyncio.coroutine    def findAll(cls, where=None, args=None, **kw):        ' find objects by where clause. '        sql = [cls.__select__]        if where:            sql.append('where')            sql.append(where)        if args is None:            args = []        orderBy = kw.get('orderBy', None)        if orderBy:            sql.append('order by')            sql.append(orderBy)        limit = kw.get('limit', None)        if limit is not None:            sql.append('limit')            if isinstance(limit, int):                sql.append('?')                args.append(limit)            elif isinstance(limit, tuple) and len(limit) == 2:                sql.append('?, ?')                args.extend(limit)            else:                raise ValueError('Invalid limit value: %s' % str(limit))        rs = yield from select(' '.join(sql), args)        return [cls(**r) for r in rs]    @asyncio.coroutine    def findNumber(cls, selectField, where=None, args=None):        ' find number by select and where. '        sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]        if where:            sql.append('where')            sql.append(where)        rs = yield from select(' '.join(sql), args, 1)        if len(rs) == 0:            return None        return rs[0]['_num_']    @asyncio.coroutine    def find(cls, pk):        ' find object by primary key. '        rs = yield from select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)        if len(rs) == 0:            return None        return cls(**rs[0])    @asyncio.coroutine    def save(self):        args = list(map(self.getValueOrDefault, self.__fields__))        args.append(self.getValueOrDefault(self.__primary_key__))        rows = yield from execute(self.__insert__, args)        if rows != 1:            logging.warn('failed to insert record: affected rows: %s' % rows)    @asyncio.coroutine    def update(self):        args = list(map(self.getValue, self.__fields__))        args.append(self.getValue(self.__primary_key__))        rows = yield from execute(self.__update__, args)        if rows != 1:            logging.warn('failed to update by primary key: affected rows: %s' % rows)    @asyncio.coroutine    def remove(self):        args = [self.getValue(self.__primary_key__)]        rows = yield from execute(self.__delete__, args)        if rows != 1:            logging.warn('failed to remove by primary key: affected rows: %s' % rows)class Field(object):    def __init__(self, name, column_type, primary_key, default):        self.name = name        self.column_type = column_type        self.primary_key = primary_key        self.default = default    def __str__(self):        return "<%s, %s : %s>" % (self.__class__.__name__, self.column_type, self.name)class StringField(Field):    def __init(self, name = None, primary_key = False, default = None, ddl = "varchar(100)"):        super().__init__(name, ddl, primary_key, default)class BooleanField(Field):    def __init__(self, name=None, default=False):        super().__init__(name, 'boolean', False, default)class IntegerField(Field):    def __init__(self, name=None, primary_key=False, default=0):        super().__init__(name, 'bigint', primary_key, default)class FloatField(Field):    def __init__(self, name=None, primary_key=False, default=0.0):        super().__init__(name, 'real', primary_key, default)class TextField(Field):    def __init__(self, name=None, default=None):        super().__init__(name, 'text', False, default)

ormTEST.py

#!/usr/bin/env python3# -*- coding: utf-8 -*-# Python基础-测试 ormfrom orm import Model, StringField, IntegerFieldimport asyncioclass User(Model):    __table__ = "users"    id = IntegerField(primary_key = True)@asyncio.coroutinedef runTest():    user = User(id = 123, name = "王大锤")    yield from user.save()    yield from user.findAll("王大锤")    print(user)runTest()

提交git

echo "# python3-webapp-Su" >> README.mdgit add .git commit -m "Python基础-项目-day3 编写 数据库增删改查 ORM"git push -u origin master

转载地址:http://zacti.baihongyu.com/

你可能感兴趣的文章
MySQL字段类型的选择与MySQL的查询效率
查看>>
Java的Properties配置文件用法【续】
查看>>
JAVA操作properties文件的代码实例
查看>>
IPS开发手记【一】
查看>>
Java通用字符处理类
查看>>
文件上传时生成“日期+随机数”式文件名前缀的Java代码
查看>>
Java代码检查工具Checkstyle常见输出结果
查看>>
北京十大情人分手圣地
查看>>
Android自动关机代码
查看>>
Android中启动其他Activity并返回结果
查看>>
2009年33所高校被暂停或被限制招生
查看>>
GlassFish 部署及应用入门
查看>>
X-code7 beta error: warning: Is a directory
查看>>
Error: An App ID with identifier "*****" is not avaliable. Please enter a different string.
查看>>
3.5 YOLO9000: Better,Faster,Stronger(YOLO9000:更好,更快,更强)
查看>>
iOS菜鸟学习--如何避免两个按钮同时响应
查看>>
iOS菜鸟学习—— NSSortDescriptor的使用
查看>>
C语言8
查看>>
Qt实现简单延时
查看>>
qml有关矩形说明
查看>>