Monday, October 23, 2023

AWS Athena - Using Merge + Iceberg tables to store only changed records

This post is based on my github page: mypublicnotes/AWS/Athena/iceberg_tracking_changes.md at master · rajrao/mypublicnotes (github.com)

Change data capture using Athena and iceberg

Many times in a datalake, you have a source, where the source doesnt provide information about which records changed. Another use case is where you have an ETL, where you have multiple tables and columns taking part in it and its traditionally difficult to track which records changed in that ETL query. This page shows you one method for being able to track those changes and insert only those records that are new or had updates. (at the end, I also show how to track deletes). The method leverages AWS Iceberg tables in Athena (Athena Engine 3) and the upsert mechanism provided via the merge-into statement.

TL;DR; Check out the merge statement used to update only those records that had changes.

Setup: A CTE for source data

I am using a CTE to simulate source data, in practice, you would typically use another Athena table as your source, or a query that brings data together from multiple tables (aka ETL), etc. A key part to this method is using a hashing function that can be used to determine when a record has changes. I use xxhas64

with cte(id, value1, value2) as
    (
    select 1,'a1','b' union all
    select 4,'raj','rao' union all
    select 2,'c2','d2' 
    )
    select *, xxhash64(from_base64(value1 || value2)) as hash from cte

Note 1: You can use murmur3 instead of xxhash64 using the following code: murmur3(to_utf8(value1 || value2)).

Note 2: Here are the other hashing functions available: https://trino.io/docs/current/functions/binary.html

Setup: Create an iceberg table

The iceberg table is your final table. This will track the data that had changes. Id is the primary key in this case, you can have more columns that are part of the primary key used for the update.

CREATE TABLE
  test_db.hash_test (
  id int,
  value1 string,
  value2 string,
  hash string,
  last_updated_on timestamp)
  LOCATION 's3://my_test_bucket/hash_test'
  TBLPROPERTIES ( 'table_type' ='ICEBERG')

The ##Merge## statement

Here is a merge statement that inserts new records and updates only when there are changes. The merge statement uses the CTE described above as its source data. You can manipulate the CTE to test various scenarios. The hash column is used to determine when to insert/update data.

MERGE INTO hash_test as tgt
USING (
    with cte(id, value1, value2, value3) as
    (
    select 1,'a1','b',100 union all
    select 4,'rao','raj',200 union all
    select 2,'c2','d2',300 
    )
    select *, xxhash64(to_utf8(concat_ws('::',coalesce(value1,'-'),coalesce(value2,'-'),coalesce(cast(value3 as varchar))))) as hash from cte
) as src
ON tgt.id = src.id
WHEN MATCHED and src.hash <> tgt.hash
    THEN UPDATE SET  
    value1 = src.value1,
    value2 = src.value2,
    hash = src.hash,
    last_updated_on = current_timestamp
WHEN NOT MATCHED 
THEN INSERT (id, value1, value2, hash, last_updated_on)
      VALUES (src.id, src.value1, src.value2, src.hash, current_timestamp)	  

If you need to deal with deletes, you can add as your first matched phrase one of the following options (delete, or archive):

WHEN MATCHED and src.IsDeleted = 1
  THEN DELETE

or

WHEN MATCHED and src.IsDeleted = 1
  THEN UPDATE SET  
    is_archived = 1,
    last_updated_on = current_timestamp

Finally some examples of queries to view the data

-- see the history of changes
select * from test_db."hash_test$history" order by made_current_at desc

-- use a snasphot_id from above as your value for xxxxx
select * from test_db.hash_test for version as of xxxxx

-- get only the latest records from the table
select * from test_db.hash_test
where last_updated_on in (select max(last_updated_on) from test_db.hash_test)
order by last_updated_on

Reference:

  1. Athena Functions
  2. Query Delta Lake Tableshttps://docs.aws.amazon.com/athena/latest/ug/delta-lake-tables.html)
  3. Using Apache Iceberg tables

Testing Hashing Behavior

When hashing you need to make sure that null values are handled appropriately.

Ex: null, a, null and a, null, null should be treated as changes. If they generate the same hash, then you will miss this change. Also the hash functions need string input and hence, one needs to cast the data when its not of type string. For this reason, the computation of the hash gets complicated and I have not found a simpler solution around this.

with cte(id,note, value1, value2,value3) as
(
    select 1,null,'a1','b',1 union all
    select 4,null,'raj','rao',2 union all
    select 5,'both null',null,null,null union all
    select 6,'empty & null','',null,null union all
    select 7,'null & empty',null,'',1 union all
    select 8,'empty-empty','','',2 union all
    select 9,'str-null','a',null,3 union all
    select 10,'null-str',null,'a',4 union all
    select 100,null,'c2','d2',5 
)
select *
,concat_ws('::',coalesce(value1,'-'),coalesce(value2,'-'),coalesce(cast(value3 as varchar)))
, murmur3(to_utf8(concat_ws('::',coalesce(value1,'-'),coalesce(value2,'-'),coalesce(cast(value3 as varchar))))) as hash1
, xxhash64(to_utf8(concat_ws('::',coalesce(value1,'-'),coalesce(value2,'-'),coalesce(cast(value3 as varchar))))) as hash2
from cte
order by id




Tuesday, July 18, 2023

