Search
86 itens encontrados para ""
- Converting Parquet table to Delta Table
For this post we're going to create examples to how convert parquet table to Delta table. First, we'll create a parquet table from scratch through a Spark Dataframe and then converting to Delta table. Using Delta table has some benefits comparing to a Parquet table. Delta enables to restore versions of your table through time travel function, ACID supports and more. Creating a Parquet table First of all, let's create a parquet table to be converted later to Delta Table. I'll prefer create a parquet table from scratch to bring a better understanding. The following code will be executed once, just to create a parquet table. We're going to use a Spark Dataframe that will be loaded from a JSON file containing semi-structured records. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); Dataset dataFrame = session.read().json("product.json"); dataframe.write().format("parquet").save("table/product"); } The above example, we start creating a SparkSession object to create and manage a Spark Dataframe that was loaded from the product.json file content. Alter load, the Dataframe creates (or write) a table in parquet format in the table/product directory. JSON content File represented by product.json file that contains semi-structured records. {"id":1, "name":"rice", "price":12.0, "qty": 2} {"id":2, "name":"beans", "price":7.50, "qty": 5} {"id":3, "name":"coke", "price":5.50, "qty": 2} {"id":4, "name":"juice", "price":3.80, "qty": 1} {"id":5, "name":"meat", "price":1.50, "qty": 1} {"id":6, "name":"ice-cream", "price":6.0, "qty": 2} {"id":7, "name":"potato", "price":3.70, "qty": 10} {"id":8, "name":"apple", "price":5.60, "qty": 5} After running the code above, parquet files will be generated in the table/product directory containing the files below. Converting Parquet table to Delta Table Now that we have a Parquet table already created, we can convert easily to Delta Table, let's do this. public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setAppName("spark-delta-table"); conf.setMaster("local[1]"); SparkSession session = SparkSession.builder() .config(conf) .getOrCreate(); DeltaTable.convertToDelta(session, "parquet.`table/product`"); } DeltaTable.convertToDelta method is responsible to convert parquet table to Delta table. Note that we had to use SparkSession as a parameter and also specify the path of parquet table using this format "parquet.``" . The result after execution you can see in the picture below. After conversion running, Delta creates the famous _delta_log directory containing commit info and checkpoint files. Well that's it, I hope you enjoyed it!
- Entendendo o AWS SNS - Simple Notification Service
O SNS (Simple Notification Service), provê um serviço de notificação utilizando o paradigma Pub/Sub. É uma forma de publicar mensagens destinadas a um ou mais inscritos na forma de endpoints. Confuso? Vamos aprofundar um pouco mais sobre o assunto. O termo Pub/Sub é um tema bastante relacionado em arquiteturas guiada a eventos, conhecida tecnicamente como event-driven architecture. Nesta arquitetura a publicação de mensagens podem ser feitas através de notificações para um ou mais destinos já conhecidos, criando uma arquitetura mais assíncrona. Para que um destino se torna conhecido, deve haver uma forma de sinalizar que aquele destino seja um candidato a receber qualquer mensagem da origem, ou seja, o destino é um subscriber (sub) ou inscrito. Mas inscrito aonde? Todo subscriber pode ser inscrito em um ou mais publicadores, no contexto do SNS, seria Tópicos, no qual falaremos mais adiante. Dessa forma, para cada publicação feita, um inscrito naquela publicação, receberá uma mensagem. Um exemplo, é quando recebemos notificações de algum aplicativo instalado no nosso Smartphone via push, ou seja, na instalação daquele aplicativo nos tornamos um inscrito (sub ou assinante), ou seja, para que qualquer publicação feita pelo aplicativo, seremos notificados. Provavelmente este serviço pode utilizar SNS como solução. O exemplo anterior é uma visão de mais alto nível como forma de introdução. O tema é um pouco mais amplo e será abordado a seguir. O SNS é dividido em Tópicos e Assinaturas, ambos trabalham de forma conjunta e oferecem diversos recursos através do próprio console da AWS ou de APIs. 1. Tópicos Os Tópicos são pontos de acesso que funciona como interface entre o Publisher (publicador) e o Subscriber (inscrito). Todo aplicativo deve estar inscrito a um Tópico para que receba notificações, ou seja, é o único ponto de acesso para a comunicação. Um Tópico é dividido entre o tipo Fifo e o Padrão: Fifo: O tipo Fifo permite um controle mais rigoroso de ordenação das mensagens (first in/first out), possui um limite de throughput de até 300 publicações por segundo, garante a entrega da mensagem uma única vez e por fim, fornece suporte somente ao protocolo de assinatura SQS. Padrão: O tipo padrão possui algumas diferenças que o torna mais flexível, porém menos rigoroso se comparado ao Fifo. Começando pela ordenação de mensagens. Este padrão visa uma ordenação de mensagens da maneira mais apropriada, ou seja, não possui uma regra que visa ordenar as mensagens por chegada. O throughput de publicações/segundo é maior que a do tipo Fifo e fornece suporte de protocolos de assinaturas para SQS, Lambda, HTTP, SMS, E-mail e endpoints de aplicativos móveis. Limite de tópicos Por conta da AWS, é permitido criar até 100.000 tópicos 2. Assinaturas A Assinatura é a forma de conectar ou inscrever um endpoint para um Tópico específico. Ou seja, cada Assinatura deve-se especificar um Tópico (existente) e o endpoint em que deseja receber as notificações publicadas pelo Tópico que será assinado. O endpoint é representado por diferentes tipos: AWS SQS HTTP HTTPS AWS Kinesis Data Firehose E-mail SMS AWS Lambda Resumindo, cada endpoint acima, são formatos de entrega/transporte para recebimento de notificações. Limite de Assinaturas A AWS permite até 10 milhões de assinaturas por tópico. 3. Limite de tamanho da mensagem O SNS possui um limite de tamanho de mensagem de até 256 KB. Já as mensagens para SMS são de 140 bytes. 4. Tipos de mensagens O SNS possui suporte para diferentes tipos de mensagens, como por exemplo texto, XML, JSON e texto sem formato. 5. SNS X SQS O SNS e o SQS são coisas diferentes, mas que possuem relação. Como falamos anteriormente, o SQS pode ser utlizado como endpoint, ou seja, um protocolo SQS que assina um Tópico SNS passa a receber qualquer mensagem publicada no Tópico tornando um processo de integração assíncrona. Na imagem acima descreve o contexto do SNS junto aos Tópicos e algumas SQS (subscribers) simulando as assinaturas. Após assinadas, todas estas SQS receberão mensagens publicadas do(s) Tópico(s). A SQS 1 receberá notificações dos Tópicos 1 e 2, a SQS 2 receberá notificações dos Tópicos 2 e 3 e por fim, a SQS 3 receberá somente do Tópico 3. Em breve será liberado um post com alguns exemplos de códigos que te ajudará entender ainda mais sobre o SNS mais a fundo. Caso queira saber mais detalhes, recomendo ler a documentação oficial através deste link. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Amazon AWS: Descomplicando a computação na nuvem é um livro para aqueles que estão começando na AWS e querem entender o funcionamento e a dinâmicas dos serviços como S3, EC2, ElasticCache, Route 53, SNS, Cloudwatch e muito mais. AWS Cookbook (Versão Inglês) é um guia prático contendo 70 receitas detalhadas sobre os recursos da AWS e como resolver diferentes desafios. É um livro bem escrito e de fácil entendimento cobrindo os principais serviços da AWS através de exemplos práticos. A AWS ou Amazon Web Services é o serviço de nuvem mais utilizando atualmente em todo o mundo, caso queira entender mais sobre o tema para ficar bem posicionado no mercado, recomendo fortemente o estudo. É isso, curtiu? Até mais!
- First steps with DBT - Data Build Tool
DBT has been used by a lot of companies on Data area and I believe that we can extract good insights in this post about it. That's going to be a practical post showing how DBT works it and hope you guys enjoy it. What's DBT? DBT means Data Build Tool and enables teams to transform data already loaded in their warehouse with simple select statements. DBT does the T in ELT processes, in the other words, he doesn't work to extract and load data but he's useful to transform it. Step 1: Creating a DBT Project Now, we're assume that DBT is already installed but if not, I recommend see this link. After DBT installed you can create a new project using CLI or you can clone this project from the DBT Github repository. Here for this post we're going to use CLI mode to create our project and also to complete the next steps. To create a new project, run the command below. dbt init After running this command, you need to type the project's name and which warehouse or database you're going to use like the image below. For this post, we're going to use postgres adapter. It's very important that you have a postgres database already installed or you can up a postgres image using docker. About adapters, DBT supports different of them and you can check here. I created a table structure and also loaded it with data simulating data from a video platform called wetube and we're going to use them to understand how DBT works it. Follow the structure: Step 2: Structure and more about DBT After running dbt init command to create the project, a structure of folders and files below will be created. I won't talk about the whole directories of project but I'd like to focus in two of them. Sources Sources are basically the data already loaded into your warehouse. In DBT process, sources have the same meaning of raw data. There's no folders representing source data for this project but you need to know about this term because we're going to set up tables already created as sources for the next sections. Seeds Seeds is an interesting and useful mechanism to load static data into your warehouse through CSV files. If you want to load these data you need to create a CSV file on this directory and run the command below. dbt seed For each field on CSV file, DBT will infer their types and create a table into warehouse or database. Models DBT works with Model paradigm, the main idea is that you can create models through the transformation using SQL statements based on tables sources or existing models Every SQL file located in your model folder will create a model into your warehouse when the command below runs. dbt run Remember that a model can be created through a source or another model and don't worry about this, I'll show you more details about it. Step 3: Setting up database connection After project already created, we need to set up our database's connection and here at this post, we're going to use postgres as database. After initialize the project a bunch of files are created and one of them is called profiles.yml. profiles.yml file is responsible to control the different profiles to the different database's connection like dev and production environment. If you've noticed, we can't see this file on the image above because this file is created outside of project to avoid sensitive credentials. You can find this file in ~/.dbt/ directory. If you note, we have one profile named dbt_blog and a target called dev, by default the target refer to dev with the database's connection settings. Also, It's possible to create one or more profiles and targets, it enables working with different environments. Another important detail is that dbt_blog profile should be specified on dbt_project.yml file as a default profile. For the next sections, we'll discuss what and how dbt_project.yml file works it. Step 4: Creating dbt_project.yml file Every DBT project has a dbt_project.yml file, you can set up informations like project name, directories, profiles and materialization type. name: 'dbt_blog' version: '1.0.0' config-version: 2 profile: 'dbt_blog' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] target-path: "target" # directory which will store compiled SQL files clean-targets: # directories to be removed by `dbt clean` - "target" - "dbt_packages" models: dbt_blog: # Config indicated by + and applies to all files under models/example/ mart: +materialized: table Note that profile field was set up as the same profile specified on profiles.yml file and another important detail is about materialized field. Here was set up as a "table" value but by default, is a "view". Materialized fields allows you to create models as a table or view on each run. There are others type of materialization but we won't discuss here and I recommend see dbt docs. Step 5: Creating our first model Creating first files Let's change a little and let's going to create a sub-folder on model directory called mart and inside this folder we're going to create our .SQL files and also another important file that we don't discuss yet called schema.yml. Creating schema file Schema files are used to map sources and to document models like model's name, columns and more. Now you can create a file called schema.yml e fill up with these informations below. version: 2 sources: - name: wetube tables: - name: account - name: city - name: state - name: channel - name: channel_subs - name: video - name: video_like - name: user_address models: - name: number_of_subs_by_channel description: "Number of subscribers by channel" columns: - name: id_channel description: "Channel's ID" tests: - not_null - name: channel description: "Channel's Name" tests: - not_null - name: num_of_subs description: "Number of Subs" tests: - not_null Sources: At sources field you can include tables from your warehouse or database that's going to be used on model creation. models: At models field you can include the name's model, columns and their description Creating a model This part is where we can create SQL scripts that's going to result in our first model. For the first model, we're going to create a SQL statement to represent a model that we can see the numbers of subscribers by channel. Let's create a file called number_of_subs_by_channel.sql and fill up with these scripts below. with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_channel_subs as ( select * from {{ source('wetube','channel_subs') }} ), number_of_subs_by_channel as ( select source_channel.id_channel, source_channel.name, count(source_channel_subs.id_subscriber) num_subs from source_channel_subs inner join source_channel using (id_channel) group by 1, 2 ) select * from number_of_subs_by_channel Understanding model creation Note that we have multiple scripts separated by common table expression (CTE) that becomes useful to understand the code. DBT enables using Jinja template {{ }} bringing a better flexibility to our code. The usage of keyword source inside Jinja template means that we're referring source tables. To refer a model you need to use ref keyword. The last SELECT statement based on source tables generates the model that will be created as table in the database. Running our first model Run the command below to create our first model dbt run Output: Creating another model Imagine that we need to create a model containing account information and it's channels. Let's get back to schema.yml file to describe this new model. - name: account_information description: "Model containing account information and it's channels" columns: - name: id_account description: "Account ID" tests: - not_null - name: first_name description: "First name of user's account" tests: - not_null - name: last_name description: "Last name of user's account" tests: - not_null - name: email description: "Account's email" tests: - not_null - name: city_name description: "city's name" tests: - not_null - name: state_name description: "state's name" tests: - not_null - name: id_channel description: "channel's Id" tests: - not_null - name: channel_name description: "channel's name" tests: - not_null - name: channel_creation description: "Date of creation name" tests: - not_null Now, let's create a new SQL file and name it as account_information.sql and put scripts below: with source_channel as ( select * from {{ source('wetube', 'channel') }} ), source_city as ( select * from {{ source('wetube','city') }} ), source_state as ( select * from {{ source('wetube','state') }} ), source_user_address as ( select * from {{ source('wetube','user_address') }} ), source_account as ( select * from {{ source('wetube','account') }} ), account_info as ( select account.id_user as id_account, account.first_name, account.last_name, account.email, city.name as city_name, state.name as state_name, channel.id_channel, channel.name as channel, channel.creation_date as channel_creation FROM source_account account inner join source_channel channel on (channel.id_account = account.id_user) inner join source_user_address user_address using (id_user) inner join source_state state using (id_state) inner join source_city city using (id_city) ) select * from account_info Creating our last model For our last model, we going to create a model about how many likes has a video. Let's change again the schema.yml to describe and to document our future and last model. - name: total_likes_by_video description: "Model containing total of likes by video" columns: - name: id_channel description: "Channel's Id" tests: - not_null - name: channel description: "Channel's name" tests: - not_null - name: id_video description: "Video's Id" tests: - not_null - name: title description: "Video's Title" tests: - not_null - name: total_likes description: "Total of likes" tests: - not_null Name it a file called total_likes_by_video.sql and put the code below: with source_video as ( select * from {{ source('wetube','video') }} ), source_video_like as ( select * from {{ source('wetube','video_like') }} ), source_account_info as ( select * from {{ ref('account_information') }} ), source_total_like_by_video as ( select source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title, count(*) as total_likes FROM source_video_like inner join source_video using (id_video) inner join source_account_info using (id_channel) GROUP BY source_account_info.id_channel, source_account_info.channel, source_video.id_video, source_video.title ORDER BY total_likes DESC ) select * from source_total_like_by_video Running DBT again After creation of our files, let's run them again to create the models dbt run Output The models were created in the database and you can run select statements directly in your database to check it. Model: account_information Model: number_of_subs_by_channel Model: total_likes_by_video Step 6: DBT Docs Documentation After generated our models, now we're going to generate docs based on these models. DBT generates a complete documentation about models and sources and their columns and also you can see through a web page. Generating docs dbt docs generate Running docs on webserver After docs generated you can run command below to start a webserver on port 8080 and see the documentation locally. dbt docs serve Lineage Another detail about documentation is that you can see through of a Lineage the models and it's dependencies. Github code You can checkout this code through our Github page. Cool? I hope you guys enjoyed it!
- Differences between FAILFAST, PERMISSIVE and DROPMALFORED modes in Dataframes
There's a bit differences between them and we're going to find out in this post. The parameter mode is a way to handle with corrupted records and depending of the mode, allows validating Dataframes and keeping data consistent. In this post we'll create a Dataframe with PySpark and comparing the differences between these three types of mode: PERMISSIVE DROPMALFORMED FAILFAST CSV file content This content below simulates some corrupted records. There are String types for the engines column that we'll define as an Integer type in the schema. "type","country","city","engines","first_flight","number_built" "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A220","Canada","Calgary","two",2013-03-02,179 "Airbus A220","Canada","Calgary",2,2013-03-02,179 "Airbus A320","France","Lyon","two",1986-06-10,10066 "Airbus A330","France","Lyon","two",1992-01-02,1521 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York","two",1967-08-03,10636 "Boeing 737","USA","New York",2,1967-08-03,10636 "Airbus A220","Canada","Calgary",2,2013-03-02,179 Let's start creating a simple Dataframe that will load data from a CSV file with the content above, let's supposed that the content above it's from a file called airplanes.csv. To modeling the content, we're also creating a schema that will allows us to Data validate. Creating a Dataframe using PERMISSIVE mode The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "PERMISSIVE") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of PERMISSIVE mode Creating a Dataframe using DROPMALFORMED mode The DROPMALFORMED mode ignores corrupted records. The meaning that, if you choose this type of mode, the corrupted records won't be list. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "DROPMALFORMED") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of DROPMALFORMED mode After execution it's possible to realize that the corrupted records aren't available at Dataframe. Creating a Dataframe using FAILFAST mode Different of DROPMALFORMED and PERMISSIVE mode, FAILFAST throws an exception when detects corrupted records. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("spark-app") \ .getOrCreate() schema = StructType([ StructField("TYPE", StringType()), StructField("COUNTRY", StringType()), StructField("CITY", StringType()), StructField("ENGINES", IntegerType()), StructField("FIRST_FLIGHT", StringType()), StructField("NUMBER_BUILT", IntegerType()) ]) read_df = spark.read \ .option("header", "true") \ .option("mode", "FAILFAST") \ .format("csv") \ .schema(schema) \ .load("airplanes.csv") read_df.show(10) Result of FAILFAST mode ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Books to study and read If you want to learn more about and reach a high level of knowledge, I strongly recommend reading the following book(s): Spark: The Definitive Guide: Big Data Processing Made Simple is a complete reference for those who want to learn Spark and about the main Spark's feature. Reading this book you will understand about DataFrames, Spark SQL through practical examples. The author dives into Spark low-level APIs, RDDs and also about how Spark runs on a cluster and how to debug and monitor Spark clusters applications. The practical examples are in Scala and Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library with the new version of Spark, this book explores the main Spark's features like Dataframes usage, Spark SQL that you can uses SQL to manipulate data and Structured Streaming to process data in real time. This book contains practical examples and code snippets to facilitate the reading. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark is a book that explores best practices using Spark and Scala language to handle large-scale data applications, techniques for getting the most out of standard RDD transformations, how Spark SQL's new interfaces improve performance over SQL's RDD data structure, examples of Spark MLlib and Spark ML machine learning libraries usage and more. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming covers the basic concepts of Python through interactive examples and best practices. Learning Scala: Practical Functional Programming for the Jvm is an excellent book that covers Scala through examples and exercises. Reading this bool you will learn about the core data types, literals, values and variables. Building classes that compose one or more traits for full reusability, create new functionality by mixing them in at instantiation and more. Scala is one the main languages in Big Data projects around the world with a huge usage in big tech companies like Twitter and also the Spark's core language. Cool? I hope you enjoyed it!
- Working with Schemas in Spark Dataframes using PySpark
What's a schema in the Dataframes context? Schemas are metadata that allows working with a standardized Data. Well, that was my definition about schemas but we also can understanding schemas as a structure that represents a data context or a business model. Spark enables using schemas with Dataframes and I believe that is a good point to keep data quality, reliability and we also can use these points to understand the data and connect to the business. But if you know a little more about Dataframes, working with schema isn't a rule. Spark provides features that we can infer to a schema without defined schemas and reach to the same result, but depending on the data source, the inference couldn't work as we expect. In this post we're going to create a simple Dataframe example that will read a CSV file without a schema and another one using a defined schema. Through examples we'll can see the advantages and disadvantages. Let's to the work! CSV File content "type","country","engines","first_flight","number_built" "Airbus A220","Canada",2,2013-03-02,179 "Airbus A320","France",2,1986-06-10,10066 "Airbus A330","France",2,1992-01-02,1521 "Boeing 737","USA",2,1967-08-03,10636 "Boeing 747","USA",4,1969-12-12,1562 "Boeing 767","USA",2,1981-03-22,1219 If you noticed in the content above, we have different data types. We have string, numeric and date column types. The content above will be represented by airliners.csv in the code. Writing a Dataframe without Schema from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result It seems that worked fine but if you look with attention, you'll realize that in the schema structure there are some field types that don't match with their values, for example fields like number_built, engines and first_flight. They aren't string types, right? We can try to fix it adding the following parameter called "inferSchema" and setting up to "true". from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result Even inferring the schema, the field first_flight keeping as a string type. Let's try to use Dataframe with a defined schema to see if this details will be fixed. Writing a Dataframe with Schema Now it's possible to see the differences between the codes. We're adding an object that represents the schema. This schema describes the content in CSV file, you can note that we have to describe the column name and type. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() StructSchema = StructType([ StructField("type", StringType()), StructField("country", StringType()), StructField("engines", IntegerType()), StructField("first_flight", DateType()), StructField("number_built", IntegerType()) ]) air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .schema(StructSchema) \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Dataframe/Print schema result After we defined the schema, all the field types match with their values. This shows how important is to use schemas with Dataframes. Now it's possible to manipulate the data according to the type with no concerns. Books to study and read If you want to learn more about and reach a high level of knowledge, I strongly recommend reading the following book(s): Spark: The Definitive Guide: Big Data Processing Made Simple is a complete reference for those who want to learn Spark and about the main Spark's feature. Reading this book you will understand about DataFrames, Spark SQL through practical examples. The author dives into Spark low-level APIs, RDDs and also about how Spark runs on a cluster and how to debug and monitor Spark clusters applications. The practical examples are in Scala and Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library with the new version of Spark, this book explores the main Spark's features like Dataframes usage, Spark SQL that you can uses SQL to manipulate data and Structured Streaming to process data in real time. This book contains practical examples and code snippets to facilitate the reading. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark is a book that explores best practices using Spark and Scala language to handle large-scale data applications, techniques for getting the most out of standard RDD transformations, how Spark SQL's new interfaces improve performance over SQL's RDD data structure, examples of Spark MLlib and Spark ML machine learning libraries usage and more. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming covers the basic concepts of Python through interactive examples and best practices. Learning Scala: Practical Functional Programming for the Jvm is an excellent book that covers Scala through examples and exercises. Reading this bool you will learn about the core data types, literals, values and variables. Building classes that compose one or more traits for full reusability, create new functionality by mixing them in at instantiation and more. Scala is one the main languages in Big Data projects around the world with a huge usage in big tech companies like Twitter and also the Spark's core language. Cool? I hope you enjoyed it!
- Criando Schemas com Spark Dataframes usando PySpark
O que é um schema no contexto de Dataframes? Os schemas são metadados que permitem trabalhar com dados padronizados. Bem, essa foi minha definição sobre esquemas, mas também podemos entender os schemas como uma estrutura que representa um contexto de dados ou um modelo de negócio. O Spark possibilita a definição de schemas em Dataframes visando manter a qualidade dos dados, confiabilidade e manter um catalogo de metadados. Por mais que seja possível não utilizar schemas, somente utilizando métodos do Spark para inferir o Dataframe e extrair o metadado, a utilização de schemas é uma boa prática quando se trabalha com governança de dados. Neste post vamos criar um exemplo de Dataframe simples que irá ler um arquivo CSV sem um schema e outro exemplo usando um schema definido. Através de exemplos veremos as vantagens e desvantagens. Conteúdo do arquivo CSV "type","country","engines","first_flight","number_built" "Airbus A220","Canada",2,2013-03-02,179 "Airbus A320","France",2,1986-06-10,10066 "Airbus A330","France",2,1992-01-02,1521 "Boeing 737","USA",2,1967-08-03,10636 "Boeing 747","USA",4,1969-12-12,1562 "Boeing 767","USA",2,1981-03-22,1219 Se você percebeu no conteúdo acima, temos diferentes tipos de dados. Temos tipos de coluna string, numérica e date. O conteúdo acima será representado pelo arquivo airliners.csv no código. Criando um Dataframe sem Schema definido from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Resultado Dataframe Parece que funcionou bem, mas se você olhar com atenção, perceberá que na estrutura do esquema existem alguns tipos de campos que não correspondem aos seus valores, por exemplo, campos como number_built, engines e first_flight. Eles não são tipos de string, certo? Podemos tentar corrigir essa inconsistência adicionando no Dataframe o seguinte parâmetro chamado inferSchema e adicionando o valor para true. from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() air_liners_df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .format("csv") \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Resultado Dataframe utilizando o parâmetro inferSchema Mesmo inferindo o schema, o campo first_flight manteve-se como tipo string. Agora, vamos tentar definir um schema para o Dataframe, para que tenhamos certeza de que todos os tipos sejam padronizados de acordo com o seu valor. Criando um Dataframe com Schema definido No código abaixo temos um complemento do código anterior, neste caso, estamos de fato criando um schema para ser utilizado no Dataframe. Perceba que é possível utilizar a classes StructSchema para definição do schema e StructField para os campos e seus tipos. from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType, DateType, StructField if __name__ == "__main__": spark = SparkSession.builder \ .master("local[1]") \ .appName("schema-app") \ .getOrCreate() StructSchema = StructType([ StructField("type", StringType()), StructField("country", StringType()), StructField("engines", IntegerType()), StructField("first_flight", DateType()), StructField("number_built", IntegerType()) ]) air_liners_df = spark.read \ .option("header", "true") \ .format("csv") \ .schema(StructSchema) \ .load("airliners.csv") air_liners_df.show() air_liners_df.printSchema() Resultado Dataframe utilizando o Schema definido Depois de definirmos o schema, todos os tipos de campos correspondem aos seus valores. Isso mostra o quão importante é usar schemas com Dataframes. Agora é possível manipular os dados de acordo com o tipo e sem preocupações de se ter dados inconsistente e visando a qualidade dos dados para o consumo. Material de estudo Se quer aprender mais sobre o assunto e alcançar um alto nível de conhecimento, recomendo fortemente a leitura do(s) seguinte(s) livro(s): Spark: The Definitive Guide: Big Data Processing Made Simple (Versão Inglês) é uma referência completa para quem quer aprender o Spark e sobre as suas principais funcionalidades. Lendo esse livro, você vai aprender sobre DataFrames, Spark SQL através de exemplos práticos. O autor mergulha nas APIs de baixo nível do Spark, RDDs e também sobre como o Spark é executado em um cluster e como depurar e monitorar os aplicativos de clusters do Spark. Os exemplos práticos estão em Scala e Python. Beginning Apache Spark 3: With Dataframe, Spark SQL, Structured Streaming, and Spark Machine Library (Versão Inglês) com a nova versão do Spark, este livro explora os principais recursos do Spark, como o uso de Dataframes, Spark SQL no qual você pode usar SQL para manipular dados e Structured Streaming para processar dados em tempo real. Este livro contém exemplos práticos e trechos de código para facilitar a leitura. High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (Versão Inglês) é um livro que explora as melhores práticas usando a linguagem Spark e Scala para lidar com aplicações de dados em larga escala, técnicas para transformações utilizando RDD, e também mostra como as novas interfaces do Spark SQL melhoram o desempenho sobre a estrutura de dados RDD do SQL, exemplos de Spark MLlib e o uso de bibliotecas de aprendizado de máquina de ML e muito mais. Python Crash Course, 2nd Edition: A Hands-On, Project-Based Introduction to Programming (Versão Inglês) abrange os conceitos básicos do Python por meio de exemplos interativos e práticas recomendadas. Learning Scala: Practical Functional Programming for the Jvm (Versão Inglês) é um excelente livro que aborda a linguagem Scala através de exemplos e exercícios práticos. Lendo este livro, você aprenderá sobre os principais tipos de dados, literais, valores e variáveis. Construir classes que compõem uma ou mais características para total reutilização, criar novas funcionalidades misturando-as na instanciação e muito mais. Scala é uma das principais linguagens em projetos de Big Data em todo o mundo, com grande uso em grandes empresas de tecnologia como o Twitter e também a linguagem principal do Spark. Bom é isso, espero que tenham gostado!
- Differences between External and Internal tables in Hive
There are two ways to create tables in the Hive context and this post we'll show the differences, advantages and disadvantages. Internal Table Internal tables are known as Managed tables and we'll understand the reason in the following. Now, let's create an internal table using SQL in the Hive context and see the advantages and disadvantages. create table coffee_and_tips_table (name string, age int, address string) stored as textfile; Advantages To be honest I wouldn't say that it's an advantage but Internal tables are managed by Hive Disadvantages Internal tables can't access remote storage services for example in clouds like Amazon AWS, Microsoft Azure and Google Cloud. Dropping Internal tables all the data including metadata and partitions will be lost. External Table External tables has some interesting features compared to Internal tables and it's a good and recommended approach when we need to create tables. In the script below you can see the difference between Internal table creation and External table related to the last section. We just added the reserved word external in the script. create external table coffee_and_tips_external (name string, age int, address string) stored as textfile; Advantages The data and metadata won't be lost if drop table External tables can be accessed and managed by external process External tables allows access to remote storage service as a source location Disadvantages Again, I wouldn't say that it's a disadvantage but if you need to change schema or dropping a table, probably you'll need to run a command to repair the table as shown below. msck repair table Depending on the volume, this operation may take some time to complete. To check out a table type, run the following command below and you'll see at the column table_type the result. hive> describe formatted That's it, I hope you guys enjoy it! References: https://hive.apache.org/
- How to generate random Data using Datafaker lib
Sometimes in our projects we have to fill Java objects for unit tests or even to create a database dump with random data to test a specific feature and etc. We need to be creative trying to create names, street names, cities or documents. There's an interesting and helpful Java library called Datafaker that allows to create random data with a large number of providers. Providers are objects based on a context, for example: If you want to generate data about a Person object, there's a specific provider for this context that will generate name, last name and etc. If you need to create a unit test that you need data about address, you'll find it. In this post we'll create some examples using Maven but the library also provides support for Gradle projects. Maven net.datafaker datafaker 1.1.0 Generating Random Data Let's create a simple Java class that contains some properties like name, last name, address, favorite music genre and food. public class RandomPerson { public String firstName; public String lastName; public String favoriteMusicGenre; public String favoriteFood; public String streetAddress; public String city; public String country; @Override public String toString() { return "firstName=" + firstName + "\n" + "lastName=" + lastName + "\n" + "favoriteMusicGenre="+favoriteMusicGenre + "\n" + "favoriteFood=" + favoriteFood + "\n" + "streetAddress=" + streetAddress + "\n" + "city=" + city + "\n" + "country=" + country ; } static void print(RandomPerson randomPerson){ System.out.println( randomPerson ); } } In the next step we'll fill an object using the providers that we quote in the first section. First of all, we create an object called randomData that represents Faker class. This class contains all the providers in the example below. public static void main(String[] args) { Faker randomData = new Faker(); RandomPerson randomPerson = new RandomPerson(); randomPerson.firstName = randomData.name().firstName(); randomPerson.lastName = randomData.name().lastName(); randomPerson.favoriteMusicGenre = randomData.music().genre(); randomPerson.favoriteFood = randomData.food().dish(); randomPerson.streetAddress = randomData.address().streetAddress(); randomPerson.city = randomData.address().city(); randomPerson.country = randomData.address().country(); print(randomPerson); } After the execution, we can see the results like this at the console: Result firstName=Dorthy lastName=Jones favoriteMusicGenre=Electronic favoriteFood=Cauliflower Penne streetAddress=7411 Darin Gateway city=Gutkowskifort country=Greece Every execution will be a new result because of providers are randoms. Another interesting feature is that we can set up the Locale when instantiate an object. Faker randomData = new Faker(Locale.JAPANESE); See the results based on Local.JAPANESE: Result firstName=航 lastName=横山 favoriteMusicGenre=Non Music favoriteFood=French Fries with Sausages streetAddress=418 美桜Square city=南斉藤区 country=Togo Books to study and read If you want to learn more about and reach a high level of knowledge, I strongly recommend reading the following book(s): Unit Testing Principles, Practices, and Patterns: Effective Testing Styles, Patterns, and Reliable Automation for Unit Testing, Mocking, and Integration Testing with Examples in C# is a book that covers Unit Testing Principles, Patterns and Practices teaches you to design and write tests that target key areas of your code including the domain model. In this clearly written guide, you learn to develop professional-quality tests and test suites and integrate testing throughout the application life cycle. Junit em Ação (Portuguese version) is a popular testing book that covers techniques such as hands-on testing of your code, using techniques from completed tests, unit testing, basic form testing, and much more. It is through the practice of tests that we guarantee a quality delivery of the software to the end customer, avoiding breaks in the Build and Deploy phase, delivering full functionalities and following up according to the project requirements Mastering Unit Testing Using Mockito and JUnit is a book that covers JUnit practices using one of the most famous testing libraries called Mockito. This book teaches how to create and maintain automated unit tests using advanced features of JUnit with the Mockito framework, continuous integration practices (famous CI) using market tools like Jenkins along with one of the largest dependency managers in Java projects, Maven. For you who are starting in this world, it is an excellent choice. Isn't a cool library!? See you!
- Criando Kinesis Firehose via Terraform e enviando eventos com Java
Neste post, vamos fazer um passo a passo de como criar um AWS Kinesis Firehose (Delivery Stream) via Terraform e enviar alguns eventos via aplicação Java. Antes de começarmos, precisamos entender o que é o Kinesis Firehose e como é o seu funcionamento. O Kinesis Firehose é um dos recursos do Amazon Kinesis, que se resume ao serviço de streaming de dados da AWS. O Kinesis Firehose possibilita capturar um grande volume de dados através de uma origem, processar em tempo real e entregar os dados para um destino. Para entender melhor, vamos pensar no cenário a seguir: Imagine que você e seu time esteja trabalhando em um processo que será necessário enviar um grande volume de dados em tempo real para o S3. Estes objetos são arquivos no formato JSON representados por dados transacionais de compras e vendas de uma plataforma de e-commerce. A ideia de enviar ao S3 é que, o time está planejando criar um Datalake no futuro, e já pensa em organizar os dados em um repositório único de dados, neste caso, o S3. O time criou uma API que faz o PUT dos objetos no S3, e foram percebendo que aquele processo era limitado, custoso, não muito escalável pelo fato do volume de dados ser alto e o mais importante, não era resiliente. Com base nas limitações, o time entendeu que, o mais certo seria trabalhar em cima de uma ferramenta que pudesse fazer toda a parte de streaming dos dados e que a utilização do Kinesis Firehose seria uma melhor abordagem. Pensando neste novo cenário, arquitetura foi mudada para a seguinte: Certo, mas qual é o benefício de ter adicionado o Kinesis Firehose? O Kinesis possibilita gerenciar todo o volume de dados, independente do volume. Não será mais necessário acessar o S3 de forma contínua e gerenciar todo o fluxo de PUT dos objetos utilizando SDK Client, tornando o processo mais resiliente e simples. O Kinesis Firehose possibilita entregar as mensagens para diferentes destinos, como o S3, Redshift, Elastic Search e dentre outros. É um ótimo recurso para um processo analítico de dados e não necessita de uma administração contínua, pois o recurso já é gerenciado pela AWS. Resumindo, se você e seu time precisa processar grandes volumes de dados em tempo real e entregar os dados para diferentes destinos, o Kinesis Firehose pode ser uma boa solução. O cenário acima descrito, foi apenas para que a gente pudesse entender o contexto em qual cenário aplicar o Kinesis Firehose, existem vários outros cenários em que podemos aplicar. Na próxima etapa, vamos criar um Kinesis utilizando uma ferramenta IAC, neste caso, o Terraform. O Terraform já foi abordado em outros posts e vale a pena dar uma olhada caso você seja novo por aqui, pois o foco não é falar sobre Terraform, combinado? Após a criação do Kinesis Firehose utilizando Terraform, vamos criar um código Java que vai nos permitir enviar alguns eventos para o Kinesis Firehose processar e salvar estes mesmos dados em um Bucket no S3, finalizando o fluxo de streaming. Lembre-se que, para executar os passos seguintes é necessário ter uma conta na AWS e ter as credencias já configuradas, ok? Criando o Terraform Crie um arquivo no seu projeto chamado main.tf e adicione o código abaixo provider "aws" { region = "${var.region}" } Entendendo o código acima provider "aws": É no provider que definimos qual é a cloud que utilizaremos, no nosso caso a AWS. region: É onde definimos a região em que o serviço será executado. Agora, crie o arquivo vars.tf . É neste arquivo que vamos definir a variáveis e os seus valores. variable "region" { default = "us-east-1" type = "string" } variable "bucket" { type = "string" } Entendendo o código acima variable "region": Declaração da variável region. Esta mesma variável é utilizado no arquivo main.tf. default: Definição padrão do valor da variável, neste caso us-east-1. Esta será a região em que o serviço será executado. type: Tipo da variável variable "bucket": Declaração da variável bucket. E por último, crie o arquivo kinesis.tf, este nome é sugestivo, combinado? resource "aws_kinesis_firehose_delivery_stream" "kinesis_firehose" { destination = "extended_s3" name = "kns-delivery-event" extended_s3_configuration { bucket_arn = "${aws_s3_bucket.bucket.arn}" role_arn = "${aws_iam_role.firehose_role.arn}" prefix = "ingest/year=!{timestamp:yyyy}/ month=!{timestamp:MM}/ day=!{timestamp:dd}/ hour=!{timestamp:HH}/" error_output_prefix = "ingest/ !{firehose:error-output-type}/ year=!{timestamp:yyyy} /month=!{timestamp:MM}/ day=!{timestamp:dd}/hour=!{timestamp:HH}/" } } resource "aws_s3_bucket" "bucket" { bucket = "${var.bucket}" acl = "private" } resource "aws_iam_role" "firehose_role" { name = "firehose_test_role" assume_role_policy = < com.amazonaws aws-java-sdk-kinesis 1.12.70 org.projectlombok lombok 1.18.20 provided Criando o modelo Classe Java chamada Event.java que representará o modelo do evento. import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; import java.util.UUID; @Data public class Event { @JsonProperty("event_date") private String eventDate; @JsonProperty("event_id") private UUID eventId; @JsonProperty("provider") private String provider; @JsonProperty("blog") private String blog; @JsonProperty("post_id") private UUID postId; } Criando o Service A classe KinesisService.java será a responsável por parte da lógica e envio dos eventos. public class KinesisService { static String PROVIDER = "AWS KINESIS FIREHOSE"; static String BLOG = "Coffee and Tips"; static String KNS_DELIVERY_NAME = "kns-delivery-event"; static String RECORD_ID = "Record ID "; static String EVENT = "Event "; public static AmazonKinesisFirehose kinesisFirehoseClient(){ AmazonKinesisFirehose amazonKinesisFirehose = AmazonKinesisFirehoseClient.builder() .withRegion(Regions.US_EAST_1.getName()) .build(); return amazonKinesisFirehose; } @SneakyThrows public static void sendDataWithPutRecordBatch(int maxRecords){ PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); putRecordBatchRequest .setDeliveryStreamName(KNS_DELIVERY_NAME); String line = ""; List records = new ArrayList<>(); while(maxRecords > 0){ line = getData(); String data = line + "\n"; Record record = new Record() .withData(ByteBuffer.wrap(data.getBytes())); records.add(record); maxRecords --; } putRecordBatchRequest.setRecords(records); PutRecordBatchResult putRecordResult = kinesisFirehoseClient() .putRecordBatch(putRecordBatchRequest); putRecordResult .getRequestResponses() .forEach(result -> System.out .println(RECORD_ID + result.getRecordId())); } @SneakyThrows public static void sendDataWithPutRecord(int maxRecords){ PutRecordRequest PutRecordRequest = new PutRecordRequest(); PutRecordRequest .setDeliveryStreamName(KNS_DELIVERY_NAME); String line = ""; while(maxRecords > 0){ line = getData(); String data = line + "\n"; System.out.println(EVENT + data); Record record = new Record() .withData(ByteBuffer .wrap(data.getBytes())); PutRecordRequest.setRecord(record); PutRecordResult putRecordResult = kinesisFirehoseClient() .putRecord(PutRecordRequest); System.out.println(RECORD_ID + putRecordResult.getRecordId()); maxRecords --; } } @SneakyThrows public static String getData(){ Event event = new Event(); event.setEventId(UUID.randomUUID()); event.setPostId(UUID.randomUUID()); event.setBlog(BLOG); event.setEventDate(LocalDateTime.now().toString()); event.setProvider(PROVIDER); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(event); } } Entendendo o código acima O código acima possui algumas adaptações visando o funcionamento do Kinesis para o Post. Para um ambiente de produção, recomendo utilizar técnicas de padrões de projeto ou adotar melhores práticas visando um código mais organizado, GG? 1. O método kinesisFirehoseClient( ) é o responsável por configurar e manter uma interação entre os serviços do Kinesis Firehose e a nossa aplicação através da interface AmazonKinesisFirehose. Esta interface provê os seguintes contratos: putRecordBatch(PutRecordBatchRequest var1) putRecord(PutRecordRequest var1) Todo o código de envio foi baseado nos contratos acima e falaremos mais sobre eles nos próximos passos. Por último, perceba que estamos utilizando a região US-EAST-1, a mesma região definida no Terraform. 2. O método sendDataWithPutRecordBatch(int maxRecords) será o nosso método responsável por enviar os dados em Batch a partir do contrato putRecordBatch(PutRecordBatchRequest var1). A ideia de usar o envio em Batch, é a possibilidade de, em um único request, enviarmos uma grande quantidade de dados. Este cenário é bem comum para aplicações que trabalham com um grande volume de dados. Um ponto importante a ser lembrado é que, esta operação suporta até 4 MB de dados ou até 500 registros por requisição. Para o método sendDataWithPutRecordBatch(int maxRecords), estamos utilizando o parâmetro maxRecords, que simulará uma quantidade específica de dados dentro do loop para serem enviados via Batch ao fim da interação, ou seja, o seu uso será apenas para a simulação. 3. Para entendermos sobre o contrato putRecord(PutRecordRequest var1) fornecido pela interface AmazonKinesisFirehose, criamos o método sendDataWithPutRecord(int maxRecords). Este cenário difere bastante do envio em Batch, pois o tipo de envio será do tipo single record, ou seja, para cada dado enviado, um novo request deve ser criado e para cada envio, existe um limite de dado de até 1.000 KiB de tamanho. 4. O método getData( ) será o responsável por montar o objeto do tipo Event com valores aleatórios antes do envio e transformar em um JSON através do ObjectMapper. Executando o código Para a execução do código, vamos criar um método main( ) que fará a execução dos métodos acima. Executando o método sendDataWithPutRecord(int maxRecords) public static void main(String[] args) { sendDataWithPutRecord(2000); } No código acima estamos invocando o método sendDataWithPutRecord(int maxRecords) e passando no parâmetro o valor 2000, ou seja, iremos enviar 2000 eventos ou 2000 requests. Resposta do envio no console Para cada envio, o campo putRecordResult.getRecordId() retorna um ID. Executando o método sendDataWithPutRecordBatch(int maxRecords) public static void main(String[] args) { sendDataWithPutRecordBatch(500); } Já no método acima, especificamos o valor 500 para o parâmetro maxRecords, pois é o valor máximo de dados permitidos para envio em cada requisição. Lembrando que este parâmetro simula a quantidade de dados fictícios a serem enviados. Destino final Após a execução dos envios, os dados serão entregues ao Bucket do S3 definido na execução do Terraform e serão organizados conforme a imagem abaixo. Na imagem acima, os dados estão organizados entre ano, mês, dia e hora de envio. Os arquivos vão estar disponíveis dentro das pastas relacionadas a hora (hour=), e perceba a diferença de 5 minutos entre eles na coluna Última modificação, ou seja, seguindo a configuração de Buffer interval conforme falamos nos passos anteriores. Pronto, os dados estão sendo entregues! Destruindo os recursos Caso você esteja acompanhando este Post de forma experimental, creio que seja necessário apagar toda a stack em que criamos para evitar uma conta inesperada da AWS. Podemos executar o seguinte comando via Terraform para destruir os recursos em que criamos. terraform destroy -var 'bucket=digite o nome do seu bucket' Mas antes, apague todos os arquivos e pastas do Bucket, pois caso o contrário, o destroy não vai funcionar, GG? Após a execução do comando acima, você deverá confirmar a destruição, é só digitar yes para confirmar. Caso queria entender mais a fundo sobre Kinesis Firehose, recomendo a leitura na documentação oficial. Código completo no GitHub. É isso, curtiu? Até mais!
- Entendendo o Presto
O Presto é uma ferramenta bem conhecido no mundo Big Data, especificamente na área de Engenharia de dados. O Presto funciona como uma SQL Engine para análise de grandes volumes de dados distribuídos e heterogêneos. É um conceito um pouco subjetivo que vamos entender mais a frente. Presto não substitui MySQL, Oracle, SQL Server e etc, muitas pessoas se confundem com esta ideia. O Presto é um contexto composto por uma engine robusta que possibilita processar um input no padrão ANSI-SQL em que possibilitará um resultado final ao consumidor. Hoje no mercado temos algumas ferramentas/recursos baseados no Presto. Um deles é o conhecido AWS Athena. O Athena é um recurso que permite executar consultas interativas visando análise de dados. Entender o funcionamento do Athena, vai nos ajudar a entender também o funcionamento do Presto. No Athena, um Datasource é dividido por um catalogo de dados externo ou um connector, ou seja, onde os dados estão alocados (Por ser um contexto da AWS, os times escolhem algum bucket do S3). E por fim, um com catalogo de metadados, mais uma vez, naturalmente utiliza-se o Glue, mas também é possível o Apache Hive. Contexto Athena x Presto O que descrevemos anteriormente sobre a um Datasource no Athena é basicamente o contexto do Presto. O Presto é composto por diversos componentes que possibilitam processar uma entrada criando um processo de Query gerenciado por um componente que coordena todo o fluxo, chamado de Coordinator. Este Coordinator tem um papel em gerenciar os nós workers que trabalham em conjunto para entregar o resultado final ao cliente. Da mesma forma que o Athena funciona, o Presto também possui suporte para diferentes catálogos de dados, incluindo o S3 e entre outros. Possui também uma lista extensa de catalogo de metadados e conectores. Uma outra forma de entender o funcionamento e a sua finalidade, seria o seguinte cenário: Imagine um cenário em que é necessário executar uma análise em cima de dados representados por uma grande quantidade de arquivos semi-estruturados. Para uma melhor análise, será utilizado o padrão ANSI-SQL para uma análise mais precisa. Para este contexto, poderíamos adotar o Presto para conectar aos dados, organizar os dados em um catalogo e tabelas e por fim, criar um ecossistema que possibilita analisar os dados. Dentro deste contexto, o usuário poderia executar análises utilizando SQL baseadas nos arquivos em um repositório único, neste caso, um catálogo de dados mais transparente e possibilitando trabalhar com dados heterogêneos. Entendendo melhor o funcionamento do Presto Aqui vamos descrever os principais componentes do Presto, recomendo a leitura da sua documentação original. Overview do contexto Presto 1. Coordinator O Coordinator é o principal servidor e é o responsável por fazer os parses dos statements, queries, retornar o resultado final para o cliente e por gerenciar os workers. O Coordinator é o core de cada instalação, ou seja, uma peça chave para o funcionamento. A comunicação entre o Coordinator e os nós workers são através de API REST. 2. Workers Os Workers são os nós responsáveis por executar as tasks e processar os dados estimulados pelo Coordinator. As comunicações entre os Workers e o Coordinator são feitas através de API REST. 3. Connector O Connector é uma forma de adaptar Presto a um Datasource. É possível utilizar um Connector para ser utilizado junto ao Hive, MySQL, Redshift e entre vários outros. Resumidamente, o Connector funciona como um driver para um banco de dados. 4. Query Execution Model Presto executa SQL statements que são convertidos em Queries que por fim, são executados através de clusters distribuídos e gerenciados pelo coordinator e workers. 5. Statement Uma Statement é basicamente um texto SQL. Presto trabalha com statements seguindo os padrões ANSI-SQL, ou seja, possui expressões, cláusulas e etc. É interessante não confundir uma Statement com Query, existe uma diferença na qual falaremos no próximo tópico. 6. Query A Query é um processo criado a partir de uma Statement, ou seja, uma Statement é passada ao Presto e em seguida convertida para uma Query. A Query é um conjunto de fatores executados através de estágios interconectados e gerenciados pelos Workers. Perceba que existe uma diferença entre Statement (Texto SQL) e Query. A Query é um processo criado através de uma Statement envolvendo alguns componentes. Já a Statement é a representação textual de um script SQL que segue o padrão ANSI-SQL que será passado e processado pelo Presto para um resultado final. Para maiores informações sobre o Presto, recomendo bastante a leitura da doc Este foi um post foi um pouco mais simples onde a ideia foi fazer um overview sobre o contexto do Presto, espero ter ajudado. Até mais.
- Mensageria com Google Cloud Pub/Sub
A comunicação assíncrona via mensagens é um dos modelos mais utilizados atualmente entre microsserviços. Vamos fazer um exemplo bem simples usando a plataforma do Google. Vou começar mostrando o código e depois algumas configurações que precisaremos fazer. OK? Vamos lá!!! Para começar, vamos adicionar as seguintes dependências no projeto: (Caso tenha dúvidas em como iniciar um projeto spring boot, veja nesse post) org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-gcp-starter 1.1.1.RELEASE org.springframework.cloud spring-cloud-gcp-starter-pubsub 1.1.1.RELEASE org.springframework.boot spring-boot-starter-test test Agora, vamos para o código de fato: Primeiramente vamos criar um VO que vamos usar para representar o nosso evento. public class EventVO { private String name; private String email; [...] Getters/Setters and Constructor } Agora , vamos disponibilizar um endpoint para facilitar a publicação da mensagem. import com.coffee.tipsgcp.vo.EventVO; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gcp.pubsub.core.publisher.PubSubPublisherTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/producer-event") public class ProducerEventController { @Autowired PubSubPublisherTemplate pubsubPublisher; @Autowired ObjectMapper objectMapper; @PostMapping() public void sendEvent(@RequestBody EventVO event) throws JsonProcessingException { pubsubPublisher.publish("topico-test", objectMapper.writeValueAsString(event)); System.out.println("Mensagem enviada para o tópico !!!"); } } No exemplo acima, nós injetamos a classe PubSubPublisherTemplate que é responsável por publicar a mensagem com o método publish(). Nele informamos o nome do tópico e transformamos o nosso objeto EventVO em json para ser enviado. E finalizando a parte do código, vamos criar um método para consumir as mensagens que publicamos. Lembrando que como estamos falando de microsserviços, o produtor e consumidor ficam em aplicações diferentes, aqui vamos colocar na mesma aplicação para facilitar o exemplo. import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate; import org.springframework.stereotype.Component; @Component public class TopicSubscriber implements ApplicationRunner { @Autowired PubSubTemplate pubSubTemplate; @Override public void run(ApplicationArguments args) { pubSubTemplate.subscribe("topico-test-assinatura", msg -> { System.out.println("Consumindo mensagem \n" + msg.toString()); msg.ack(); }); } } A classe acima implementa ApplicationRunner para que o código no método run() seja executado assim que a aplicação comece a rodar. Além disso, injetamos a classe PubSubTemplate para consumir as mensagens que chegam ao tópico especificado por meio do método subscribe(). Por fim, o metodo ack(), notifica o serviço do google que a mensagem foi recebida. Agora que finalizamos a parte do código, vamos a algumas configurações que teremos que fazer no Google Cloud Console. Isso pode ser feito por um client diretamento do terminal ou no próprio google console. Vou mostrar pelo console. Após criar um projeto no google console, precisaremos ativar uma chave para autenticação. Para isso, procure por Credentials na barra de busca: Após entrar nessa sessão, você encontrará uma listas com as contas que estão disponíveis para o seu usuário. Escolha a conta que deseja usar e clique em editar. Na próxima tela, será exibida a aba CHAVES. Nela terá a opção de adicionar uma nova chave. Clique na opção e será gerado um arquivo com a chave para autenticação no serviço. Faça dowload do arquivo. Após baixar o arquivo de autenticação, vamos criar uma variável de ambiente com o path do arquivo. Isso pode ser feito de várias formas, inclusive pode ser feito via código ao invés de variável de ambiente. Aqui, vou mostrar como foi feito na IDE Intellij. Clique em Edit Configurations e em, Environment Variables adicione: GOOGLE_APPLICATION_CREDENTIALS=[file_path]. Caso não seja especificado um arquivo de autenticação no google ou se a configuração estiver errada, a aplicação não vai nem conseguir iniciar. Além do arquivo de autenticação, vamos configurar agora o tópico e subscrição que vamos utilizar. Na barra de busca do Google Console, procure por Pub/Sub: Após abrir essa seção, clique no botão Criar Tópico e crie um tópico com o nome que você especificou no código. Depois de criado o tópico, clique em mais ações (3 pontinhos) no lado direito, e crie a assinatura com o mesmo nome especificado no código. Ótimo, agora sim nossa aplicação já está pronta!! Vamos subir a aplicação e fazer um POST para o nosso controller: Após fazer o POST, a saída do console será essa: Note que após a mensagem ser enviada, imediatamente ela é consumida pela outra ponta. É isso galera, espero ter contribuído. Até a próxima!
- Spring Data REST: Um CRUD pronto em 5 minutos
O módulo data rest do spring nos ajuda a disponibilizar operações CRUD com o mínimo de esforço possível. Imagine que você precise implementar um cadastro de produtos com regras complexas, mas que, antes disso, também tenha que ter um cadastro de “categoria de produtos”, que simplesmente insere uma nova categoria sem nenhuma regra complexa para se preocupar. Nesse contexto, o spring data rest pode facilitar bastante o trabalho, disponibilizando todas as operações no padrão REST para isso. Vamos lá!!! A primeira coisa que vamos precisar é incluir a dependência no projeto. org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-data-rest org.springframework.boot spring-boot-starter-data-jpa com.h2database h2 org.springframework.boot spring-boot-starter-test test Feito isso, vamos criar a entidade PERSON, que será utilizada para o exemplo: @Entity public class Person { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String document; [...] //Getters and Setters } E para finalizar(SIM, já estamos finalizando), criamos a classe Repository da entidade, onde vamos ter a anotação responsável por disponibilizar as operações rest. @RepositoryRestResource(collectionResourceRel = "persons", path = "persons") public interface PersonRepository extends JpaRepository { } Pronto!!! Já temos várias operações REST funcionando. Vamos testar? Lembrando que foi usado o banco H2 para facilitar a implementação. Vamos fazer um POST para a url http://localhost:8080/persons, com o seguinte payload: { "name": "Steve Rogers", "document": "0000000001" } Nesse caso temos como response um 201 Created e o body: { "name": "Steve Rogers", "document": "0000000001", "_links": { "self": { "href": "http://localhost:8080/persons/1 " }, "person": { "href": "http://localhost:8080/persons/1 " } } } Note que o retorno já faz referência a url GET do próprio registro. Para fazer update dos dados, podemos fazer um POST para a url http://localhost:8080/persons/1 com o seguinte payload: { "name": "Tony Stark", "document": "0000000001" } E para deletar fazemos DELETE para a mesma url, e nesse caso receberemos como retorno o código 204 No Content. Após isso, se fizermos um GET para a url acima, passaremos a receber um 404 Not Found, já que o registro foi deletado. Então é isso pessoal, exemplo simples, mas que pode ajudar no ganho de produtividade no dia a dia. Até a próxima!!