Handle business exceptions (for example, invalid data)

One more scenario to handle: what to do with invalid data? We should at least log a message in case we have some data that doesn't pass our validate_traffic_data() check.

@task def consume_traffic_data(): """ Inhuman Insurance, Inc. Artificial Intelligence System robot. Consumes traffic data work items. """ process_traffic_data() def process_traffic_data(): for item in workitems.inputs: traffic_data = item.payload["traffic_data"] if len(traffic_data["country"]) == 3: status, return_json = post_traffic_data_to_sales_system(traffic_data) if status == 200: item.done() else: item.fail( exception_type="APPLICATION", code="TRAFFIC_DATA_POST_FAILED", message=return_json["message"], ) else: item.fail( exception_type="BUSINESS", code="INVALID_TRAFFIC_DATA", message=item.payload, ) def post_traffic_data_to_sales_system(traffic_data): url = "https://robocorp.com/inhuman-insurance-inc/sales-system-api" response = requests.post(url, json=traffic_data) return response.status_code, response.json()
  • You added an extra else, to the check regarding the validity of the data, moved the status related logic under the previous if to take care of dealing with invalid traffic data. It logs a message, including the invalid data. The work item is released as FAILED. This time the exception type is BUSINESS. The code can be any custom code. You choose to use INVALID_TRAFFIC_DATA. The exception type and the code help filter the failed work items in Control Room.

After running the consumer task, you see the following log entry regarding the invalid data (due to the country code not matching the validation rule):

Releasing item '1' with FAILED state and exception: {'type': 'BUSINESS', 'code': 'INVALID_TRAFFIC_DATA', 'message': {'traffic_data': {'country': 'SWError', 'year': 2019, 'rate': 3.13947}}}

You notice that the entire logic is in process_traffic_data() and in our @task function we only call this one function. You know what you have to do. That's right, some more refactoring!

Your consumer logic is now complete!

The consumer:

  • loops all the work items one by one.
  • validates the data.
  • posts the data to the sales system API.
  • handles successful responses.
  • handles application exceptions.
  • handles business exceptions.

Your complete code now looks like:

import requests from robocorp import workitems from robocorp.tasks import task from RPA.HTTP import HTTP from RPA.JSON import JSON from RPA.Tables import Tables http = HTTP() json = JSON() table = Tables() TRAFFIC_JSON_FILE_PATH = "output/traffic.json" # JSON data keys COUNTRY_KEY = "SpatialDim" YEAR_KEY = "TimeDim" RATE_KEY = "NumericValue" GENDER_KEY = "Dim1" @task def produce_traffic_data(): """ Inhuman Insurance, Inc. Artificial Intelligence System automation. Produces traffic data work items. """ http.download( url="https://github.com/robocorp/inhuman-insurance-inc/raw/main/RS_198.json", target_file=TRAFFIC_JSON_FILE_PATH, overwrite=True, ) traffic_data = load_traffic_data_as_table() filtered_data = filter_and_sort_traffic_data(traffic_data) filtered_data = get_latest_data_by_country(filtered_data) payloads = create_work_item_payloads(filtered_data) save_work_item_payloads(payloads) @task def consume_traffic_data(): """ Inhuman Insurance, Inc. Artificial Intelligence System automation. Consumes traffic data work items. """ for item in workitems.inputs: traffic_data = item.payload["traffic_data"] if len(traffic_data["country"]) == 3: status, return_json = post_traffic_data_to_sales_system(traffic_data) if status == 200: item.done() else: item.fail( exception_type="APPLICATION", code="TRAFFIC_DATA_POST_FAILED", message=return_json["message"], ) else: item.fail( exception_type="BUSINESS", code="INVALID_TRAFFIC_DATA", message=item.payload, ) def post_traffic_data_to_sales_system(traffic_data): url = "https://robocorp.com/inhuman-insurance-inc/sales-system-api" response = requests.post(url, json=traffic_data) return response.status_code, response.json() def load_traffic_data_as_table(): json_data = json.load_json_from_file(TRAFFIC_JSON_FILE_PATH) return table.create_table(json_data["value"]) def filter_and_sort_traffic_data(data): max_rate = 5.0 both_genders = "BTSX" table.filter_table_by_column(data, RATE_KEY, "<", max_rate) table.filter_table_by_column(data, GENDER_KEY, "==", both_genders) table.sort_table_by_column(data, YEAR_KEY, False) return data def get_latest_data_by_country(data): data = table.group_table_by_column(data, COUNTRY_KEY) latest_data_by_country = [] for group in data: first_row = table.pop_table_row(group) latest_data_by_country.append(first_row) return latest_data_by_country def create_work_item_payloads(traffic_data): payloads = [] for row in traffic_data: payload = dict( country=row[COUNTRY_KEY], year=row[YEAR_KEY], rate=row[RATE_KEY], ) payloads.append(payload) return payloads def save_work_item_payloads(payloads): for payload in payloads: variables = dict(traffic_data=payload) workitems.outputs.create(variables)

All done. The producer produces, and the consumer consumes. Bravissimo! ๐Ÿ‘