Performance — Distributed Batch Process — Using as many servers as you have

Sometimes we have too many business data to process that even improving the algorithms and using strategies like threads seems to be not enough to complete everything on schedule.

The data processing strategy using “more than one computer” is not new at all. But there’s times when we lack the idea of “how can I use this kind of thing on my project”.

Below I will explain the overall idea, tools and the strategy itself. Then, I will provide an example using C#, Postgres and Docker.

Disclaimer: its not a framework, it’s not the fittest approach of anything. It’s just a working idea that, of course, should be improved before using in a production environment.

The overall idea:

We still have a monolithic system, but we starting feeling that the amount of data we have, become too large to process in a single computer or, we don’t want to vertical scale it anymore.

The concept written here it’s not a monolithic system solver, but it can start a journey to horizontalize some hard performed tasks. The strategy still need a centralized relational database, it’s not mandatory, but knowing that most systems still don’t have a “cloud nature”, I think an example using it, it’s a good start.

In order to distribute some tasks we will need:

  • A program that will create the list of tasks (jobs, batches, processes) that need to be performed;
  • Another program, that will look more like a service than an application, has a job of capture a bulk of tasks, process them and sinalize it’s conclusion;
  • A centralized task list storage (in this case, a relational database), that will control which tasks are being performed (blocking new services from capturing them again).

An User Application will queue all needed tasks on a centralized task manager then, servers will seek “n” tasks at manager, retrieving them, processing and marking as completed.

Disclaimer 2: servers won’t help each other to perform the same “task” (maybe it was split before queuing), instead they will process entirely different tasks.

  1. We will need two interfaces, the first one represents the task itself and the second its execution parameters that I called “contract”;
  2. Using both interfaces we can create the task that we want to distributed in the most granular, not deadlocking way;
  3. Create the storage task engine, in this case, a relational table called “batchjob” that will store some data and some reflection data. The suggestion is the structure have, at least, the fields below:
    ClassName => string that will store the class job program name;
    ContractName => string that will store the contract name class;
    ContractJson => string that will store serialized contract data;
    Status => int or enum that hold the task status, example: 0 = need to be performed, 1 = done;
    StartedAt => date and time when the task was started;
    StoppedAt => date and time when the task was completed.
  4. The client application need to create the task list and persist the data at the storage engine, something like:
    1) MyBatchClass | MyContractClass | {“parm1” : “0”, “parm2”: “1” }| 0 | NULL | NULL
    2) MyBatchClass | MyContractClass | {“parm1” : “2”, “parm2”: “3” }| 0 | NULL | NULL
    3) MyBatchClass | MyContractClass | {“parm1” : “4”, “parm2”: “5” }| 0 | NULL | NULL
    4) MyBatchClass | MyContractClass | {“parm1” : “6”, “parm2”: “7” }| 0 | NULL | NULL
  5. The service application must collect the task and hold to itself. There are a lot of strategies to peek something and mark as “processing”. Here, using a relational database can be really useful, because you can use artifacts like: use repeatable read isolation, lock the row and skip locked rows when selecting new tasks.
    If you choose not use a storage database with such technology, you will need to implement some kind of new status like “executing”, preventing other services to peek the same task and, maybe, implement something to unmark the flag on error (or putting the task back to list);
  6. Run the service application in more than one computer, VM, container.

Implementing

My choice is always biased because I really like the C# code structure and .NET [Core] Framework. It has a beautiful documentation, many examples all around the internet and the code it’s pretty nice to read.

But, if you don’t use or don’t have an affinity with C#, it’s not a problem because this example was a bit dried to be understandable and, hopefully, easy to replicate in your preferable technologies.

The choose scenario is the same as the other post, where we needed to calculate how much (in money) and how much (in quantity) we bought from every vendor in the database.

Well, as I said before in this post, I have choose a relational database to be the task list manager.

