Building a custom workflow engine on top of Durable Task Framework DTFx

Spyros Giannakakis

Dmitrii Burik

Building a custom workflow engine on top of Durable Task Framework (DTFx)

Introduction

In late 2022, we were approached by a large customer in the automotive industry who asked us to help them implement a self-service solution with the ability for bi-directional communication to their PLCs (Programmable Logic Controllers). Before our engagement with them, PLC operators would need to manually configure each of the relevant PLCs on the factory floor to obtain desired output/results. Our project helped eliminate or simplify these manual steps, which in turn helped the customer iterate and scale production faster, while potentially cutting down on manual errors.

Given that production of such hardware components requires a series of steps to be executed in sequence as a workflow, we found that workflow engines would be good candidates to base our solution upon. From the variety of existing workflow engines we selected the Durable Task Framework (DTFx) due to its performance characteristics, broad capabilities, big community and Microsoft support.

However, out-of-the-box DTFx also did not meet all of our requirements and we built some features on top of it. With this blog post, we detail what we built, explaining the why and how. We hope this information will be helpful if you are considering using DTFx and want to tailor it to your specific needs.

Brief overview of requirements and mapping to the features we built

A fundamental requirement of the solution was to be self-service in the sense that factory operators should be able to define workflows to cover future use-cases. Given that operators often lack coding skills, we addressed this requirement with the introduction of a Domain-specific language (DSL) that acts as an abstraction layer, enabling operators to create workflows in an easy and user-friendly way.

Another requirement was to be able to influence the workflow execution based on input provided externally at workflow execution time or depending on values generated while running the workflow e.g., the current value of a PLC node. To address this we introduced dynamic expressions and a data-flow to pass data from a workflow step to subsequent steps.

The workflows we are dealing with have (write) access to machines on the factory floor, so validation of dynamic expressions and the workflow as a whole is crucial to ensure safety and communicate issues earlier to factory operators.

In some cases, workflows could take a long time to be completed or even all together hang. This could happen due to various reasons, like an incorrect information in a workflow configuration or transient network issues on the factory floor. To avoid this and recover gracefully, we provided a way to handle workflow timeouts and cancellations.

In other cases, workflows need to execute a certain “cleanup” action independently of the result of the execution. To cover this requirement we added a workflow closure step.

Contents

Domain-specific language (DSL)

In DTFx workflows are exclusively defined through code. To empower factory operators with the ability to define workflow steps without coding, we recognized the need for a Domain-specific Language (DSL). Besides the fact that operators often lack coding skills, adding workflows via code would also be error-prone, so we aimed to create a DSL that would act as an abstraction layer, enabling operators to create workflows in an easy and user-friendly way. As an example, operators wouldn’t have to enter detailed information on how and where PLC nodes can be reached because this information was “enriched” from our backend, minimizing the workflow definition inputs required of operators.

Although, eventually end users will use a UI to interact with the solution, which will generate the underlying workflow definition JSON, having a well-designed DSL was important to onboard users fast and even before the UI was ready. It would also support more advanced future use-cases like workflow definition versioning and having definition JSON generated by external systems.

To achieve this, the concept of workflow definition was separated from workflow configuration. You can think of the definition being a “compile-time” construct which uses user-facing terms like signals, versus configuration being a “runtime” construct, containing all details needed to execute the workflow.

As for DSL base language, we chose JSON over Yaml due to easier writing and better support from C# libraries. The fact that DTFx uses JSON internally to serialize its state, was another reason to choose JSON.

Dynamic expressions and data flow

As we saw previously, the workflow definition time is not the same as the workflow execution time. We can think of it as compile-time vs runtime. With the term dynamic expressions, we refer to expressions that are evaluated at “runtime”, i.e., when a workflow is executed. This allows the PLC operators to influence the workflow execution based on input provided externally at execution time or values generated while running the workflow e.g., the current value of a PLC node.

❕ Tip: Verify whether a static workflow configuration is sufficient for your business needs or whether workflow execution could vary depending on runtime parameters. Need for runtime arguments and dynamic expressions might influence the overall design of the engine, so it’s highly recommended to identify these needs as early as possible.

To support dynamic execution of workflows, we relied heavily on Dynamic Linq functionality and more specifically for:

  • parsing, evaluating as well as validating dynamic expressions
  • storing execution input values as well as generated values at runtime.

Dynamic expression example: if-condition

