Performance — Distributed Batch Process — Using as many servers as you have

The overall idea:

  • A program that will create the list of tasks (jobs, batches, processes) that need to be performed;
  • Another program, that will look more like a service than an application, has a job of capture a bulk of tasks, process them and sinalize it’s conclusion;
  • A centralized task list storage (in this case, a relational database), that will control which tasks are being performed (blocking new services from capturing them again).

Topology:

Path to implementation:

  1. We will need two interfaces, the first one represents the task itself and the second its execution parameters that I called “contract”;
  2. Using both interfaces we can create the task that we want to distributed in the most granular, not deadlocking way;
  3. Create the storage task engine, in this case, a relational table called “batchjob” that will store some data and some reflection data. The suggestion is the structure have, at least, the fields below:
    ClassName => string that will store the class job program name;
    ContractName => string that will store the contract name class;
    ContractJson => string that will store serialized contract data;
    Status => int or enum that hold the task status, example: 0 = need to be performed, 1 = done;
    StartedAt => date and time when the task was started;
    StoppedAt => date and time when the task was completed.
  4. The client application need to create the task list and persist the data at the storage engine, something like:
    1) MyBatchClass | MyContractClass | {“parm1” : “0”, “parm2”: “1” }| 0 | NULL | NULL
    2) MyBatchClass | MyContractClass | {“parm1” : “2”, “parm2”: “3” }| 0 | NULL | NULL
    3) MyBatchClass | MyContractClass | {“parm1” : “4”, “parm2”: “5” }| 0 | NULL | NULL
    4) MyBatchClass | MyContractClass | {“parm1” : “6”, “parm2”: “7” }| 0 | NULL | NULL
  5. The service application must collect the task and hold to itself. There are a lot of strategies to peek something and mark as “processing”. Here, using a relational database can be really useful, because you can use artifacts like: use repeatable read isolation, lock the row and skip locked rows when selecting new tasks.
    If you choose not use a storage database with such technology, you will need to implement some kind of new status like “executing”, preventing other services to peek the same task and, maybe, implement something to unmark the flag on error (or putting the task back to list);
  6. Run the service application in more than one computer, VM, container.

Implementing

Choosing the Centralized Task List Manager

SELECT * FROM blog.batchjob WHERE Status = 0 FOR UPDATE SKIP LOCKED LIMIT ?

Choosing the language and it’s technologies

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
=> optionsBuilder
.UseNpgsql(string.Format(
"Server={0};Username={1};Database={2};Port={3};Password={4};SSLMode=Prefer",
Host,
User,
DBname,
Port,
Password),
x => x.MigrationsHistoryTable("_ef_migration_history", "blog")
)
.UseLowerCaseNamingConvention();
[Index(nameof(Status))]
public class BatchJob
{
public long Id { get; set; }
public string ClassName { get; set; } public string ContractName { get; set; } public string ContractJson { get; set; } public int Status { get; set; } = 0; public DateTime StartedAt { get; set; } public DateTime StoppedAt { get; set; }
}
Add-Migration InitialMigration -Context BlogContext -OutputDir Migrations/Blog
Update-Database -Context BlogContext
Update-Database 0 -Context BlogContext
static void Main(string[] args)
{
Migration();
CreateTaskList();
}
static void Migration()
{
using var blogCtx = new BlogContext();
blogCtx.Database.Migrate();

}
using var blogCtx = new BlogContext();var entityType = blogCtx.Model.FindEntityType(typeof(BatchJob));
var schema = entityType.GetSchema();
var tableName = entityType.GetTableName();
var vendors = ctx.Vendor
.Select(v => new Vendor()
{
BusinessEntityId = v.BusinessEntityId
}).ToList();
blogCtx.Database.ExecuteSqlRaw($"TRUNCATE TABLE {schema}.{tableName}");
static void CreateTaskList()
{
//code not shown

foreach (var item in vendors)
{
var contract = new SummedSalesContract
{
VendorId = item.BusinessEntityId
};
blogCtx.BatchJob.Add(new BatchJob
{
ClassName = typeof(SummedSalesBatch).AssemblyQualifiedName,
ContractName = typeof(SummedSalesContract).AssemblyQualifiedName,
ContractJson = JsonSerializer.Serialize(contract),
Status = 0 //Pending

});
}
blogCtx.SaveChanges();
}
public sealed class SummedSalesBatch : ITaskfy
{
public void Execute(IContract _contract)
{
var contract = _contract as SummedSalesContract;
var vendorId = contract.VendorId; using var ctx = new PurchasingContext(); decimal total = ctx.PurchaseOrderHeader
.Where(poh => poh.VendorId == vendorId)
.Sum(poh => poh.SubTotal);
decimal qty = (from details in ctx.PurchaseOrderDetail
join header in ctx.PurchaseOrderHeader
on details.PurchaseOrderId equals header.PurchaseOrderId
where header.VendorId == vendorId
select details.PurchaseOrderDetailsId)
.Count();
Console.WriteLine($"VendorId: {vendorId}, Total: {total}, Items {qty}.");
}
}

Choosing the stack to reproduce (and execute) a scenario

private static string DbAddress()
{
string dbAddress = Environment.GetEnvironmentVariable("DB_ADDRESS");
if(string.IsNullOrWhiteSpace(dbAddress))
{
dbAddress = "localhost";
}
return dbAddress;
}
version: '3.4'services:
simplebatchrunner:
image: ${DOCKER_REGISTRY-}simplebatchrunner
build:
context: .
dockerfile: SimpleBatchRunner/Dockerfile
environment:
DB_ADDRESS: ${DOCKER_GATEWAY_HOST:-host.docker.internal}
export DOCKER_GATEWAY_HOST=172.17.0.1

Running the scenario

cd /path/to/copy/of/repository
docker-compose build
docker-compose up -d --scale simplebatchrunner=20

Conclusion

--

--

--

Software Performance Engineer wannabe

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Publishing Android Library to Bintray with Gradle + Buddy.Works

Design Shapes in Flutter — Introduction to the Morphable Shape Package

Lessons from One Year as a Cloud Solutions Architect

Blue sky with white rolling clouds.

DeFi legend, Curve.fi, is now available in Amberdata.io 🎉👏

Top 3 URL Categorization APIs With Documentation For Developers

Exploring Kubernetes With Minikube

Interpreted, compiled. what. ever.

New concept of travel planing on web

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Glauber Cini

Glauber Cini

Software Performance Engineer wannabe

More from Medium

Creating an AWS Lambda function with .NET 6

Building a Platform: Part 1

Factory Design Pattern with Dependency Injection (Notepad Series #4)

Setup your CI/CD pipeline with Configuration as a Code and GitHub Actions in 1 minute.