Planfy: Sistema de Classificação de Recorrência de Despesas
Visão Geral
Este projeto faz parte da plataforma de finanças pessoais Planfy, com foco na identificação e categorização de despesas como recorrentes ou não recorrentes. Esse recurso é crucial para ajudar os clientes a definirem metas de gastos e gerenciar melhor seus controles financeiros.
Definição do Problema
O principal desafio abordado por este projeto é a classificação precisa das despesas. Isso é necessário para auxiliar no orçamento, na previsão e no gerenciamento das atividades financeiras dos clientes do Planfy.
Desenvolvimento e Execução
O processo de engenharia de dados para disponibilizar os dados de entrada para o algoritmo e fornecer os resultados para a Planfy envolve várias ferramentas, como serviços da AWS (API Gateway, SNS e SQS) e Databricks (Spark e Delta Lake). A análise exploratória de dados e o desenvolvimento do algoritmo de classificação foram realizados usando o framework Kedro em conjunto com a biblioteca Pandas. O algoritmo desenvolvido foi então convertido para execução em clusters Spark dentro do Databricks, permitindo sua operação em um ambiente de Big Data.
Estrutura do Pipeline
Um esquema simples do pipeline desenvolvido pode ser visto na Figura abaixo.
Para classificar novas despesas diariamente, também foi desenvolvida uma infraestrutura para receber eventos de atualizações da fonte de dados, a Pluggy. Assim, quando esses eventos são recebidos, os novos dados são ingeridos no Data Lake, por meio do Apache NiFi, e posteriormente processados, por meio do Databricks, para serem disponibilizados ao algoritmo de classificação. Um esquema simples dessa infraestrutura pode ser visto na Figura abaixo.
Estágios Desenvolvidos
- A etapa inicial do projeto começou com o desenvolvimento da infraestrutura para receber eventos do webhook da Pluggy, a fonte de dados para as contas e transações bancárias dos clientes do Planfy. Dentro dessa infraestrutura, foi criada uma API usando o AWS API Gateway para receber esses eventos. Posteriormente, foi estabelecida uma integração da API com o AWS SNS para publicar todos os eventos em um tópico. Em seguida, foi criada uma fila no AWS SQS para receber esses eventos e armazená-los temporariamente até serem lidos pelo Apache NiFi.
- A segunda etapa envolveu o desenvolvimento de um pipeline no Apache NiFi para ler os eventos da fila do AWS SQS, solicitar dados de contas e transações da Pluggy e, em seguida, enviar esses dados para o Data Lake.
- O terceiro estágio envolveu o desenvolvimento do job
update_pluggy_tables
, que, por meio das tasksupdate_accounts
eupdate_transactions
, extrai os dados brutos no formato JSON, executa transformações e limpeza, aplica verificações de qualidade e carrega os dados em tabelas no formato Delta (accounts
etransactions
). Para a extração dos dados brutos, o Spark Structured Streaming foi usado em conjunto com o Databricks Auto Loader. - A quarta etapa do projeto foi a análise exploratória dos dados nos notebooks intitulados
v1.0_data_description
,v1.1_variable_filtering
ev1.2_feature_engineering
. - O quinto estágio envolveu a conversão do algoritmo de classificação, inicialmente desenvolvido usando o Kedro e o Pandas, para o Spark e o salvamento dos resultados em uma tabela Delta chamada
fixed_expenses_classification
no catálogo analytics. Além disso, foram adicionadas verificações de qualidade aos dados de resultados. Essa etapa está presente no scriptclassify_fixed_expensures.py
. - A sexta e última etapa, realizada por meio do script
send_to_planfy.py
, envolveu a captura das despesas recém-classificadas e a disponibilização desses dados para o Planfy.
Visão Geral dos Scripts
update_accounts.py
e update_transactions.py
Ambos os scripts lidam com o processo de ETL para os dados de contas e transações, movendo-os da camada raw para a camada stage. Esse processo inclui:
- Carregamento de dados da camada raw do Data Lake usando o Spark Structured Streaming e o Auto Loader (Databricks).
- Verificação da qualidade dos dados com base em algumas restrições.
- Inserir os dados na tabela Delta.
classify_fixed_expensures.py
Esse script inclui as etapas de engenharia de features necessárias para classificar as despesas. Ele descreve o processo de carregamento dos dados, aplicando a engenharia de features para categorizar as despesas e, em seguida, atualizando uma tabela na camada analytics com os resultados. As principais operações incluem:
- Carregamento dos dados da área de stage.
- Classificação de despesas fixas e variáveis com base em categorias predefinidas.
- Detecção de despesas recorrentes usando window functions.
- Atualização da tabela Delta com novas classificações de despesas.
send_to_planfy.py
O segundo script lida com o processo de envio dos dados classificados para a plataforma Planfy. Isso envolve:
- Rastrear e atualizar a versão dos dados lidos da tabela Delta.
- Buscar novas classificações de despesas com base na última versão lida.
- Enviar os novos dados de despesas classificadas para o banco de dados do Planfy usando o Spark e um recurso do Delta Lake, chamado Change Data Feed (CDF).
Uso
Não é necessária nenhuma configuração adicional da estrutura do Kedro. Todo o processo é gerenciado por meio de notebooks do Databricks, com cada etapa claramente definida e encapsulada em scripts Python para manutenção e escalabilidade.
Próximas Etapas
- Finalizar o método de transferência de dados para a plataforma Planfy (inserção direta no banco de dados ou via API).
- Implantar o pipeline usando Databricks Asset Bundles.