One example of dynamic expressions are control structures which are an essential part of any programming language, including a custom DSL. Control structures allow end-users (in our case PLC operators) to specify conditions, loops, and branching statements, among others, that dictate how a workflow would execute. The exact type and number of control structures for your custom DSL will depend on its intended purpose and the business needs.

❕ Tip: Consider which control structures make most sense for your use-case and are both expressive and easy to use for your end-users and start with those.

In our use-case we identified if-conditions as a fundamental control structure to start our implementation from. We relied on Dynamic Linq DynamicExpressionParser.ParseLambda method that creates a LambdaExpression out of a condition provided as a string. This LambdaExpression can then be compiled to create a Delegate that can be invoked using the DynamicInvoke method on it, as shown below.

var condition = "1 == 1";
Delegate parsedDelegate = null;
try
{
    LambdaExpression parsedCondition = DynamicExpressionParser.ParseLambda(
        new ParsingConfig(),
        Array.Empty<ParameterExpression>(),
        typeof(bool),
        condition);
    parsedDelegate = parsedCondition.Compile();
}
catch (Exception e) when (e is ParseException or InvalidOperationException)
{
    // handle any exceptions during parsing the condition
}

if ((bool)parsedDelegate?.DynamicInvoke(null))
{
    // execute then branch
}
else
{
    // execute else branch
}

Data flow: workflow inputs and runtime data

The execution plan of a workflow could be influenced by input parameters on execution time or by values that were generated from previous steps of the workflow. Both of these were stored as properties in an instance of System.Linq.Dynamic.Core.DynamicClass.

Assuming the workflow inputs are represented using the following class:

class WorkflowInputParameter
{
    public string Name;
    public string Type;
}

The following sample code shows how we populated the workflowData for the input parameters. A similar approach was used to pass outputs of previous steps to the next steps of the workflow.

// parameters as defined in compile-time
var workflowParameters = new List<WorkflowInputParameter>()
{
    new () { Name = "temperature-threshold", Type = "System.Int32" }
};

// convert to DynamicProperties
List<DynamicProperty> dynamicProperties = workflowParameters.Select(p => new DynamicProperty(p.Name, Type.GetType(p.Type))).ToList();

// create DynamicClass to hold values
Type workflowType = DynamicClassFactory.CreateType(dynamicProperties);
DynamicClass workflowData = (DynamicClass)System.Activator.CreateInstance(workflowType) !; // consider handling nulls explicitly

// actual values passed in runtime
var parameterValues = new Dictionary<string, int>()
{
    {"temperature-threshold", 14}
};
workflowParameters.ForEach(
    p => workflowData.SetDynamicPropertyValue(p.Name, Convert.ChangeType(parameterValues.GetValueOrDefault(p.Name), Type.GetType(p.Type))));

Validation of dynamic expressions

Validation of dynamic expressions is crucial in order to provide early feedback to the user in case of errors or prevent execution of unwanted code snippets which could constitute a security issue.

❕ Tip: Consider validation if you are allowing dynamic expressions in your workflows to ensure no malicious code can be executed and errors can be communicated to the end-user early.

In our case, syntactic validation (e.g. a step is attempting to use data from a previous step that does not exist) is happening via Dynamic Linq DynamicExpressionParser.ParseLambda that throws a ParseException as shown in the previous snippets. This can happen among others when a member does not exist, or a parameter is not of proper type.

Regarding validation from a security point of view, Dynamic Linq already restricts the attack surface by allowing access only to a pre-defined set of types: only primitive types and types from the System.Math and System.Convert namespaces are accessible. This can be configured/extended using the DynamicLinqType attribute on a custom type.

Further restrictions can be introduced by using the Visitor pattern and extending the DynamicExpressionVisitor. For example, in order to check method calls the following snippet can be helpful. The same pattern can be used to check operators, fields, properties etc.

/* code omitted for brevity */

// expression as a string
string expression = "Convert.ToInt32(data.ReadOpc1.Signals[\"signalB\"].Value) > data.IntParam";
LambdaExpression parsedExpression = DynamicExpressionParser.ParseLambda(new[] { dataParam }, typeof(object), expression);

// initiate expression visit
WorkflowExpressionVisitor visitor = new ();
visitor.Visit(parsedExpression);

// Visitor class
public class WorkflowExpressionVisitor : DynamicExpressionVisitor
{
    protected override Expression VisitMethodCall(MethodCallExpression node)
    {
        // perform any required checks e.g. that the declaring type
        // or the method called is whitelisted

        return base.VisitMethodCall(node);
    }

}

