Python SQLAlchemy basic operations and common tips (great with lots of examples)

  • 2020-04-02 13:38:52
  • OfStack

First of all, since the latest version 0.8 is the development version, so I'm using version 0.79, the API may be a little different.
Since I'm using MySQL and InnoDB with other databases, I can't completely copy this article.

Let's start with the installation, take Debian/Ubuntu for example (make sure you have admin rights) :
1. The MySQL

apt-get install mysql-server
apt-get install mysql-client
apt-get install libmysqlclient15-dev

2. The python - mysqldb
apt-get install python-mysqldb

3. The easy_install
wget http://peak.telecommunity.com/dist/ez_setup.py

Python ez_setup. Py
4. MySQL - Python
easy_install MySQL-Python

5. SQLAlchemy
easy_install SQLAlchemy

If you're using another operating system, just Google the problem. I was working on Mac OS X, and I had some problems along the way, but I didn't write them down...
It is worth mentioning that I used mysql-python to connect MySQL, because it does not support asynchronous calls, so it is not very compatible with Tornado. But the performance is actually pretty good, so let's explore other options later...

Once installed, it can be used:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)
DB_Session = sessionmaker(bind=engine)
session = DB_Session()

The DB_CONNECT_STRING here is the path to the database. "Mysql +mysqldb" specifies the use of mysql-python connection, "root" and "123" are the username and password, respectively, "localhost" is the domain name of the database, "ooxx" is the database name used (omitted), and "charset" specifies the character set to be used for connection (omitted).
Create_engine () returns a database engine, and when the echo parameter is True, each executed SQL statement is displayed and can be closed in production.
Sessionmaker () generates a database session class. An instance of this class ACTS as a database connection, and it also records the data for some queries and decides when to execute the SQL statement. Since SQLAlchemy maintains a database connection pool on its own (five connections by default), it is not expensive to initialize a session. For Tornado, it can be initialized in BaseHandler's initialize () :

class BaseHandler(tornado.web.RequestHandler):
    def initialize(self):
        self.session = models.DB_Session()
    def on_finish(self):
        self.session.close()

For other Web servers, sqlalchemy.orm.scoped_session can be used to ensure that each thread gets a unique session object. Tornado itself is single-threaded, and if you use asynchronous, you might have a problem, so I didn't use it.

After you get the session, you can execute the SQL:

session.execute('create database abc')
print session.execute('show databases').fetchall()
session.execute('use abc')
#  build  user  The process is abbreviated 
print session.execute('select * from user where id = 1').first()
print session.execute('select * from user where id = :id', {'id': 1}).first()

However, this is no different from using mysql-python directly, so I won't introduce it; I still like the ORM approach, which is the only reason I adopted SQLAlchemy.

So let's define a table:

from sqlalchemy import Column
from sqlalchemy.types import CHAR, Integer, String
from sqlalchemy.ext.declarative import declarative_base

BaseModel = declarative_base()
def init_db():
    BaseModel.metadata.create_all(engine)
def drop_db():
    BaseModel.metadata.drop_all(engine)

class User(BaseModel):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    name = Column(CHAR(30)) # or Column(String(30))
init_db()

Declarative_base () creates a BaseModel class, a subclass of which can be automatically associated with a table.
Take the User class as an example. Its s/p attribute is the name of the table in the database. It has two fields, id and name, which are integer and 30 fixed-length characters respectively. Column has some other parameters that I won't explain.
Finally, BaseModel. Metadata. Create_all (engine) will find BaseModel all subclasses, and set up the tables in the database; Drop_all () drops the tables.

Then start using the table:

from sqlalchemy import func, or_, not_

user = User(name='a')
session.add(user)
user = User(name='b')
session.add(user)
user = User(name='a')
session.add(user)
user = User()
session.add(user)
session.commit()
query = session.query(User)
print query #  According to SQL  statements 
print query.statement #  Same as above 
for user in query: #  Traversal query 
    print user.name
