21 points par hiddenest 2020-12-24 | 2 commentaires | Partager sur WhatsApp

Dans un environnement où le nombre mensuel moyen d’événements dépasse les 10 milliards, il a fallu analyser rapidement les données pour réaliser des analyses de comportement utilisateur de type cohortes.

(ex. femmes dans la trentaine ayant dépensé plus de 100 000 wons par mois dans notre app au cours des 6 derniers mois → leur taux de revisite)

Cet article raconte comment un datastore, que les développeurs se contentaient jusqu’ici d’utiliser, a finalement été implémenté en interne.

Pour implémenter des requêtes d’analyse du comportement utilisateur…

  • Il faut pouvoir interroger des métriques qui n’ont pas été pré-calculées à l’avance (+ permettre de nouveaux types d’analyse sans re-indexing)

  • Lors d’un Group By des données d’événements par utilisateur, il faut réduire le goulet d’étranglement du High Cardinality Shuffle

Utiliser une solution existante ou créer la nôtre ?

  • Druid était déjà utilisé ailleurs, mais ses limites liées au Pre-Aggregation (lecture de valeurs déjà calculées) le rendaient inadapté à cette fonctionnalité

  • Il est possible d’exploiter à grande échelle des data warehouses comme Snowflake ou Redshift, mais leur caractère très généraliste impose d’opérer un cluster trop important par rapport à l’objectif, donc trop coûteux

  • Pour couvrir des besoins variés comme le funnel ou le matching d’ID, les bases de données SQL ont leurs limites

Finalement, créer directement le datastore

  • Luft = un datastore optimisé dès le départ pour exécuter rapidement des requêtes d’analyse du comportement utilisateur regroupées par ID utilisateur

  • Construit sur Golang

  • Analyse des données utilisateur de plusieurs dizaines de To avec moins de 5 nœuds, en moyenne en 3 secondes et au maximum en 10 secondes

  • Contrairement à un SGBDR classique, il est immutable (et peut, si nécessaire, écraser les données d’une même période) → design de cluster simple, hautes performances sans implémenter de gestionnaire de pages complexe, et liberté de concevoir le format de stockage souhaité

Décomposer la base technique

  • TrailDB (moteur de stockage) - un rowstore d’événements time series optimisé pour le partitionnement par ID utilisateur

→ dictionnarise les valeurs et ne stocke que leur ID

→ trie les événements utilisateur par ordre chronologique et ne stocke que l’écart de temps par rapport à l’événement précédent ainsi que les colonnes modifiées (car la plupart des attributs utilisateur ne changent pas)

→ pas d’index, full scan obligatoire

→ mais avec un taux de compression étonnamment élevé (CSV 13GB → ~TrailDB 300mb)

→ comme la complexité temporelle est en O(n), l’idée a été de réduire la complexité spatiale

  • LLVM (moteur de requête)

→ mais TrailDB ne fournit que des equals de type OR-AND, et il faut transmettre les requêtes parsées en Go vers C/C++

→ découverte que PostgreSQL compile les requêtes via LLVM JIT

→ comme les fonctionnalités des requêtes évoluent souvent, cela évite d’augmenter le coût de développement en écrivant en C/C++ (il suffit de générer l’IR LLVM en Golang puis de l’exécuter en JIT côté C/C++)

  • Créer directement la couche de calcul

→ MapReduce est largement utilisé, mais inutilisable ici à cause du choix de Golang

→ Spark/Hadoop sont optimisés pour les Long-running Job, donc même en les raccordant les performances ne sont pas au rendez-vous

→ cela aussi a été développé en interne → https://github.com/ab180/lrmr

→ combinaison de gRPC + Protobuf + etcd, avec reprise de nombreux éléments du design familier de Spark

→ abandon de la resiliency → en poussant les performances à l’extrême, même en cas d’incident, relancer depuis le début prend moins de 10 secondes

→ face aux fréquents buffer overflows dus au traitement de gros volumes de données (Backpressure), passage à un Pull-based Event Stream (adopté aussi par Kafka, Armeria, etc.)

  • Implémenter directement le sharding

→ shard = historical node

→ et si on utilisait la plage de dates d’une partition comme clé de sharding ?

→ toutes les requêtes comportent une dimension temporelle → filtrage facile

→ sur une même plage temporelle, les volumes de données sont similaires → répartition facile

→ les environnements distribués ne sont pas élégants…

→ que faire si un nœud tombe ou si un nouveau nœud est ajouté ?

→ que faire si l’espace de stockage est saturé ?

→ que faire si, à cause d’une panne, la charge se concentre sur un seul nœud ?

→ personnalisation de la Cost Function de Druid pour augmenter le coût lorsque les plages de dates des partitions sont proches ou se chevauchent

→ pour assurer la disponibilité des shards, les actions suivantes ont été mises en place

→ ajout d’un TTL aux informations de shard et rafraîchissement périodique (etcd)

→ stockage des partitions sur S3 et gestion de leur liste via DynamoDB

Situation actuelle en production

  • Scan de 500GB de données en moins de 15 secondes avec seulement 4 instances c5.2xlarge

Objectifs à venir (ou travaux à faire)

  • Réaliser des analyses de funnel en temps réel avec un cluster de moins de 10 machines

  • Ajouter la prise en charge de Spark pour permettre l’intégration ML, entre autres

  • Développement en cours d’un column store maison (Ziegel) pour remplacer TrailDB

→ optimisation SIMD et multicœur

→ filtrage préalable basé sur les attributs utilisateur via Bitmap Index

2 commentaires

 
gera1d 2020-12-24

traildb est intéressant. https://www.youtube.com/watch?v=-oPFxSwn0lM C’est intéressant. Même si la vidéo est ancienne, traildb n’a probablement pas changé entre-temps.

 
hiddenest 2020-12-24

Maintenant que je regarde, il y a aussi un article de blog de l’équipe de développement,

https://engineering.ab180.co/stories/introducing-luft

Je n’avais jamais entendu parler de TrailDB, mais c’est ce genre de projet...

https://github.com/traildb/traildb