Spark Examples
Getting Started with Azure Databricks - A Census Data Example
Section titled “Getting Started with Azure Databricks - A Census Data Example”This notebook is a simple example of working with data in Azure Databricks.
If you are reading this on the wiki, you can find the working Notebook at the following path: Getting Started with Azure Databricks - A Census Data Example
Explore the ADLS folders
Section titled “Explore the ADLS folders”You can explore filesystems directly through Notebooks by using the dbutils.fs
client.
Below we exaplore the CSVs for the night population census:
display(dbutils.fs.ls("/"))
path | name | size |
---|---|---|
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/ | data/ | 0 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/ | tables/ | 0 |
display(dbutils.fs.ls("abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/"))
path | name | size |
---|---|---|
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/ | data/ | 0 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/ | tables/ | 0 |
display(dbutils.fs.ls("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz"))
path | name | size |
---|---|---|
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv | Data8317.csv | 857219761 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv | DimenLookupAge8317.csv | 2720 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupArea8317.csv | DimenLookupArea8317.csv | 65400 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupEthnic8317.csv | DimenLookupEthnic8317.csv | 272 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupSex8317.csv | DimenLookupSex8317.csv | 74 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupYear8317.csv | DimenLookupYear8317.csv | 67 |
As you can see above, the ADLS Gen2 container for the project (abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/
) is set as the default filesystem for clusters associated with that project (in this case sandbox
).
Read in the CSVs and explore the data
Section titled “Read in the CSVs and explore the data”We can read the source CSVs into Spark as DataFrames and explore the data.
We use the DataFrame API to transform the DataFrames.
df = spark.read.csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
display(df.limit(10))
_c0 | _c1 | _c2 | _c3 | _c4 | _c5 |
---|---|---|---|---|---|
Year | Age | Ethnic | Sex | Area | count |
2018 | 000 | 1 | 1 | 01 | 807 |
2018 | 000 | 1 | 1 | 02 | 5109 |
2018 | 000 | 1 | 1 | 03 | 2262 |
2018 | 000 | 1 | 1 | 04 | 1359 |
2018 | 000 | 1 | 1 | 05 | 180 |
2018 | 000 | 1 | 1 | 06 | 741 |
2018 | 000 | 1 | 1 | 07 | 633 |
2018 | 000 | 1 | 1 | 08 | 1206 |
2018 | 000 | 1 | 1 | 09 | 2184 |
The datatypes and headers look incorrect: all the datatypes are strings and the headers have not been read. We can fix this by passing in reader options.
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")
display(df.limit(10))
_c0 | _c1 | _c2 | _c3 | _c4 | _c5 |
---|---|---|---|---|---|
Year | Age | Ethnic | Sex | Area | count |
2018 | 000 | 1 | 1 | 01 | 807 |
2018 | 000 | 1 | 1 | 02 | 5109 |
2018 | 000 | 1 | 1 | 03 | 2262 |
2018 | 000 | 1 | 1 | 04 | 1359 |
2018 | 000 | 1 | 1 | 05 | 180 |
2018 | 000 | 1 | 1 | 06 | 741 |
2018 | 000 | 1 | 1 | 07 | 633 |
2018 | 000 | 1 | 1 | 08 | 1206 |
2018 | 000 | 1 | 1 | 09 | 2184 |
df.count()
Let’s take a look at a couple of the dimension tables too.
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv")
display(df.limit(10))
Code | Description | SortOrder |
---|---|---|
999999 | Total people - age group | 1 |
888 | Median age | 2 |
1 | Under 15 years | 3 |
2 | 15-29 years | 4 |
3 | 30-64 years | 5 |
4 | 65 years and over | 6 |
1 | 0-4 years | 7 |
2 | 5-9 years | 8 |
3 | 10-14 years | 9 |
4 | 15-19 years | 10 |
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupEthnic8317.csv")
display(df.limit(10))
Code | Description | SortOrder |
---|---|---|
9999 | Total people | 1 |
1 | European | 2 |
2 | Maori | 3 |
3 | Pacific Peoples | 4 |
4 | Asian | 5 |
5 | Middle Eastern/Latin American/African | 6 |
6 | Other ethnicity | 7 |
61 | New Zealander | 10 |
69 | Other ethnicity nec | 11 |
77 | Total people stated | 8 |
Observation: the dimension tables seem to have a consistent schema.
Denormalise the source tables
Section titled “Denormalise the source tables”Since we are using Apache Spark we want to limit joins the number of joins/data transfer between nodes. Filters and aggregations suit the architecture better, and data will be stored in columnar-compressed files therefore it would make sense to denormalise the data.
Let’s join all the data into one large DataFrame:
from pyspark.sql.functions import col
denorm_df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]: dim_df = spark.read.option("header", True).option("inferSchema", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv") denorm_df = denorm_df.join(dim_df, col(dim) == col("Code")).drop("Code", dim).withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")
display(denorm_df.limit(10))
count | Age | AgeSortOrder | Area | AreaSortOrder | Ethnic | EthnicSortOrder | Sex | SexSortOrder | Year | YearSortOrder |
---|---|---|---|---|---|---|---|---|---|---|
807 | Less than one year | 28 | Northland Region | 4 | European | 2 | Male | 2 | 2018 | 3 |
5109 | Less than one year | 28 | Auckland Region | 5 | European | 2 | Male | 2 | 2018 | 3 |
2262 | Less than one year | 28 | Waikato Region | 6 | European | 2 | Male | 2 | 2018 | 3 |
1359 | Less than one year | 28 | Bay of Plenty Region | 7 | European | 2 | Male | 2 | 2018 | 3 |
180 | Less than one year | 28 | Gisborne Region | 8 | European | 2 | Male | 2 | 2018 | 3 |
741 | Less than one year | 28 | Hawke's Bay Region | 9 | European | 2 | Male | 2 | 2018 | 3 |
633 | Less than one year | 28 | Taranaki Region | 10 | European | 2 | Male | 2 | 2018 | 3 |
1206 | Less than one year | 28 | Manawatu-Wanganui Region | 11 | European | 2 | Male | 2 | 2018 | 3 |
2184 | Less than one year | 28 | Wellington Region | 12 | European | 2 | Male | 2 | 2018 | 3 |
177 | Less than one year | 28 | West Coast Region | 16 | European | 2 | Male | 2 | 2018 | 3 |
denorm_df.count()
Investigate duplicates 🕵️♀️
Section titled “Investigate duplicates 🕵️♀️”Pre-join count: 34959673
Post-join count: 48585735
The counts look incorrect: the dimension joins are only lookups and should not produce additional rows.
Let’s look into why this has happened.
Hypothesis: the code column shouldn’t be inferred as an integer column.
With schema inference:
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv")
display(df.limit(10))
Code | Description | SortOrder |
---|---|---|
999999 | Total people - age group | 1 |
888 | Median age | 2 |
1 | Under 15 years | 3 |
2 | 15-29 years | 4 |
3 | 30-64 years | 5 |
4 | 65 years and over | 6 |
1 | 0-4 years | 7 |
2 | 5-9 years | 8 |
3 | 10-14 years | 9 |
4 | 15-19 years | 10 |
Without schema inference:
df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv")
display(df.limit(10))
Code | Description | SortOrder |
---|---|---|
999999 | Total people - age group | 1 |
888 | Median age | 2 |
1 | Under 15 years | 3 |
2 | 15-29 years | 4 |
3 | 30-64 years | 5 |
4 | 65 years and over | 6 |
01 | 0-4 years | 7 |
02 | 5-9 years | 8 |
03 | 10-14 years | 9 |
04 | 15-19 years | 10 |
Schema inference shot us in the foot! 🦶🔫
Try again without infering any datatypes (we can manually cast later!):
from pyspark.sql.functions import col
denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]: dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv") denorm_df = denorm_df.join(dim_df, col(dim) == col("Code")).drop("Code", dim).withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")
denorm_df.count()
Pre-join count: 34959673
Post-join count: 34885323
Closer, but it looks like we lost a few rows this time.
Let’s try a left join:
from pyspark.sql.functions import col
denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]: dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv") denorm_df = denorm_df.join(dim_df, col(dim) == col("Code"), how="left").drop("Code", dim).withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")
denorm_df.count()
Bingo! The counts match. However, this implies something doesn’t join to it’s dimension lookup.
Let’s hunt for nulls:
display(denorm_df.limit(10))
count | Age | AgeSortOrder | Area | AreaSortOrder | Ethnic | EthnicSortOrder | Sex | SexSortOrder | Year | YearSortOrder |
---|---|---|---|---|---|---|---|---|---|---|
807 | Less than one year | 28 | Northland Region | 4 | European | 2 | Male | 2 | 2018 | 3 |
5109 | Less than one year | 28 | Auckland Region | 5 | European | 2 | Male | 2 | 2018 | 3 |
2262 | Less than one year | 28 | Waikato Region | 6 | European | 2 | Male | 2 | 2018 | 3 |
1359 | Less than one year | 28 | Bay of Plenty Region | 7 | European | 2 | Male | 2 | 2018 | 3 |
180 | Less than one year | 28 | Gisborne Region | 8 | European | 2 | Male | 2 | 2018 | 3 |
741 | Less than one year | 28 | Hawke's Bay Region | 9 | European | 2 | Male | 2 | 2018 | 3 |
633 | Less than one year | 28 | Taranaki Region | 10 | European | 2 | Male | 2 | 2018 | 3 |
1206 | Less than one year | 28 | Manawatu-Wanganui Region | 11 | European | 2 | Male | 2 | 2018 | 3 |
2184 | Less than one year | 28 | Wellington Region | 12 | European | 2 | Male | 2 | 2018 | 3 |
177 | Less than one year | 28 | West Coast Region | 16 | European | 2 | Male | 2 | 2018 | 3 |
denorm_df_nulls = denorm_df.filter(col("Age").isNull() | col("Area").isNull() | col("Ethnic").isNull() | col("Sex").isNull() | col("Year").isNull())
denorm_df_nulls.count()
display(denorm_df_nulls.limit(10))
count | Age | AgeSortOrder | Area | AreaSortOrder | Ethnic | EthnicSortOrder | Sex | SexSortOrder | Year | YearSortOrder |
---|---|---|---|---|---|---|---|---|---|---|
50 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
48.5 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
49.2 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
29.5 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
42.3 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
36 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
21.2 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
24.9 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
23.3 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
26.6 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
denorm_df_nulls.filter(col("Age") == "Median age").count()
All the duplicates come from the median age category.
We should take some time to understand our data!
display(spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupAge8317.csv").limit(10))
Code | Description | SortOrder |
---|---|---|
999999 | Total people - age group | 1 |
888 | Median age | 2 |
1 | Under 15 years | 3 |
2 | 15-29 years | 4 |
3 | 30-64 years | 5 |
4 | 65 years and over | 6 |
01 | 0-4 years | 7 |
02 | 5-9 years | 8 |
03 | 10-14 years | 9 |
04 | 15-19 years | 10 |
Let’s filter the fact table by the top two codes as they look odd:
df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv").filter(col("Age").isin(["999999", "888"]))
display(df.limit(10))
Year | Age | Ethnic | Sex | Area | count |
---|---|---|---|---|---|
2018 | 999999 | 1 | 1 | 01 | 65307 |
2018 | 999999 | 1 | 1 | 02 | 418347 |
2018 | 999999 | 1 | 1 | 03 | 169422 |
2018 | 999999 | 1 | 1 | 04 | 110733 |
2018 | 999999 | 1 | 1 | 05 | 13566 |
2018 | 999999 | 1 | 1 | 06 | 60591 |
2018 | 999999 | 1 | 1 | 07 | 49086 |
2018 | 999999 | 1 | 1 | 08 | 92655 |
2018 | 999999 | 1 | 1 | 09 | 186054 |
2018 | 999999 | 1 | 1 | 12 | 15735 |
display(df.filter(col("Age") == "888").limit(10))
Year | Age | Ethnic | Sex | Area | count |
---|---|---|---|---|---|
2018 | 888 | 1 | 1 | 01 | 46.4 |
2018 | 888 | 1 | 1 | 02 | 38.5 |
2018 | 888 | 1 | 1 | 03 | 40 |
2018 | 888 | 1 | 1 | 04 | 43.4 |
2018 | 888 | 1 | 1 | 05 | 41.4 |
2018 | 888 | 1 | 1 | 06 | 43.6 |
2018 | 888 | 1 | 1 | 07 | 40.6 |
2018 | 888 | 1 | 1 | 08 | 40.8 |
2018 | 888 | 1 | 1 | 09 | 38.4 |
2018 | 888 | 1 | 1 | 12 | 47.7 |
It’s getting a little hard to trace columns and codes, so let’s denormalise whilst retaining the code columns:
from pyspark.sql.functions import col
denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]: dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv") denorm_df = denorm_df.join(dim_df, col(dim) == col("Code"), how="left").drop("Code").withColumnRenamed(dim, f"{dim}Code").withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")
denorm_df_nulls = denorm_df.filter(col("Age").isNull() | col("Area").isNull() | col("Ethnic").isNull() | col("Sex").isNull() | col("Year").isNull())
display(denorm_df_nulls.limit(10))
YearCode | AgeCode | EthnicCode | SexCode | AreaCode | count | Age | AgeSortOrder | Area | AreaSortOrder | Ethnic | EthnicSortOrder | Sex | SexSortOrder | Year | YearSortOrder |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2018 | 888 | 100100 | 1 | 1 | 50 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
2018 | 888 | 100100 | 1 | 2 | 48.5 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
2018 | 888 | 100100 | 1 | 9 | 49.2 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
2018 | 888 | 100100 | 2 | 1 | 29.5 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
2018 | 888 | 100100 | 2 | 2 | 42.3 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
2018 | 888 | 100100 | 2 | 9 | 36 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
2018 | 888 | 100100 | 3 | 1 | 21.2 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
2018 | 888 | 100100 | 3 | 2 | 24.9 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
2018 | 888 | 100100 | 3 | 9 | 23.3 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
2018 | 888 | 100100 | 4 | 1 | 26.6 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
The area code doesn’t match.
Let’s dig deeper:
display(spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupArea8317.csv").filter(col("Code").isin(["1", "01"])))
Code | Description | SortOrder |
---|---|---|
01 | Northland Region | 4 |
Seems like some of the area codes don’t lookup correctly.
Let’s also look at sex:
display(denorm_df_nulls.limit(10))
YearCode | AgeCode | EthnicCode | SexCode | AreaCode | count | Age | AgeSortOrder | Area | AreaSortOrder | Ethnic | EthnicSortOrder | Sex | SexSortOrder | Year | YearSortOrder |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2018 | 888 | 100100 | 1 | 1 | 50 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
2018 | 888 | 100100 | 1 | 2 | 48.5 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
2018 | 888 | 100100 | 1 | 9 | 49.2 | Median age | 2 | null | null | null | null | Male | 2 | 2018 | 3 |
2018 | 888 | 100100 | 2 | 1 | 29.5 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
2018 | 888 | 100100 | 2 | 2 | 42.3 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
2018 | 888 | 100100 | 2 | 9 | 36 | Median age | 2 | null | null | null | null | Female | 3 | 2018 | 3 |
2018 | 888 | 100100 | 3 | 1 | 21.2 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
2018 | 888 | 100100 | 3 | 2 | 24.9 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
2018 | 888 | 100100 | 3 | 9 | 23.3 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
2018 | 888 | 100100 | 4 | 1 | 26.6 | Median age | 2 | null | null | null | null | null | null | 2018 | 3 |
display(spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookupSex8317.csv"))
Code | Description | SortOrder |
---|---|---|
9 | Total people - sex | 1 |
1 | Male | 2 |
2 | Female | 3 |
Conclusion: the data looks a little odd and needs investigating further. For now, let’s continue with the denormalisation as we can fix these issues later by keeping the raw CSVs.
Creating output files, Hive databases and tables
Section titled “Creating output files, Hive databases and tables”When writing out DataFrames, we can write them out in various formats, and optionally add a Hive table over these files.
Hive tables are ‘virtual’ SQL tables over data stored (in this case stored on ADLS). We can use either:
- External tables: create tables over existing data
- Hive-managed tables: create tables and data at the same time
Both options produce the same end, and are only subtly different.
When we create Hive tables, we are really writing out the DataFrame to ADLS and adding a schema and file path reference to Hive.
Let’s create a Hive database:
%sqlcreate database if not exists sandbox;use sandbox;
Now, let’s create our final DataFrame we would like to write out:
denorm_df = spark.read.option("header", True).csv("/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/Data8317.csv")for dim in ["Age", "Area", "Ethnic", "Sex", "Year"]: dim_df = spark.read.option("header", True).csv(f"/data/raw/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/DimenLookup{dim}8317.csv") denorm_df = denorm_df.join(dim_df, col(dim) == col("Code"), how="left").drop("Code").withColumnRenamed(dim, f"{dim}Code").withColumnRenamed("Description", dim).withColumnRenamed("SortOrder", f"{dim}SortOrder")
We can write the files out directly as Parquet (with no Hive table):
denorm_df.write.mode("overwrite").parquet("/data/derived/stats_nz_census/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz_denorm/")
Or we can write the files out (by default in Parquet) and create a Hive table:
denorm_df.write.mode("overwrite").saveAsTable("sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz")
We can now query the Hive table using SQL:
%sqlselect * from sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nzlimit 10
YearCode | AgeCode | EthnicCode | SexCode | AreaCode | count | Age | AgeSortOrder | Area | AreaSortOrder | Ethnic | EthnicSortOrder | Sex | SexSortOrder | Year | YearSortOrder |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2018 | 110 | 61 | 9 | 120300 | 0 | 110 years | 138 | West Harbour Clearwater Cove | 317 | New Zealander | 10 | Total people - sex | 1 | 2018 | 3 |
2018 | 110 | 69 | 1 | 120300 | ..C | 110 years | 138 | West Harbour Clearwater Cove | 317 | Other ethnicity nec | 11 | Male | 2 | 2018 | 3 |
2018 | 110 | 69 | 2 | 120300 | ..C | 110 years | 138 | West Harbour Clearwater Cove | 317 | Other ethnicity nec | 11 | Female | 3 | 2018 | 3 |
2018 | 110 | 69 | 9 | 120300 | 0 | 110 years | 138 | West Harbour Clearwater Cove | 317 | Other ethnicity nec | 11 | Total people - sex | 1 | 2018 | 3 |
2018 | 110 | 61 | 1 | 120400 | ..C | 110 years | 138 | Unsworth Heights East | 318 | New Zealander | 10 | Male | 2 | 2018 | 3 |
2018 | 110 | 61 | 2 | 120400 | ..C | 110 years | 138 | Unsworth Heights East | 318 | New Zealander | 10 | Female | 3 | 2018 | 3 |
2018 | 110 | 61 | 9 | 120400 | 0 | 110 years | 138 | Unsworth Heights East | 318 | New Zealander | 10 | Total people - sex | 1 | 2018 | 3 |
2018 | 110 | 69 | 1 | 120400 | ..C | 110 years | 138 | Unsworth Heights East | 318 | Other ethnicity nec | 11 | Male | 2 | 2018 | 3 |
2018 | 110 | 69 | 2 | 120400 | ..C | 110 years | 138 | Unsworth Heights East | 318 | Other ethnicity nec | 11 | Female | 3 | 2018 | 3 |
2018 | 110 | 69 | 9 | 120400 | 0 | 110 years | 138 | Unsworth Heights East | 318 | Other ethnicity nec | 11 | Total people - sex | 1 | 2018 | 3 |
And we have three ways of opening the Hive table as a DataFrame:
df_s = spark.sql("select * from sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz")df_h = spark.read.table("sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz")df_p = spark.read.parquet("/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/")
The underlying Hive-backed table metadata and data files look like the following:
%sqldescribe formatted sandbox.age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz;
col_name | data_type | comment |
---|---|---|
YearCode | string | null |
AgeCode | string | null |
EthnicCode | string | null |
SexCode | string | null |
AreaCode | string | null |
count | string | null |
Age | string | null |
AgeSortOrder | string | null |
Area | string | null |
AreaSortOrder | string | null |
Ethnic | string | null |
EthnicSortOrder | string | null |
Sex | string | null |
SexSortOrder | string | null |
Year | string | null |
YearSortOrder | string | null |
# Detailed Table Information | ||
Database | sandbox | |
Table | age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz | |
Owner | root | |
Created Time | Thu Nov 12 02:32:15 UTC 2020 | |
Last Access | UNKNOWN | |
Created By | Spark 3.0.0 | |
Type | MANAGED | |
Provider | parquet | |
Location | abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz | |
Serde Library | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
InputFormat | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
OutputFormat | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat |
display(dbutils.fs.ls("/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/"))
path | name | size |
---|---|---|
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/_SUCCESS | _SUCCESS | 0 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/_committed_5626976569612752113 | _committed_5626976569612752113 | 724 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/_started_5626976569612752113 | _started_5626976569612752113 | 0 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00000-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-247-1-c000.snappy.parquet | part-00000-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-247-1-c000.snappy.parquet | 5614085 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00001-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-248-1-c000.snappy.parquet | part-00001-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-248-1-c000.snappy.parquet | 5036655 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00002-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-249-1-c000.snappy.parquet | part-00002-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-249-1-c000.snappy.parquet | 9889226 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00003-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-250-1-c000.snappy.parquet | part-00003-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-250-1-c000.snappy.parquet | 8736544 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00004-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-251-1-c000.snappy.parquet | part-00004-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-251-1-c000.snappy.parquet | 9473689 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00005-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-252-1-c000.snappy.parquet | part-00005-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-252-1-c000.snappy.parquet | 9039443 |
abfss://sandbox@aueprddlsnzlh001.dfs.core.windows.net/tables/sandbox.db/age_and_sex_by_ethnic_group_census_night_population_counts_2006_2013_2018_nz/part-00006-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-253-1-c000.snappy.parquet | part-00006-tid-5626976569612752113-51097d54-ac44-42e0-b210-cf5d6502cbf9-253-1-c000.snappy.parquet | 2995091 |
Summary
Section titled “Summary”We now have a virtual Hive table we can query using SQL, and we can create a DataFrame using an SQL query, a reference to the Hive table or by reading the underlying Parquet files.
And since the underlying files are Snappy-compressed Parquet, the underlying filesize has gone from 800Mb CSVs (100Mb compressed) to 50Mb Parquet files (even though we denormalised!).