print query.all() #  What is returned is a list-like object 
print query.first().name #  When the record doesn't exist, first()  Returns the  None
# print query.one().name #  An exception is thrown when there is no record, or when there are multiple rows 
print query.filter(User.id == 2).first().name
print query.get(2).name #  Get by primary key, equivalent to the above sentence 
print query.filter('id = 2').first().name #  Support string 
query2 = session.query(User.name)
print query2.all() #  Each row is a tuple 
print query2.limit(1).all() #  Most return  1  records 
print query2.offset(1).all() #  From the first  2  A record begins to return 
print query2.order_by(User.name).all()
print query2.order_by('name').all()
print query2.order_by(User.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()
print query2.filter(User.id == 1).scalar() #  If there is a record, return the first element of the first record 
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) #  Multiple spliced  filter  Is also  and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in
query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()
print query4.count()
print session.query(func.count('*')).select_from(User).scalar()
print session.query(func.count('1')).select_from(User).scalar()
print session.query(func.count(User.id)).scalar()
print session.query(func.count('*')).filter(User.id > 0).scalar() # filter()  Contained in the  User , so there is no need to specify a table 
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 #  You can use  limit()  limit  count()  The number of return 
print session.query(func.sum(User.id)).scalar()
print session.query(func.now()).scalar() # func  Can be followed by any function name, as long as the database supports 
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()
query.filter(User.id == 1).update({User.name: 'c'})
user = query.get(1)
print user.name
user.name = 'd'
session.flush() #  Write to the database, but do not commit 
print query.get(1).name
session.delete(user)
session.flush()
print query.get(1)
session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)

Add, delete, change check are involved in, oneself look at the output of the SQL statement to know, so the basic knowledge is introduced here.


So let's start with some more advanced stuff.

How to batch insert large amount of data?

Non-orm methods can be used:

session.execute(
    User.__table__.insert(),
    [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

Above I batch inserted 10000 records, in half a second finished; The ORM approach takes a long time.

How do I prefix executed SQL statements?

Using the prefix_with() method of the query object:

session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

How do I replace a record with an existing primary key?

Session.merge () instead of session.add(), SELECT + UPDATE:

user = User(id=1, name='ooxx')
session.merge(user)
session.commit()

Or using MySQL's INSERT... ON DUPLICATE KEY UPDATE, need to use @compiles decorator, a bit confusing, search it yourself: SQLAlchemy ON DUPLICATE KEY UPDATE and sqlalchemy_mysql_ext.

How to use unsigned integers?

You can use MySQL dialect:

from sqlalchemy.dialects.mysql import INTEGER
id = Column(INTEGER(unsigned=True), primary_key=True)

What if the property name of the model needs to be different from the field name of the table?

A strange requirement was encountered during development. A table of other systems contained a "from" field, which is the keyword in Python.

from_ = Column('from', CHAR(10))

How do I get the length of a field?

The Column will generate a very complex object, and it is difficult to get the length. Here, user-name is used as an example:

User.name.property.columns[0].type.length

How do I specify the use of InnoDB and the use of utf-8 encoding?

The easiest way is to change the default configuration of the database. If you must specify this in code, you can do this:

class User(BaseModel):
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }

MySQL 5.5 now supports the storage of 4 bytes of utf-8 encoded characters, as well as emoji in iOS (e.g. 🍎 Character) belongs to this category.
If you are setting the table, you can change utf8 in the above code to utf8mb4, and charset in DB_CONNECT_STRING as well.
If you set the database or field, it is more convenient to write your own SQL statement. For details, please refer to How to support full Unicode in MySQL databases.
It is not recommended to replace utf8 entirely with utf8 as utf8mb4 is slower and the index takes up more space.

How do I set a foreign key constraint?

from random import randint
from sqlalchemy import ForeignKey

class User(BaseModel):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    age = Column(Integer)

class Friendship(BaseModel):
    __tablename__ = 'friendship'
    id = Column(Integer, primary_key=True)
    user_id1 = Column(Integer, ForeignKey('user.id'))
    user_id2 = Column(Integer, ForeignKey('user.id'))

for i in xrange(100):
    session.add(User(age=randint(1, 100)))
session.flush() #  or  session.commit() After execution, user  The object's  id  Properties are accessible (because  id  It's self-increasing.) 
for i in xrange(100):
    session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100)))
