← Tilbage til koncepter

ETL (Extract, Transform, Load)

Arkitektur

Proces til at udtrække data fra kilder, transformere det, og loade det ind i et destination system.

Beskrivelse

ETL er en fundamental dataintegrations-proces bestående af tre faser: Extract (udtræk data fra flere kilder), Transform (rens, konverter, aggreger data), og Load (indsæt i destinationsdatabase eller data warehouse). ETL er kernen i moderne data-pipelines og bruges til at konsolidere data fra forskellige systemer til analyse. Ekstraktion kan være fra databaser, API'er, CSV-filer eller realtids-streams. Transformation inkluderer datarensning (fjern dubletter, ret formater), berigelse (join med referencedata), aggregering (sum, gennemsnit) og normalisering. Loading kan være fuld load (hele datasættet) eller inkrementel load (kun ændringer). Moderne varianter inkluderer ELT (load først, transformer i destination) og realtids-streaming ETL. Værktøjer som Apache Airflow, Talend, Informatica og AWS Glue automatiserer ETL-workflows. God ETL sikrer datakvalitet, konsistens og tilgængelighed for business intelligence.

Problem

Organisationer har data spredt på tværs af mange systemer - CRM, ERP, databases, APIs, filer. Data har forskellige formater, kvalitet, og opdateringsfrekvenser. Hvordan konsoliderer man dette til et samlet, rent, analyseklart dataset?

Løsning

ETL pipelines automatisk udtrækker data fra alle kilder, transformerer det til et konsistent format og kvalitet, og loader det ind i et central data warehouse hvor analysts kan lave queries og rapporter uden at bekymre sig om data sources.

Eksempel

// Simpelt ETL eksempel med Node.js

// ============ EXTRACT ============
async function extractFromSources() {
  // Udtræk fra MySQL database
  const dbCustomers = await mysqlClient.query(`
    SELECT customer_id, name, email, created_at
    FROM customers
    WHERE updated_at > '2024-01-01'
  `);
  
  // Udtræk fra REST API
  const apiOrders = await fetch('https://api.shop.dk/orders')
    .then(res => res.json());
  
  // Udtræk fra CSV fil
  const csvProducts = await fs.readFile('products.csv', 'utf-8')
    .then(data => parseCSV(data));
  
  return { customers: dbCustomers, orders: apiOrders, products: csvProducts };
}

// ============ TRANSFORM ============
function transformData({ customers, orders, products }) {
  // 1. Data Cleaning
  const cleanCustomers = customers
    .filter(c => c.email && c.email.includes('@')) // Remove invalid emails
    .map(c => ({
      ...c,
      email: c.email.toLowerCase().trim(), // Normalize
      name: c.name.trim()
    }));
  
  // 2. Data Enrichment - Join orders with customers
  const enrichedOrders = orders.map(order => {
    const customer = cleanCustomers.find(c => c.customer_id === order.customer_id);
    return {
      order_id: order.id,
      customer_name: customer?.name || 'Unknown',
      customer_email: customer?.email,
      order_total: order.total,
      order_date: new Date(order.created_at)
    };
  });
  
  // 3. Data Aggregation
  const customerMetrics = cleanCustomers.map(customer => {
    const customerOrders = enrichedOrders.filter(
      o => o.customer_email === customer.email
    );
    return {
      customer_id: customer.customer_id,
      customer_email: customer.email,
      total_orders: customerOrders.length,
      total_spent: customerOrders.reduce((sum, o) => sum + o.order_total, 0),
      avg_order_value: customerOrders.length > 0 
        ? customerOrders.reduce((sum, o) => sum + o.order_total, 0) / customerOrders.length
        : 0,
      first_order_date: customerOrders.length > 0
        ? new Date(Math.min(...customerOrders.map(o => o.order_date)))
        : null
    };
  });
  
  return { customerMetrics, enrichedOrders, products };
}

// ============ LOAD ============
async function loadToWarehouse(transformedData) {
  const { customerMetrics, enrichedOrders, products } = transformedData;
  
  // Load til data warehouse (PostgreSQL)
  await warehouseDb.transaction(async (trx) => {
    // Truncate staging tables
    await trx.raw('TRUNCATE staging.customer_metrics');
    
    // Bulk insert
    await trx('staging.customer_metrics').insert(customerMetrics);
    await trx('staging.enriched_orders').insert(enrichedOrders);
    
    // Merge til production tables (upsert)
    await trx.raw(`
      INSERT INTO prod.customers 
      SELECT * FROM staging.customer_metrics
      ON CONFLICT (customer_id) 
      DO UPDATE SET
        total_orders = EXCLUDED.total_orders,
        total_spent = EXCLUDED.total_spent,
        updated_at = NOW()
    `);
  });
  
  console.log(`ETL completed: Loaded ${customerMetrics.length} customers`);
}

// ============ ORCHESTRATION ============
async function runETLPipeline() {
  try {
    console.log('Starting ETL pipeline...');
    const rawData = await extractFromSources();
    const transformedData = transformData(rawData);
    await loadToWarehouse(transformedData);
    console.log('ETL pipeline completed successfully');
  } catch (error) {
    console.error('ETL failed:', error);
    // Send alert, log to monitoring
  }
}

// Schedule: Run every day at 2 AM
cron.schedule('0 2 * * *', runETLPipeline);

Fordele

  • Centraliseret data til analyse
  • Konsistent datakvalitet og format
  • Automatisering af data-workflows
  • Historisk datasporing
  • Adskillelse af ansvar (OLTP vs OLAP)

Udfordringer

  • Kompleksitet ved mange datakilder
  • Datakvalitetsproblemer fra kildesystemer
  • Ydeevne ved store datamængder
  • Skemaændringer i kildesystemer
  • Overvågning og fejlhåndtering

Anvendelsesområder

  • Befolkning af data warehouse
  • Business intelligence-rapporter
  • Datamigrering mellem systemer
  • Aggregering af data fra flere kilder
  • Master Data Management (MDM)

Eksempler fra den virkelige verden

  • E-handel: Kombiner web, CRM, lagerdata til salgsrapporter
  • Bankvirksomhed: Aggreger transaktioner fra alle filialer til central rapportering
  • Sundhedsvæsen: Konsolider patientdata fra flere systemer
  • Detailhandel: Kombiner point-of-sale, lager, kundedata
  • Marketing: Samle data fra annoncer, website, CRM til attributionsanalyse