Performance — Distributed Batch Process — Using as many servers as you have

The overall idea:

We still have a monolithic system, but we starting feeling that the amount of data we have, become too large to process in a single computer or, we don’t want to vertical scale it anymore.

  • A program that will create the list of tasks (jobs, batches, processes) that need to be performed;
  • Another program, that will look more like a service than an application, has a job of capture a bulk of tasks, process them and sinalize it’s conclusion;
  • A centralized task list storage (in this case, a relational database), that will control which tasks are being performed (blocking new services from capturing them again).

Topology:

Path to implementation:

  1. We will need two interfaces, the first one represents the task itself and the second its execution parameters that I called “contract”;
  2. Using both interfaces we can create the task that we want to distributed in the most granular, not deadlocking way;
  3. Create the storage task engine, in this case, a relational table called “batchjob” that will store some data and some reflection data. The suggestion is the structure have, at least, the fields below:
    ClassName => string that will store the class job program name;
    ContractName => string that will store the contract name class;
    ContractJson => string that will store serialized contract data;
    Status => int or enum that hold the task status, example: 0 = need to be performed, 1 = done;
    StartedAt => date and time when the task was started;
    StoppedAt => date and time when the task was completed.
  4. The client application need to create the task list and persist the data at the storage engine, something like:
    1) MyBatchClass | MyContractClass | {“parm1” : “0”, “parm2”: “1” }| 0 | NULL | NULL
    2) MyBatchClass | MyContractClass | {“parm1” : “2”, “parm2”: “3” }| 0 | NULL | NULL
    3) MyBatchClass | MyContractClass | {“parm1” : “4”, “parm2”: “5” }| 0 | NULL | NULL
    4) MyBatchClass | MyContractClass | {“parm1” : “6”, “parm2”: “7” }| 0 | NULL | NULL
  5. The service application must collect the task and hold to itself. There are a lot of strategies to peek something and mark as “processing”. Here, using a relational database can be really useful, because you can use artifacts like: use repeatable read isolation, lock the row and skip locked rows when selecting new tasks.
    If you choose not use a storage database with such technology, you will need to implement some kind of new status like “executing”, preventing other services to peek the same task and, maybe, implement something to unmark the flag on error (or putting the task back to list);
  6. Run the service application in more than one computer, VM, container.

Implementing

My choice is always biased because I really like the C# code structure and .NET [Core] Framework. It has a beautiful documentation, many examples all around the internet and the code it’s pretty nice to read.

Choosing the Centralized Task List Manager

Well, as I said before in this post, I have choose a relational database to be the task list manager.

SELECT * FROM blog.batchjob WHERE Status = 0 FOR UPDATE SKIP LOCKED LIMIT ?

Choosing the language and it’s technologies

The C# choice was already explained, but there are few things that was choose inside the .NET Core that deserve an explanation.

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
=> optionsBuilder
.UseNpgsql(string.Format(
"Server={0};Username={1};Database={2};Port={3};Password={4};SSLMode=Prefer",
Host,
User,
DBname,
Port,
Password),
x => x.MigrationsHistoryTable("_ef_migration_history", "blog")
)
.UseLowerCaseNamingConvention();
[Index(nameof(Status))]
public class BatchJob
{
public long Id { get; set; }
public string ClassName { get; set; } public string ContractName { get; set; } public string ContractJson { get; set; } public int Status { get; set; } = 0; public DateTime StartedAt { get; set; } public DateTime StoppedAt { get; set; }
}
Add-Migration InitialMigration -Context BlogContext -OutputDir Migrations/Blog
Update-Database -Context BlogContext
Update-Database 0 -Context BlogContext
static void Main(string[] args)
{
Migration();
CreateTaskList();
}
static void Migration()
{
using var blogCtx = new BlogContext();
blogCtx.Database.Migrate();

}
using var blogCtx = new BlogContext();var entityType = blogCtx.Model.FindEntityType(typeof(BatchJob));
var schema = entityType.GetSchema();
var tableName = entityType.GetTableName();
var vendors = ctx.Vendor
.Select(v => new Vendor()
{
BusinessEntityId = v.BusinessEntityId
}).ToList();
blogCtx.Database.ExecuteSqlRaw($"TRUNCATE TABLE {schema}.{tableName}");
static void CreateTaskList()
{
//code not shown

foreach (var item in vendors)
{
var contract = new SummedSalesContract
{
VendorId = item.BusinessEntityId
};
blogCtx.BatchJob.Add(new BatchJob
{
ClassName = typeof(SummedSalesBatch).AssemblyQualifiedName,
ContractName = typeof(SummedSalesContract).AssemblyQualifiedName,
ContractJson = JsonSerializer.Serialize(contract),
Status = 0 //Pending

});
}
blogCtx.SaveChanges();
}
public sealed class SummedSalesBatch : ITaskfy
{
public void Execute(IContract _contract)
{
var contract = _contract as SummedSalesContract;
var vendorId = contract.VendorId; using var ctx = new PurchasingContext(); decimal total = ctx.PurchaseOrderHeader
.Where(poh => poh.VendorId == vendorId)
.Sum(poh => poh.SubTotal);
decimal qty = (from details in ctx.PurchaseOrderDetail
join header in ctx.PurchaseOrderHeader
on details.PurchaseOrderId equals header.PurchaseOrderId
where header.VendorId == vendorId
select details.PurchaseOrderDetailsId)
.Count();
Console.WriteLine($"VendorId: {vendorId}, Total: {total}, Items {qty}.");
}
}

Choosing the stack to reproduce (and execute) a scenario

Using docker compose we can simulate many containers as separate servers, each one running a copy of the service application.
I really don’t know if using containers to increase the distributed process power is the most suitable tool. But, as far as we need just to show the working idea, it’s a problem that we don’t need to bother now.

private static string DbAddress()
{
string dbAddress = Environment.GetEnvironmentVariable("DB_ADDRESS");
if(string.IsNullOrWhiteSpace(dbAddress))
{
dbAddress = "localhost";
}
return dbAddress;
}
version: '3.4'services:
simplebatchrunner:
image: ${DOCKER_REGISTRY-}simplebatchrunner
build:
context: .
dockerfile: SimpleBatchRunner/Dockerfile
environment:
DB_ADDRESS: ${DOCKER_GATEWAY_HOST:-host.docker.internal}
export DOCKER_GATEWAY_HOST=172.17.0.1

Running the scenario

Here, it’s just to up docker compose and see the services working.

cd /path/to/copy/of/repository
docker-compose build
docker-compose up -d --scale simplebatchrunner=20

Conclusion

Its know that using more than one computer could outperform a single computer power. But the overhead caused by this kind of strategy should be evaluated.
Even being distributed this strategy can be combined with the previous one here. Where you can think about the service application running more than one task using threads.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Glauber Cini

Glauber Cini

Software Performance Engineer wannabe