PowerBi/PowerQuery: Dealing with errors in Excel files

 When you have errors in your excel file, they sometimes leak through and adding "Table.ReplaceErrorValues" or "Table.RemoveRowsWithErrors" doesnt really work. What I have found is to add the error fix step right after the navigation step that loads the sheet. 

In the screenshot below, I have used "Table.RemoveRowsWithErrors" after the Navigation step and it fixed the error.





Sunday, May 28, 2023

Applying for US passport in 2023

We needed a new passport for our daughter as her passport expires in 5 months and 3 weeks from date of travel (country requires 6 months). 

Panicking, we emailed our senators and representatives. Got a call from one of them and they advised to call passport phone number and tell them country we were travelling to needs a visa (urgent passport appointment is provided 28 days out for countries needing passport).


Time line

May 5: figured out we needed new passport. Called passport agency, was told to call back 2 weeks prior to travel.

May 6: emailed senators and representatives

May 8: got call back from one of the representatives' staff advising about calling back and telling them that we needed a visa

May 9: called passport agency and got an interview date for May 23 at Colorado office in Aurora. Lucky for us this is a 30 minute drive for us.

May 23: appointment was for 8am. Should have lined up 30 minutes early. Line was long, but efficiently managed. Had flight tickets, birth certificate (as passport was for kid and this is considered a new application and not renewal), paper work about needing visa. The entire appointment lasted less than 60 minutes. Was told to return after 2pm on 25th to pick up passport.

May 25: got passport (took 15 minutes)


Reflections:

1. Everyone we spoke to from the phone staff to the people in the Colorado passport office were extremely helpful, efficient and great to work with.

2. Didn't really need help from senator/representatives, but the help they provided telling us the provision for visa, was the breakthrough we needed.

3. Next time we will apply for passport 12 months prior to expiration, as many countries need 6 months validity on passport for travel.

4. Kids need a ds-11 and you are reapplying for a new passport (I believe until age of 16). Their passports are valid for only 5 years.

Friday, May 12, 2023

AWS CloudFormation error: Properties validation failed for resource LAMBDAXXXX with message: #/Code: expected type: JSONObject, found: String

During a AWS CF deployment I got the following error:

Properties validation failed for resource LAMBDAXXXX with message: #/Code: expected type: JSONObject, found: String

After struggling for over 4 hours, it turned out that the issue was that the path was not correct in the CF Template!!!

  LAMBDAXXXX:
    Type: "AWS::Lambda::Function"
    Properties:
      FunctionName: LambdaIngestXXXX
      Code: "../lambda/lambda_function_folderpath_xxxx/"
      Handler: "lambda_function.lambda_handler"
      Layers:
        - !ImportValue LAYERPYREQUESTS
        - !ImportValue LAYERAWSWRANGLER

The error: "#/Code: expected type: JSONObject, found: String" it turns out translates to "your path ../lambda/lambda_function_folderpath_xxxx/" is wrong!!!

How I hate CloudFormation!!!

Hope this saves someone else time!

Monday, May 01, 2023

Add an image into a PowerBi report via PowerBi.com/PowerBi Service

Currently, there is no way to insert an image into a PowerBi report via PowerBi.com (power bi service).

One, workaround though, is to insert a "Blank" button and then set the "Fill" options Background

Friday, April 28, 2023

PowerBi Deep Links (URL query string parameter filtering) is not working

 If you find that your deeplinks into a PowerBi report are not working, please make sure you are using a "?" in front of the "filter parameter.

More info: Filter a report using query string parameters in the URL - Power BI | Microsoft Learn

If you are coming from a Reporting Server environment, the filter url used to have a & in front of it, which will not work.

When testing, one easy way to see if your query parameter is working is to check your filter pane. The filter if correctly picked up will show up in there in italics. The following filters the report by a field named "District" in a table named "District". The table name and column names are case sensitive. The value used for filtering is not. Another tell tale sign that your url is not correct is if the filter gets dropped from the URL once the page loads.

?filter=District/District eq 'FD - 01'


Example of an old Report Server Link which will not work because of the "&": ?rs:embed=true&filter=District/District eq 'FD - 01'

Thursday, April 06, 2023

Power BI + Denodo: FailedWithUnknownOrUnsupportedDataSources

We started getting an error with our Power Bi datasets that were using a Denodo connector. This seems to be happeing only with new files and not existing datasets.

The error we were getting is:

Unable to determine the data source. In order to determine details about custom connector based data sources, Power BI service needs to connect to a gateway, however, no gateway was available/reachable. Details: Static analysis failed in gateway. gatewayObjectId:e884ca97-b4d9-45bc-8560-ad3d2d4eaa56, resultCode:FailedWithUnknownOrUnsupportedDataSources Query contains unsupported function. Function name: Denodo.Contents .

  • Data source for Query1
  • Locally the PowerBi files works fine, this error shows up in PowerBi service after you publish the dataset and take a look at the settings for the dataset.

    It looks like this might be a bug in PowerBI where when you add a new query that uses the Denodo connector, it is adding an empty list to the arguments and that is causing this is.

        Source = Denodo.Contents("MyDenodoSourceName", null, []),

    The problem is caused by the []. If you change the line to remove the empty [], then everything works fine!

                Source = Denodo.Contents("MyDenodoSourceName", null),

    Saturday, February 11, 2023

    Honda Service Codes

    Recently my car was blinking the maintenance minder B167 code.

    Turns out each character has a specific meaning.

    Main Honda Service Codes