TaskRunner

Efficient and scalable task execution with Redis Streams

Overview

TaskRunner is a powerful and efficient Go library that simplifies distributed task execution by leveraging the capabilities of Redis Streams. It’s designed to help developers manage and scale asynchronous tasks in a fault-tolerant and highly scalable manner.

How to Install

To install TaskRunner, simply run the following command:

go get github.com/soroosh-tanzadeh/taskrunner

Asynchronous Task Processing

Execute tasks asynchronously, keeping your application responsive and performant even under heavy loads.

Scalable and Distributed

Scale your task processing horizontally by adding more workers without disrupting existing infrastructure.

Fault Tolerance

Automatic task retries and error handling ensure tasks are not lost or inconsistencies introduced during failures.

Why Use TaskRunner?

TaskRunner is built for developers who need a robust solution for managing asynchronous tasks in distributed systems. Whether you're handling millions of tasks per day or need a simple way to ensure tasks are processed reliably, TaskRunner provides the tools to get the job done.

Throughput (tasks/sec)

Memory Usage (MB)

Latency (ms)

Example Usage


package main

import (
    "context"
    "fmt"
    "strconv"
    "sync"
    "time"

    "github.com/soroosh-tanzadeh/taskrunner/redisstream"
    "github.com/soroosh-tanzadeh/taskrunner/runner"
    "github.com/redis/go-redis/v9"
)

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "127.0.0.1:6379",
        DB:       5,
        PoolSize: 100,
    })
    wg := sync.WaitGroup{}

    queue := redisstream.NewRedisStreamMessageQueue(rdb, "example_tasks", "default", time.Second*30, true)
    taskRunner := runner.NewTaskRunner(runner.TaskRunnerConfig{
        BatchSize:         10,
        ConsumerGroup:     "example",
        ConsumersPrefix:   "default",
        NumWorkers:        10,
        ReplicationFactor: 1,
    }, rdb, queue)

    taskRunner.RegisterTask(&runner.Task{
        Name:     "exampletask",
        MaxRetry: 10,
        Action: func(ctx context.Context, payload any) error {
            fmt.Printf("Hello from example task %s\n", payload)
            return nil
        },
        Unique: false,
    })

    wg.Add(1)
    go func() {
        defer wg.Done()
        taskRunner.Start(context.Background())
    }()

    for i := 0; i < 100; i++ {
        taskRunner.Dispatch(context.Background(), "exampletask", strconv.Itoa(i))
    }

    wg.Wait()
}