pyspark之Structured Streaming window函数-滚动模式

#fil

e文件使用pyspark之Structured Streaming file文件案例1生成文件,以下代码主要探讨window函数使用 window三种方式:滚动、滑动、会话,只有windowDuration滚动 from pyspark.sql import SparkSession,DataFrame from pyspark.sql.functions import split,window,from_unixtime,lit PATH = '/opt/software/tmp/data/' if __name__ == '__main__': spark = SparkSession.builder.getOrCreate() lines = spark.readStream.format("text").option("seq"," ").load(PATH) userinfo = lines.select(split(lines.value," ").alias("info")) user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'), userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'), userinfo['info'][3].alias('action')) """ 每1分钟统计一次各个省份登录情况,不配置slideDuration和startTime window函数参数:timeColumn:时间戳列 windowDuration:时间间隔 slideDuration:平移时间 startTime:开始时间 """ windowDuration = "1 minutes" userCounts = user.groupBy("province",window("eventtime",windowDuration)).count() #此时返回三个字段:province window count 其中window为struct结构,包括start和end两个 字段,可以通过userCounts['window']['start']方式获得 具体可以通过printSchema()方法查看 #输出方式可以选择complete或者update,如果为complete会每个batch会输出每个时间段结果,batch1会 包含batch0结果 如果为update,则batch1只会输出该时间段内变化数据,不会输出batch0数据 userCounts = userCounts.select(userCounts['province'],userCounts['window']['start'].alias("start"),userCounts['window']['end'].alias("end"),userCounts['count'].alias('sl')) userCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() """ 结果输出到mysql数据库 def insert_into_mysql_batch(df: DataFrame, batch): print("{} is start".format(batch)) if df.count() > 0: # 此处将batch添加到df中,采用lit函数 data = df.withColumn("batch", lit(batch)) data.write.format("jdbc"). option("driver", "com.mysql.jdbc.Driver"). option("url", "jdbc:mysql://localhost:3306/spark").option("user", "root"). option("password", "root").option("dbtable", "dm_user_province").option("batchsize", 1000).mode("append").save() else: pass userCounts.writeStream.outputMode("complete").foreachBatch(insert_into_mysql_batch).trigger(processingTime="20 seconds").start().awaitTermination() """