In this case, I used my previous posts stack that consists on a Postgres with Microsoft AdventureWorks database. The news here is that I updated the Postgres version to ≥ 9.5 because I needed the SKIP LOCKED option during querying. I did not test the strategy on others DBMS, but you can find SKIP LOCKED on Oracle and READPAST on SQL Server, that should work pretty much the same as Postgres.

But, why?
Well, to prevent different services to retrieve the same task, the strategy was open a transaction, in this case a Repeatable Read transaction, that will peek 1 or more tasks “for update”, during its processing these rows will be locked on the database engine so, every other services that are running, when they will “select skip locked”, the service will just see tasks that weren’t retrieve at that time, because they are on other’s transactions.

The query peeking task will look something like:

SELECT * FROM blog.batchjob WHERE Status = 0 FOR UPDATE SKIP LOCKED LIMIT ?

The C# choice was already explained, but there are few things that was choose inside the .NET Core that deserve an explanation.

You can download the entire source HERE.

The Entity Framework Core
I like to use Entity Framework because you have a lot of possibilites to do the same stuff, it all depends of your scenario. In my case EF is always the first choice because you can measure the query performance using the Visual Studio Profiler, among other things like context migrations, Code First migrations and so on.

In order to persists the migrations, I changed the EF extended DbContext class to use a table below the “blog” schema, this schema is located inside adventureworks database.

Snippet:

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
=> optionsBuilder
.UseNpgsql(string.Format(
"Server={0};Username={1};Database={2};Port={3};Password={4};SSLMode=Prefer",
Host,
User,
DBname,
Port,
Password),
x => x.MigrationsHistoryTable("_ef_migration_history", "blog")
)
.UseLowerCaseNamingConvention();

Furthermore, I created a model class BatchJob (Code First) that will be the table that persists the tasks, this class it’s in a DbSet on BlogContext:

[Index(nameof(Status))]
public class BatchJob
{
public long Id { get; set; }
public string ClassName { get; set; } public string ContractName { get; set; } public string ContractJson { get; set; } public int Status { get; set; } = 0; public DateTime StartedAt { get; set; } public DateTime StoppedAt { get; set; }
}

Then, I ran the Code First migration tool:
As almost always Windows and Visual Studio user, to run the migration you need to follow these steps:

1) Set “DataLayer” project as Startup Project:

2) Run the Package Manager Console

3) Run the Migration Creation tool command

Add-Migration InitialMigration -Context BlogContext -OutputDir Migrations/Blog

4) Run the Migration (it’s not mandatory, because in the source I call the migration from C# on the client application):

Update-Database -Context BlogContext

5) In case you miss something or if something goes wrong, you can revert the migration using:

Update-Database 0 -Context BlogContext

The client, code, C#, reflection and serializer overall idea
To make the example work, some shortcuts were taken. Like mentioned before, Migration is called on client application, here named as: Blog.Performance.Batch.

Running the migration on the client application:

static void Main(string[] args)
{
Migration();
CreateTaskList();
}
static void Migration()
{
using var blogCtx = new BlogContext();
blogCtx.Database.Migrate();

}

But not just the migration was a shortcut, you can see in the source that every time that client run, a truncate table instruction is sent to database.

using var blogCtx = new BlogContext();var entityType = blogCtx.Model.FindEntityType(typeof(BatchJob));
var schema = entityType.GetSchema();
var tableName = entityType.GetTableName();
var vendors = ctx.Vendor
.Select(v => new Vendor()
{
BusinessEntityId = v.BusinessEntityId
}).ToList();
blogCtx.Database.ExecuteSqlRaw($"TRUNCATE TABLE {schema}.{tableName}");

All of these database code instructions explanation was just to show how the application was build. Because just running the client will performed all the code shown above.

After all the database stuff, it’s time to create the Task List and save it on the BatchJob table.
Maybe here, we can see “where the magic happens”, because it’s here where we will store the “batch”, “contract” and “parameters” that will be executed in some server.