Workflow validation

Besides the validation of expressions that we just covered, validation of the whole workflow definition helps build a more user-friendly engine by detecting possible errors or misconfigurations and providing feedback to end-users early.

❕ Tip: Consider validating as early as possible to give feedback to end-users. In this feedback you can potentially include all issues found, to allow for resolution of them at once.

Some basic validation is already provided if you rely on concrete types instead of a generic object when you deserialize the workflow definition. We relied also on attributes to specify required JSON properties and implemented custom validation based on business rules (using FluentValidation in C#).

What is more, instead of primitive types (like int or string) consider using custom types for fields that have business value or hold domain semantics. This way if there is a need to refactor these in the future due to business changes, you can better encapsulate changes.

Workflow timeout and cancellation

In some cases, workflows could take a long time to be completed. This could happen due to various reasons, like an incorrect information in a workflow configuration or transient network issues on the factory floor.

In our specific customer scenario, where the workflow engine runs on a factory edge, workflows need to finish and free up system resources as fast as possible. Having a workflow running for a long time without any response is undesired and would in most cases mean some kind of misconfiguration or network issue. To handle this case, we were asked to give a user an opportunity to specify a timeout value for the entire workflow.

To implement this feature we followed the approach recommended in the official documentation for Azure Durable Functions.

The implementation makes use of a Durable Timer provided by the DTFx framework. The workflow orchestrator (our custom class) waits for the timer and the scheduled activities, and based on which task finishes first we know if the workflow timed out and we should cancel it. In case of the timeout, we tell DTFx to purge the running orchestration, so that it can be cleaned up. However, purging an entire orchestration in DTFx doesn’t cancel already running activities.

Hence, we needed a way to supply a cancellation token down to each activity in the workflow. Since there can be multiple orchestrations running in parallel, the cancellation token must be unique for each orchestration. To solve this, we implemented a class that maps a DTFx orchestration context to a CancellationTokenSource and stores this map in memory:

Note: The approach explained below works only if the engine and all workflows run on a single node and it won’t work in a distributed scenario. Although the Durable Task Framework (DTFx) is designed for building distributed workflows and even supports the execution of activities on different machines, we don’t make use of this feature and ensure the engine runs on just one node, to take out the complexity of distributed environment.

public class OrchestrationCancellationTokenRegistry : IOrchestrationCancellationTokenRegistry
{
    /// <summary>
    /// Holds the map between DTFx orchestrations and cancellation tokens.
    /// </summary>
    internal ConcurrentDictionary<OrchestrationInstance, CancellationTokenSource> CancellationTokenSources { get; private set; } = new ();

    /// <summary>
    /// Requests cancellation on the token for a workflow.
    /// </summary>
    void IOrchestrationCancellationTokenRegistry.Cancel(OrchestrationInstance instance)
    {
        if (CancellationTokenSources.TryGetValue(instance, out CancellationTokenSource? cts))
        {
            cts?.Cancel();
        }
    }

    /// <summary>
    /// Creates a new token for a workflow execution.
    /// </summary>
    void IOrchestrationCancellationTokenRegistry.Create(OrchestrationInstance instance)
        => CancellationTokenSources.TryAdd(instance, new CancellationTokenSource());

    /// <summary>
    /// Gets a token for the workflow.
    /// </summary>
    CancellationToken IOrchestrationCancellationTokenRegistry.Get(OrchestrationInstance instance)
        => CancellationTokenSources[instance].Token;

    /// <summary>
    /// Removes a token for a workflow.
    /// </summary>
    void IOrchestrationCancellationTokenRegistry.Remove(OrchestrationInstance instance)
        => CancellationTokenSources.TryRemove(instance, out _);
}

The following code snippet show the RunTask() method of our custom workflow orchestrator, which inherits from the DTFx TaskOrchestration class:

private readonly IOrchestrationCancellationTokenRegistry _cancellationRegistry;

  /// <summary>
  /// Runs a workflow. Triggered by DTFx.
  /// </summary>
  /// <param name="context">Orchestrator context.</param>
  /// <param name="workflowContext">Workflow context.</param>
  public override async Task<DynamicClass> RunTask(OrchestrationContext context, WorkflowContext workflowContext)
  {
      // Setup cancellation tokens
      CancellationTokenSource timerCancellationSource = new ();
      _cancellationRegistry.Create(context.OrchestrationInstance);

      try
      {
          // Setup cancellation timer
          Task<bool> timer = context.CreateTimer(
              context.CurrentUtcDateTime.AddMilliseconds(workflowContext.WorkflowTimeout),
              true,
              timerCancellationSource.Token);

          // Schedule all activities in the workflow
          Task allActivities = ScheduleAllActivities(...);

          // wait for any of them to complete
          Task first = await Task.WhenAny(timer, allActivities);

          if (first.Id == allActivities.Id)
          {
              /* Activities finished first */

              // cancel timer
              timerCancellationSource.Cancel();
          }
          else
          {
              // cancel activities
              workflowContext.WorkflowIsCancelled = true;
              _cancellationRegistry.Cancel(context.OrchestrationInstance);
          }
      }
      finally
      {
          // clean up cancellation
          _cancellationRegistry.Remove(context.OrchestrationInstance);
      }

      return workflowContext.WorkflowData;
  }

Each activity, scheduled on a OrchestrationContext has access to the current OrchestrationInstance through the DurableTask.Core.TaskContext class. By injecting the IOrchestrationCancellationTokenRegistry it can get the cancellation token for the currently running orchestration, set it to throw an exception if cancellation was requested and use the token in any other method calls it might make.

public class MyActivity
{
  private readonly IOrchestrationCancellationTokenRegistry _cancellationRegistry;

  protected override async Task<TOutput> ExecuteAsync(TaskContext context, TInput input)
  {
          // Get cancellation token from registry - this one will signal if the workflow times out.
          CancellationToken workflowTimeOutToken = _cancellationRegistry.Get(context.OrchestrationInstance);
          try
          {
              // throw if workflow times out.
              workflowTimeOutToken.ThrowIfCancellationRequested();

              return await ExecuteCodeActivityAsync(input, workflowTimeOutToken);
          }
          catch (TaskCanceledException)
          {
            // handle cancellation
          }
      }
  }

}

