Laravel 8 ships with a neat feature that allows us to dispatch a group of jobs to the queue to be executed in parallel. We can monitor these jobs and execute certain logic when any of the jobs fail or when all jobs are processed.
In this post, we're going to look into how this works under the hood.
Here's an example from the official docs:
$batch = Bus::batch([
new ProcessPodcast(Podcast::find(1)),
new ProcessPodcast(Podcast::find(2)),
new ProcessPodcast(Podcast::find(3)),
new ProcessPodcast(Podcast::find(4)),
new ProcessPodcast(Podcast::find(5)),
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->name('Process Podcasts')
->allowFailures(false)
->onConnection('redis')
->onQueue('podcasts')
->dispatch();
Storing the Batch
When you call dispatch()
, Laravel stores the batch information in the database:
$this->connection->table($this->table)->insert([
'id' => $id,
'name' => $batch->name,
'total_jobs' => 0,
'pending_jobs' => 0,
'failed_jobs' => 0,
'failed_job_ids' => '[]',
'options' => serialize($batch->options),
'created_at' => time(),
'cancelled_at' => null,
'finished_at' => null,
]);
This snippet is from the store()
method of the Illuminate\Bus\DatabaseBatchRepository
class. Each batch is assigned a UUID and an optional name. The batch options array is also stored as a string after being serialized.
That options
array holds the closures provided to the then
, catch
, and finally
methods. It also holds the values passed to allowFailures
, onConnection
, and onQueue
.
The closures are serialized by the framework's implementation of Opis\Closure\SerializableClosure
so they can be stored in the database.
Dispatching the Jobs
Now that the batch information is stored in the database, Laravel stores the batch ID in each job instance we pass to the batch and then dispatches the jobs to the queue:
$jobs->each->withBatchId($this->id);
$this->repository->incrementTotalJobs(
$this->id,
count($jobs)
);
$this->queue
->connection($this->options['connection'] ?? null)
->bulk(
$jobs->all(),
$data = '',
$this->options['queue'] ?? null
);
This snippet is from the add()
method of the Illuminate\Bus\Batch
class.
While dispatching the jobs to the queue, Laravel increments the jobs count in the database row where the batch is stored. That way it can track the remaining jobs to check whether the batch has finished or not.
You can also notice that Laravel uses the bulk()
method to dispatch the jobs. That way it can send all the jobs to the queue store in a single transaction instead of calling dispatch()
for every single job.
Monitoring the Batch
After each successful job execution, the recordSuccessfulJob()
method of the Illuminate\Bus\Batch
class is called. This method decrements the number of pending batch jobs in the database and calls the proper callbacks:
$counts = $this->decrementPendingJobs($jobId);
if ($counts->pendingJobs === 0) {
$this->repository->markAsFinished($this->id);
}
if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
collect($this->options['then'])->each(/* Invoke */);
}
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
collect($this->options['finally'])->each(/* Invoke */);
}
If there aren't any pending jobs in the batch, Laravel will mark the batch as finished by updating the finished_at
timestamp in the database. It'll also call any then
callbacks if all jobs in the batch ran successfully, and any finally
callbacks if all the jobs in the batch were attempted at least once.
Similarly, the recordFailedJob()
method is called after any job failure. And by failure I mean consuming all the attempts of a job.
$counts = $this->incrementFailedJobs($jobId);
if ($counts->failedJobs === 1 && ! $this->allowsFailures()) {
$this->cancel();
}
if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
$batch = $this->fresh();
collect($this->options['catch'])->each(/* Invoke */);
}
if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
$batch = $this->fresh();
collect($this->options['finally'])->each(/* Invoke */);
}
If the batch doesn't allow failure, it's going to be cancelled after the first recorded failure. The cancelled_at
field in the database will be updated with the current timestamp.
In addition to that, any catch
callbacks will be called it was the first recorded failure, and any finally
callbacks will be called if all the jobs in the batch were attempted at least once.
The recordSuccessfulJob()
and recordFailedJob()
methods are called from within the Illuminate\Queue\CallQueuedHandler
class.
Exploring Further
If you wish to learn further details on how batches works. You can check these classes and methods:
-
Illuminate\Bus\Dispatcher::batch()
. -
Illuminate\Bus\PendingBatch
. -
Illuminate\Bus\DatabaseBatchRepository
. -
Illuminate\Bus\Batch
.
I'm Mohamed Said. I work with companies and teams all over the world to build and scale web applications in the cloud. Find me on twitter @themsaid.