Streaming scikit-learn with PySpark - Ben Weber

By Ben Weber

Source: https://www.maxpixels.net/Water-Creek-Stream-River-Cascade-Nationalpark-4324529

Streaming scikit-learn with PySpark

Chapter 8 excerpt of Data Science in Production

Ben Weber

This chapter explores model pipelines where records are evaluated as they arrive in a streaming ML framework. The book covers streaming workflows for both Spark with Kafka on AWS and PubSub with Dataflow on GCP, while this blog post covers the former. The source code for all of the examples in the chapter are available on GitHub.

Chapter 8 Streaming Model Workflows

One of the most popular streaming platforms is Apache Kafka, which is an open-source solution for providing message streaming across public and private clouds. Kafka is a hosted solution that requires provisioning and managing a cluster of machines in order to scale. GCP provides a fully-managed streaming platform called PubSub and AWS provides a managed solution called Kinesis. The best option to use depends on your cloud platform, throughput and latency requirements, and DevOps concerns.

With a streaming platform, you can pass data between different components in a cloud environment, as well as external systems. For example, many game companies are now using these platforms to collect gameplay events from mobile games, where an event is transmitted from the game client to a game server, and then passed to the data platform as a stream. The message producer in this case is the game server passing the message to the consumer, which is the data platform that transforms and stores the event.

The connection to data science in production is that streaming platforms can be used to apply ML models as a transform step in a streaming pipeline. For example, you can set up a Python process that reads in messages from a topic, applies a sklearn model, and outputs the prediction to a new topic. This process can be part of a larger workflow that provides real-time ML predictions for users, such as item recommendations in a mobile game. For the model application step to scale to large volumes of messages, we’ll need to use distributed systems such as Spark and Cloud Dataflow rather than a single Python process.

While the model application step in a streaming model pipeline is similar to setting up a Lambda or Cloud Function, which already provides near real-time predictions, a key difference is the ease of integrating with other components in the cloud platform. For example, with Cloud Dataflow you can route the event to BigQuery for storage as well as the model application step, which may push the output to a new message consumer. Another benefit is that it enables using distributed tools such as PySpark to handle requests for model application, versus the endpoint based approaches that service requests in isolation.

One of the benefits of using messing systems in a cloud platform is that it enables different tools and different programming languages to communicate using standardized interfaces. We’ll focus on Python and PySpark in this book, but Java, Go, and many other languages are supported by these platforms. In this chapter, we’ll first use Apache Kafka to pass messages between different Python processes and then consume, transform, and produce new messages using PySpark Streaming. Next, we’ll use PubSub on GCP to provide near real-time model predictions using Cloud Dataflow in streaming mode.

8.1 Spark Streaming

In this section, we’ll first set up a Kafka instance and then produce and consume messages on the same machine using Kafka. Next, we’ll show how to consume messages from Kafka using the readStream function in PySpark, and then build a streaming pipeline that applies a sklearn model.

8.1.1 Apache Kafka

To show how Kafka can be integrated into a streaming workflow, we’ll use a single-node setup to get up and running. For a production environment, you’ll want to set up a multi-node cluster for redundancy and improved latency. Since the focus of this chapter is model application, we won’t dig into the details of setting up Kafka for high-availability, and instead recommend managed solutions for small teams getting started. To install Kafka, it’s useful to browse to the website and find the most recent release. In order to install Kafka, we first need to install Java, and then download and extract the Kafka release. The steps needed to set up a single-node Kafka instance on an EC2 machine are shown in the snippet below. We’ll also install a library for working with Kafka in Python called kafka-python.

sudo yum install -y javapip install --user kafka-python

wget http://mirror.reverse.net/pub/apache/kafka/2.4.0/

kafka_2.12-2.4.0.tgztar -xzf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0bin/zookeeper-server-start.sh config/zookeeper.properties

# new terminal
bin/kafka-server-start.sh config/server.properties

# new terminal
bin/kafka-topics.sh --create --bootstrap-server localhost:9092