session.commit()
session.query(User).filter(User.age < 50).delete()

When you execute this code, you should encounter an error:

sqlalchemy.exc.IntegrityError: (IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,)

The reason is that deleting data from the user table may cause the foreign key of friendship not to point to a real record. By default, MySQL denies this operation, aka RESTRICT. InnoDB also allows for ON DELETE to be specified as CASCADE NULL, which deletes invalid records in friendship, and SET NULL for the foreign key of those records.
In addition to deletion, it is possible to change the primary key, which can also cause the foreign key of friendship to fail. So we have ON UPDATE. Instead of deleting, CASCADE updates the corresponding foreign key.
In SQLAlchemy, this is how it works:
class Friendship(BaseModel):
    __tablename__ = 'friendship'
    id = Column(Integer, primary_key=True)
    user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
    user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))

How do I join tables?

from sqlalchemy import distinct
from sqlalchemy.orm import aliased

Friend = aliased(User, name='Friend')
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() #  All users with friends 
print session.query(distinct(User.id)).join(Friendship, User.id == Friendship.user_id1).all() #  All users with friends (no duplicates) 
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).distinct().all() #  Same as above 
print session.query(Friendship.user_id2).join(User, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() #  All users who are considered friends by others 
print session.query(Friendship.user_id2).select_from(User).join(Friendship, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() #  Same as above, join  In the opposite direction, but because it's not  STRAIGHT_JOIN , so  MySQL  You can choose your order 
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).all() #  Users and their friends 
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).filter(User.id < 10).all() # id  Less than  10  Users and their friends 
print session.query(User.id, Friend.id).join(Friendship, User.id == Friendship.user_id1).join(Friend, Friend.id == Friendship.user_id2).all() #  two  join Because you are using the same table, you need an alias 
print session.query(User.id, Friendship.user_id2).outerjoin(Friendship, User.id == Friendship.user_id1).all() #  Users and their friends (no friends is  None , using the left connection) 

I didn't mention relationship here. Although it looks very convenient, there is too much to learn and many performance problems to consider, so let's just join by ourselves.

Why can't you delete the records from the in operation?

session.query(User).filter(User.id.in_((1, 2, 3))).delete()

Throws an exception like this:
sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter.

But that's fine:
session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete()

Searching for Sqlalchemy delete subquery, I mentioned one important point of delete: when deleting a record, it will try to delete the qualified object in session by default, and the in operation estimation is not supported, so I made an error. The solution is to delete without synchronizing, and then let all the entities in the session expire:
session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False)
session.commit() # or session.expire_all()

In addition, the update operation has the same parameters, and if it is committed immediately afterwards, adding the synchronize_session=False parameter is faster.


How do I extend the base classes of the model?

Declarative_base () generates a class object whose subclasses typically correspond to a table. If you want to add methods or properties to this base class so that subclasses can use them, you can do this in three ways:

1. Define a new class and set its methods as the methods of the base class:


