What data transfer from legacy systems has in common with Spotify

innFactory GmbH
4 min readNov 24, 2020

Spotify summarizes the last 10 years for over 248 million users in personalized playlists.

For the first time in 2016, Spotify has created a personalized review for users. The challenges in 2019 were much greater, as the music streams to be analyzed had increased fivefold compared to 2018 alone. Nevertheless, Spotify managed to analyze the entire listening statistics of the last 10 years for over 248 million users by the end of 2019. From this, a personalized yearly review with personalized playlists was created for each user.

Technically, Spotify has been using the Google Cloud since 2016 and is one of the largest consumers of virtual computing resources there. For the first three years after moving to GCP alone, Spotify has budgeted over $450 million for the cloud to handle the sheer volume of data. Together with Google’s engineering teams, this once again created the largest dataflow analysis job ever done in the Google cloud. With such enormous amounts of data, such calculations are naturally also a financial burden, so that such a process needs to be very well planned and cannot simply be started from scratch. Google calculates in GCP according to hours or data volumes, depending on the tariff. Both were gigantic in this case.

The addressed GCP service “Dataflow” is based on the open-source project Apache Beam. Apache Beam is a framework or model for the definition and creation of data processing pipelines. Even before moving to the Google Cloud, Spotify has started to develop a Scala framework called “Scio” that abstracts the Apache Beam API in Scala. Apache Beam and thus Scio provide all the necessary tools to process gigantic ETL transactions in the form of batch or streaming jobs. The jobs programmed in “Scio” can then be executed serverless directly in Google Dataflow.

Scala, Scio and Dataflow as “serverless wonder weapon” for processing legacy data

Since the majority of our customer projects and our products also run in the Google Cloud, Scio is an ideal tool for us to bring legacy data from legacy systems into the cloud. The jobs can run once as a batch or continuously stream new data into the new system if the legacy system is temporarily operated in parallel. Since Google manages the infrastructure and everything runs serverless, this is a very cost-efficient way for us to connect legacy systems. At the same time we also use Scio Jobs for BigData analyses that are written to a BigQuery data warehouse. The advantages for us as innFactory are obvious:
- Problem-free scaling across all sizes
- Low development effort
- Scala as our favorite backend language
- Very stable jobs that can also be tried out on local systems

Scio 101 — Data transformation from and to Postgres

The following example is intended to provide a first insight into how legacy data from a legacy system or its database can be transferred to a new database in the cloud. The whole example can also be found in the innFactory github account.
The example shows how user data from Postgres database 1 can be transformed and written to a new Postgres database 2.
At the beginning of our job we want to extract the credentials from the command line arguments / dataflow parameters for the databases. Later we need these arguments for the Scio context to read and write the data via JDBC.

def initializeFromCmdLineArgs(cmdlineArgs: Array[String]): (JdbcConnectionOptions, JdbcConnectionOptions, ScioContext) = {
val (readOptions, _) = ScioContext.parseArguments[ReadSqlOptions](cmdlineArgs)
val (writeOptions, _) = ScioContext.parseArguments[WriteSqlOptions](cmdlineArgs)

val sc = ScioContext(readOptions)

val readConnectionOptions = getReadConnectionOptions(readOptions)
val writeConnectionOptions = getWriteConnectionOptions(writeOptions)

(readConnectionOptions, writeConnectionOptions, sc)
}

Scio gives us the ability to stream all data consistently from the old database and map the rows to our case classes. The case classes can then be processed in the next step and written as side output to the new database. Since we define a “DAG” (directed acyclic graph) in Scio, similar to Spark, using “higher ordered functions”, Apache Beam can optimize the execution of the whole flow before running it. Furthermore, Dataflow ensures that both the source and the target are not overloaded with too much data. So our example ends up counting the processed people and displays them on the console.

def initializeFromCmdLineArgs(cmdlineArgs: Array[String]): (JdbcConnectionOptions, JdbcConnectionOptions, ScioContext) = {
val (readOptions, _) = ScioContext.parseArguments[ReadSqlOptions](cmdlineArgs)
val (writeOptions, _) = ScioContext.parseArguments[WriteSqlOptions](cmdlineArgs)

val sc = ScioContext(readOptions)

val readConnectionOptions = getReadConnectionOptions(readOptions)
val writeConnectionOptions = getWriteConnectionOptions(writeOptions)

(readConnectionOptions, writeConnectionOptions, sc)
}

The following command can be used to start the example:

sbt “runMain scio.example.jdbc.Dataflow — readSqlPassword=test — readSqlDb=test — readSqlInstanceConnectionName=localhost:5434 — readSqlUsername=test — writeSqlUsername=test — writeSqlPassword=test — writeSqlInstanceConnectionName=localhost:5432 — writeSqlDb=test”

Conclusion

Since Apache Beam takes care of the threads and the distribution of our job, we can concentrate on the essential ETL functionalities. A fast transformation of data at any point in time is thus made very easy. Because of the serverless Google Dataflow service, we don’t have to worry about scaling multiple servers and don’t pay any fees when no job is running. Dataflow scales down to 0.

Originally published at https://innfactory.de on November 24, 2020. Translated with DeepL from German to English.

--

--

innFactory GmbH

Software & Cloud Engineering Experts based in Rosenheim | Germany — We blog about: Scala, TypeScript, Dart, akka, play, react, flutter, gce, aws, azure, cloud..