--replication-factor 1 --partitions 1 --topic dsp

# output
[2019-12-18 10:50:25] INFO Log partition=dsp-0, dir=/tmp/kafka-logs

Completed load of log with 1 segments, log start offset 0 and

log end offset 0 in 56 ms (kafka.log.Log)

When setting up Kafka, we’ll need to spawn three separate processes to run dependencies, start the Kafka service, and create a new topic for publishing messages. The snippet above runs the following processes:

  • Zookeeper: An Apache project that provides configuration and service discovery for distributed systems.
  • Kafka Launches the bootstrap service that enables setting up Kafka topics and using streaming APIs.
  • Topics: Creates a new topic called “dsp”.

The Zookeeper and Kafka tasks are long-running processes that will continue to execute until terminated, while the Topics process will shutdown once the new Kafka topic is set up. The output at the bottom of the snippet shows the output from running this command, which will be displayed in terminal running the Kafka process. In this configuration, we are setting up a single partition for the topic with no replication. We now have a single-node Kafka cluster set up for testing message streaming.

The first API that we’ll explore is the Producer API, which enables processes to publish a message to a topic. To publish a message to our Kafka server, we create a producer object by passing in an address and a serialization function, which specifies how to encode Python objects into strings that can be passed to the Kafka server. The Python snippet below shows how to create the producer and spend a dictionary object as a message to the server, publishing the message to the dsp topic. The dict object contains hello and time keys. If we run this code, the message should be successfully transmitted to the server, but there will not yet be a consumer to process the message.

from kafka import KafkaProducerfrom json import dumpsimport time

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],


value_serializer=lambda x: dumps(x).encode('utf-8'))

data = {'hello' : 'world', 'time': time.time()}


producer.send('dsp', data)

To set up a process for consuming the message, we’ll explore the ‘Consumer’ API, which is used to read in streams of data. The Python snippet below shows how to create a consumer object that connects to the Kafka server and subscribe to the dsp topic. The consumer object returned is iterable and can be used in combination with a for loop in order to process messages. In the example below, the for loop will suspend execution until the next message arrives and continue iterating until the process is terminated. The value object will be a Python dictionary that we passed from the producer, while the deserializer function defines how to transform strings to Python objects. This approach works fine for small-scale streams, but with a larger data volume we also want to distribute the message processing logic, which we’ll demonstrate with PySpark in the next section.

from kafka import KafkaConsumerfrom json import loads

consumer = KafkaConsumer('dsp',

bootstrap_servers=['localhost:9092'],

value_deserializer=lambda x: loads(x.decode('utf-8')))

for x in consumer:


print(x.value)

Now that we have Python scripts for producing and consuming messages, we can test message streaming with Kafka. First, run the Consumer script in a Jupyter notebook, and then run the Producer script in a separate notebook. After running the producer cell multiple times, you should see output from the consumer cell similar to the results shown below.

{'hello': 'world', 'time': 1576696313.876075}{'hello': 'world', 'time': 1576696317.435035}

{'hello': 'world', 'time': 1576696318.219239}

We can now use Kafka to reliably pass messages between different components in a cloud deployment. While this section used a test configuration for spinning up a Kafka service, the APIs we explored apply to production environments with much larger data volumes. In the next section, we’ll explore the Streams API which is used to process streaming data, such as applying an ML model.

8.1.2 Sklearn Streaming

In order to get Kafka to work with Databricks, we’ll need to edit the Kafka configuration to work with external connections, since Databrics runs on a separate VPC and potentially separate cloud than the Kafka service. Also, we previously used the bootstrap approach to refer to brokers using localhost as the IP. On AWS, the Kafka startup script will use the internal IP to listen for connection, and in order to enable connections from remote machines we’ll need to update the configuration to use the external IP, as shown below.

vi config/server.properties 
advertised.listeners=PLAINTEXT://{external_ip}:9092