class ModelMixin(object):
    @classmethod
    def get_by_id(cls, session, id, columns=None, lock_mode=None):
        if hasattr(cls, 'id'):
            scalar = False
            if columns:
                if isinstance(columns, (tuple, list)):
                    query = session.query(*columns)
                else:
                    scalar = True
                    query = session.query(columns)
            else:
                query = session.query(cls)
            if lock_mode:
                query = query.with_lockmode(lock_mode)
            query = query.filter(cls.id == id)
            if scalar:
                return query.scalar()
            return query.first()
        return None
    BaseModel.get_by_id = get_by_id
    @classmethod
    def get_all(cls, session, columns=None, offset=None, limit=None, order_by=None, lock_mode=None):
        if columns:
            if isinstance(columns, (tuple, list)):
                query = session.query(*columns)
            else:
                query = session.query(columns)
                if isinstance(columns, str):
                    query = query.select_from(cls)
        else:
            query = session.query(cls)
        if order_by is not None:
            if isinstance(order_by, (tuple, list)):
                query = query.order_by(*order_by)
            else:
                query = query.order_by(order_by)
        if offset:
            query = query.offset(offset)
        if limit:
            query = query.limit(limit)
        if lock_mode:
            query = query.with_lockmode(lock_mode)
        return query.all()
    BaseModel.get_all = get_all
    @classmethod
    def count_all(cls, session, lock_mode=None):
        query = session.query(func.count('*')).select_from(cls)
        if lock_mode:
            query = query.with_lockmode(lock_mode)
        return query.scalar()
    BaseModel.count_all = count_all
    @classmethod
    def exist(cls, session, id, lock_mode=None):
        if hasattr(cls, 'id'):
            query = session.query(func.count('*')).select_from(cls).filter(cls.id == id)
            if lock_mode:
                query = query.with_lockmode(lock_mode)
            return query.scalar() > 0
        return False
    BaseModel.exist = exist
    @classmethod
    def set_attr(cls, session, id, attr, value):
        if hasattr(cls, 'id'):
            session.query(cls).filter(cls.id == id).update({
                attr: value
            })
            session.commit()
    BaseModel.set_attr = set_attr
    @classmethod
    def set_attrs(cls, session, id, attrs):
        if hasattr(cls, 'id'):
            session.query(cls).filter(cls.id == id).update(attrs)
            session.commit()
    BaseModel.set_attrs = set_attrs

It's clumsy, but it works. It comes with something useful, you know.
2. Set CLS parameters for declarative_base() :
BaseModel = declarative_base(cls=ModelMixin)

This method does not require code like "basemodel.get_by_id = get_by_id." The downside is that PyCharm still cannot find the locations of these methods.
3. Set the cascade abstract__ property:
class BaseModel(BaseModel):
    __abstract__ = True
    __table_args__ = { #  You can omit subclasses  __table_args__  the 
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }
    # ...

This is the simplest and allows you to inherit from multiple classes.

How to use transactions correctly?

Imagine a simple banking system with two users:

class User(BaseModel):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    money = Column(DECIMAL(10, 2))
class TanseferLog(BaseModel):
    __tablename__ = 'tansefer_log'
    id = Column(Integer, primary_key=True)
    from_user = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
    to_user = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
    amount = Column(DECIMAL(10, 2))
user = User(money=100)
session.add(user)
user = User(money=0)
session.add(user)
session.commit()

Then open two sessions and conduct two transfer operations at the same time:
session1 = DB_Session()
session2 = DB_Session()
user1 = session1.query(User).get(1)
user2 = session1.query(User).get(2)
if user1.money >= 100:
    user1.money -= 100
    user2.money += 100
    session1.add(TanseferLog(from_user=1, to_user=2, amount=100))
user1 = session2.query(User).get(1)
user2 = session2.query(User).get(2)
if user1.money >= 100:
    user1.money -= 100
    user2.money += 100
    session2.add(TanseferLog(from_user=1, to_user=2, amount=100))
session1.commit()
session2.commit()

Now look at the results:
>>> user1.money
Decimal('0.00')
>>> user2.money
Decimal('100.00')
>>> session.query(TanseferLog).count()
2L

Both transfers were successful, but only one money was transferred, which is clearly unscientific.

It can be seen that MySQL and InnoDB support transactions, but it is not that simple, also need to manually lock.
First, try the read lock:

