Category : apache-kafka

Am new to Heroku and Apache Kafka. One of my requirement is to connect heroku salesforce / heroku postgre through kafka uisng python. In our previous post i can refer the usage of "kafka-helper" python code for SSL Connection. when i try to use the below code getting error as below. But i have KAFKA_URL ..

Read more

import pyspark import copy import numpy as np from collections import namedtuple import json import sklearn from confluent_kafka import Consumer,TopicPartition from pyspark.sql import SQLContext sqlContext = SQLContext(sc) conf = {} topic = "price-data" conf[‘bootstrap.servers’] = ‘pkc-4yyd6.us-east1.gcp.confluent.cloud:9092’ conf[‘security.protocol’] = ‘SASL_SSL’ conf[‘sasl.mechanisms’] = ‘PLAIN’ conf[‘sasl.username’] = ‘****’ conf[‘sasl.password’] = ‘****’ conf[‘auto.offset.reset’] = ‘earliest’ conf[‘group.id’] = ‘python_example_group_1’ conf[‘default.topic.config’] ..

Read more

I am trying to understand how Kafka producer works .Below is the python producer code I wrote to send a message. I started the Kafka console consumer first and then I am running the python code from confluent_kafka import Producer from Product import Product from faker import Faker if __name__ == ‘__main__’: config = { ..

Read more

I’m tring to load and assign randomly to topics(5 topics) information from csv file. This is my current code. How do get this working? pconf = { ‘bootstrap.servers’: brokers, ‘partitioner’: ‘random’, ‘key.serializer’: StringSerializer(‘utf_8’) } producer = SerializingProducer(pconf) now = datetime.now() current_time = now.strftime("%d/%m/%Y %H:%M:%S") f = open(‘data.csv’, ‘r’) with f: reader = csv.reader(f) for row ..

Read more