ETL mit Apache Spark

Spark als ETL Werkzeug nutzen und von Big Data Performance profitieren

Die In-Memory Big Data Plattform Apache Spark dominiert die Big Data Welt. Natürlich lässt sich Spark sehr gut für ETL-Prozesse einsetzen und somit lassen sich täglich enorme Datenmengen bewegen, filtern und transformieren. Große Firmen wie Facebook machen es vor und zeigen mit einem produktiven Spark ETL-Prozess von 60 TB, der im Vergleich bis zu x6 schneller ist als der Apache Hive Job, die extreme Stärke der Plattform auf.

Mit den unterschiedlichen Daten und Datentypen, die täglich in unsere Data Lakes geladen werden, müssen wir hinsichtlich der Aufbereitung der Daten extrem flexibel sein. Hier kann Spark helfen die Datenaufbereitung durch ETL (Extract, Transform, Load) effizient zu gestalten. Durch das Spark DataSource API ist man mit der Technologie in der Lage die verschiedensten Quellen zu lesen.

Apache Spark Data Soucre API Quelle: www.databricks.com
Apache Spark Data Soucre API Quelle: www.databricks.com

Diese Vielfalt im Big Data Umfeld kann kaum eine Plattform bieten und viele traditionelle Datenbanken sind sehr starr auf bestimmte Datenquellen ausgerichtet und adaptieren nur sehr langsam neue Quellen, wie bspw. das spaltenorientierte Format Parquet.

Spark als zentrale ETL Engine für Data Lakes

Folgend zeige ich euch eine mögliche Architektur, in der Apache Spark eine zentrale Rolle für die Datenverarbeitung spielt. Gezeigt wird hierbei ein hybrider Architekturansatz mit bestehender DWH Architektur und Data Lake. Apache Spark nimmt dabei die Position als zentrale ETL Engine ein. So können bspw. große Log Daten, wie Trackingdaten, von Spark über Spark ETL Prozesse in die gewünschte (gefiltert & aggregiert) Form gebracht werden und zurück ins DWH gespielt werden.

ETL mit Spark
ETL mit Spark

Die Grafik zeigt ebenfalls schön, wie das schon beschriebene Data Source API eingesetzt werden kann. Als Beispiel kann auch das HDFS Filesystem ohne Probleme mit Spark angesprochen werden, somit integriert sich Spark in viele Umgebungen aus der Big Data Landschaft. Neben den verschiedenen Dateitypen, ist Spark natürlich auch in der Lage, Streaming-Daten in Real-Time zu verarbeiten. Apache Kafka übergibt z.B. im Stream Dateien im JSON-Format, welche anschließend von Spark Streaming gelesen werden.

Vorteile von Spark als ETL Engine

In der Einführung habe ich kurz über die Vorteile von Apache Spark als ETL Engine berichtet. Besonders im Big Data Umfeld sind die Vorteile von Spark ETL Prozessen deutlich:

  1. In-Memory Verarbeitung beschleunigt ETL Prozesse mit Spark. Die grundlegende Technologie von Spark ermöglicht eine schnelle Verarbeitung von großen Datenmengen im Arbeitsspeicher. Somit kannst du viel Zeit sparen und Daten schneller bereitstellen.
  2. Das DataSource API stellt eine große Anzahl von Standard-Dateiformaten bereit und ist in der Lage alle gängigen Formate zu lesen und schreiben.
  3. Funktionale Schreibweise hilft dabei, den Code wiederverwendbar zu machen. So sind Prozesse schnell übertragen und ggf. angepasst auf neue Anforderungen.
  4. Streaming ETLs sind mit Spark möglich. Im Stream verarbeiten wir die Daten nicht mehr im Batch, sondern im kontinuierlichen Stream. Streaming ETLs stellen einen sehr interessanten Bereich dar und es wird derzeit viel in diese Richtung entwickelt, was etliche Beiträge auf dem Spark Summit belegen.
  5. Machine Learning Pipelines laufen in der gleichen Umgebung ab. Das hat den großen Vorteil, dass du keine störenden PMMLs exportieren und importieren musst, wenn du ein ML Scoring durchführen willst.

[elementor-template id=“5035″]

Wie baue ich einen ETL in Apache Spark? Spark ETL Beispiel

