note
This article was last updated on September 15, 2023, 1 year ago. The content may be out of date.
In a previous post, I mentioned using yt-dlp
as a service to extract YouTube information. The method I used has one serious problems: it uses a custom protocol that serves one request per connection and dumps the error as a string if any. To find out if there is an error, we have to parse the result first and if it’s not a valid json string, we can be sure there’s an error.
Golang stdlib provides a rpc package. Normally it’s supposed to be used with services written in go, it can be used with json rpc, and json rpc is a very simple protocol.
Server Implementation
As before, we’re using asyncio due to the fact that yt-dlp is I/O bound thus benefits more from asyncio than from multithread/multiprocess.
|
|
- First, we define the request handler when a request has been received. Since there is only one type of request (calling yt-dlp), we just pass the parameter to yt-dlp. The result and error are encoded as defined by the json rpc specification.
note
We don’t keep track of the request id nor check whether the request contains all the necessary field for simplicity’s sake. The client will have to handle them properly.
- We define several variables that will keep track of running tasks and the connection status. We keep those tasks in a set which is an iterable so that we can wait for one of them to complete.
note
There can only be one python coroutine that reads from the connection. And python asyncio task can be assigned a name when created. Use it to differentiate among different types of tasks.
The rest of the code deal with the completed tasks and connection tear-down. We break out of the loop when any exception is encountered, which can only happen when there is an error with the underlying socket or the client has sent an invalid request. As an edge case, there won’t be an exception when the client has closed the connection and the line received will be empty.
Client Usage
Golang rpc client usage is straight forward and a client can be safely shared among a group of goroutines. The method name will be sent as is, as there is no restriction on the method name whatsoever.
And to tell between a connection error and a request error, we can test to see if the error can be type-asserted to rpc.ServerError
.
info
The error type is rpc.ServerError
not *rpc.ServerError
.
type ydlRequest struct {
Url string `json:"url"`
WritePages bool `json:"write_pages"`
}
type ydlJson struct {
Duration int `json:"duration"`
RequestedFormats []struct {
FormatID string `json:"format_id"`
Url string `json:"url"`
} `json:"requested_formats"`
}
func callYdl(id string) {
client, _ := jsonrpc.Dial("unix", "/path/to/yt-dlp.socket")
var r ydlRequest
r.Url = "https://www.youtube.com/watch?v=" + id
r.WritePages = true
var yj ydlJson
_ = client.Call("ydl", r, &yj)
// yj contains information about assets url
}
To use a *rpc.Client
with many goroutines and reconnect in case of an unrecoverable error, we can use the following code:
// global variables
var ready int32
var lock sync.Mutex
var client *rpc.Client
func callYdl(id string) {
if atomic.LoadInt32(&ready) == 0 {
ydlLock.Lock()
if ready == 0 {
var err error
client, err = jsonrpc.Dial("unix", "/path/to/yt-dlp.socket")
if err != nil {
ydlLock.Unlock()
return
}
atomic.StoreInt32(&ready, 1)
}
ydlLock.Unlock()
}
var r ydlRequest
r.Url = "https://www.youtube.com/watch?v=" + id
r.WritePages = true
var yj ydlJson
err := client.Call("ydl", r, &yj)
if err != nil {
if _, ok := err.(rpc.ServerError); !ok {
if atomic.LoadInt32(&ready) == 1 {
ydlLock.Lock()
if ready == 1 {
_ = client.Close()
atomic.StoreInt32(&ready, 0)
}
ydlLock.Unlock()
}
}
return
}
// yj contains information about assets url
}
Both creating the client and closing it are similar: first we use an atomic load to test if the status is already changed; then we use a sync.Mutex
to serialize the access so that only one goroutine can change the status, and if the status is not changed, we take the necessary actions and atomically change the status if successful; finally we unlock the mutex if we locked it previously.
info
Inside the critical section when comparing the status we used a plain comparison instead of an atomic load because there is no way the status can be changed at that moment. When updating the status we use an atomic store because there may be a goroutine testing if the status has already changed.