note

This article was last updated on September 15, 2023, 5 months ago. The content may be out of date.

In a previous post, I mentioned using yt-dlp as a service to extract YouTube information. The method I used has one serious problems: it uses a custom protocol that serves one request per connection and dumps the error as a string if any. To find out if there is an error, we have to parse the result first and if it’s not a valid json string, we can be sure there’s an error.

Golang stdlib provides a rpc package. Normally it’s supposed to be used with services written in go, it can be used with json rpc, and json rpc is a very simple protocol.

Server Implementation

As before, we’re using asyncio due to the fact that yt-dlp is I/O bound thus benefits more from asyncio than from multithread/multiprocess.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
async def ydl(data):
    request_id = data['id']
    try:
        params = data['params'][0]
        url = params.pop('url')
        yt = YoutubeDL(params)

        info = await asyncio.to_thread(yt.extract_info, url, download=False)
        return {'id': request_id, 'result': info, 'error': None}
    except Exception:
        return {'id': request_id, 'result': None, 'error': traceback.format_exc()}


async def handle_ydl(reader, writer):
    tasks = set()
    is_exit = False
    is_reading = False
    while not is_exit:
        if not is_reading:
            task = asyncio.create_task(reader.readline(), name='readline')
            tasks.add(task)
            is_reading = True
        done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

        for d in done:
            tasks.discard(d)

            exception = d.exception()
            if exception is not None:
                is_exit = True
                break

            if d.get_name() == 'readline':
                is_reading = False
                line = d.result()
                if line == bytes():
                    is_exit = True
                    break
                data = json.loads(line)
                y = asyncio.create_task(ydl(data), name='ydl')
                tasks.add(y)
            elif d.get_name() == 'ydl':
                reply = json.dumps(d.result())
                writer.write(bytes(reply, 'utf-8'))
                writer.write(b'\n')
                await writer.drain()

    writer.close()
    await writer.wait_closed()
  1. First, we define the request handler when a request has been received. Since there is only one type of request (calling yt-dlp), we just pass the parameter to yt-dlp. The result and error are encoded as defined by the json rpc specification.

note

We don’t keep track of the request id nor check whether the request contains all the necessary field for simplicity’s sake. The client will have to handle them properly.

  1. We define several variables that will keep track of running tasks and the connection status. We keep those tasks in a set which is an iterable so that we can wait for one of them to complete.

note

There can only be one python coroutine that reads from the connection. And python asyncio task can be assigned a name when created. Use it to differentiate among different types of tasks.

The rest of the code deal with the completed tasks and connection tear-down. We break out of the loop when any exception is encountered, which can only happen when there is an error with the underlying socket or the client has sent an invalid request. As an edge case, there won’t be an exception when the client has closed the connection and the line received will be empty.

Client Usage

Golang rpc client usage is straight forward and a client can be safely shared among a group of goroutines. The method name will be sent as is, as there is no restriction on the method name whatsoever.

And to tell between a connection error and a request error, we can test to see if the error can be type-asserted to rpc.ServerError.

info

The error type is rpc.ServerError not *rpc.ServerError.

type ydlRequest struct {
	Url           string          `json:"url"`
	WritePages    bool            `json:"write_pages"`
}

type ydlJson struct {
	Duration         int `json:"duration"`
	RequestedFormats []struct {
		FormatID string `json:"format_id"`
		Url      string `json:"url"`
	} `json:"requested_formats"`
}

func callYdl(id string) {
	client, _ := jsonrpc.Dial("unix", "/path/to/yt-dlp.socket")

	var r ydlRequest
	r.Url = "https://www.youtube.com/watch?v=" + id
	r.WritePages = true
	
	var yj ydlJson
	_ = client.Call("ydl", r, &yj)
	
	// yj contains information about assets url
}

To use a *rpc.Client with many goroutines and reconnect in case of an unrecoverable error, we can use the following code:

// global variables
var ready int32
var lock sync.Mutex
var client *rpc.Client

func callYdl(id string) {
    if atomic.LoadInt32(&ready) == 0 {
        ydlLock.Lock()
        if ready == 0 {
            var err error
            client, err = jsonrpc.Dial("unix", "/path/to/yt-dlp.socket")
            if err != nil {
                ydlLock.Unlock()
                return
            }
            atomic.StoreInt32(&ready, 1)
        }
        ydlLock.Unlock()
    }
    
    var r ydlRequest
	r.Url = "https://www.youtube.com/watch?v=" + id
	r.WritePages = true
	
	var yj ydlJson
	err := client.Call("ydl", r, &yj)
	if err != nil {
		if _, ok := err.(rpc.ServerError); !ok {
			if atomic.LoadInt32(&ready) == 1 {
				ydlLock.Lock()
				if ready == 1 {
					_ = client.Close()
					atomic.StoreInt32(&ready, 0)
				}
				ydlLock.Unlock()
			}
		}
		return
	}
    
    // yj contains information about assets url	
}

Both creating the client and closing it are similar: first we use an atomic load to test if the status is already changed; then we use a sync.Mutex to serialize the access so that only one goroutine can change the status, and if the status is not changed, we take the necessary actions and atomically change the status if successful; finally we unlock the mutex if we locked it previously.

info

Inside the critical section when comparing the status we used a plain comparison instead of an atomic load because there is no way the status can be changed at that moment. When updating the status we use an atomic store because there may be a goroutine testing if the status has already changed.