In diesem Spark Beispiel zeige ich auf, wie man aus einer Inputdatei mit Apache Spark einen ETL Prozess aufsetzt, der das File filtert, transformiert und in einem anderen Format abspeichert. Für den Beispiel ETL nutze ich die native Spark API, mit der ich einfach ETL Prozesse in Scala schreibe, was eine elegante ETL Logik ohne Erweiterungen erlaubt.

Extract

Wir gehen davon aus, dass wir in unserem Data Lake für jeden Tag neue CSV-Dateien mit Online-Shop Interaktionen und den dazugehörigen Produktmetadaten geliefert bekommen. Die Daten liegen auf einem Amazon S3 Bucket und wir wollen die geklickten Kategorieinformationen zukünftig in unseren Machine Learning Modellen nutzen, daher müssen wir die Daten aufbereiten und in einen ML Feature Store schreiben.

val dataLakeCSV = sqlContext.read.format("csv")
                                 .option("header", "true")
                                 .option("inferSchema", "true")
                                 .load("s3a://new-interactions/")

val filteredDF  = dataLakeCSV.filter($"shop_id" === "s_1qs_de")
                             .filter($"interaction_type"==="view")

val shopMetadata = sqlContext.read.format("csv")
                             .option("delimiter", ";")
                             .option("header", "true")
                             .option("inferSchema", "true")
                             .load("/FileStore/tables/Shop_metadata.csv") 




Transform

Nachdem wir die Views von unserem Shop herausgefiltert haben, wollen wir nun pro Kunde die Anzahl der Produktdetailansichten der letzten 5 Tage zu einem Feature berechnen.

Um das zu tun, können wir unser gefiltertes DataFrame filteredDF nehmen und eine Transformation darauf anwenden, das Ergebnis ist ein neues transformiertes DataFrame.

Durch die funktionale Schreibweise, wird der Code für verschiedene Transformationen nutzbar und lässt sich leicht innerhalb von anderen Prozessen wiederverwenden.

Die Transformation aggViews() aggregiert unserer E-Commerce Produktdetailansichten über die Kategoriemetadaten der letzten 5 Tage:

def aggViews(df: DataFrame): DataFrame = {
  df.filter(((current_timestamp().cast("long") - $"timestamp".cast("long"))
              / (24D * 3600D)) <= 5.0 )
    .groupBy($"user";, $"category")
    .agg(count("*").alias("sum_views"))
}

def technicalTS()(df: DataFrame): DataFrame = {
  df.withColumn("db_user", lit("mlguide_user"))
    .withColumn("input_timestamp", current_timestamp())

}


Die zweite Transformation technicalTS() soll zudem ein paar technische Daten schreiben, so dass wir den Prozess nachvollziehen können.

Load

Um unseren ETL in Spark auszuführen und die Transformieren Daten zuschreiben, brauchen wir eine Funktion etlWriter(), die die Daten in unserem FileStore persistiert. In diesem Fall schreiben wir die Daten in Parquet auf das Dateisystem. Ein anderer Weg ist die Daten in ein Hive Warehouse zu schreiben.

 def etlWriter()(df: DataFrame): DataFrame = {
  val path = "/FileStore/output/shop_interactions"
  df.write.mode(SaveMode.Overwrite).parquet(path)

  return null
}

Um unser Beispiel ETL jetzt auszuführen, müssen wir die einzelnen Funktionen aneinanderhängen und diese ausführen. Dafür schreiben wir uns eine kleine Funktion ETL().

def ETL(views: DataFrame, meta: DataFrame): DataFrame = {
   views.join(meta, Seq("item_id"), "left")
        .transform(aggViews())
        .transform(technicalTS())
        .transform(etlWriter())
}

Die ETL Funktion führt alle unsere Schritte zusammen: ein Left-Join mit den Produktmetadaten, die Aggregation, technische Informationen und das Schreiben der Ergebnisse in einem Parquet File.

Als letztes müssen wir unsere ETL()Funktion nur noch mit den entsprechenden Parametern aufrufen. Die ETL() Funktion erwartet zwei DataFrames mit unseren Shopviews und Produktmetadaten. Das Ganze wird so aufgerufen:

ETL(filteredDF,shopMetadata)

Fazit Spark als ETL Engine

