Schema inferring

pyspark-data-mocker lets you define the schema of the table as you please. You can enable automatic schema inferring by setting up the schema.infer option in the configuration file, or you can manually specify the schema of each column you want using another yaml file. By default, pyspark-data-mocker will consider that all columns are string columns

Automatic inferring

This is the simplest configuration. Let's see the example we saw before in the welcome page with automatic infer schema enabled

$ cat ./tests/data/config/infer_schema.yaml
spark_configuration:
  app_name: "test"
  number_of_cores: 1
schema:
  infer: True

You only need to set the boolean schema.infer to True and that is it! once you load the builder, the columns will vary depending on their values

>>> from pyspark_data_mocker import DataLakeBuilder
>>> builder = DataLakeBuilder.load_from_dir("./tests/data/basic_datalake", "./tests/data/config/infer_schema.yaml")  # byexample: +timeout=20 +pass
>>> spark = builder.spark
>>> spark.sql("DESCRIBE TABLE school.students").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|      int|
|first_name|   string|
| last_name|   string|
|     email|   string|
|    gender|   string|
|birth_date|   string|
+----------+---------+


>>> spark.sql("DESCRIBE TABLE school.courses").select("col_name", "data_type").show()
+-----------+---------+
|   col_name|data_type|
+-----------+---------+
|         id|      int|
|course_name|   string|
+-----------+---------+


>>> spark.sql("DESCRIBE TABLE grades.exams").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|      int|
|student_id|      int|
| course_id|      int|
|      date|   string|
|      note|      int|
+----------+---------+

>>> builder.cleanup()

Schema configuration file

This yaml file needs to be located in the folder you will place the datalake definition (the root path you will pass to the DatalakeBuilder class). By default, the config file that will be used is called schema_config.yaml, but it can be overriden in the application configuration file.

$ cat ./pyspark_data_mocker/config/schema.py
<...>schema.Optional("config_file", default="schema_config.yaml")<...>

That yaml needs to be a file where each key represents the table name (considering the database), and as value, a dictionary with the columns as keys, and a Spark's DDL type of the column as value.

Example

Let's consider this datalake definition.