static void CreateTaskList()
{
//code not shown

foreach (var item in vendors)
{
var contract = new SummedSalesContract
{
VendorId = item.BusinessEntityId
};
blogCtx.BatchJob.Add(new BatchJob
{
ClassName = typeof(SummedSalesBatch).AssemblyQualifiedName,
ContractName = typeof(SummedSalesContract).AssemblyQualifiedName,
ContractJson = JsonSerializer.Serialize(contract),
Status = 0 //Pending

});
}
blogCtx.SaveChanges();
}

After running the client application, the data on table batchjob should be looking like this:

I have chosen the “AssemblyQualifiedName”, because it was the fastest way to create an object using Activator. We will see it on the service side as far as below.

The service
The service side it’s just a one class program. Even this example being dried, the service do not hold a big complexity, once its job it’s read the “next tasks” from the database and call the respective objects.

As said before, the service holds a lock of each row that will be used. You can see the strategy below:

There’s a Thread.Sleep in order to see the running progress through the containers.

The scenario
The scenario that is used as example is just simple. For each vendor on the database a task was created. The contract holds the Vendor Id, and the BatchJob uses this contract to calculate the amount of value and quantity bought from him.

public sealed class SummedSalesBatch : ITaskfy
{
public void Execute(IContract _contract)
{
var contract = _contract as SummedSalesContract;
var vendorId = contract.VendorId; using var ctx = new PurchasingContext(); decimal total = ctx.PurchaseOrderHeader
.Where(poh => poh.VendorId == vendorId)
.Sum(poh => poh.SubTotal);
decimal qty = (from details in ctx.PurchaseOrderDetail
join header in ctx.PurchaseOrderHeader
on details.PurchaseOrderId equals header.PurchaseOrderId
where header.VendorId == vendorId
select details.PurchaseOrderDetailsId)
.Count();
Console.WriteLine($"VendorId: {vendorId}, Total: {total}, Items {qty}.");
}
}

Using docker compose we can simulate many containers as separate servers, each one running a copy of the service application.
I really don’t know if using containers to increase the distributed process power is the most suitable tool. But, as far as we need just to show the working idea, it’s a problem that we don’t need to bother now.

Once we have the client application and docker containers connection to another docker container that holds the database, few precautions must be taken.

A) The DB address for the client application:
The stack that holds the database can be resolved on host machine just using “localhost”, for that, I created a method that validate if some environment variable exists, if not, it will use “localhost”.

B) The DB address for the docker service containers:
They need to reach the database to peek some tasks, for this I just send to containers the host address as an environment variable.

C# method — DefaultDBContext:

private static string DbAddress()
{
string dbAddress = Environment.GetEnvironmentVariable("DB_ADDRESS");
if(string.IsNullOrWhiteSpace(dbAddress))
{
dbAddress = "localhost";
}
return dbAddress;
}

docker-compose.yml for service application:

version: '3.4'services:
simplebatchrunner:
image: ${DOCKER_REGISTRY-}simplebatchrunner
build:
context: .
dockerfile: SimpleBatchRunner/Dockerfile
environment:
DB_ADDRESS: ${DOCKER_GATEWAY_HOST:-host.docker.internal}

The above strategy to send DB_ADDRESS should work in any OS. But, for Linux it’s needed to export the environment variable (source):

export DOCKER_GATEWAY_HOST=172.17.0.1

Here, it’s just to up docker compose and see the services working.

Building and running:

cd /path/to/copy/of/repository
docker-compose build
docker-compose up -d --scale simplebatchrunner=20

The parameter “scale” will performed the creation of 20 containers running the same application.

Output:

After the execution, the BatchJob table should be looking like:

Conclusion

Its know that using more than one computer could outperform a single computer power. But the overhead caused by this kind of strategy should be evaluated.
Even being distributed this strategy can be combined with the previous one here. Where you can think about the service application running more than one task using threads.

Software Performance Engineer wannabe