Desvendando o Change Data Feed (CDF) no Delta Lake: + Spark Streaming
Explore o Futuro da Gestão de Dados Delta com CDF e Spark Streaming! Descubra como rastrear, transmitir e analisar alterações em tempo real para uma eficiência incomparável. Potencialize suas ...
Explore o Futuro da Gestão de Dados Delta com CDF e Spark Streaming! Descubra como rastrear, transmitir e analisar alterações em tempo real para uma eficiência incomparável. Potencialize suas operações ETL/ELT, integre-se perfeitamente a sistemas downstream e mantenha uma trilha de auditoria abrangente. Não perca a oportunidade de transformar a maneira como gerencia dados com Delta Lake.
Aproveitando o Poder do Change Data Feed (CDF) no Delta Lake
O recurso Change Data Feed (CDF) no Delta Lake é uma ferramenta poderosa que permite às tabelas Delta rastrearem alterações em nível de linha entre diferentes versões. Quando ativado em uma tabela Delta, o CDF registra eventos de alteração, incluindo dados da linha e metadados indicando se a linha foi inserida, excluída ou atualizada.
Casos de Uso:
Tabelas Silver e Gold: Aprimore o desempenho Delta processando apenas alterações ao nível de linha após operações iniciais MERGE, UPDATE ou DELETE. Isso acelera e simplifica operações ETL e ELT.
Transmissão de Alterações: Envie um feed de dados alterados para sistemas downstream, como Kafka ou RDBMS, para processamento incremental em estágios posteriores de pipelines de dados.
Tabela de Trilha de Auditoria:Capture o feed de dados alterados como uma tabela Delta, proporcionando armazenamento perpétuo e capacidade de consulta eficiente para visualizar todas as alterações ao longo do tempo, incluindo exclusões e atualizações.
Ativando CDF para a Tabela de Leitura:
ALTER TABLE nome_tabela SET TBLPROPERTIES(delta.enableChangeDataFeed=true)
Como fica a estrutura desse dado no Lake:
Será criado uma pasta “_change_data” para o processo após habilitar ChangeDataFeed.
Após este processo na sua tabela aparecerá 3 novas colunas de dados.
Lendo os Dados:
dfReadStream = spark.readStream.format('delta')\
.option("readChangeFeed", "true")\
.option("enableChangeDataFeed", "true")\
.option('cloudFiles.inferColumnTypes', 'true')\
.option('cloudFiles.schemaLocation', schemalocal)\
.option('cloudFiles.schemaEvolutionMode', 'addNewColumns')\
.table(nome_tabela)
O método dfReadStream.writeStream segue normalmente seu fluxo, porém caso tenha necessidade de utilizar o foreachBatch no processo, segue abaixo um código ninja que vc não vai encontrar em lugar algum hehehehe.
Caso tenha necessidade de utilizar o foreachBatch no processo:
#CDF
streamQuery = (dfReadStream.writeStream
.format('delta')
.outputMode('append')
.foreachBatch(upsertToDeltaLive(
nome_banco = nome_banco
,nome_tabela = nome_tabela
,caminho_Gravacao = caminho_Gravacao
,merge_condition = '''s.chave_tbl = t.chave_tbl'''
,table_id=['chave_tbl']
,dataAtualizacao = 'lastupdate'))
.queryName(nome_tabela)
.option('checkpointLocation', checkpoint)
.trigger(once=True)
.start())
#Função micro-batch para cdc no delta
def upsertToDeltaLive(nome_banco, nome_tabela, caminho_Gravacao, merge_condition, table_id, dataAtualizacao):
def _do_work(microbatchOutputDF, batchId):
spark.catalog.clearCache()
for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
rdd.unpersist()
print("Unpersisted {} rdd".format(id))
if(spark._jsparkSession.catalog().tableExists(f'{nome_banco}.{nome_tabela}')):
#add ingestion date foreach microbatch
microbatchOutputDF = add_ingestion_date(microbatchOutputDF)
#remove duplicadaas em caso de acumulos de batchs
microbatchOutputDF = microbatchOutputDF.withColumn("rn2", row_number()\
.over(Window.partitionBy([col(x) for x in table_id])\
.orderBy(microbatchOutputDF[dataAtualizacao].desc())))
microbatchOutputDF = microbatchOutputDF.filter('rn2 = 1')
microbatchOutputDF = microbatchOutputDF.drop(col('rn2'))
#upsert merge
deltadf = DeltaTable.forName(spark, f'{nome_banco}.{nome_tabela}')
(deltadf.alias('t')
.merge(
microbatchOutputDF.alias('s'),
merge_condition)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
microbatchOutputDF = add_ingestion_date(microbatchOutputDF)
microbatchOutputDF = microbatchOutputDF.withColumn("rn2", row_number()\
.over(Window.partitionBy([col(x) for x in table_id])\
.orderBy(microbatchOutputDF[dataAtualizacao].desc())))
microbatchOutputDF = microbatchOutputDF.filter('rn2 = 1')
microbatchOutputDF = microbatchOutputDF.drop(col('rn2'))
print('Primeira Ingestão')
microbatchOutputDF.write.mode('overwrite').option('path', f'{caminho_Gravacao}')\
.option("Mergeschema", "True")\
.format('delta').saveAsTable(f'{nome_banco}.{nome_tabela}')
return _do_work
Alterar armazenamento de dados
Delta Lake registra dados alterados para operações UPDATE, DELETE e MERGE na changedata dentro da pasta no diretório da tabela Delta. Esses registros podem ser ignorados quando o Delta Lake detecta que pode calcular com eficiência o feed de dados alterados diretamente do log de transações. Em particular, operações somente de inserção e exclusões completas de partição não gerarão dados no changedata diretório.
Os arquivos da changedata seguem a política de retenção da tabela. Portanto, se você executar o comando VACUUM , os dados do feed de dados alterados também serão excluídos.
Vou deixar o FAQ direto do fornecedor com as perguntas e notebook de exemplo.
Qual é a sobrecarga de habilitar o feed de dados de alterações?
Não há nenhum impacto significativo. Os registros de dados de alterações são gerados em linha durante o processo de execução da consulta e geralmente são muito menores do que o tamanho total dos arquivos reescritos.
Qual é a política de retenção para os registros de alteração?
Os registros de alteração seguem a mesma política de retenção das versões de tabela desatualizadas e serão limpos por meio de VACUUM se estiverem fora do período de retenção especificado.
Quando novos registros ficam disponíveis no feed de dados de alterações?
Os dados de alterações são confirmados com a transação do Delta Lake e serão disponibilizados ao mesmo tempo que os novos dados ficam disponíveis na tabela.
Exemplo de notebook: propagar alterações com o feed de dados de alteração Delta
Esse notebook mostra como propagar as alterações feitas em uma tabela Silver de número absoluto de vacinações para uma tabela Gold de taxas de vacinação.