After making this configuration change, you’ll need to restart the Kafka process in order to receive inbound connections from Databricks. You’ll also need to enable inbound connections from remote machines, by modifying the security group, which is covered in Section 1.4.1. Port 9092 needs to be open for the Spark nodes that will be making connections to the Kafka service.

We’ll also set up a second topic, which is used to publish the results of the model application step. The PySpark workflow we will set up will consume messages from a topic, apply a sklearn model, and then write the results to a separate topic, called preds. One of the key benefits of this workflow is that you can swap out the pipeline that makes predictions without impacting other components in the system. This is similar to components in a cloud workflow calling out to an endpoint for predictions, but instead of changing the configuration of components calling endpoints to point to new endpoints, we can seamlessly swap in new backend logic without impacting other components in the workflow.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 
--replication-factor 1 --partitions 1 --topic preds

It’s a good practice to start with a basic workflow that simply consumes messages before worrying about how to build out a predictive modeling pipeline, especially when working with streaming data. To make sure that we’ve correctly set up Kafka for remote connections with Databricks, we can author a minimal script that consumes messages from the stream and outputs the results, as shown in the PySpark snippet below. Databricks will refresh the output on a regular interval and show new data in the output table as it arrives. Setting the startingOffsets value to earliest means that we’ll backload data from the last Kafka checkpoint. Removing this setting will mean that only new messages are displayed in the table visualization.

df = spark .readStream.format("kafka")
.option("kafka.bootstrap.servers", "{external_ip}:9092")
.option("subscribe", "dsp")
.option("startingOffsets", "earliest").load()
display(df)

Getting Databricks to communicate with the Kafka service can be one of the main challenges in getting this sample pipeline to work, which is why I recommend starting with a minimal PySpark script. It’s also useful to author simple UDFs that process the value field of the received messages to ensure that the decoded message in PySpark matches the encoded data from the Python process. Once we can consume messages, we’ll use a UDF to apply a sklearn model, where UDF refers to a Python function and not a Pandas UDF. As a general practice, it’s good to add checkpoints to a Spark workflow, and the snippet above is a good example for checking if the data received matches the data transmitted.

For the Spark streaming example, we’ll again use the Games data set, which has ten attributes and a label column. In this workflow, we’ll send the feature vector to the streaming pipeline as input, and output an additional prediction column as the output. We’ll also append a unique identifier, as shown in the Python snippet below, in order to track the model applications in the pipeline. The snippet below shows how to create a Python dict with the ten attributes needed for the model, append a GUID to the dictionary, and send the object to the streaming model topic.

from kafka import KafkaProducerfrom json import dumpsimport time

import uuid