Wie wir gesehen haben ist Apache Spark als ETL Engine sehr unterschiedlich einzusetzen und bietet einige Vorteile. Nicht ohne Grund setzen sehr viele von den großen Tech-Unternehmen auf diese Technologie.

Weiterlesen

Big Data Vor 2 Wochen

Streaming Daten: Einführung und Überblick wichtiger Frameworks

Streaming-Daten sind Daten, die mit einem Streaming-Framework in „Echtzeit“ verarbeitet werden. Der Unterschied zur reinen Nachrichtenverarbeitung besteht darin, dass komplexe Operationen (Aggregationen, Joins, etc.) auf den Datenströmen durchgeführt werden können. […]
Big Data Word Cloud
Big Data Vor 2 Wochen

Was ist Big Data? Definition, 4 V’s und Technologie

Big Data ist ein großer Trend in der Geschäftswelt und schafft für viele Unternehmen einen enormen Mehrwert. Big Data ermöglicht die Speicherung, Verarbeitung und Analyse großer Datenmengen. Spezielle Technologien ermöglichen […]
Big Data Vor 2 Monaten

Hadoop einfach erklärt!

Apache Hadoop ist eine verteilte Big Data Plattform, die von Google basierend auf dem Map-Reduce Algorithmus entwickelt wurde, um rechenintensive Prozesse bis zu mehreren Petabytes zu erledigen. Hadoop ist eines […]
Grafik zeigt vier Säulen des Data Mesh: Domain Ownership, Data as a Product, Self-Service Datenplattform, Federated Governance.
Big Data Vor 2 Monaten

Was bedeutet Data Mesh? – Data Mesh-Architektur erklärt

Was ist eigentlich Data Mesh und wofür wird es verwendet? In diesem Artikel erläutern wir Ihnen die Grundprinzipien von Data Mesh und wie diese in der Praxis umgesetzt werden. Anhand […]
Big DataDatabricks Vor 5 Monaten

Der Data Lakehouse Ansatz – eine moderne Cloud-Datenplattform 

Ein Data Lakehouse-Ansatz bietet die Flexibilität und Skalierbarkeit eines Data Lakes in Kombination mit hoher Datenqualität, Transaktionssicherheit und vereinfachter und vereinheitlichter Verwaltung bzw. Governance des Data Warehouse. Der Data Lakehouse […]
Big Data Vor 10 Monaten

Data Warehouse vs. Data Lake: Gemeinsamkeiten und Unterschiede

Was unterscheidet eigentlich ein Data Warehouse von einem Data Lake? Diese Frage soll hier beantwortet werden. Beide Konzepte stellen Speichermöglichkeiten für die verschiedenen Daten eines Unternehmens dar, sind aber grundsätzlich […]
Die Datenplattform besteht aus einem Data Lake, welcher Rohdaten und durch den ETL-Prozess in verarbeitete Daten umwandelt. Dafür werden bestimmte Tools verwendet. Einmal verarbeitet, kann Mehrwert aus den Daten gezogen werden.
Big Data Vor 10 Monaten

Was ist ein Data Lake?

Ein Data Lake ist ein zentrales Repository für eine unbegrenzte Menge von strukturierten, semistrukturierten und unstrukturierten Daten in ihrer Rohform. In diesem Beitrag erklären wir, was der Data Lake ist, […]
Big DataData Engineering Grundlagen Vor 1 Jahr

Auto Loader von Databricks

Databricks Auto Loader ist eine optimierte Dateischnittstelle, die automatisch inkrementelle Datenladungen aus Ihrem Cloud-Speicher durchführen kann, um die geladenen Daten dann in Delta Lake Tables abzuspeichern. Wie genau der Databricks […]
Big Data Vor 1 Jahr

Customer Data Platform – Funktionen, Vorteile und Unterschiede

Für erfolgreiches Marketing, braucht ein Unternehmen viele Daten und Informationen über die Kunden. Eine Customer Data Platforms (kurz: CDP) stellt die jede Kundeninformation an einer zentralen Stelle bereit. Andere Systeme und analytische Funktionen […]
Newsletter und Updates

Sie sehen gerade einen Platzhalterinhalt von HubSpot. Um auf den eigentlichen Inhalt zuzugreifen, klicken Sie auf die Schaltfläche unten. Bitte beachten Sie, dass dabei Daten an Drittanbieter weitergegeben werden.

Mehr Informationen
Termin buchen