May 11th, 2023

Launching Litmus Chaos Scenarios Programmatically

Jeremy De La Cruz
Software Engineer II

Litmus Chaos is a CNCF-hosted open-source Chaos Engineering platform. It allows teams to conduct controlled chaos tests on Kubernetes applications, which reveals infrastructure weaknesses or potential outages. In a recent engagement, our team was tasked with using Litmus to conduct a variety of complex experiments on Kubernetes clusters in a repeatable way.

Typically, users leverage the Litmus UI (ChaosCenter) to organize, launch, and monitor scenarios. However, we wanted to automate our simulations by running our Litmus Chaos scenarios through an Azure DevOps Pipeline. Because of this, we needed to find a way to kick off our scenarios programmatically. In this blog post, we describe how we utilized the Litmus APIs to programmatically launch and track the progress of Litmus scenarios in our pipeline.

Python Script Utilizing Litmus APIs

There are two APIs available for use with the standard Litmus Chaos installation that we took advantage of:

  • The Authentication API (REST)
    • Authenticate users
    • Update profiles
    • Reset credentials
    • Update projects
    • Create new users, etc.
  • The Server API (GraphQL)
    • Perform operations on agents/delegates
    • Perform operations on scenarios
    • Perform operations on ChaosHub experiments
    • Perform operations relevant to analytics/monitoring, etc.

