Documentação dos Notebooks Databricks¶
Esta documentação apresenta os notebooks que fazem o processo de ingestão e tratamento dos dados em três camadas: Bronze, Silver e Gold .
Notebook Bronze¶
Objetivo¶
Ingerir os arquivos CSV da landing zone no armazenamento Azure Data Lake (ADLS), montar os containers, carregar os dados em DataFrames Spark, aplicar algumas transformações básicas e salvar no formato Delta na camada Bronze.
Configurações e Montagem¶
storageAccountName = ""
storageAccountAccessKey = ""
sasToken = ""
def mount_adls(blobContainerName):
try:
dbutils.fs.mount(
source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
)
print("OK!")
except Exception as e:
print("Falha", e)
mount_adls('landing-zone')
mount_adls('bronze')
mount_adls('silver')
mount_adls('gold')
Ingestão dos Dados CSV¶
Leitura dos arquivos CSV da landing zone para os DataFrames Spark e definição dos nomes das colunas.
Exemplo para o arquivo vendedores:
df_vendedores_raw = spark.read.option("header", "false").csv(f"/mnt/{storageAccountName}/landing-zone/ecommerce/vendedores.csv")
colunas_vendedores = ["id", "nome", "email", "telefone", "data_cadastro"]
df_vendedores = df_vendedores_raw.toDF(*colunas_vendedores)
Processo similar para os demais arquivos:
- clientes.csv
- categorias.csv
- estoque.csv
- pagamentos.csv
- entregas.csv
- avaliacoes.csv
- pedidos.csv
- enderecos_cliente.csv
- transportadoras.csv
- formas_pagamento.csv
- itens_pedido.csv
- produtos.csv
Enriquecimento dos DataFrames¶
Adição das colunas data_hora_bronze
(timestamp da ingestão) e nome_arquivo
para rastreamento.
from pyspark.sql.functions import current_timestamp, lit
df_vendedores = df_vendedores.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("vendedores.csv"))
# Mesmo para os demais DataFrames...
Sanitização dos nomes das colunas¶
Função para padronizar os nomes das colunas:
def sanitize_columns(df):
for col_name in df.columns:
new_name = (
col_name.strip()
.lower()
.replace(" ", "_")
.replace("(", "")
.replace(")", "")
.replace("-", "_")
.replace(",", "")
.replace(";", "")
.replace("{", "")
.replace("}", "")
.replace("=", "")
.replace("\n", "")
.replace("\t", "")
)
df = df.withColumnRenamed(col_name, new_name)
return df
Salvamento dos dados na camada Bronze¶
dfs = [
(df_vendedores, "vendedores"),
(df_clientes, "clientes"),
(df_categorias, "categorias"),
(df_estoque, "estoque"),
(df_pagamentos, "pagamentos"),
(df_entregas, "entregas"),
(df_avaliacoes, "avaliacoes"),
(df_formas_pagamento, "formas_pagamento"),
(df_transportadoras, "transportadoras"),
(df_pedidos, "pedidos"),
(df_enderecos_cliente, "enderecos_cliente"),
(df_itens_pedido, "itens_pedido"),
(df_produtos, "produtos"),
]
for df, name in dfs:
df_sanitized = sanitize_columns(df)
path = f"/mnt/{storageAccountName}/bronze/ecommerce/{name}"
df_sanitized.write.format("delta").save(path)
Notebook Silver¶
Objetivo¶
Ler os dados da camada Bronze, aplicar tratamentos e renomeações, adicionar metadados, e salvar os dados processados na camada Silver.
Montagem dos containers (mesma função do notebook Bronze)¶
storageAccountName = ""
storageAccountAccessKey = ""
sasToken= ""
def mount_adls(blobContainerName):
try:
dbutils.fs.mount(
source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
)
print("OK!")
except Exception as e:
print("Falha", e)
Leitura dos dados Bronze em formato Delta¶
df_avaliacoes = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/avaliacoes")
df_categorias = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/categorias")
df_clientes = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/clientes")
df_enderecos_cliente = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/enderecos_cliente")
df_entregas = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/entregas")
df_estoque = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/estoque")
df_formas_pagamento = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/formas_pagamento")
df_itens_pedidos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/itens_pedido")
df_pagamentos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/pagamentos")
df_pedidos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/pedidos")
df_produtos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/produtos")
df_transportadoras = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/transportadoras")
df_vendedores = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/vendedores")
Adição de colunas de metadados para Silver¶
from pyspark.sql.functions import current_timestamp, lit
df_avaliacoes = df_avaliacoes.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("avaliacoes"))
# Repetir para os demais DataFrames
Renomeação das colunas para padrão maiúsculo e tratamento de sufixos¶
Função para renomear as colunas:
from pyspark.sql.functions import lit, current_timestamp
def renomear_colunas(diretorio):
df = spark.read.format('delta').load(diretorio)
tabela = diretorio.split('/')[-2]
novos_nomes = {}
for coluna in df.columns:
novo_nome = coluna.upper()
if novo_nome.endswith("_ID"):
prefixo = novo_nome[:-3]
novo_nome = f"CODIGO_{prefixo}"
else:
novo_nome = novo_nome.replace("ID", "CODIGO")
novos_nomes[coluna] = novo_nome
for antigo, novo in novos_nomes.items():
df = df.withColumnRenamed(antigo, novo)
for col_drop in ["DATA_HORA_BRONZE", "NOME_ARQUIVO"]:
if col_drop in df.columns:
df = df.drop(col_drop)
df = df.withColumn("NOME_ARQUIVO_BRONZE", lit(tabela))
df = df.withColumn("DATA_ARQUIVO_SILVER", current_timestamp())
df.write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/ecommerce/{tabela}")
def renomear_arquivos_delta(diretorio):
arquivos = dbutils.fs.ls(diretorio)
for arquivo in arquivos:
renomear_colunas(arquivo.path)
diretorio = f'/mnt/{storageAccountName}/bronze/ecommerce'
renomear_arquivos_delta(diretorio)
Notebook Gold¶
Objetivo¶
Ler os dados da camada silver, fazer algumas agregações entre tabelas e disponibilizar um modelo dimensional, com tabelas de dimensão e tabelas fato, ja com todos os dados em formatos delta prontos para serem consumidos por Power BI ou outra ferramenta de visualização de dados.
Montagem dos containers (mesma função do notebook Silver)¶
storageAccountName = ""
storageAccountAccessKey = ""
sasToken= ""
def mount_adls(blobContainerName):
try:
dbutils.fs.mount(
source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
)
print("OK!")
except Exception as e:
print("Falha", e)
Leitura em dos dados da Silver em formato Delta¶
df_avaliacoes = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/avaliacoes")
df_categorias = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/categorias")
df_clientes = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/clientes")
df_enderecos_cliente = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/enderecos_cliente")
df_entregas = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/entregas")
df_estoque = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/estoque")
df_formas_pagamento = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/formas_pagamento")
df_itens_pedidos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/itens_pedido")
df_pagamentos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/pagamentos")
df_pedidos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/pedidos")
df_produtos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/produtos")
df_transportadoras = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/transportadoras")
df_vendedores = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/ecommerce/vendedores")
Adiciona metadados para os dataframes utiizados no notebook¶
from pyspark.sql.functions import current_timestamp, lit
df_avaliacoes = df_avaliacoes.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("avaliacoes"))
df_categorias = df_categorias.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("categorias"))
df_clientes = df_clientes.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("clientes"))
df_entregas = df_entregas.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("entregas"))
df_estoque = df_estoque.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("estoque"))
df_formas_pagamento = df_formas_pagamento.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("formas_pagamento"))
df_itens_pedidos = df_itens_pedidos.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("itens_pedidos"))
df_pagamentos = df_pagamentos.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("pagamentos"))
df_pedidos = df_pedidos.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("pedidos"))
df_produtos = df_produtos.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("produtos"))
df_transportadoras = df_transportadoras.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("transportadoras"))
df_vendedores = df_vendedores.withColumn("data_hora_gold", current_timestamp()).withColumn("nome_arquivo", lit("vendedores"))
Cria as tabelas dimensionais¶
Tabela Dimensão clientes¶
%sql
CREATE TABLE IF NOT EXISTS dim_clientes (
ID BIGINT GENERATED ALWAYS AS IDENTITY,
CODIGO_CLIENTE INT,
NOME STRING,
EMAIL STRING,
TELEFONE STRING,
CIDADE STRING,
ESTADO STRING,
DATA_CADASTRO TIMESTAMP
)
USING delta
LOCATION '/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_clientes';
Tabela Dimensão produtos¶
%sql
CREATE TABLE IF NOT EXISTS dim_produtos (
ID BIGINT GENERATED ALWAYS AS IDENTITY, -- Chave substituta (SK)
CODIGO_PRODUTO INT, -- Chave natural do sistema transacional
NOME STRING,
DESCRICAO STRING,
PRECO DECIMAL(10,2),
CATEGORIA STRING,
DATA_CADASTRO TIMESTAMP
)
USING delta
LOCATION '/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_produtos';
Tabela Dimensão vendedores¶
%sql
CREATE TABLE IF NOT EXISTS dim_vendedores (
ID BIGINT GENERATED ALWAYS AS IDENTITY, -- Chave substituta (SK)
CODIGO_VENDEDOR INT, -- Chave natural do sistema de origem
NOME STRING,
EMAIL STRING,
TELEFONE STRING,
DATA_CADASTRO TIMESTAMP
)
USING delta
LOCATION '/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_vendedores';
Tabela Dimensão formas de pagamento¶
%sql
CREATE TABLE IF NOT EXISTS dim_formas_pagamento (
ID BIGINT GENERATED ALWAYS AS IDENTITY, -- Chave substituta (SK)
CODIGO_FORMA INT, -- Chave natural (caso exista)
DESCRICAO STRING
)
USING delta
LOCATION '/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_formas_pagamento';
Tabela Dimensão tempo¶
from pyspark.sql.functions import expr, date_format, year, month, dayofmonth, dayofweek, when, col
data_inicial = "2022-01-01"
data_final = "2025-12-31"
num_dias = spark.sql(f"SELECT datediff('{data_final}', '{data_inicial}')").collect()[0][0]
df_calendario = spark.range(0, num_dias + 1) \
.selectExpr(f"date_add(to_date('{data_inicial}'), CAST(id AS INT)) AS Data")
# Criar colunas nome mês e dia da semana com when (case when)
df_tempo = df_calendario.select(
col("Data"),
year("Data").alias("Ano"),
month("Data").alias("Mes"),
when(month("Data") == 1, "JANEIRO")
.when(month("Data") == 2, "FEVEREIRO")
.when(month("Data") == 3, "MARCO")
.when(month("Data") == 4, "ABRIL")
.when(month("Data") == 5, "MAIO")
.when(month("Data") == 6, "JUNHO")
.when(month("Data") == 7, "JULHO")
.when(month("Data") == 8, "AGOSTO")
.when(month("Data") == 9, "SETEMBRO")
.when(month("Data") == 10, "OUTUBRO")
.when(month("Data") == 11, "NOVEMBRO")
.when(month("Data") == 12, "DEZEMBRO")
.alias("NomeMes"),
dayofmonth("Data").alias("Dia"),
when(dayofweek("Data") == 1, "DOMINGO")
.when(dayofweek("Data") == 2, "SEGUNDA-FEIRA")
.when(dayofweek("Data") == 3, "TERCA-FEIRA")
.when(dayofweek("Data") == 4, "QUARTA-FEIRA")
.when(dayofweek("Data") == 5, "QUINTA-FEIRA")
.when(dayofweek("Data") == 6, "SEXTA-FEIRA")
.when(dayofweek("Data") == 7, "SABADO")
.alias("NomeDiaSemana"),
dayofweek("Data").alias("NumeroDiaSemana"),
date_format("Data", "yyyyMMdd").cast("int").alias("ID")
)
df_tempo.display()
df_tempo.write.mode("overwrite")\
.option("path", f"/mnt/{storageAccountName}/gold/ecommerce/dim_tempo")\
.saveAsTable("dim_tempo")
Tabela Dimensão entregas¶
%sql
CREATE TABLE IF NOT EXISTS dim_entregas (
ID BIGINT GENERATED ALWAYS AS IDENTITY, -- Chave substituta (SK)
CODIGO_ENTREGA INT, -- Chave natural (caso exista)
TRANSPORTADORA STRING,
STATUS STRING,
DATA_ENVIO TIMESTAMP,
DATA_ENTREGA TIMESTAMP
)
USING delta
LOCATION '/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_entregas';
Cria os merges e faz os inserts nas tabelas a partir de tabelas temporárias criadas com os dataframes¶
df_clientes.createOrReplaceTempView("silver_clientes")
df_enderecos_cliente.createOrReplaceTempView("silver_enderecos_cliente")
df_produtos.createOrReplaceTempView("silver_produtos")
df_categorias.createOrReplaceTempView("silver_categorias")
df_vendedores.createOrReplaceTempView("silver_vendedores")
df_formas_pagamento.createOrReplaceTempView("silver_formas_pagamento")
df_entregas.createOrReplaceTempView("silver_entregas")
df_transportadoras.createOrReplaceTempView("silver_transportadoras")
df_pedidos.createOrReplaceTempView("pedidos")
df_itens_pedidos.createOrReplaceTempView("itens_pedidos")
df_pagamentos.createOrReplaceTempView("pagamentos")
df_avaliacoes.createOrReplaceTempView("avaliacoes")
df_produtos.createOrReplaceTempView("produtos")
Tabela Dimensão clientes¶
%sql
-- Criar TEMP VIEW com os dados de origem
CREATE OR REPLACE TEMP VIEW dim_clientes_temp AS
SELECT
c.CODIGO AS CODIGO_CLIENTE,
c.NOME,
c.EMAIL,
c.TELEFONE,
ec.CIDADE,
ec.ESTADO,
c.DATA_CADASTRO
FROM silver_clientes c
LEFT JOIN silver_enderecos_cliente ec ON c.CODIGO = ec.CODIGO_CLIENTE;
-- MERGE na tabela de dimensão usando a chave natural
MERGE INTO dim_clientes AS target
USING dim_clientes_temp AS source
ON target.CODIGO_CLIENTE = source.CODIGO_CLIENTE
WHEN MATCHED THEN
UPDATE SET
target.NOME = source.NOME,
target.EMAIL = source.EMAIL,
target.TELEFONE = source.TELEFONE,
target.CIDADE = source.CIDADE,
target.ESTADO = source.ESTADO,
target.DATA_CADASTRO = source.DATA_CADASTRO
WHEN NOT MATCHED THEN
INSERT (CODIGO_CLIENTE, NOME, EMAIL, TELEFONE, CIDADE, ESTADO, DATA_CADASTRO)
VALUES (source.CODIGO_CLIENTE, source.NOME, source.EMAIL, source.TELEFONE, source.CIDADE, source.ESTADO, source.DATA_CADASTRO);
Tabela Dimensão produtos¶
%sql
-- Criar ou substituir a TEMP VIEW com os dados transformados
CREATE OR REPLACE TEMP VIEW dim_produtos_temp AS
SELECT
p.CODIGO AS CODIGO_PRODUTO,
p.NOME,
p.DESCRICAO,
p.PRECO,
c.NOME AS CATEGORIA,
p.DATA_CADASTRO
FROM silver_produtos p
LEFT JOIN silver_categorias c ON p.CODIGO_CATEGORIA = c.CODIGO;
-- Realizar o MERGE na tabela de dimensão
MERGE INTO dim_produtos AS target
USING dim_produtos_temp AS source
ON target.CODIGO_PRODUTO = source.CODIGO_PRODUTO
WHEN MATCHED THEN
UPDATE SET
target.NOME = source.NOME,
target.DESCRICAO = source.DESCRICAO,
target.PRECO = source.PRECO,
target.CATEGORIA = source.CATEGORIA,
target.DATA_CADASTRO = source.DATA_CADASTRO
WHEN NOT MATCHED THEN
INSERT (CODIGO_PRODUTO, NOME, DESCRICAO, PRECO, CATEGORIA, DATA_CADASTRO)
VALUES (source.CODIGO_PRODUTO, source.NOME, source.DESCRICAO, source.PRECO, source.CATEGORIA, source.DATA_CADASTRO);
Tabela Dimensão vendedores¶
%sql
-- Criar ou substituir a TEMP VIEW com os dados transformados
CREATE OR REPLACE TEMP VIEW dim_vendedores_temp AS
SELECT
CODIGO AS CODIGO_VENDEDOR,
NOME,
EMAIL,
TELEFONE,
DATA_CADASTRO
FROM silver_vendedores;
-- Realizar o MERGE na tabela de dimensão
MERGE INTO dim_vendedores AS target
USING dim_vendedores_temp AS source
ON target.CODIGO_VENDEDOR = source.CODIGO_VENDEDOR
WHEN MATCHED THEN
UPDATE SET
target.NOME = source.NOME,
target.EMAIL = source.EMAIL,
target.TELEFONE = source.TELEFONE,
target.DATA_CADASTRO = source.DATA_CADASTRO
WHEN NOT MATCHED THEN
INSERT (CODIGO_VENDEDOR, NOME, EMAIL, TELEFONE, DATA_CADASTRO)
VALUES (source.CODIGO_VENDEDOR, source.NOME, source.EMAIL, source.TELEFONE, source.DATA_CADASTRO);
Tabela Dimensão formas de pagamento¶
%sql
-- Criar ou substituir a TEMP VIEW com dados da origem
CREATE OR REPLACE TEMP VIEW dim_formas_pagamento_temp AS
SELECT
CODIGO AS CODIGO_FORMA,
DESCRICAO
FROM silver_formas_pagamento;
-- MERGE na dimensão dim_formas_pagamento
MERGE INTO dim_formas_pagamento AS target
USING dim_formas_pagamento_temp AS source
ON target.CODIGO_FORMA = source.CODIGO_FORMA
WHEN MATCHED THEN
UPDATE SET
target.DESCRICAO = source.DESCRICAO
WHEN NOT MATCHED THEN
INSERT (CODIGO_FORMA, DESCRICAO)
VALUES (source.CODIGO_FORMA, source.DESCRICAO);
Tabela Dimensão entregas¶
%sql
-- Criar ou substituir a TEMP VIEW com os dados de origem
CREATE OR REPLACE TEMP VIEW dim_entregas_temp AS
SELECT
e.CODIGO AS CODIGO_ENTREGA,
t.NOME AS TRANSPORTADORA,
e.STATUS,
e.DATA_ENVIO,
e.DATA_ENTREGA
FROM silver_entregas e
LEFT JOIN silver_transportadoras t ON e.CODIGO_TRANSPORTADORA = t.CODIGO;
-- Executar o MERGE na tabela de dimensão
MERGE INTO dim_entregas AS target
USING dim_entregas_temp AS source
ON target.CODIGO_ENTREGA = source.CODIGO_ENTREGA
WHEN MATCHED THEN
UPDATE SET
target.TRANSPORTADORA = source.TRANSPORTADORA,
target.STATUS = source.STATUS,
target.DATA_ENVIO = source.DATA_ENVIO,
target.DATA_ENTREGA = source.DATA_ENTREGA
WHEN NOT MATCHED THEN
INSERT (CODIGO_ENTREGA, TRANSPORTADORA, STATUS, DATA_ENVIO, DATA_ENTREGA)
VALUES (source.CODIGO_ENTREGA, source.TRANSPORTADORA, source.STATUS, source.DATA_ENVIO, source.DATA_ENTREGA);
Criação da tabela Fato vendas¶
%sql
CREATE TABLE IF NOT EXISTS fato_vendas (
ID BIGINT GENERATED ALWAYS AS IDENTITY, -- SK automática
CLIENTE_SK INT,
PRODUTO_SK INT,
VENDEDOR_SK INT,
TEMPO_SK INT,
FORMA_PAGAMENTO_SK INT,
ENTREGA_SK INT,
QUANTIDADE INT,
PRECO_UNITARIO DECIMAL(10,2),
VALOR_TOTAL DECIMAL(12,2),
TEMPO_ENTREGA_DIAS INT,
NOTA_AVALIACAO INT
)
USING delta
LOCATION '/mnt/datalake922b9abd80170b5b/gold/ecommerce/fato_vendas';
Depois criação de views temporária
spark.read.format("delta").load("/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_clientes").createOrReplaceTempView("dim_clientes")
spark.read.format("delta").load("/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_produtos").createOrReplaceTempView("dim_produtos")
spark.read.format("delta").load("/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_vendedores").createOrReplaceTempView("dim_vendedores")
spark.read.format("delta").load("/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_tempo").createOrReplaceTempView("dim_tempo")
spark.read.format("delta").load("/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_formas_pagamento").createOrReplaceTempView("dim_formas_pagamento")
spark.read.format("delta").load("/mnt/datalake922b9abd80170b5b/gold/ecommerce/dim_entregas").createOrReplaceTempView("dim_entregas")
e por fim adiciona os dados na tabela fato
%sql
MERGE INTO fato_vendas AS target
USING (
SELECT
dc.ID AS CLIENTE_SK,
dp.ID AS PRODUTO_SK,
dv.ID AS VENDEDOR_SK,
dt.ID AS TEMPO_SK,
dfp.ID AS FORMA_PAGAMENTO_SK,
de.ID AS ENTREGA_SK,
ip.QUANTIDADE,
ip.PRECO_UNITARIO,
ip.QUANTIDADE * ip.PRECO_UNITARIO AS VALOR_TOTAL,
DATEDIFF(de.DATA_ENTREGA, de.DATA_ENVIO) AS TEMPO_ENTREGA_DIAS,
av.NOTA AS NOTA_AVALIACAO
FROM pedidos p
INNER JOIN itens_pedidos ip ON ip.CODIGO_PEDIDO = p.CODIGO
INNER JOIN dim_clientes dc ON dc.CODIGO_CLIENTE = p.CODIGO_CLIENTE
INNER JOIN dim_produtos dp ON dp.CODIGO_PRODUTO = ip.CODIGO_PRODUTO
INNER JOIN produtos pr ON pr.CODIGO = ip.CODIGO_PRODUTO
INNER JOIN dim_vendedores dv ON dv.CODIGO_VENDEDOR = pr.CODIGO_VENDEDOR
INNER JOIN dim_tempo dt ON dt.Data = CAST(p.DATA_PEDCODIGOO AS DATE)
LEFT JOIN pagamentos pg ON pg.CODIGO_PEDIDO = p.CODIGO
LEFT JOIN dim_formas_pagamento dfp ON dfp.CODIGO_FORMA = pg.CODIGO
LEFT JOIN dim_entregas de ON de.CODIGO_ENTREGA = p.CODIGO_ENDERECO_ENTREGA
LEFT JOIN (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY CODIGO_PRODUTO, CODIGO_CLIENTE ORDER BY DATA_AVALIACAO DESC) AS rn
FROM avaliacoes
) av ON av.CODIGO_PRODUTO = ip.CODIGO_PRODUTO
AND av.CODIGO_CLIENTE = p.CODIGO_CLIENTE
AND av.rn = 1
) AS source
ON
target.CLIENTE_SK = source.CLIENTE_SK
AND target.PRODUTO_SK = source.PRODUTO_SK
AND target.VENDEDOR_SK = source.VENDEDOR_SK
AND target.TEMPO_SK = source.TEMPO_SK
AND target.FORMA_PAGAMENTO_SK = source.FORMA_PAGAMENTO_SK
AND target.ENTREGA_SK = source.ENTREGA_SK
AND target.QUANTIDADE = source.QUANTIDADE
AND target.PRECO_UNITARIO = source.PRECO_UNITARIO
WHEN NOT MATCHED THEN
INSERT (
CLIENTE_SK,
PRODUTO_SK,
VENDEDOR_SK,
TEMPO_SK,
FORMA_PAGAMENTO_SK,
ENTREGA_SK,
QUANTIDADE,
PRECO_UNITARIO,
VALOR_TOTAL,
TEMPO_ENTREGA_DIAS,
NOTA_AVALIACAO
)
VALUES (
source.CLIENTE_SK,
source.PRODUTO_SK,
source.VENDEDOR_SK,
source.TEMPO_SK,
source.FORMA_PAGAMENTO_SK,
source.ENTREGA_SK,
source.QUANTIDADE,
source.PRECO_UNITARIO,
source.VALOR_TOTAL,
source.TEMPO_ENTREGA_DIAS,
source.NOTA_AVALIACAO
);