python spark streaming kafka – spark submit 提交

python spark streaming kafka - spark submit 提交

第一步

安装 kafka(略)

我使用的CDH安装的kafka (CDH 6.2)

第二步:kafka验证,网络连通验证

kafka 集群消费者,生产者验证是否有效。

生产者

kafka-console-producer --broker-list example.cn-north-1.compute.amazonaws.com.cn:9092 --topic my-topic

消费者

kafka-console-consumer  --bootstrap-server example.cn-north-1.compute.amazonaws.com.cn:9092  --topic my-topic --from-beginning

本地验证端口是否可用 telnet example.cn-north-1.compute.amazonaws.com.cn 9092

kafka开启外网访问

第三步:本地代码测试

consumer.py

from kafka import KafkaConsumer

# connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', bootstrap_servers=['example.cn-north-1.compute.amazonaws.com.cn:9092'])

for msg in consumer:
    print(msg)

producer.py

import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['example.cn-north-1.compute.amazonaws.com.cn:9092'])
# Assign a topic
topic = 'my-topic'

def test():
    print('begin')
    n = 1
    while n <= 100:
        producer.send(topic, str(n).encode('utf-8'))
        print("send" + str(n))
        n += 1
        time.sleep(1)
    print('done')


if __name__ == '__main__':
    test()

第四步:本地Spark Streaming 调用kafka

spark-streaming-consumer.py(createStream)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


class KafkaMessageParse:

    def extractFromKafka(self, kafkainfo):
        print(kafkainfo)
        if type(kafkainfo) is tuple and len(kafkainfo) == 2:
            return kafkainfo[1]


if __name__ == '__main__':
    zkQuorum = 'ip-1:2181,ip-2:2181,ip-3:2181'
    groupid = "rokid-trace-rt1"
    appName = "SparkStreamingKafka"
    timecell = 5 #5秒获取一次kafka消息
    sc = SparkContext(appName=appName)
    ssc = StreamingContext(sc, timecell)
    kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, {"my-topic": 1})
    kmp = KafkaMessageParse()
    lines = kvs.map(lambda x: kmp.extractFromKafka(x))
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

低阶API spark-streaming-consumer.py(createDirectStream)

// todo 待补充

本地spark-submit

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0 client data/spark-streaming-kafka.py

CDH 环境spark-submit

! 注意安装响应包pip install xx

client mode

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0 --master yarn --deploy-mode client kafka.py

cluster mode

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0 --master yarn --deploy-mode cluster /root/test-python/t-kafka.py

注意:cluster是不会返回控制台日志,具体执行是否成功,需要自己验证

高阶API createStream kakfa 数据丢失场景

猜想一:kafka 消费者获取到数据,但是在处理过程中程序意外退出或down掉。

结果:数据丢失,消费场景无法回滚。

解决方案1:开启StorageLevel.MEMORY_AND_DISK,并且程序最后加上ssc.stop(True, True)。

解决方案2:使用低阶API,自己控制kafka的offset。

备注:这种方式知识在程序退出时可以保证数据不丢失,但是机器断电之类的错误还是会发生数据丢失(不可预知的事情)。

发表评论

电子邮件地址不会被公开。