Schema Merging é uma forma de evoluir schemas adicionando novas colunas através da junção de DataFrames. Imagine que você possui diferentes arquivos parquets com diferentes schemas, e que exista a necessidade em criar um novo schema a partir de todas as colunas destes variados parquets. Podemos resolver este problema em uma simples linha de código, conforme será mostrado a seguir.
A seguir vamos criar arquivos parquets com diferentes schemas através de arquivos JSON e em seguida, faremos o merge destes arquivos transformando em um único schema consolidado.
Arquivos JSON que usaremos como DataSource:
Arquivo user.json
{"id":1, "login": "Jonh", "age": 24}
{"id":2, "login": "Lucas", "age": 24}
{"id":3, "login": "Miesha", "age": 25}
{"id":4, "login": "Suzie", "age": 22}
Arquivo address.json
{"id":1, "city": "Los Angeles", "country": "USA"}
{"id":2, "city": "New York", "country": "USA"}
{"id":3, "city": "San Louis Obispo", "country": "USA"}
Criando o SparkSession
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();
Criando os DataFrames
Dataset<Row> dfUser =
sparkSession.read().json("user.json");
Dataset<Row> dfAddress =
sparkSession.read().json("address.json");
Gerando os Parquets para cada DataFrame
No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 1
dfUser.write().parquet("data/table/partition=1");
No código a seguir, estamos criando os parquets no diretório data/table/ para a partição chamada partition de valor 2
dfAddress.write().parquet("data/table/partition=2");
Agora, criaremos um novo DataFrame com base nos parquets criados anteriormente executando o Schema Merging
Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table");
O Schema Merging é executado no seguinte trecho
option("mergeSchema", true)
Por fim, podemos ver o novo schema mergeado através do mergeSchema
dfMerge.printSchema();
Resultado
root
|-- age: long (nullable = true)
|-- id: long (nullable = true)
|-- login: string (nullable = true)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
|-- partition: integer (nullable = true)
Veja que a partir dos DataFrames dfUser e dfAddress foi criado um novo schema consolidando as colunas entre eles.
Código completo
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[1]");
sparkConf.setAppName("app");
SparkSession sparkSession =
new SparkSession.Builder()
.config(sparkConf)
.getOrCreate();
Dataset<Row> dfUser =
sparkSession.read().json("user.json");
dfUser.write().parquet("data/table/partition=1");
Dataset<Row> dfAddress =
sparkSession.read().json("address.json");
dfAddress.write().parquet("data/table/partition=2");
Dataset<Row> dfMerge = sparkSession
.read().option("mergeSchema", true)
.parquet("data/table");
dfMerge.printSchema();
Curtiu? Espero que sim, até mais!
Comments