user1 = session1.query(User).with_lockmode('read').get(1)
user2 = session1.query(User).with_lockmode('read').get(2)
if user1.money >= 100:
    user1.money -= 100
    user2.money += 100
    session1.add(TanseferLog(from_user=1, to_user=2, amount=100))
user1 = session2.query(User).with_lockmode('read').get(1)
user2 = session2.query(User).with_lockmode('read').get(2)
if user1.money >= 100:
    user1.money -= 100
    user2.money += 100
    session2.add(TanseferLog(from_user=1, to_user=2, amount=100))
session1.commit()
session2.commit()

Now when we execute session1.com MIT (), because both user1 and user2 are read locked by session2, we wait for the lock to be released. After the timeout, session1.com MIT () will throw a timeout exception, if caught, or session2 in another process, then session2.com MIT () will still be able to commit. In this case, a transaction is bound to fail to commit, so the changes are in vain.

Next, take a look at write locks by changing the 'read' in the previous code to 'update'. This time it will block while executing the select:
User1 = session2. Query (User). With_lockmode (" update "). The get (1)
In this way, as long as session1 commits or rolls back during the timeout period, then session2 can judge user1.money normally > Is equal to 100 true or not.
Thus, if you need to change the data, it is best to add a write lock.

So when to use a read lock? If you want to ensure that the data read is not modified during the transaction, and that you do not modify it, you can simply add a read lock.
For example, if I were to query a user's expense record (including both the balance and the transfer record), I could simply make an internal connection between the user and the tansefer_log.
However, if the user's transfer record is particularly large, I want to verify the user's password (assuming it is in the user table) before the query, and then query the transfer record after confirming the match. During these two queries, the user may have received a transfer, causing his money field to be modified, but when I showed it to the user, the user's balance remained unchanged, which is abnormal.
If I add a read lock when I read user, the user cannot receive the transfer (because it cannot be changed by a write lock on another transaction to modify the money field), which ensures that no additional tansefer_log records are found. After I checked the two tables and released the read lock, the transfer could proceed, but the data I displayed was correct and consistent at the time.

Another thing to note is that if the field being queried is not indexed, the entire table will be locked:

session1.query(User).filter(User.id > 50).with_lockmode('update').all()
session2.query(User).filter(User.id < 40).with_lockmode('update').all() #  It won't lock because  id  Is a primary key 
session1.rollback()
session2.rollback()
session1.query(User).filter(User.money == 50).with_lockmode('update').all()
session2.query(User).filter(User.money == 40).with_lockmode('update').all() #  Will wait to unlock because  money  There is no index on 

To avoid this, try this:
money = Column(DECIMAL(10, 2), index=True)

Another point to note is subtransactions.
InnoDB supports subtransactions (savepoint statements) to simplify some logic.
For example, a method that overwrites a database might commit a transaction while executing, but then fail in a subsequent process, failing to roll back the committed transaction in that method. Then you can run that method as a subtransaction:

def step1():
    # ...
    if success:
        session.commit()
        return True
    session.rollback()
    return False
def step2():
    # ...
    if success:
        session.commit()
        return True
    session.rollback()
    return False
session.begin_nested()
if step1():
    session.begin_nested()
    if step2():
        session.commit()
    else:
        session.rollback()
else:
    session.rollback()

In addition, a rollback subtransaction can release locks acquired in this subtransaction, increasing concurrency and reducing deadlock probability.

How do I increment a field?

The easiest way is to get a write lock:

user = session.query(User).with_lockmode('update').get(1)
user.age += 1
session.commit()

If you don't want to read it again, you can write:
session.query(User).filter(User.id == 1).update({
    User.age: User.age + 1
})
session.commit()
#  You can also do operations between fields: 
session.query(User).filter(User.id == 1).update({
    User.age: User.age + User.id
})


Related articles: