Based on the asyncio asynchronous protocol framework the live live barrage of B station is collected
- 2020-05-10 18:23:14
- OfStack
preface
Although the title is for the whole station, only the full-day barrage collection of top 100 studio has been done so far.
Danmaku collection system is based on the previous B station live danmaku Python version modified. Specific protocol analysis can be seen in the following article.
The live live barrage protocol is directly based on the TCP protocol, so it would be difficult for the B station to take countermeasures against such behavior. There should be technology that I don't know about to detect malicious behavior like mine.
I've been able to connect 100 rooms at once, and I've been able to connect a single room 100 times, and I've had no problems.
>
150 will close the link.
The selection of broadcast room
At present, it is relatively simple to select the live broadcast room in the bullet screen collection system, and the level top100 is directly selected.
Will modify this part to a timing to http: / / live bilibili. com/all view new began broadcasting studio, and dynamically add tasks.
Asynchronous task and barrage storage
The collection system still USES the asyncio asynchronous protocol framework, which is added to loop for each studio using the following method.
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
In fact, if you put the heartbeat task HeartbeatLoop into connectorServer to start, the code looks more elegant. But I do this because I need to maintain a task list, which will be described later.
I spent some time selecting the barrage store.
Database storage is a process of synchronizing IO, which blocks the bullet screen collection task. Although there is an asynchronous interface like aiomysql, it is cumbersome to configure the database, and my assumption is that this small system can be easily deployed.
Finally, I chose to use my own sqlite3. However, sqlite3 cannot do parallel operations, so a single thread is opened for database storage. In another thread, 100 * 2 tasks collect all the information about the barrage and the number of people, and put them into the queue commentq, numq. The storage thread wakes up every 10s, writes the data from the queue into sqlite3, and empties the queue.
With the combination of multithreading and asynchrony, the network traffic is not blocked.
Possible connection failure scenarios are handled
Bullet screen protocol is directly based on TCP, with strong direct correlation between bit and bit, and it is easy to throw Exception when there is a 1 Dan parsing error (I feel that although TCP is a reliable transmission, it is possible for the server of B station to make an error). Therefore, it is necessary to design an automatic reconnection mechanism.
As mentioned in the asyncio document,
Done means either that a result / exception are available, or that the future was cancelled.
If the function returns normally, throws an exception, or is cancel, the current task exits. You can use done().
There are two tasks corresponding to each broadcast room. The parsing task is the easiest to hang, but it will not affect the heartbeat task, so the corresponding heartbeat task must be found and ended.
Use the dictionary to record two tasks per room when creating the task,
self.tasks[url] = [task1, task2]
During the operation, check every 10s,
for url in self.tasks:
item = self.tasks[url]
task1 = item[0]
task2 = item[1]
if task1.done() == True or task2.done() == True:
if task1.done() == False:
task1.cancel()
if task2.done() == False:
task2.cancel()
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task11 = asyncio.ensure_future(danmuji.connectServer())
task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
self.tasks[url] = [task11, task22]
In fact, I only saw one scene where the task failed, because the host room was blocked, so I could not enter the live broadcast room.
conclusion
Finally attach this source GITHUB address https: / / github com/lyyyuna/bilibili_danmu_colloector