ETL leve com AWS Lambda, DuckDB e Delta-RS
Artigo japonês original: AWS Lambda × DuckDB × Delta-RS による ETL の実装 Introdução I’m Aki, um construtor comunitário da AWS (@jitepengin). Nos meus artigos anteriores, concentrei -me principalmente no Apache iceberg: enquanto o Iceberg está rapidamente se tornando o padrão de fato dos formatos de tabela aberta (OTF), o Delta Lake ainda é a primeira opção ao usar o Databricks. Neste artigo, percorrerei uma abordagem para reduzir os custos descarregando o pré-processamento para o ETL leve antes de entregar a carga de trabalho a Databricks.A idéia é construir a camada de bronze com ETL baseado em Lambda, enquanto alavancando os bancos de dados para camadas de prata e ouro (agregação e análise). O que é o lago Delta? O lago Delta é um formato de data lago compatível com ácido construído em cima do Apache Parquet. Principais características: transações ácidas: garante consistência mesmo com gravações simultâneas. Viagem no tempo: consulta versões históricas dos dados. Evolução do esquema: Adicionar ou modificar facilmente colunas. Otimizações de desempenho: cluster de ordem z e pular dados. É o formato de armazenamento primário nos bancos de dados e amplamente utilizado para pipelines ETL, BI e ML. A arquitetura usada neste artigo aqui está a arquitetura que discutiremos: o estágio de pré -processamento (pouso → bronze) é tratado por lambda com saída de formato delta. O bronze → prata e os estágios de ouro, que requerem computação mais pesada, são tratados por bancos de dados. Foco deste artigo: The Red Box (Landing → Bronze). Landing → Bronze: ETL leve com Lambda (formato delta) Bronze → Prata: Bancos de Databricks (agregação, limpeza, modelagem) Prata → Gold: Databricks (KPIs de negócios, MARTs de dados) Bibliotecas usadas dentro de Lambda: Duckdb: In-Memory OLAP Engine para Sql Spells SQUERIAS. Pyarrow: formato de dados de alto desempenho para transferência/conversão eficientes. Delta-RS: Biblioteca Rust/Python para operações de leitura/gravação de Delta Lake. O DuckDB DuckDB é um mecanismo de banco de dados OLAP incorporado. É leve e funciona totalmente na memória, tornando-o um ótimo ajuste para lambda.duckdb é especialmente poderoso para consultas analíticas e cargas de trabalho de ETL em lote. O Pyarrow Pyarrow é a ligação do Python do Apache Arrow.arrow fornece um formato de memória colunar rápido otimizado para análise de dados e computação distribuída.pyarrow permite operações de dados eficientes, manuseio de esquemas e interoperabilidade em diferentes sistemas. Delta-RS Delta-RS é uma implementação de ferrugem do lago Delta, com encadernas para Python e outros idiomas. Suporta transações ácidas, evolução do esquema e viagens no tempo. Em python, está disponível como o pacote Deltalake. Configurando o Databricks com a AWS Crie um local externo Escolha a opção de início rápido da AWS e insira os detalhes do seu balde S3. Execute a CloudFormation Isso é executado automaticamente ao criar o local externo. Crie um catálogo com essas etapas, o Databricks e a integração S3 são concluídos por meio da GUI. Você pode criar e gerenciar tabelas com SQL simples. Embalagem para Lambda Como as dependências são grandes, usei uma imagem de contêiner. Dockerfile de public.ecr.aws/lambda/python:3.12 workdir/var/cópia de tarefa requisitos.txt. Execute o PIP Install -R requisitos.txt – -Target “$ {lambda_task_root}” copie lambda_function.py. Cmd [“lambda_function.lambda_handler”]
Digite os requisitos do modo de tela cheia de tela cheia.TXT DuckDB PYARROW Deltalake> = 1.1.4 Digite o modo de tela cheia Sair da tela cheia código de amostra Código de amostra Importar DuckDB Importar pyarrow como PA de DelTalake import Write_DelTalake Def lambda_handler (Evento): Try: DUTMB_CONNECTIONS = DATANCONECTB.CONNETN = DATATENCONECT). Duckdb_Connection.execute (“Set home_directory = ‘/tmp'”) DUCKDB_CONNECTION.EXECUTE (“Instalar httpfs;”) Duckdb_Connection.execute (“Carregar httpfs;”) # Obter arquivo de entrada do s3 Event S3_BUCT = Event = Event = Evento[‘Records’][0][‘s3’][‘bucket’][‘name’]
s3_key = evento[‘Records’][0][‘s3’][‘object’][‘key’]
s3_input_path = f “s3: // {s3_bucket}/{s3_key}” print (f “s3_input_path: {s3_input_path}”) query = f “” selecione * de read_parquet (‘{s3_inin_path}’) Duckdb_Connection.Execute (Query) .Fetch_arrow_Table () print (F “contagem de linha: {Result_arrow_table.num_Rows}”) Print (f “Schema: {Result_arrow_table.Schema}) # —- Converte colunas de Timestamp —- Schema = []
Para o campo em resultado_arrow_table.schema: se pa.types.is_timestamp (field.type) e field.type.tz não é: # timestamp timestamp[us] → Timestamp[ns, tz=”UTC”]
schema.append (pa.field (field.name, pa.timestamp (“ns”, tz = “utc”))) else: schema.append (field) new_schema = pa.schema (schema) result_arrow_table = resultado_arrow_table.cast.cast (new_schema) (scroms) (schema) prost_table = result_arrow_table.cast.cast. (new_schema) (schaned_table = resultado_trow_table.cast.cast. {Result_arrow_table.schema} “) # Tabela Delta Caminho de saída S3_OUTPUT_PATH =” S3: // Your-Bucket ” # Escreva para delta lake write_deltalake (s3_output_path, result_arrow_table, mode =” Append “, # ou” Subrite “) (F” Scectenting para delta, para delta, para delta. Imprimir (Ocorrer Erro F “Ocorreu: {e}”) Digite Modo de tela de tela cheia Modo de tela cheia NOTE IMPORTANTE NOTA: WRITERVERSÃO> = 7 Se uma tabela Delta tiver WriterVersion> = 7 e contém o Timestamps de Timestamps de Timestrons de Timestrono: Modo de Timestão de Erro: o Writer deve ser especificado para WriterSversion> = 7, Especificar o TimestOngOmToTTimezona[us] → Timestamp[ns, tz=”UTC”] antes de escrever. Certifique -se de estar usando o Deltalake> = 1.1.4. REFERÊNCIAS: Exemplo de execução Dados de amostra: dados de táxi de Nova York (AWS Marketplace) Lambda acionada via S3: escreveu com sucesso dados no formato Delta Lake (OTF)! Além do bronze de bronze → Prata (Databricks): normalização, junções, agregação, verificações de qualidade de dados, modelagem de esquema. Prata → Gold (Databricks): KPIs de negócios, dados de dados, conjuntos de dados prontos para BI/ML. Exemplo de responsabilidades de fluxo de trabalho: Notas da ferramenta Etapa Notas → Bronze Lambda / DuckDB / Delta-Rs Formato delta de custo eficiente em Bronze S3 → Bancos de Dabricks de Prata, Modelagem, Modelamento, Prata de Prata → Bancos de Dados de Goldes NO BUSTING, NO, BI / ML Proady Pros e Cons PRO PRÓSSIMANTE ENCFIFIFÍFICA EFFIIFIFICÁVEL: Desenvolvimento fácil com bibliotecas Lambda + Python. Baseado em SQL (sintaxe do tipo Postgresql). Processamento na memória eficiente (DuckDB). ETL em tempo real com gatilhos S3. Limite de memória Lambda CONS (10 GB máx). Limite de execução de 15 minutos (não é ideal para conjuntos de dados muito grandes). Pode exigir computação alternativa (por exemplo, ECS/Fargate) para escala. Conclusão Nós exploramos como usar o AWS Lambda + DuckDB + Delta-Rs para um oleoduto ETL leve no lago Delta. Os bancos de dados são poderosos, mas podem ser caros para o pré-processamento de cargas de trabalho que não requerem computação pesada. Com a descarga de aterrissagem → ETL de bronze no lambda, você pode obter: menor custos em tempo real (através de gatilhos de sinicentos de que você é um passo mais simples e mais flexível.
Fonte
Publicar comentário