Litmus Chaos is a CNCF-hosted open-source Chaos Engineering platform. It allows teams to conduct controlled chaos tests on Kubernetes applications, which reveals infrastructure weaknesses or potential outages. In a recent engagement, our team was tasked with using Litmus to conduct a variety of complex experiments on Kubernetes clusters in a repeatable way.
Typically, users leverage the Litmus UI (ChaosCenter) to organize, launch, and monitor scenarios. However, we wanted to automate our simulations by running our Litmus Chaos scenarios through an Azure DevOps Pipeline. Because of this, we needed to find a way to kick off our scenarios programmatically. In this blog post, we describe how we utilized the Litmus APIs to programmatically launch and track the progress of Litmus scenarios in our pipeline.
Python Script Utilizing Litmus APIs
There are two APIs available for use with the standard Litmus Chaos installation that we took advantage of:
- The Authentication API (REST)
- Authenticate users
- Update profiles
- Reset credentials
- Update projects
- Create new users, etc.
- The Server API (GraphQL)
- Perform operations on agents/delegates
- Perform operations on scenarios
- Perform operations on ChaosHub experiments
- Perform operations relevant to analytics/monitoring, etc.
With the Litmus frontend service exposed (e.g. http://localhost:9091
), we sent requests to the Auth REST API at /auth
and to the Server GraphQL API at /api/query
.
Note: As of February 2023, the latest published docs for the Auth API only covers
v2.0.0
and the latest published docs for the Server API only coversv2.9.0
. While these are considerably out-of-date, they are what have been linked above for reference.For the current stable release of Litmus (
v2.14.0
), the Auth API docs are still mostly accurate but the Server API docs are not.A workaround for up-to-date Server API docs we used was to leverage the “GraphQL Playground”, available at
/api
by default. The playground is always up-to-date and has two useful tabs: one listing all supported GraphQL queries and mutations, and one listing all relevant defined data types.
These API calls are integrated into our Python script that we will walk through below.
Our team decided to use Python for simplicity and due to the high availability of helpful packages (e.g. gql
to help work with GraphQL).
The same logic can be translated to any other language and make use of any other comparable packages.
In the script, we authenticate with Litmus and then launch and track a scenario. This is done by the following sequence of events:
- Load and validate required inputs
# Load environment variables (if using a .env file) load_dotenv(override=True) # Validate required env variables config = { "EXECUTION_ID": os.getenv("EXECUTION_ID"), # EXECUTION_ID was used as a correlation ID "AUTH_API_URL": os.getenv("AUTH_API_URL"), "LITMUS_USERNAME": os.getenv("LITMUS_USERNAME"), "LITMUS_PASSWORD": os.getenv("LITMUS_PASSWORD"), "SERVER_API_URL": os.getenv("SERVER_API_URL"), "SCENARIO_YAML_PATH": os.getenv("SCENARIO_YAML_PATH"), "CLUSTER_TYPE": os.getenv("CLUSTER_TYPE"), "BACKOFF_MINUTES": os.getenv("BACKOFF_MINUTES"), "TIMEOUT_MINUTES": os.getenv("TIMEOUT_MINUTES") } empty_env_variables = [] for key, value in config.items(): if value == None: empty_env_variables.append(key) if len(empty_env_variables) > 0: print(f"Error: Missing the following required environment variables {empty_env_variables}", file=sys.stderr) exit(1) # Convert non-string env variables config["BACKOFF_MINUTES"] = int(config["BACKOFF_MINUTES"]) config["TIMEOUT_MINUTES"] = int(config["TIMEOUT_MINUTES"])
- Log in to retrieve a valid access token
def get_auth_token(auth_api: str, username: str, password: str) -> str: payload = { "username": username, "password": password } try: res = requests.post(f"{auth_api}/login", json = payload) res.raise_for_status() jsonRes = res.json() return jsonRes["access_token"] except HTTPError as http_err: print(f"Auth API HTTP error: {http_err}", file=sys.stderr) except Exception as err: print(f"Auth API error: {err}", file=sys.stderr) exit(1) # Handle authentication refresh_access_token = lambda: get_auth_token( config["AUTH_API_URL"], config["LITMUS_USERNAME"], config["LITMUS_PASSWORD"] ) auth_token = refresh_access_token()
- Use APIs to get additional required config values
# Note: this function assumes one Litmus Chaos project def get_project_id(auth_api: str, auth_token: str) -> str: try: res = requests.get(f"{auth_api}/list_projects", headers={"Authorization": f"Bearer {auth_token}"}) res.raise_for_status() jsonRes = res.json() project = jsonRes["data"][0] return project["ID"] except HTTPError as http_err: print(f"Auth API HTTP error: {http_err}", file=sys.stderr) except Exception as err: print(f"Auth API error: {err}", file=sys.stderr) exit(1) # Note: this function assumes one Litmus Chaos cluster delegate def get_cluster_id(server_api: str, auth_token: str, project_id: str, cluster_type: str) -> str: try: transport = RequestsHTTPTransport( url=f"{server_api}/query", cookies={"litmus-cc-token": auth_token} ) client = Client(transport=transport, fetch_schema_from_transport=True) query = gql( """ query listClusters($projectID: String!, $clusterType: String){ listClusters(projectID: $projectID, clusterType: $clusterType){ clusterID } } """ ) params = { "projectID": project_id, "clusterType": cluster_type } res = client.execute(query, variable_values=params) cluster = res["listClusters"][0] return cluster["clusterID"] except GraphQLError as gql_err: print(f"List Clusters GraphQL error: {gql_err}", file=sys.stderr) except Exception as err: print(f"List Clusters error: {err}", file=sys.stderr) exit(1) project_id = get_project_id(config["AUTH_API_URL"], auth_token) cluster_id = get_cluster_id(config["SERVER_API_URL"], auth_token, project_id, config["CLUSTER_TYPE"])
- Parse the custom Chaos Scenario YAML into JSON format
def parse_scenario_yaml(absolute_path: str) -> any: with open(absolute_path) as yaml_in: scenario_json = yaml.safe_load(yaml_in) return scenario_json scenario_json = parse_scenario_yaml(config["SCENARIO_YAML_PATH"])
- Use all the above data to build the launch-scenario request object (i.e.
ChaosWorkflowRequest
)def create_workflow_request(project_id: str, cluster_id: str, scenario_json: any, exec_id_value: str) -> ChaosWorkflowRequest: try: # Append execution_id to .metadata.generateName to create a unique but static ".metadata.name", as the API doesn't accept "generateName" generated_name = (scenario_json["metadata"]["generateName"] + exec_id_value) # Validate generated scenario name # Note: Litmus API currently allows the creation of scenarios with invalid names, which causes errors (see https://github.com/litmuschaos/litmus/issues/3857) pattern = re.compile(r'^[a-zA-Z0-9-]{1,54}$') if not pattern.match(generated_name): raise Exception(f"Invalid scenario name: {generated_name}") scenario_json["metadata"]["name"] = generated_name print(f"Processing scenario: {generated_name}") scenario_params = scenario_json["spec"]["arguments"]["parameters"] scenario_id = next(param for param in scenario_params if param["name"] == "scenario_id") # Replace placeholder execution_id value with passed in value for param_dict in scenario_params: if param_dict["name"] == "execution_id": param_dict.update({"name": param_dict["name"], "value": exec_id_value}) break exec_id = next(param for param in scenario_params if param["name"] == "execution_id") description = f"Scenario: {scenario_id['value']} | Execution: {exec_id['value']}" scenario_json_str = json.dumps(scenario_json) return ChaosWorkflowRequest(scenario_json_str, generated_name, description, project_id, cluster_id) except Exception as err: print(f"Error processing scenario manifest: {err}", file=sys.stderr) exit(1) scenario_payload = create_workflow_request(project_id, cluster_id, scenario_json, config["EXECUTION_ID"])
For reference, the
ChaosWorkflowRequest
construct used in this example looks like:class ChaosWorkflowRequest: def __init__(self, scenario_json_str: str, name: str, desc: str, project_id: str, cluster_id: str): self.workflow_manifest = scenario_json_str self.workflow_name = name self.workflow_description = desc self.cron_syntax = "" self.weightages = [{"experimentName": "", "weightage": 0}] self.is_custom_workflow = True self.project_id = project_id self.cluster_id = cluster_id
- Send the API request to launch the scenario
def launch_scenario(server_api: str, auth_token: str, scenario: ChaosWorkflowRequest) -> str: try: transport = RequestsHTTPTransport( url=f"{server_api}/query", cookies={"litmus-cc-token": auth_token} ) client = Client(transport=transport, fetch_schema_from_transport=True) query = gql( """ mutation createChaosWorkFlow($input: ChaosWorkFlowRequest!) { createChaosWorkFlow(request: $input) { workflowID cronSyntax workflowName workflowDescription isCustomWorkflow } } """ ) params = { "input": { "workflowManifest": scenario.workflow_manifest, "workflowName": scenario.workflow_name, "workflowDescription": scenario.workflow_description, "cronSyntax": scenario.cron_syntax, "weightages": scenario.weightages, "isCustomWorkflow": scenario.is_custom_workflow, "projectID": scenario.project_id, "clusterID": scenario.cluster_id } } res = client.execute(query, variable_values=params) return res["createChaosWorkFlow"]["workflowID"] except GraphQLError as gql_err: print(f"Launch GraphQL error: {gql_err}", file=sys.stderr) except Exception as err: print(f"Launch error: {err}", file=sys.stderr) exit(1) litmus_workflow_id = launch_scenario( config["SERVER_API_URL"], auth_token, scenario_payload )
- Use APIs to wait for the scenario to run to completion before reporting the result (success or error exit code)
def wait_for_scenario_completion(server_api: str, auth_token: str, refresh_access_token, project_id: str, litmus_workflow_id: str, backoff_minutes: int, timeout_minutes: int) -> any: start_time = time.time() timeout = start_time + (60 * timeout_minutes) is_consecutive_auth_error = False is_running = True is_descending = True # Backoff once to allow Litmus to initialize the scenario run print(f"Created Chaos Scenario: {litmus_workflow_id}") time.sleep(config["BACKOFF_MINUTES"]*60) while(is_running): try: if time.time() > timeout: raise TimeoutError(timeout_minutes) transport = RequestsHTTPTransport( url=f"{server_api}/query", cookies={"litmus-cc-token": auth_token} ) client = Client(transport=transport, fetch_schema_from_transport=True) query = gql( """ query listWorkflowRuns($request: ListWorkflowRunsRequest!) { listWorkflowRuns(request: $request) { totalNoOfWorkflowRuns workflowRuns { workflowRunID workflowName phase lastUpdated executionData } } } """ ) params = { "request": { "projectID": project_id, "workflowIDs": [litmus_workflow_id], "sort": { "field": "TIME", "descending": is_descending } } } # Send request for workflow run res = client.execute(query, variable_values=params) # Reset flag for when we refresh an expired token is_consecutive_auth_error = False # Extract the scenario status scenario_runs = res["listWorkflowRuns"]["workflowRuns"] if len(scenario_runs) == 0: raise Exception("No scenario runs found") run_phase = scenario_runs[0]["phase"] execution_data_string = scenario_runs[0]["executionData"] execution_data_json = json.loads(execution_data_string) finished_at = execution_data_json["finishedAt"] # Record elapsed time for console output elapsed_seconds = int((time.time()-start_time)) elapsed_minutes, elapsed_seconds = divmod(elapsed_seconds, 60) elapsed_hours, elapsed_minutes = divmod(elapsed_minutes, 60) # Evaluate scenario status if len(finished_at) == 0: if run_phase == "Terminated": raise Exception(run_phase) else: # run_phase is either Running or Failed, but not yet completed print(f"{scenario_runs[0]['workflowName']} Running with status: {run_phase} [ {elapsed_hours}h {elapsed_minutes}m {elapsed_seconds}s elapsed ]") time.sleep(60 * backoff_minutes) continue else: if run_phase == "Succeeded": is_running = False else: # run_phase is "Failed" raise Exception(run_phase) except TransportQueryError as transport_err: # Try to refresh the access token once; if we have consecutive errors, exit with errors error_msg = transport_err.errors[0]['message'] if error_msg == "Invalid Token" and is_consecutive_auth_error == False: print(f"Invalid access token. Retrying once with new token...") auth_token = refresh_access_token() is_consecutive_auth_error = True continue print(f"Scenario status transport error: {transport_err}", file=sys.stderr) exit(1) except GraphQLError as gql_err: print(f"Scenario status GraphQL error: {gql_err}", file=sys.stderr) exit(1) except TimeoutError as timeout_err: print(f"Scenario status Timeout error: {timeout_err}", file=sys.stderr) exit(1) except Exception as err: print(f"Scenario completed with status: {err}", file=sys.stderr) exit(1) print(f"Scenario completed with status: Succeeded") # Wait for scenario completion and evaluate the run status to determine exit code scenario_run = wait_for_scenario_completion( config["SERVER_API_URL"], auth_token, refresh_access_token, project_id, litmus_workflow_id, config["BACKOFF_MINUTES"], config["TIMEOUT_MINUTES"] )
In this Python3 example, these were the dependencies used (i.e. requirements.txt
):
python-dotenv==0.21.0
pyyaml==6.0
requests==2.28.1
gql[requests]==3.4.0
Running the Script in a Pipeline
To integrate our script into our Azure DevOps Pipeline, we first apply any Kubernetes templates referenced in the Litmus Scenario in the cluster Litmus is installed in.
Then, we make sure to install all script dependencies (e.g. pip install -r requirements.txt
) and set the required inputs as environment variables.
Lastly, we run the Python script in our pipeline to successfully kick off the desired Litmus Chaos Scenario and wait for its completion.
Alternate Approach: using litmusctl
An alternative method for launching our scenarios programmatically is to use litmusctl.
Created by the Litmus team, litmusctl
is a command-line tool allowing users to manage their chaos delegate control plane.
As the official documentation explains, it allows users to manage chaos delegates, scenarios, projects, and accounts all from the CLI.
This includes the ability to create and kick off Litmus Chaos scenarios from the command-line.
While it’s a relatively user-friendly and straightforward CLI tool for people to use, we decided to use the Litmus APIs instead, as it provided us with more flexibility and control.
Summary
Litmus Chaos can be complex and comes with some challenges, but it’s also a powerful tool. In this article, we provided one method of using Litmus Chaos programmatically in an Azure DevOps Pipeline. To get started on your own, please check out the Litmus docs on running your first scenario. We hope the strategies and learnings we shared in this post can help you integrate Litmus Chaos into your own pipelines.