Using GoLang's coroutines to handle Laravel jobs

Jun 6, 2023 8 min read

Laravel includes an excellent queue component that allows us to delegate time-consuming tasks to background processes. It allows us to scale our web service by responding to requests faster and thus handling more requests.

A queue worker, no matter how well optimized, can only handle one job at a time. That means we'll need more workers to handle more jobs at the same time.

Each worker is a PHP process, and each process is isolated from the others by having its own memory space. If we start too many processes, they may consume all of our machine's memory, causing everything to slow down. Furthermore, managing too many processes places a strain on the operating system and the CPU.

The majority of workload in web-based applications is I/O intensive, which means that a process is idle the majority of the time waiting for an I/O event. And even when a process is idle, it continues to occupy memory space and a portion of the OS's management workload.

Wouldn't it be great if we could use the time that a process is idle to do more work? We won't need to start as many processes this way because a single process can handle other tasks while it waits for an I/O event.

This is known as concurrency. A single process will switch between multiple tasks and handle them all at the same time. When a task is in the wait state, the process searches for other tasks to handle.

Multi-threading or Co-routines can be used to achieve this concurrency. PHP does not support multi-threading, but it does support co-routines through the Swoole PHP extension.

In one of my Laravel projects, I have a type of job that is queued between 50 and 60 million times per day. This job is straightforward: it reads a file from S3, processes it, and sends the output to an external endpoint.

The PHP process that handles this task is almost always idle, waiting for data from S3 and a response from the external endpoint. This is an ideal task for which concurrency could be advantageous.

Even though Swoole could have provided co-routine concurrency, I chose to use my knowledge of Go to create a single binary that runs as a daemon and processes all of these jobs concurrently. A binary has no dependencies, is lightweight, and can be deployed to the server with a simple scp command.

The task

Here's a pseudocode for the task we need to perform:

$messages = $sqs->getMessages();

foreach ($messages as $message){
    $key = $message->extractKey();

    $object = $s3->getObject($key);

    $requestBody = someSimpleProcessing($object);

    Http::post('https://...', $requestBody);

    $sqs->deleteMessage($message['ReceiptHandle']);
}

We dequeue messages from a SQS queue, then go through each message, extracting an S3 object key, bringing the object from S3, generating a request body, sending the request to an endpoint, and finally deleting the message.

Here's a list of the waiting that happens:

That's where most of the process time is spent. Waiting for responses.

Writing a worker in Go

We'll start by creating a struct:

import (
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)

type Worker struct {
    sqsClient *sqs.Client
    s3Client  *s3.Client
}

A struct is a collection of fields. Our struct here has two fields of type pointers. The first field points to the memory address of a struct representing the SQS client, while the second field points to the memory address of the S3 client.

Now, let's add a method to our struct:

func (worker Worker) Work() {

}

The Work method has a Worker struct as its receiver. In other words, messages sent through this method is received by a Worker. This is Go's way of object oriented programming.

Inside the method, we will start an infinite loop and call the ReceiveMessage method of the sqsClient to receive messages from the queue:

func (worker Worker) Work() {
    for {
        results, err := worker.sqsClient.ReceiveMessage(
            context.Background(),
            &sqs.ReceiveMessageInput{
                QueueUrl: ptr.String("QUEUE_URL_HERE"),
                MaxNumberOfMessages: 10,
            },
        )
    }
}

Each method in the AWS SDK expectes a context and a struct that represents the input that will be passed to the underlaying HTTP client.

A context is a way of grouping information about the environment in which the code is being executed. Contexts become useful when your program is running as a web server that receives and handles multiple requests at the same time. In this case, the request handling code will need to know which request it is dealing with. That's when we pass the request context to the code, so it can extract the request parameters and listen for web server signals related to a specific request.

