Structured Streaming + Kafka + JSON

官网:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide

简单实例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import StructType, StringType
from pyspark.sql.functions import from_json
from pyspark.sql.functions import lit
import time

spark = SparkSession.builder.appName("appName").getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "ip-172-31-25-230:9092,ip-172-31-31-34:9092,ip-172-31-31-34:9092" ) \
    .option("subscribe", "test2") \
    .load()

df = df.selectExpr("CAST(value AS STRING)")
schema = StructType() \
    .add("name", StringType()) \
    .add("age", StringType()) \
    .add("sex", StringType())

res = df.select(from_json("value", schema).alias("data")).select("data.*").withColumn("flag", lit(time.time()))

query = res \
    .writeStream \
    .format('console') \
    .outputMode("append") \
    .start()

query.awaitTermination()

发表评论

电子邮件地址不会被公开。