With the Litmus frontend service exposed (e.g. http://localhost:9091), we sent requests to the Auth REST API at /auth and to the Server GraphQL API at /api/query.

Note: As of February 2023, the latest published docs for the Auth API only covers v2.0.0 and the latest published docs for the Server API only covers v2.9.0. While these are considerably out-of-date, they are what have been linked above for reference.

For the current stable release of Litmus (v2.14.0), the Auth API docs are still mostly accurate but the Server API docs are not.

A workaround for up-to-date Server API docs we used was to leverage the “GraphQL Playground”, available at /api by default. The playground is always up-to-date and has two useful tabs: one listing all supported GraphQL queries and mutations, and one listing all relevant defined data types.

These API calls are integrated into our Python script that we will walk through below. Our team decided to use Python for simplicity and due to the high availability of helpful packages (e.g. gql to help work with GraphQL). The same logic can be translated to any other language and make use of any other comparable packages.

In the script, we authenticate with Litmus and then launch and track a scenario. This is done by the following sequence of events:

  1. Load and validate required inputs

    # Load environment variables (if using a .env file)
    load_dotenv(override=True)
    
    # Validate required env variables
    config = {
        "EXECUTION_ID": os.getenv("EXECUTION_ID"), # EXECUTION_ID was used as a correlation ID
        "AUTH_API_URL": os.getenv("AUTH_API_URL"),
        "LITMUS_USERNAME": os.getenv("LITMUS_USERNAME"),
        "LITMUS_PASSWORD": os.getenv("LITMUS_PASSWORD"),
        "SERVER_API_URL": os.getenv("SERVER_API_URL"),
        "SCENARIO_YAML_PATH": os.getenv("SCENARIO_YAML_PATH"),
        "CLUSTER_TYPE": os.getenv("CLUSTER_TYPE"),
        "BACKOFF_MINUTES": os.getenv("BACKOFF_MINUTES"),
        "TIMEOUT_MINUTES": os.getenv("TIMEOUT_MINUTES")
    }
    empty_env_variables = []
    for key, value in config.items():
        if value == None:
            empty_env_variables.append(key)
    if len(empty_env_variables) > 0:
        print(f"Error: Missing the following required environment variables {empty_env_variables}", file=sys.stderr)
        exit(1)
    
    # Convert non-string env variables
    config["BACKOFF_MINUTES"] = int(config["BACKOFF_MINUTES"])
    config["TIMEOUT_MINUTES"] = int(config["TIMEOUT_MINUTES"])
  2. Log in to retrieve a valid access token

    def get_auth_token(auth_api: str, username: str, password: str) -> str:
        payload = {
            "username": username,
            "password": password
        }
        try:
            res = requests.post(f"{auth_api}/login", json = payload)
            res.raise_for_status()
            jsonRes = res.json()
            return jsonRes["access_token"]
        except HTTPError as http_err:
            print(f"Auth API HTTP error: {http_err}", file=sys.stderr)
        except Exception as err:
            print(f"Auth API error: {err}", file=sys.stderr)
        exit(1)        
    
    # Handle authentication
    refresh_access_token = lambda: get_auth_token(
        config["AUTH_API_URL"],
        config["LITMUS_USERNAME"],
        config["LITMUS_PASSWORD"]
    )
    auth_token = refresh_access_token()
  3. Use APIs to get additional required config values

    # Note: this function assumes one Litmus Chaos project
    def get_project_id(auth_api: str, auth_token: str) -> str:
        try:
            res = requests.get(f"{auth_api}/list_projects", headers={"Authorization": f"Bearer {auth_token}"})
            res.raise_for_status()
            jsonRes = res.json()
            project = jsonRes["data"][0]
            return project["ID"]
        except HTTPError as http_err:
            print(f"Auth API HTTP error: {http_err}", file=sys.stderr)
        except Exception as err:
            print(f"Auth API error: {err}", file=sys.stderr)
        exit(1)
    
    # Note: this function assumes one Litmus Chaos cluster delegate
    def get_cluster_id(server_api: str, auth_token: str, project_id: str, cluster_type: str) -> str:
        try:
            transport = RequestsHTTPTransport(
                url=f"{server_api}/query",
                cookies={"litmus-cc-token": auth_token}
            )
    
            client = Client(transport=transport, fetch_schema_from_transport=True)
    
            query = gql(
                """
                query listClusters($projectID: String!, $clusterType: String){
                    listClusters(projectID: $projectID, clusterType: $clusterType){
                        clusterID
                    }
                }
                """
            )
    
            params = {
                "projectID": project_id,
                "clusterType": cluster_type
            }
    
            res = client.execute(query, variable_values=params)
            cluster = res["listClusters"][0]
            return cluster["clusterID"]
        except GraphQLError as gql_err:
            print(f"List Clusters GraphQL error: {gql_err}", file=sys.stderr)
        except Exception as err:
            print(f"List Clusters error: {err}", file=sys.stderr)
        exit(1)
    
    project_id = get_project_id(config["AUTH_API_URL"], auth_token)
    cluster_id = get_cluster_id(config["SERVER_API_URL"], auth_token, project_id, config["CLUSTER_TYPE"])
    
  4. Parse the custom Chaos Scenario YAML into JSON format

    def parse_scenario_yaml(absolute_path: str) -> any:
        with open(absolute_path) as yaml_in:
            scenario_json = yaml.safe_load(yaml_in)
            return scenario_json
    
    scenario_json = parse_scenario_yaml(config["SCENARIO_YAML_PATH"])
  5. Use all the above data to build the launch-scenario request object (i.e. ChaosWorkflowRequest)

    def create_workflow_request(project_id: str, cluster_id: str, scenario_json: any, exec_id_value: str) -> ChaosWorkflowRequest:
        try:
            # Append execution_id to .metadata.generateName to create a unique but static ".metadata.name", as the API doesn't accept "generateName"
            generated_name = (scenario_json["metadata"]["generateName"] + exec_id_value)
    
            # Validate generated scenario name
            # Note: Litmus API currently allows the creation of scenarios with invalid names, which causes errors (see https://github.com/litmuschaos/litmus/issues/3857)
            pattern = re.compile(r'^[a-zA-Z0-9-]{1,54}$')
            if not pattern.match(generated_name):
                raise Exception(f"Invalid scenario name: {generated_name}")
            scenario_json["metadata"]["name"] = generated_name
            print(f"Processing scenario: {generated_name}")
    
            scenario_params = scenario_json["spec"]["arguments"]["parameters"]
            scenario_id = next(param for param in scenario_params if param["name"] == "scenario_id")
            # Replace placeholder execution_id value with passed in value
            for param_dict in scenario_params:
                if param_dict["name"] == "execution_id":
                    param_dict.update({"name": param_dict["name"], "value": exec_id_value})
                    break
            exec_id = next(param for param in scenario_params if param["name"] == "execution_id")
            description = f"Scenario: {scenario_id['value']} | Execution: {exec_id['value']}"
    
            scenario_json_str = json.dumps(scenario_json)
    
            return ChaosWorkflowRequest(scenario_json_str, generated_name, description, project_id, cluster_id)
        except Exception as err:
            print(f"Error processing scenario manifest: {err}", file=sys.stderr)
            exit(1)
    
    scenario_payload = create_workflow_request(project_id, cluster_id, scenario_json, config["EXECUTION_ID"])

    For reference, the ChaosWorkflowRequest construct used in this example looks like:

    class ChaosWorkflowRequest:
        def __init__(self, scenario_json_str: str, name: str, desc: str, project_id: str, cluster_id: str):
            self.workflow_manifest = scenario_json_str
            self.workflow_name = name
            self.workflow_description = desc
            self.cron_syntax = ""
            self.weightages = [{"experimentName": "", "weightage": 0}]
            self.is_custom_workflow = True
            self.project_id = project_id
            self.cluster_id = cluster_id
  6. Send the API request to launch the scenario

    def launch_scenario(server_api: str, auth_token: str, scenario: ChaosWorkflowRequest) -> str:
        try:
            transport = RequestsHTTPTransport(
                url=f"{server_api}/query",
                cookies={"litmus-cc-token": auth_token}
            )
    
            client = Client(transport=transport, fetch_schema_from_transport=True)
    
            query = gql(
                """
                mutation createChaosWorkFlow($input: ChaosWorkFlowRequest!) {
                    createChaosWorkFlow(request: $input) {
                        workflowID
                        cronSyntax
                        workflowName
                        workflowDescription
                        isCustomWorkflow
                    }
                }
                """
            )
    
            params = {
                "input": {
                    "workflowManifest": scenario.workflow_manifest,
                    "workflowName": scenario.workflow_name,
                    "workflowDescription": scenario.workflow_description,
                    "cronSyntax": scenario.cron_syntax,
                    "weightages": scenario.weightages,
                    "isCustomWorkflow": scenario.is_custom_workflow,
                    "projectID": scenario.project_id,
                    "clusterID": scenario.cluster_id
                }
            }
    
            res = client.execute(query, variable_values=params)
            return res["createChaosWorkFlow"]["workflowID"]
        except GraphQLError as gql_err:
            print(f"Launch GraphQL error: {gql_err}", file=sys.stderr)
        except Exception as err:
            print(f"Launch error: {err}", file=sys.stderr)
        exit(1)
    
    litmus_workflow_id = launch_scenario(
        config["SERVER_API_URL"],
        auth_token,
        scenario_payload
    )
  7. Use APIs to wait for the scenario to run to completion before reporting the result (success or error exit code)

    def wait_for_scenario_completion(server_api: str, auth_token: str, refresh_access_token, project_id: str, litmus_workflow_id: str, backoff_minutes: int, timeout_minutes: int) -> any:
        start_time = time.time()
        timeout = start_time + (60 * timeout_minutes)
        is_consecutive_auth_error = False
        is_running = True
        is_descending = True
    
        # Backoff once to allow Litmus to initialize the scenario run
        print(f"Created Chaos Scenario: {litmus_workflow_id}")
        time.sleep(config["BACKOFF_MINUTES"]*60)
    
        while(is_running):
            try:
                if time.time() > timeout:
                    raise TimeoutError(timeout_minutes)
    
                transport = RequestsHTTPTransport(
                    url=f"{server_api}/query",
                    cookies={"litmus-cc-token": auth_token}
                )
    
                client = Client(transport=transport, fetch_schema_from_transport=True)
    
                query = gql(
                    """
                    query listWorkflowRuns($request: ListWorkflowRunsRequest!) {
                        listWorkflowRuns(request: $request) {
                            totalNoOfWorkflowRuns
                            workflowRuns {
                                workflowRunID
                                workflowName
                                phase
                                lastUpdated
                                executionData
                            }
                        }
                    }
                    """
                )
    
                params = {
                    "request": {
                        "projectID": project_id,
                        "workflowIDs": [litmus_workflow_id],
                        "sort": {
                            "field": "TIME",
                            "descending": is_descending
                        }
                    }
                }
    
                # Send request for workflow run
                res = client.execute(query, variable_values=params)
    
                # Reset flag for when we refresh an expired token
                is_consecutive_auth_error = False
    
                # Extract the scenario status
                scenario_runs = res["listWorkflowRuns"]["workflowRuns"]
                if len(scenario_runs) == 0:
                    raise Exception("No scenario runs found")
                run_phase = scenario_runs[0]["phase"]
                execution_data_string = scenario_runs[0]["executionData"]
                execution_data_json = json.loads(execution_data_string)
                finished_at = execution_data_json["finishedAt"]
    
                # Record elapsed time for console output
                elapsed_seconds = int((time.time()-start_time))
                elapsed_minutes, elapsed_seconds = divmod(elapsed_seconds, 60)
                elapsed_hours, elapsed_minutes = divmod(elapsed_minutes, 60)
    
                # Evaluate scenario status
                if len(finished_at) == 0:
                    if run_phase == "Terminated":
                        raise Exception(run_phase)
                    else: # run_phase is either Running or Failed, but not yet completed
                        print(f"{scenario_runs[0]['workflowName']} Running with status: {run_phase} [ {elapsed_hours}h {elapsed_minutes}m {elapsed_seconds}s elapsed ]")
                        time.sleep(60 * backoff_minutes)
                        continue
                else:
                    if run_phase == "Succeeded":
                        is_running = False
                    else: # run_phase is "Failed"
                        raise Exception(run_phase)
    
            except TransportQueryError as transport_err:
                # Try to refresh the access token once; if we have consecutive errors, exit with errors
                error_msg = transport_err.errors[0]['message']
                if error_msg == "Invalid Token" and is_consecutive_auth_error == False:
                    print(f"Invalid access token. Retrying once with new token...")
                    auth_token = refresh_access_token()
                    is_consecutive_auth_error = True
                    continue
                print(f"Scenario status transport error: {transport_err}", file=sys.stderr)
                exit(1)
            except GraphQLError as gql_err:
                print(f"Scenario status GraphQL error: {gql_err}", file=sys.stderr)
                exit(1)
            except TimeoutError as timeout_err:
                print(f"Scenario status Timeout error: {timeout_err}", file=sys.stderr)
                exit(1)
            except Exception as err:
                print(f"Scenario completed with status: {err}", file=sys.stderr)
                exit(1)
    
            print(f"Scenario completed with status: Succeeded")
    
    # Wait for scenario completion and evaluate the run status to determine exit code
    scenario_run = wait_for_scenario_completion(
        config["SERVER_API_URL"],
        auth_token,
        refresh_access_token,
        project_id,
        litmus_workflow_id,
        config["BACKOFF_MINUTES"],
        config["TIMEOUT_MINUTES"]
    )

In this Python3 example, these were the dependencies used (i.e. requirements.txt):

python-dotenv==0.21.0
pyyaml==6.0
requests==2.28.1
gql[requests]==3.4.0

Running the Script in a Pipeline

To integrate our script into our Azure DevOps Pipeline, we first apply any Kubernetes templates referenced in the Litmus Scenario in the cluster Litmus is installed in. Then, we make sure to install all script dependencies (e.g. pip install -r requirements.txt) and set the required inputs as environment variables. Lastly, we run the Python script in our pipeline to successfully kick off the desired Litmus Chaos Scenario and wait for its completion.

Alternate Approach: using litmusctl

An alternative method for launching our scenarios programmatically is to use litmusctl. Created by the Litmus team, litmusctl is a command-line tool allowing users to manage their chaos delegate control plane. As the official documentation explains, it allows users to manage chaos delegates, scenarios, projects, and accounts all from the CLI. This includes the ability to create and kick off Litmus Chaos scenarios from the command-line.

While it’s a relatively user-friendly and straightforward CLI tool for people to use, we decided to use the Litmus APIs instead, as it provided us with more flexibility and control.

Summary

Litmus Chaos can be complex and comes with some challenges, but it’s also a powerful tool. In this article, we provided one method of using Litmus Chaos programmatically in an Azure DevOps Pipeline. To get started on your own, please check out the Litmus docs on running your first scenario. We hope the strategies and learnings we shared in this post can help you integrate Litmus Chaos into your own pipelines.

Author

Jeremy De La Cruz
Software Engineer II