PySpark - Module 5.2
Python-Based Producers and Consumers
Our Apache Kafka service is now accessible to the external world, and we’ve successfully conducted basic tests to ensure its functionality. The next step is to access Kafka programmatically, specifically using Python. We’ll utilize the Databricks Community Edition for this purpose, just as we have done in previous examples. However, it’s important to note that this approach is not limited to Databricks—you should be able to replicate these steps on other platforms (Google Colab, Amazon Sagemarker, Deepnote), ensuring flexibility and adaptability in various environments.
Reading and Writing from/into a specific topic
To spin up a cluster with the default configuration, follow these steps:
-
Spin Up the Cluster: Launch a cluster using your platform’s default settings. Ensure it meets the requirements for your tasks.
-
Create a Folder in Databricks: In your Databricks environment, navigate to the workspace and create a new folder named “Mod 5”. This will be the designated space for all the resources and scripts related to Module 5.
These steps will prepare your environment for further development and organization as you proceed with the tasks in Module 5.
Import this notebook, rename it to Producer
, and follow the instructions.
In a new window in the same folder import this notebook, rename it to Consumer
, and follow the instructions.
Explain what is going on here. Why the Consumer has an entry (The bottom right screen of the image bellow)
Kafka Producer Expects JSON-Serializable Data:
The Kafka producer takes as input JSON-serializable data, which can lead to errors if the provided data cannot be serialized properly. Below you will see a few examples of data type that the Kafka producer expects, along with explanations on how to correctly format and serialize this data to avoid common errors.
Sending an integer value
producer.send('buas-data-n-ai-events', value=12)
Or sending type information as a string
producer.send('buas-data-n-ai-events', value=str(type(12)))
Or sending a JSON object with type information and value
producer.send(
'buas-data-n-ai-events',
value={"type": str(type(12)), "value": 12}
)
Create a topic
Import this notebook tot he same folder, rename it to Topic Management and Verification in Databricks
, and follow the instructions.