This approach ensures that in case a workflow times out, all activities will be cancelled, including the already running ones.

Workflow closure step

Another specific feature we built on top of DTFx is workflow closure step.

In Durable Task Framework (DTFx), when an activity is scheduled using ScheduleTask(), the DTFx runtime creates a new task for that activity and schedules it for execution. If the scheduled activity throws an unhandled exception, the DTFx runtime will catch the exception and escalate it to the orchestrator function. If the exception is not caught and handled in the orchestrator, the orchestrator will mark the entire orchestration as failed and stop executing subsequent activities.

In our scenario this would mean that a workflow could stop executing after any step and potentially leave the environment (PLCs) in an inconsistent state. Since this is a highly undesirable outcome, we needed to provide a way for operators to specify a special step in a workflow definition, which would always execute, regardless of successful completion of other activities. Developers can think of it as a try/finally construct. Moreover, it is actually implemented using try/finally syntax in C#.

The closure step or closure activity is a normal workflow activity. The only difference to other activities is when it gets scheduled.

The following code snippet illustrates our implementation of the closure step.

/*
Some parts of the code are omitted for brevity and to better highlight the implementation of the closure.
*/

private static async Task<DynamicClass> RunTaskInternal(OrchestrationContext context, WorkflowContext workflowContext, RunnableWorkflowConfiguration runnableWorkflowConfiguration)
{
    try
    {
        foreach (var activity in runnableWorkflowConfiguration.Activities)
        {
            await context.ScheduleTask<object>(activity.ActivityType,[...]);
        }
    }
    finally
    {
        if (runnableWorkflowConfiguration.Closure is { })
        {
            await context.ScheduleTask<object>(runnableWorkflowConfiguration.Closure.ActivityType, [...]);
        }
    }

    return workflowContext.WorkflowData;
}

The RunTaskInternal() method is a method of our custom WorkflowOrchestrator and is used to execute a workflow. The runnableWorkflowConfiguration object holds all data needed to execute a workflow, including all activities, input parameters and the closure activity. First, in a try { } block we iterate through all activities in the workflow and schedule each of them using the DTFx’ OrchestrationContext.ScheduleTask() method. In the finally { } block we check if a closure activity was provided (by the way, it’s optional) and if so, we schedule it to execute on the same instance of the OrchestrationContext.

Summary

In this post we have shown how we built a workflow engine on top of DTFx and tailored it to our needs. We discussed the implementation of the Domain Specific Language (DSL) with workflow validation, dynamic expressions and data flow, workflow timeout/cancellation and closure step. We hope this information was helpful to understand how DTFx works and how you can build more features on top of it.

Feedback usabilla icon