Based on the asyncio asynchronous protocol framework the live live barrage of B station is collected

  • 2020-05-10 18:23:14
  • OfStack

preface

Although the title is for the whole station, only the full-day barrage collection of top 100 studio has been done so far.

Danmaku collection system is based on the previous B station live danmaku Python version modified. Specific protocol analysis can be seen in the following article.

The live live barrage protocol is directly based on the TCP protocol, so it would be difficult for the B station to take countermeasures against such behavior. There should be technology that I don't know about to detect malicious behavior like mine.

I've been able to connect 100 rooms at once, and I've been able to connect a single room 100 times, and I've had no problems. > 150 will close the link.

The selection of broadcast room

At present, it is relatively simple to select the live broadcast room in the bullet screen collection system, and the level top100 is directly selected.

Will modify this part to a timing to http: / / live bilibili. com/all view new began broadcasting studio, and dynamically add tasks.

Asynchronous task and barrage storage

The collection system still USES the asyncio asynchronous protocol framework, which is added to loop for each studio using the following method.


danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())

In fact, if you put the heartbeat task HeartbeatLoop into connectorServer to start, the code looks more elegant. But I do this because I need to maintain a task list, which will be described later.

I spent some time selecting the barrage store.

Database storage is a process of synchronizing IO, which blocks the bullet screen collection task. Although there is an asynchronous interface like aiomysql, it is cumbersome to configure the database, and my assumption is that this small system can be easily deployed.

Finally, I chose to use my own sqlite3. However, sqlite3 cannot do parallel operations, so a single thread is opened for database storage. In another thread, 100 * 2 tasks collect all the information about the barrage and the number of people, and put them into the queue commentq, numq. The storage thread wakes up every 10s, writes the data from the queue into sqlite3, and empties the queue.

With the combination of multithreading and asynchrony, the network traffic is not blocked.

Possible connection failure scenarios are handled

Bullet screen protocol is directly based on TCP, with strong direct correlation between bit and bit, and it is easy to throw Exception when there is a 1 Dan parsing error (I feel that although TCP is a reliable transmission, it is possible for the server of B station to make an error). Therefore, it is necessary to design an automatic reconnection mechanism.

As mentioned in the asyncio document,

Done means either that a result / exception are available, or that the future was cancelled.

If the function returns normally, throws an exception, or is cancel, the current task exits. You can use done().

There are two tasks corresponding to each broadcast room. The parsing task is the easiest to hang, but it will not affect the heartbeat task, so the corresponding heartbeat task must be found and ended.
Use the dictionary to record two tasks per room when creating the task,

self.tasks[url] = [task1, task2]

During the operation, check every 10s,


for url in self.tasks:
  item = self.tasks[url]
  task1 = item[0]
  task2 = item[1]
  if task1.done() == True or task2.done() == True:
    if task1.done() == False:
      task1.cancel()
    if task2.done() == False:
      task2.cancel()
    danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
    task11 = asyncio.ensure_future(danmuji.connectServer())
    task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
    self.tasks[url] = [task11, task22]

In fact, I only saw one scene where the task failed, because the host room was blocked, so I could not enter the live broadcast room.

conclusion

The number of people on B is calculated according to the number of links to the bullet screen server. By manipulating the number of links, you can instantly increase any number of people to watch, is there a business opportunity? Over the course of a few days, it was found that most of the rooms were available even if they were not live > 5, including the wee hours. I can only guess that there are also people like me collecting barrage at 24h. top100 average 1 day 40M barrage data. What can a collected barrage do? I haven't thought about it yet, but maybe we can use it for user behavior analysis

Finally attach this source GITHUB address https: / / github com/lyyyuna/bilibili_danmu_colloector


Related articles: