{"id":14697,"date":"2023-05-11T00:00:00","date_gmt":"2023-05-11T07:00:00","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/cse\/?p=14697"},"modified":"2024-11-03T23:42:35","modified_gmt":"2024-11-04T07:42:35","slug":"launching-litmus-chaos-scenarios-programmatically","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/ise\/launching-litmus-chaos-scenarios-programmatically\/","title":{"rendered":"Launching Litmus Chaos Scenarios Programmatically"},"content":{"rendered":"<p>Litmus Chaos is a CNCF-hosted open-source Chaos Engineering platform. It allows teams to conduct controlled chaos tests on Kubernetes applications, which reveals infrastructure weaknesses or potential outages.\nIn a recent engagement, our team was tasked with using Litmus to conduct a variety of complex experiments on Kubernetes clusters in a repeatable way.<\/p>\n<p>Typically, users leverage the Litmus UI (<a href=\"https:\/\/docs.litmuschaos.io\/docs\/2.13.0\/getting-started\/resources\/#chaoscenter\">ChaosCenter<\/a>) to organize, launch, and monitor scenarios.\nHowever, we wanted to automate our simulations by running our Litmus Chaos scenarios through an Azure DevOps Pipeline. Because of this, we needed to find a way to kick off our scenarios programmatically.\nIn this blog post, we describe how we utilized the Litmus APIs to programmatically launch and track the progress of Litmus scenarios in our pipeline.<\/p>\n<h2>Python Script Utilizing Litmus APIs<\/h2>\n<p>There are two APIs available for use with the standard Litmus Chaos installation that we took advantage of:<\/p>\n<ul>\n<li>The <a href=\"https:\/\/litmuschaos.github.io\/litmus\/auth\/v2.0.0\/api.html\">Authentication API (REST)<\/a>\n<ul>\n<li>Authenticate users<\/li>\n<li>Update profiles<\/li>\n<li>Reset credentials<\/li>\n<li>Update projects<\/li>\n<li>Create new users, etc.<\/li>\n<\/ul>\n<\/li>\n<li>The <a href=\"https:\/\/litmuschaos.github.io\/litmus\/graphql\/v2.9.0\/api.html\">Server API (GraphQL)<\/a>\n<ul>\n<li>Perform operations on agents\/delegates<\/li>\n<li>Perform operations on scenarios<\/li>\n<li>Perform operations on ChaosHub experiments<\/li>\n<li>Perform operations relevant to analytics\/monitoring, etc.<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n<p>With the Litmus frontend service exposed (e.g. <code>http:\/\/localhost:9091<\/code>), we sent requests to the Auth REST API at <code>\/auth<\/code> and to the Server GraphQL API at <code>\/api\/query<\/code>.<\/p>\n<blockquote><p><strong>Note:<\/strong> As of February 2023, the latest published docs for the Auth API only covers <code>v2.0.0<\/code> and the latest published docs for the Server API only covers <code>v2.9.0<\/code>. While these are considerably out-of-date, they are what have been linked above for reference.<\/p>\n<p>For the current stable release of Litmus (<code>v2.14.0<\/code>), the Auth API docs are still mostly accurate but the Server API docs are not.<\/p>\n<p>A workaround for up-to-date Server API docs we used was to leverage the &#8220;GraphQL Playground&#8221;, available at <code>\/api<\/code> by default. The playground is always up-to-date and has two useful tabs: one listing all supported GraphQL queries and mutations, and one listing all relevant defined data types.<\/p><\/blockquote>\n<p>These API calls are integrated into our Python script that we will walk through below.\nOur team decided to use Python for simplicity and due to the high availability of helpful packages (e.g. <a href=\"https:\/\/github.com\/graphql-python\/gql\"><code>gql<\/code><\/a> to help work with GraphQL).\nThe same logic can be translated to any other language and make use of any other comparable packages.<\/p>\n<p>In the script, we authenticate with Litmus and then launch and track a scenario.\nThis is done by the following sequence of events:<\/p>\n<ol>\n<li>Load and validate required inputs\n<pre><code class=\"language-python\"># Load environment variables (if using a .env file)\r\nload_dotenv(override=True)\r\n\r\n# Validate required env variables\r\nconfig = {\r\n    \"EXECUTION_ID\": os.getenv(\"EXECUTION_ID\"), # EXECUTION_ID was used as a correlation ID\r\n    \"AUTH_API_URL\": os.getenv(\"AUTH_API_URL\"),\r\n    \"LITMUS_USERNAME\": os.getenv(\"LITMUS_USERNAME\"),\r\n    \"LITMUS_PASSWORD\": os.getenv(\"LITMUS_PASSWORD\"),\r\n    \"SERVER_API_URL\": os.getenv(\"SERVER_API_URL\"),\r\n    \"SCENARIO_YAML_PATH\": os.getenv(\"SCENARIO_YAML_PATH\"),\r\n    \"CLUSTER_TYPE\": os.getenv(\"CLUSTER_TYPE\"),\r\n    \"BACKOFF_MINUTES\": os.getenv(\"BACKOFF_MINUTES\"),\r\n    \"TIMEOUT_MINUTES\": os.getenv(\"TIMEOUT_MINUTES\")\r\n}\r\nempty_env_variables = []\r\nfor key, value in config.items():\r\n    if value == None:\r\n        empty_env_variables.append(key)\r\nif len(empty_env_variables) &gt; 0:\r\n    print(f\"Error: Missing the following required environment variables {empty_env_variables}\", file=sys.stderr)\r\n    exit(1)\r\n\r\n# Convert non-string env variables\r\nconfig[\"BACKOFF_MINUTES\"] = int(config[\"BACKOFF_MINUTES\"])\r\nconfig[\"TIMEOUT_MINUTES\"] = int(config[\"TIMEOUT_MINUTES\"])<\/code><\/pre>\n<\/li>\n<li>Log in to retrieve a valid access token\n<pre><code class=\"language-python\">def get_auth_token(auth_api: str, username: str, password: str) -&gt; str:\r\n    payload = {\r\n        \"username\": username,\r\n        \"password\": password\r\n    }\r\n    try:\r\n        res = requests.post(f\"{auth_api}\/login\", json = payload)\r\n        res.raise_for_status()\r\n        jsonRes = res.json()\r\n        return jsonRes[\"access_token\"]\r\n    except HTTPError as http_err:\r\n        print(f\"Auth API HTTP error: {http_err}\", file=sys.stderr)\r\n    except Exception as err:\r\n        print(f\"Auth API error: {err}\", file=sys.stderr)\r\n    exit(1)        \r\n\r\n# Handle authentication\r\nrefresh_access_token = lambda: get_auth_token(\r\n    config[\"AUTH_API_URL\"],\r\n    config[\"LITMUS_USERNAME\"],\r\n    config[\"LITMUS_PASSWORD\"]\r\n)\r\nauth_token = refresh_access_token()<\/code><\/pre>\n<\/li>\n<li>Use APIs to get additional required config values\n<pre><code class=\"language-python\"># Note: this function assumes one Litmus Chaos project\r\ndef get_project_id(auth_api: str, auth_token: str) -&gt; str:\r\n    try:\r\n        res = requests.get(f\"{auth_api}\/list_projects\", headers={\"Authorization\": f\"Bearer {auth_token}\"})\r\n        res.raise_for_status()\r\n        jsonRes = res.json()\r\n        project = jsonRes[\"data\"][0]\r\n        return project[\"ID\"]\r\n    except HTTPError as http_err:\r\n        print(f\"Auth API HTTP error: {http_err}\", file=sys.stderr)\r\n    except Exception as err:\r\n        print(f\"Auth API error: {err}\", file=sys.stderr)\r\n    exit(1)\r\n\r\n# Note: this function assumes one Litmus Chaos cluster delegate\r\ndef get_cluster_id(server_api: str, auth_token: str, project_id: str, cluster_type: str) -&gt; str:\r\n    try:\r\n        transport = RequestsHTTPTransport(\r\n            url=f\"{server_api}\/query\",\r\n            cookies={\"litmus-cc-token\": auth_token}\r\n        )\r\n\r\n        client = Client(transport=transport, fetch_schema_from_transport=True)\r\n\r\n        query = gql(\r\n            \"\"\"\r\n            query listClusters($projectID: String!, $clusterType: String){\r\n                listClusters(projectID: $projectID, clusterType: $clusterType){\r\n                    clusterID\r\n                }\r\n            }\r\n            \"\"\"\r\n        )\r\n\r\n        params = {\r\n            \"projectID\": project_id,\r\n            \"clusterType\": cluster_type\r\n        }\r\n\r\n        res = client.execute(query, variable_values=params)\r\n        cluster = res[\"listClusters\"][0]\r\n        return cluster[\"clusterID\"]\r\n    except GraphQLError as gql_err:\r\n        print(f\"List Clusters GraphQL error: {gql_err}\", file=sys.stderr)\r\n    except Exception as err:\r\n        print(f\"List Clusters error: {err}\", file=sys.stderr)\r\n    exit(1)\r\n\r\nproject_id = get_project_id(config[\"AUTH_API_URL\"], auth_token)\r\ncluster_id = get_cluster_id(config[\"SERVER_API_URL\"], auth_token, project_id, config[\"CLUSTER_TYPE\"])\r\n<\/code><\/pre>\n<\/li>\n<li>Parse the <a href=\"https:\/\/docs.litmuschaos.io\/docs\/2.14.0\/user-guides\/construct-workflow\/\">custom Chaos Scenario YAML<\/a> into JSON format\n<pre><code class=\"language-python\">def parse_scenario_yaml(absolute_path: str) -&gt; any:\r\n    with open(absolute_path) as yaml_in:\r\n        scenario_json = yaml.safe_load(yaml_in)\r\n        return scenario_json\r\n\r\nscenario_json = parse_scenario_yaml(config[\"SCENARIO_YAML_PATH\"])<\/code><\/pre>\n<\/li>\n<li>Use all the above data to build the launch-scenario request object (i.e. <code>ChaosWorkflowRequest<\/code>)\n<pre><code class=\"language-python\">def create_workflow_request(project_id: str, cluster_id: str, scenario_json: any, exec_id_value: str) -&gt; ChaosWorkflowRequest:\r\n    try:\r\n        # Append execution_id to .metadata.generateName to create a unique but static \".metadata.name\", as the API doesn't accept \"generateName\"\r\n        generated_name = (scenario_json[\"metadata\"][\"generateName\"] + exec_id_value)\r\n\r\n        # Validate generated scenario name\r\n        # Note: Litmus API currently allows the creation of scenarios with invalid names, which causes errors (see https:\/\/github.com\/litmuschaos\/litmus\/issues\/3857)\r\n        pattern = re.compile(r'^[a-zA-Z0-9-]{1,54}$')\r\n        if not pattern.match(generated_name):\r\n            raise Exception(f\"Invalid scenario name: {generated_name}\")\r\n        scenario_json[\"metadata\"][\"name\"] = generated_name\r\n        print(f\"Processing scenario: {generated_name}\")\r\n\r\n        scenario_params = scenario_json[\"spec\"][\"arguments\"][\"parameters\"]\r\n        scenario_id = next(param for param in scenario_params if param[\"name\"] == \"scenario_id\")\r\n        # Replace placeholder execution_id value with passed in value\r\n        for param_dict in scenario_params:\r\n            if param_dict[\"name\"] == \"execution_id\":\r\n                param_dict.update({\"name\": param_dict[\"name\"], \"value\": exec_id_value})\r\n                break\r\n        exec_id = next(param for param in scenario_params if param[\"name\"] == \"execution_id\")\r\n        description = f\"Scenario: {scenario_id['value']} | Execution: {exec_id['value']}\"\r\n\r\n        scenario_json_str = json.dumps(scenario_json)\r\n\r\n        return ChaosWorkflowRequest(scenario_json_str, generated_name, description, project_id, cluster_id)\r\n    except Exception as err:\r\n        print(f\"Error processing scenario manifest: {err}\", file=sys.stderr)\r\n        exit(1)\r\n\r\nscenario_payload = create_workflow_request(project_id, cluster_id, scenario_json, config[\"EXECUTION_ID\"])<\/code><\/pre>\n<p>For reference, the <code>ChaosWorkflowRequest<\/code> construct used in this example looks like:<\/p>\n<pre><code class=\"language-python\">class ChaosWorkflowRequest:\r\n    def __init__(self, scenario_json_str: str, name: str, desc: str, project_id: str, cluster_id: str):\r\n        self.workflow_manifest = scenario_json_str\r\n        self.workflow_name = name\r\n        self.workflow_description = desc\r\n        self.cron_syntax = \"\"\r\n        self.weightages = [{\"experimentName\": \"\", \"weightage\": 0}]\r\n        self.is_custom_workflow = True\r\n        self.project_id = project_id\r\n        self.cluster_id = cluster_id<\/code><\/pre>\n<\/li>\n<li>Send the API request to launch the scenario\n<pre><code class=\"language-python\">def launch_scenario(server_api: str, auth_token: str, scenario: ChaosWorkflowRequest) -&gt; str:\r\n    try:\r\n        transport = RequestsHTTPTransport(\r\n            url=f\"{server_api}\/query\",\r\n            cookies={\"litmus-cc-token\": auth_token}\r\n        )\r\n\r\n        client = Client(transport=transport, fetch_schema_from_transport=True)\r\n\r\n        query = gql(\r\n            \"\"\"\r\n            mutation createChaosWorkFlow($input: ChaosWorkFlowRequest!) {\r\n                createChaosWorkFlow(request: $input) {\r\n                    workflowID\r\n                    cronSyntax\r\n                    workflowName\r\n                    workflowDescription\r\n                    isCustomWorkflow\r\n                }\r\n            }\r\n            \"\"\"\r\n        )\r\n\r\n        params = {\r\n            \"input\": {\r\n                \"workflowManifest\": scenario.workflow_manifest,\r\n                \"workflowName\": scenario.workflow_name,\r\n                \"workflowDescription\": scenario.workflow_description,\r\n                \"cronSyntax\": scenario.cron_syntax,\r\n                \"weightages\": scenario.weightages,\r\n                \"isCustomWorkflow\": scenario.is_custom_workflow,\r\n                \"projectID\": scenario.project_id,\r\n                \"clusterID\": scenario.cluster_id\r\n            }\r\n        }\r\n\r\n        res = client.execute(query, variable_values=params)\r\n        return res[\"createChaosWorkFlow\"][\"workflowID\"]\r\n    except GraphQLError as gql_err:\r\n        print(f\"Launch GraphQL error: {gql_err}\", file=sys.stderr)\r\n    except Exception as err:\r\n        print(f\"Launch error: {err}\", file=sys.stderr)\r\n    exit(1)\r\n\r\nlitmus_workflow_id = launch_scenario(\r\n    config[\"SERVER_API_URL\"],\r\n    auth_token,\r\n    scenario_payload\r\n)<\/code><\/pre>\n<\/li>\n<li>Use APIs to wait for the scenario to run to completion before reporting the result (success or error exit code)\n<pre><code class=\"language-python\">def wait_for_scenario_completion(server_api: str, auth_token: str, refresh_access_token, project_id: str, litmus_workflow_id: str, backoff_minutes: int, timeout_minutes: int) -&gt; any:\r\n    start_time = time.time()\r\n    timeout = start_time + (60 * timeout_minutes)\r\n    is_consecutive_auth_error = False\r\n    is_running = True\r\n    is_descending = True\r\n\r\n    # Backoff once to allow Litmus to initialize the scenario run\r\n    print(f\"Created Chaos Scenario: {litmus_workflow_id}\")\r\n    time.sleep(config[\"BACKOFF_MINUTES\"]*60)\r\n\r\n    while(is_running):\r\n        try:\r\n            if time.time() &gt; timeout:\r\n                raise TimeoutError(timeout_minutes)\r\n\r\n            transport = RequestsHTTPTransport(\r\n                url=f\"{server_api}\/query\",\r\n                cookies={\"litmus-cc-token\": auth_token}\r\n            )\r\n\r\n            client = Client(transport=transport, fetch_schema_from_transport=True)\r\n\r\n            query = gql(\r\n                \"\"\"\r\n                query listWorkflowRuns($request: ListWorkflowRunsRequest!) {\r\n                    listWorkflowRuns(request: $request) {\r\n                        totalNoOfWorkflowRuns\r\n                        workflowRuns {\r\n                            workflowRunID\r\n                            workflowName\r\n                            phase\r\n                            lastUpdated\r\n                            executionData\r\n                        }\r\n                    }\r\n                }\r\n                \"\"\"\r\n            )\r\n\r\n            params = {\r\n                \"request\": {\r\n                    \"projectID\": project_id,\r\n                    \"workflowIDs\": [litmus_workflow_id],\r\n                    \"sort\": {\r\n                        \"field\": \"TIME\",\r\n                        \"descending\": is_descending\r\n                    }\r\n                }\r\n            }\r\n\r\n            # Send request for workflow run\r\n            res = client.execute(query, variable_values=params)\r\n\r\n            # Reset flag for when we refresh an expired token\r\n            is_consecutive_auth_error = False\r\n\r\n            # Extract the scenario status\r\n            scenario_runs = res[\"listWorkflowRuns\"][\"workflowRuns\"]\r\n            if len(scenario_runs) == 0:\r\n                raise Exception(\"No scenario runs found\")\r\n            run_phase = scenario_runs[0][\"phase\"]\r\n            execution_data_string = scenario_runs[0][\"executionData\"]\r\n            execution_data_json = json.loads(execution_data_string)\r\n            finished_at = execution_data_json[\"finishedAt\"]\r\n\r\n            # Record elapsed time for console output\r\n            elapsed_seconds = int((time.time()-start_time))\r\n            elapsed_minutes, elapsed_seconds = divmod(elapsed_seconds, 60)\r\n            elapsed_hours, elapsed_minutes = divmod(elapsed_minutes, 60)\r\n\r\n            # Evaluate scenario status\r\n            if len(finished_at) == 0:\r\n                if run_phase == \"Terminated\":\r\n                    raise Exception(run_phase)\r\n                else: # run_phase is either Running or Failed, but not yet completed\r\n                    print(f\"{scenario_runs[0]['workflowName']} Running with status: {run_phase} [ {elapsed_hours}h {elapsed_minutes}m {elapsed_seconds}s elapsed ]\")\r\n                    time.sleep(60 * backoff_minutes)\r\n                    continue\r\n            else:\r\n                if run_phase == \"Succeeded\":\r\n                    is_running = False\r\n                else: # run_phase is \"Failed\"\r\n                    raise Exception(run_phase)\r\n\r\n        except TransportQueryError as transport_err:\r\n            # Try to refresh the access token once; if we have consecutive errors, exit with errors\r\n            error_msg = transport_err.errors[0]['message']\r\n            if error_msg == \"Invalid Token\" and is_consecutive_auth_error == False:\r\n                print(f\"Invalid access token. Retrying once with new token...\")\r\n                auth_token = refresh_access_token()\r\n                is_consecutive_auth_error = True\r\n                continue\r\n            print(f\"Scenario status transport error: {transport_err}\", file=sys.stderr)\r\n            exit(1)\r\n        except GraphQLError as gql_err:\r\n            print(f\"Scenario status GraphQL error: {gql_err}\", file=sys.stderr)\r\n            exit(1)\r\n        except TimeoutError as timeout_err:\r\n            print(f\"Scenario status Timeout error: {timeout_err}\", file=sys.stderr)\r\n            exit(1)\r\n        except Exception as err:\r\n            print(f\"Scenario completed with status: {err}\", file=sys.stderr)\r\n            exit(1)\r\n\r\n        print(f\"Scenario completed with status: Succeeded\")\r\n\r\n# Wait for scenario completion and evaluate the run status to determine exit code\r\nscenario_run = wait_for_scenario_completion(\r\n    config[\"SERVER_API_URL\"],\r\n    auth_token,\r\n    refresh_access_token,\r\n    project_id,\r\n    litmus_workflow_id,\r\n    config[\"BACKOFF_MINUTES\"],\r\n    config[\"TIMEOUT_MINUTES\"]\r\n)<\/code><\/pre>\n<\/li>\n<\/ol>\n<p>In this Python3 example, these were the dependencies used (i.e. <code>requirements.txt<\/code>):<\/p>\n<pre><code class=\"language-txt\">python-dotenv==0.21.0\r\npyyaml==6.0\r\nrequests==2.28.1\r\ngql[requests]==3.4.0<\/code><\/pre>\n<h2>Running the Script in a Pipeline<\/h2>\n<p>To integrate our script into our Azure DevOps Pipeline, we first apply any Kubernetes templates referenced in the Litmus Scenario in the cluster Litmus is installed in.\nThen, we make sure to install all script dependencies (e.g. <code>pip install -r requirements.txt<\/code>) and set the required inputs as environment variables.\nLastly, we run the Python script in our pipeline to successfully kick off the desired Litmus Chaos Scenario and wait for its completion.<\/p>\n<h2>Alternate Approach: using <code>litmusctl<\/code><\/h2>\n<p>An alternative method for launching our scenarios programmatically is to use <a href=\"https:\/\/docs.litmuschaos.io\/docs\/2.14.0\/litmusctl\/installation\/\">litmusctl<\/a>.\nCreated by the Litmus team, <code>litmusctl<\/code> is a command-line tool allowing users to manage their chaos delegate control plane.\nAs the official documentation explains, it allows users to manage chaos delegates, scenarios, projects, and accounts all from the CLI.\nThis includes the ability to create and kick off Litmus Chaos scenarios from the command-line.<\/p>\n<p>While it&#8217;s a relatively user-friendly and straightforward CLI tool for people to use, we decided to use the Litmus APIs instead, as it provided us with more flexibility and control.<\/p>\n<h2>Summary<\/h2>\n<p>Litmus Chaos can be complex and comes with some challenges, but it&#8217;s also a powerful tool.\nIn this article, we provided one method of using Litmus Chaos programmatically in an Azure DevOps Pipeline.\nTo get started on your own, please check out the Litmus docs on <a href=\"https:\/\/docs.litmuschaos.io\/docs\/2.14.0\/getting-started\/run-your-first-workflow\/\">running your first scenario<\/a>. We hope the strategies and learnings we shared in this post can help you integrate Litmus Chaos into your own pipelines.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Guidance for how to kick off a Litmus Chaos Scenarios programmatically and in an Azure DevOps Pipeline.<\/p>\n","protected":false},"author":118448,"featured_media":14703,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1],"tags":[3371,3407,151,229,3364,300],"class_list":["post-14697","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-cse","tag-azure-pipelines","tag-chaos-engineering","tag-devops","tag-kubernetes","tag-open-source","tag-python"],"acf":[],"blog_post_summary":"<p>Guidance for how to kick off a Litmus Chaos Scenarios programmatically and in an Azure DevOps Pipeline.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/14697","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/users\/118448"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/comments?post=14697"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/14697\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media\/14703"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media?parent=14697"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/categories?post=14697"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/tags?post=14697"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}