Tarefa: Criar esqueleto de serviço de pipeline ETL
[x] 2.1 Crie o esqueleto de serviço de tubulação ETL Implemente a estrutura do APACHE Airflow DAG para processamento de lote Crie utilitários de conexão de banco de dados para o Oracle, SQL Server, Postgresql grava a validação de dados e as funções de limpeza de 1,11 Funções de manuseio de erros e requisitos de manuseio de erros de manuseio: Esqueleto de serviço Um serviço abrangente de pipeline ETL foi implementado com sucesso usando o Apache Airflow, projetado especificamente para o processamento de dados de fabricação de semicondutores. Componentes principais construídos: DAG principal (semicondutor_data_pipline.py) manuseio de pipeline programado por hora MES, WAT, CP e Dados de rendimento. Extração paralela de várias fontes de dados. Estágios seqüenciais: validação → limpeza → enriquecimento → carregamento. Manuseio de erro robusto com tentativas e notificações por e -mail. Relatórios de qualidade de dados incluídos. Extratores de dados (extractors.py) mesextractor: extrai genealogia do lote, etapas de processo e parâmetros. Watextractor: extrai os resultados dos testes elétricos com estruturas PCM. Cpextractor: extrai resultados do teste da sonda no nível da matriz, incluindo mapas de compartimento. Rendimento Extrato: os extratos produzem métricas, dados de compartimento e análise de defeitos. Todos os extratores executam a extração estruturada com o manuseio adequado de erros. As conexões do banco de dados (conexões.pys.py) suportam vários bancos de dados: PostgreSQL (MES, WAT, DW) e Oracle (Systems Legacy). Usa o pool de conexão SQLalChemy para desempenho otimizado. A integração Redis permite o cache para acesso mais rápido. Os gerentes de contexto garantem a limpeza automática da sessão. Verificações de saúde implementadas para todas as conexões. Transformadores de dados (Transformers.py) DataValidator: validação abrangente de dados entre os tipos. Datacleaner: padroniza os dados e executa a conversão do tipo. DataenRicher: adiciona metadados do equipamento e contexto de lote. ValidationResult: Relatório estruturado de resultados de validação. Carregadores de dados (carregadores.py) DataWareHouseLoader: carrega dados em tabelas de fatos e dimensões do esquema de estrelas. Datalakeloader: gerencia zonas cruas e com curadoria com a partição no Data Lake. Implementa o log de métricas para monitoramento e observabilidade. Usa o carregamento em massa baseado em pandas para maximizar o desempenho. Architecture and Features: Here is a more readable and organized revision of the detailed description for Task 2.1, outlining the ETL pipeline service skeleton and files: Task 2.1: Create ETL Pipeline Service Skeleton Item: Implement Apache Airflow DAG Structure for Batch Processing File Description services/data-ingestion/etl-pipeline/src/dags/semiconductor_data_pipeline.py Main Airflow DAG orchestrating an Pipeline ETL por hora para sistemas de dados de MES, WAT, CP e Rendimento. Ele suporta extração paralela, validação seqüencial → Limpeza → Enriquecimento → Carregamento, manuseio de erros com 3 tentativas, alertas de email e relatórios de qualidade de dados. Item: Crie utilitários de conexão de banco de dados para Oracle, SQL Server, PostGresql Arquivo Descrição Serviços/Data-Ingestion/Etl-PipeLine/SRC/Database/Connection.Py Gerenciador de conexão de banco de dados Suporte a vários bancos de dados (Postgresql para MES/WAT/DW Sistemas Legacy), Setenting Pooling com SqlalChemy manuseio. Serviços/Data-ingestion/ETL-PiPeline/SRC/Database/__ init__.py Pacote Inicializador para o módulo de banco de dados. Item: Escreva funções de validação e limpeza de dados Descrição do arquivo Descrição Serviços/Data-ingestion/etl-PiPeline/src/etl/transformers.py DataValidator Class com mais de 150 regras de validação específicas do semicondutor (IDs de lotes, texerferos, valores de teste, porcentagens de rendimento). Datacleaner para padronização e normalização de dados (IDs em alta, digite conversão, normalização de parâmetros). O DataenRicher adiciona metadados do equipamento e contexto de lote. Item: Implemente os mecanismos de manuseio de erros e os mecanismos de mensagem Descrição Serviços/Data-ingestion/ETL-PiPeline/src/etl/extractors.py extratores de dados com manuseio de erros robustos e lógica de tentativa. Inclui Basextractor, Mesextractor (genealogia do lote), Watextractor (testes elétricos), CPEXtractor (resultados da sonda no nível do nível) e rendimento (métricas de rendimento). Erro abrangente registrando integrado. Arquivos de infraestrutura de suporte ARQUIVO DE ARQUIVO DE Descrição Serviços/Data-Ingestion/ETL-PiPeline/SRC/ETL/Carreiros.py Carregadores de Dados para o Data Warehouse (tabelas de fato/dimensão do Star Schema) e Data Lake (zonas cruas/com curadoria com particionamento). Suporta o carregamento a granel com o log de pandas e métricas para observabilidade. Serviços/Data-ingestion/ETL-PiPeline/SRC/UTILS/LOGGING_UTILS.Py Importações compartilhadas JSON Logging Utilities and Metrics Collection. Serviços/Data-ingestion/ETL-PiPeline/Src/__ init__.py ETL Pacote de serviço Inicialização. Serviços/Data-ingrestion/ETL-PiPeline/SRC/ETL/__ init__.py ETL Módulo Inicialização. Serviços/Data-ingestion/ETL-PiPeline/SRC/UTILS/__ INIT__.PILIZAÇÃO PACOTO DE UTILIDADES. Recursos técnicos -chave por estrutura de DAG de arquivo (semicondutor_data_pipline.py): 4 extratores simultâneos (mes, wat, cp, rendimento) em paralelo. Processamento de dados seqüenciais: Validação → Limpeza → Enriquecimento → Carregamento. Resiliente com 3 tentativas, atrasos de tentativa de 5 minutos e tempo de trabalho de 2 horas. Monitoramento com alertas de email, registro de execução e relatórios de qualidade de dados. Conexões de banco de dados (conexões.pys.py): pool de conexões definido com 10 conexões básicas, 20 transbordamento, reciclagem de 1 hora. Suporta PostgreSQL (MES, WAT, DW), Oracle (Legacy) e Redis (cache). Usa os gerentes de contexto para compromisso automático e reversão da sessão. Utilitários de monitoramento de saúde fornecidos para todos os tipos de conexão. Processamento de dados (Transformers.py): implementa mais de 150 regras de validação para dados específicos de semicondutores. Limpa os dados através da conversão de tipo, padronização e normalização do formato. Enriquece dados com metadados do equipamento e contexto de lote/processo. Coleta métricas de qualidade, incluindo taxas de validação, contagem de erros e integridade dos dados. Extração de dados (extractors.py): Extrações de dados de dados de fabricação de semicondutores, incluindo: MES: genealogia de lote, etapas de processo, parâmetros de equipamento. WAT: Testes elétricos, estruturas PCM, limites paramétricos. CP: Resultados dos testes no nível da matriz, mapas de compartimento, coordenadas espaciais. Rendimento: métricas de rendimento, análise de defeitos, distribuições de compartimento. Carregamento de dados (carregadores.py): carrega dados em um armazém de dados de esquema em estrela com tabelas de fatos e dimensões. Gerencia as zonas de Data Lake Raw e com curadoria com a partição de data. Desempenho otimizado com formatos de carregamento em massa e arquivo parquet. Design escalável que suporta armazenamento distribuído e gerenciamento de partição. Esse esqueleto de pipeline ETL fornece uma base pronta para a produção para processar dados de fabricação de semicondutores com eficiência, com tratamento abrangente de erros, validação, enriquecimento, monitoramento e gerenciamento de dados escaláveis.
Fonte