$ tree tests/data/datalake_with_config_schema -n --charset=ascii  # byexample: +rm=~ +skip
tests/data/datalake_with_config_schema
|-- grades
|   `-- exams.csv
|-- schema_config.yaml
`-- school
    |-- courses.csv
    `-- students.csv
~
2 directories, 4 files

Notice how in this example, unlike the one seen previously in the Home section contains a file schema_config.yaml. The content of this file will define the types of each column of the tables.

$ cat tests/data/datalake_with_config_schema/schema_config.yaml
school.courses:
  id: int
  course_name: string
school.students:
  id: int
  first_name: string
  last_name: string
  email: string
  gender: string
  birth_date: date

Take a moment to digest the schema of the file. How each key of the yaml dictionary is the full name of the table that will be created, and as value contains another dictionary with the columns of the table, and the type of that column. Let's build up the datalake

>>> builder = DataLakeBuilder.load_from_dir("./tests/data/datalake_with_config_schema")  # byexample: +timeout=20 +pass
>>> spark = builder.spark

>>> spark.sql("SHOW TABLES IN school").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   school|  courses|      false|
|   school| students|      false|
+---------+---------+-----------+


>>> spark.sql("SHOW TABLES IN grades").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   grades|    exams|      false|
+---------+---------+-----------+

Now the tables are loaded, we can take a look at the schema of each table.

>>> spark.sql("DESCRIBE TABLE school.students").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|      int|
|first_name|   string|
| last_name|   string|
|     email|   string|
|    gender|   string|
|birth_date|     date|
+----------+---------+


>>> spark.sql("DESCRIBE TABLE school.courses").select("col_name", "data_type").show()
+-----------+---------+
|   col_name|data_type|
+-----------+---------+
|         id|      int|
|course_name|   string|
+-----------+---------+


>>> spark.sql("DESCRIBE TABLE grades.exams").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|   string|
|student_id|   string|
| course_id|   string|
|      date|   string|
|      note|   string|
+----------+---------+

>>> builder.cleanup()

Now the column types changed! we have the birth_date that is date type and the ids as int. Notice also that the table grades.exams (which we didn't define any custom schema) has for each column the default value string (because it's the fallback type as we saw before).

Combining both schema inferring configurations

We can combine this file with the automatic infer option to only configure manually the schemas that we need.

>>> builder = DataLakeBuilder.load_from_dir("./tests/data/datalake_with_config_schema", "./tests/data/config/infer_schema.yaml")  # byexample: +timeout=20 +pass
>>> spark = builder.spark
>>> spark.sql("DESCRIBE TABLE school.students").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|      int|
|first_name|   string|
| last_name|   string|
|     email|   string|
|    gender|   string|
|birth_date|     date|
+----------+---------+


>>> spark.sql("DESCRIBE TABLE school.courses").select("col_name", "data_type").show()
+-----------+---------+
|   col_name|data_type|
+-----------+---------+
|         id|      int|
|course_name|   string|
+-----------+---------+


>>> spark.sql("DESCRIBE TABLE grades.exams").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|      int|
|student_id|      int|
| course_id|      int|
|      date|   string|
|      note|      int|
+----------+---------+

>>> builder.cleanup()

Now the grades.exams table schema also changed! but take into consideration that the automatic schema inferring of spark it's not magic. Note that the date column of grades.exams was not inferred to a date column type. Sometimes it is needed to use the manual schema definition to have the value we need.

NOTE: This behavior is fixed starting from pyspark 3.3. From that version and beyond, it infers date columns, but spark considers all date-kind values as datetime

Column types

You can define the type of column of each type that Spark supports! you don't have any restriction whatsoever (kind of, but more of that later).

Example

Consider these files and schema definitions

$ tree tests/data/datalake_different_files_and_schemas -n --charset=ascii  # byexample: +rm=~ +skip
tests/data/datalake_different_files_and_schemas
|-- schema_config.yaml
`-- school
    |-- courses.json
    `-- students.csv
~
1 directory, 3 files

$ cat tests/data/datalake_different_files_and_schemas/school/courses.json
{"id": 1, "course_name": "Algorithms 1", "flags": {"acitve": true}, "correlative_courses": []}
{"id": 2, "course_name": "Algorithms 2", "flags": {"acitve": true}, "correlative_courses": [1]}

$ cat tests/data/datalake_different_files_and_schemas/schema_config.yaml
school.courses:
  id: int
  course_name: string
  flags: map<string, boolean>
  correlative_courses: array<int>
school.students:
  id: long
  name: string
  birth_date: date

pyspark-data-mocker does not need that all files are in the same file format, it will infer each file depending on the file extension (the limitation is that the file format is a valid spark source). Let's see the schemas and data in each table.

>>> builder = DataLakeBuilder.load_from_dir("./tests/data/datalake_different_files_and_schemas")  # byexample: +timeout=20 +pass
>>> spark = builder.spark
>>> spark.sql("DESCRIBE TABLE school.students").select("col_name", "data_type").show()
+----------+---------+
|  col_name|data_type|
+----------+---------+
|        id|   bigint|
|      name|   string|
|birth_date|     date|
+----------+---------+

>>> spark.table("school.students").show()
+---+----------------+----------+
| id|            name|birth_date|
+---+----------------+----------+
|  1|Shirleen Dunford|1978-08-01|
|  2|    Niko Puckrin|2000-11-28|
|  3|   Sergei Barukh|1992-01-20|
|  4|     Sal Maidens|2003-12-14|
|  5|Cooper MacGuffie|2000-03-07|
+---+----------------+----------+



>>> spark.sql("DESCRIBE TABLE school.courses").select("col_name", "data_type").show()
+-------------------+-------------------+
|           col_name|          data_type|
+-------------------+-------------------+
|                 id|                int|
|        course_name|             string|
|              flags|map<string,boolean>|
|correlative_courses|         array<int>|
+-------------------+-------------------+

>>> spark.table("school.courses").show()
+---+------------+----------------+-------------------+
| id| course_name|           flags|correlative_courses|
+---+------------+----------------+-------------------+
|  1|Algorithms 1|{acitve -> true}|                 []|
|  2|Algorithms 2|{acitve -> true}|                [1]|
+---+------------+----------------+-------------------+


>>> builder.cleanup()

Limitations

The limitation you have is that you cannot define certain column types in some file formats. An example of that is to define a map column type in a csv file.

$ cat tests/data/column_type_not_supported/schema_config.yaml
foo.bar:
  id: int
  invalid_col: map<string, string>

$ tree tests/data/column_type_not_supported -n --charset=ascii  # byexample: +rm=~ +skip
tests/data/column_type_not_supported
|-- foo
|   `-- bar.csv
`-- schema_config.yaml
~
1 directory, 2 files

If we want to set up this datalake, it will fail with this exception message.

>>> builder = DataLakeBuilder.load_from_dir("./tests/data/column_type_not_supported")  # byexample: +timeout=20
<...>AnalysisException: CSV data source does not support map<string,string> data type.