producer = KafkaProducer(bootstrap_servers= ['{external_ip}:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
data = { 'G1': 1, 'G2': 0, 'G3': 0, 'G4': 0, 'G5': 0, 'G6': 0, 'G7': 0, 'G8': 0, 'G9': 0, 'G10': 0,

'User_ID': str(uuid.uuid1())}


result = producer.send('dsp', data)
result.get()

To implement the streaming model pipeline, we’ll use PySpark with a Python UDF to apply model predictions as new elements arrive. A Python UDF operates on a single row, while a Pandas UDF operates on a partition of rows. The code for this pipeline is shown in the PySpark snippet below, which first trains a model on the driver node, sets up a data sink for a Kafka stream, defines a UDF for applying a ML model, and then publishes the scores to a new topic as a pipeline output.

from pyspark.sql.types import StringTypeimport json import pandas as pd

from sklearn.linear_model import LogisticRegression

# build a logistic regression model
gamesDF = pd.read_csv("https://github.com/bgweber/Twitch/raw/ master/Recommendations/games-expand.csv")model = LogisticRegression()

model.fit(gamesDF.iloc[:,0:10], gamesDF['label'])

# define the UDF for scoring users
def score(row): d = json.loads(row) p = pd.DataFrame.from_dict(d, orient = "index").transpose() pred = model.predict_proba(p.iloc[:,0:10])[0][0] result = {'User_ID': d['User_ID'], 'pred': pred }

return str(json.dumps(result))

# read from Kafka
df = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "{external_ip}:9092")

.option("subscribe", "dsp").load()

# select the value field and apply the UDF
df = df.selectExpr("CAST(value AS STRING)")score_udf = udf(score, StringType())

df = df.select( score_udf("value").alias("value"))

# Write results to Kafka
query = df.writeStream.format("kafka") .option("kafka.bootstrap.servers", "{external_ip}:9092") .option("topic", "preds")

.option("checkpointLocation", "/temp").start()

The script first trains a logistic regression model using data fetched from GitHub. The model object is created on the driver node, but is copied to the worker nodes when used by the UDF. The next step is to define a UDF that we’ll apply to streaming records in the pipeline. The Python UDF takes a string as input, converts the string to a dictionary using the json library, and then converts the dictionary into a Pandas dataframe. The dataframe is passed to the model object and the UDF returns a string representation of a dictionary object with User_ID and pred keys, where the prediction value is the propensity of the user to purchase a specific game.

The next three steps in the pipeline define the PySpark streaming workflow. The readStream call sets up the connection to the Kafka broker and subscribes to the dsp topic. Next, a select statement is used to cast the value column of streaming records to a string before passing the value to the UDF, and then creating a new dataframe using the result of the Python UDF. The last step writes the output dataframe to the preds topic, using a local directory as a checkpoint location for Kafka. These three steps run as part of a continuous processing workflow, where the steps do not complete, but instead suspend execution until new data arrives. The result is a streaming DAG of operations that processes data as it arrives.

When running a streaming pipeline, Databricks will show details about the workflow below the cell, as shown in Figure 8.1. The green icon identifies that this is a streaming operation that will continue to execute until terminated. There are also charts that visualize data throughput and latency. For a production pipeline, it’s useful to run code using orchestration tools such as Airflow with the Databricks operator, but the notebook environment does provide a useful environment to run and debug streaming pipelines.

FIGURE 8.1: Visualizing stream processing in Databricks.

Now that we are streaming model predictions to a new topic, we’ll need to create a new consumer for these messages. The Python snippet below shows to consume messages from the broker for the new predictions topic. The only change from the prior consumer is the IP address and the deserializer function, which no longer applies an encoding before converting the string to a dictionary.

from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('preds', bootstrap_servers=['{external_ip}:9092'],

value_deserializer=lambda x: loads(x))

for x in consumer:
print(x.value)

We now have everything in place in order to test out the streaming pipeline with Spark streaming and Kafka. First, run the PySpark pipeline in a Databricks cell. Next, run the consumer script in a Jupyter notebook. To complete the workflow, run the producer script in a separate Jupyter notebook to pass a message to the pipeline. The result should be a prediction dictionary printed to the console of the consumer notebook, as shown below.

{'User_ID': '4be94cd4-21e7-11ea-ae04-8c8590b3eee6', 
'pred': 0.9325488640736544}

We now have a PySpark streaming pipeline that applies model predictions with near real-time performance. There’s additional tuning that we can perform to get this latency to within one millisecond, which is useful for a variety of model and web applications. The benefit of using Spark to perform model application is that we can scale the cluster to match demand and can swap in new pipelines as needed to provide model updates. Spark streaming was initially a bit tricky to get up and running, but the recent enhancements have made it much easier to get working with model application pipelines. In this pipeline we used a simple regression model, but the streaming workflows can also be used for deep learning tasks, such as image classification.

8.3 Conclusion

We first explored Kafka as a streaming message platform and built a real-time pipeline using structure streaming and PySpark. Next, we built a steaming Dataflow pipeline reusing components from the past chapter that now interface with the PubSub streaming service. Kafka is typically going to provide the best performance in terms of latency between these two message brokers, but it takes significantly more resources to maintain this type of infrastructure versus using a managed solution. For small teams getting started, PubSub or Kinesis provide great options for scaling to match demand while reducing DevOps support.