If you take a look at the code above, you can see that the ReceiveMessage has mutiple return values; one if them contains the results of the HTTP request made by the SDK, and the other contains an error (if there's any).

Consider this to be a PHP method that returns results and can also throw an exception. Exceptions do not exist in Go. Errors are returned in the same way that other return values are.

To catch an error, we need to assign the error returned to a variable and check this variable:

if err != nil {
    log.Printf("error: %v", err)
}

If we convert this to PHP code, it would look like this:

try {
    $results = $client->receiveMessage();
} catch (Throwable $err) {
    log($err);
}

Back to Go, once we get some results from the SDK method, we will loop through the messages and process them:

for _, message := range results.Messages {
    // TODO: Extract S3 key from Laravel's payload

    // TODO: Get the object from S3

    // TODO: Process the object

    // TOODO: Send the results to the endpoint
}

To extract the S3 object key from the message, I used simple regex to match a pattern inside the message payload created by Laravel.

regex, err := regexp.Compile("report-(.*).json")

for _, message := range results.Messages {
    s3Key := fmt.Sprintf(
        "report-%s.json",
        regex.FindStringSubmatch(*message.Body)[1],
    )
}

Now that we have the key, we can use the S3 client and call the GetObject method and retrieve the object body:

body, err := worker.s3Client.GetObject(s3Key, "BUCKET-NAME")

And then we do the processing, generate the request body, and send the request:

// TODO: Process the object

payload, _ := json.Marshal(map[string]string{
   "KEY":  "VALUE",
})

response, err := http.Post(
    "https://...",
    "application/json",
    bytes.NewBuffer(payload),
)

We pass the payload to the Post method in the form of a buffer. The underlying HTTP client continues to read from this buffer and send its content over the open HTTP connection until all content has been sent.

Final step is to delete the message from the SQS queue:

err = worker.sqsClient.DeleteMessage("QUEUE_URL_HERE", *message.ReceiptHandle)

The astrisk before message.ReceiptHandle is used for dereferencing, that is bringing the actual value stored in the memory space to which a pointer points. In our case, the ReceiptHandle field holds a string pointer. By dereferencing, we copy that string from memory and pass it to the DeleteMessage method.

All together

Here's how the complete code looks like:

func (worker Worker) Work() {
    for {
        results, err := worker.sqsClient.ReceiveMessage(
            context.Background(),
            &sqs.ReceiveMessageInput{
                QueueUrl: ptr.String("QUEUE_URL_HERE"),
                MaxNumberOfMessages: 10,
            },
        )

        regex, err := regexp.Compile("report-(.*).json")

        for _, message := range results.Messages {
            s3Key := fmt.Sprintf(
                "report-%s.json",
                regex.FindStringSubmatch(*message.Body)[1],
            )

            body, err := worker.s3Client.GetObject(s3Key, "BUCKET-NAME")

            // TODO: Process the object

            payload, err := json.Marshal(map[string]string{
               "KEY":  "VALUE",
            })

            response, err := http.Post(
                "https://...",
                "application/json",
                bytes.NewBuffer(payload),
            )

            err = worker.sqsClient.DeleteMessage(
                "QUEUE_URL_HERE",
                *message.ReceiptHandle,
            )
        }
    }
}

To make the code easier to read, I removed error handling and other details.

Running the worker

In order to run our worker, we need to create the AWS clients for SQS and S3:

awsConfig, err := config.LoadDefaultConfig(context.Background())

sqsClient = sqs.NewFromConfig(*awsConfig)

s3Client = s3.NewFromConfig(*awsConfig)

Then, we can create a worker struct and call the Work method:

worker := Worker{
    sqsClient: sqsClient,
    s3Client: s3Client,
}

worker.Work()

If we compile and run that program, the worker will start an infinite loop that dequeues 10 jobs and process them one by one.

Utilizing concurrency

So far, the process we started to run the executable handles jobs one at a time. Let's put Go's concurrency to use by running multiple workers inside coroutines.

waitGroup := sync.WaitGroup{}

waitGroup.Add(700)

for i := 1; i <= 700; i++ {
    go daemon(sqsClient, s3Client, &waitGroup)
}

waitGroup.Wait()

We create a wait group and tell it to wait for 700 coroutines to complete. Then we create a loop in which 700 coroutines run a daemon function.

Finally, we start the waiting by calling waitGroup.Wait(). Now our program will not exit until the 700 coroutines are done.

The daemon function would look like this:

func daemon(sqsClient *sqs.Client, s3Client *sqs.Client, waitGroup *sync.WaitGroup) {
    defer waitGroup.Done()

    worker := Worker{
        sqsClient: sqsClient,
        s3Client: s3Client,
    }

    worker.Work()
}

We first defer a call to waitGroup().Done(), which marks one coroutine as done. By deferring the call, Go will only run the method when the daemon() function is about to return.

Then, we create a worker and invoke the Work method. Remember that this worker.Work() method starts an infinite loop and will only return if we tell it to. That is why it is is critical that we handle all errors returned within the Work method and decide whether the worker should do another iteration, sleep, or return (if the error is unrecoverable).

The execution model

With what we have, the executable will start 700 coroutines that dequeue and process jobs. The Go scheduler switches to another coroutine that requires CPU time while one is waiting for an I/O event. It will also divide the CPU time among the coroutines so that no one coroutine consumes all of the CPU time and blocks all others.

When we monitor our machine's CPU utilization, we'll be able to see if the available CPU cores are fully utilized or not. If they are always fully utilized, it may indicate that we have too many coroutines competing for CPU time. If they're running at less than 100%, it means there's some waiting going on, and we can add more coroutines if necessary.

Other metrics to consider are memory utilization, queue size, and SQS's empty receives.

If the memory utilization is increasing, we have a memory leak somewhere in our code. If the queue is not emptying quickly enough, it means we need to add more workers. And if there are too much empty receives, it means that there are too many workers calling ReceiveMessage on the SQS queue while it is empty.

There are other details that I haven't covered in this post as I'm trying to keep it short and simple.

Learning Go

Go is one of the programming languages that quickly became close to my heart. I remember how much fun I had learning PHP when I was 13 years old, and Go brings me the same joy and excitement.

Not only is it a joy to use, but its concurrency model enabled me to improve the performance of several parts of software that I write. Particularly in the case of background processing.

Also, because the code is compiled into a binary that I can easily share and run on any machine with no dependencies, I was able to write infrastructure orchestration programs that could run in any CI environment or machine.

If you're interested in learning Go, I'm currently working on a series of premium Go tutorials aimed at PHP developers. I'm explaining Go syntax and concepts by comparing it to PHP and other familiar languages.

Add your email address to the waiting list and I'll let you know once publishing starts.


I'm Mohamed Said. I work with companies and teams all over the world to build and scale web applications in the cloud. Find me on twitter @themsaid.


Get updates in your inbox.

Menu

Log in to access your purchases.

Log in

Check the courses I have published.


You can reach me on Twitter @themsaid or email [email protected].


Join my newsletter to receive updates on new content I publish.