当前位置:网站首页>Spark RDD case: Statistics of daily new users
Spark RDD case: Statistics of daily new users
2022-06-29 07:16:00 【Yushijuj】
List of articles
One 、 Put forward a task
The following users have access to historical data , The first column is the date when the user visited the website , The second column is the user name :
2022-02-07,Mike
2022-02-07,Alice
2022-02-27,Brown
2022-04-02,Mike
2022-04-02,Alice
2022-04-02,Green
2022-07-12,Alice
2022-07-12,Smith
2022-07-12,Brian

Count the number of new users every day through the above data , The following statistical results are obtained :
2022-02-07 Added 3 Users ( Respectively mike、alice、brown)
2022-04-02 Added 1 Users (green)
2022-07-12 Added 2 Users ( Respectively smith、brian)
Two 、 Realize the idea
Use inverted indexing , If the same user corresponds to multiple access dates , The minimum date is the registration date of the user , I.e. new date , Other dates are repeat visit dates , Should not be counted . Therefore, each user should only calculate the minimum date of user access . As shown in the figure below , Move the minimum date each user accesses to the first column , The first column is valid data , Only the occurrence times of each date in the first column are counted , That is, the number of new users on the corresponding date .

- Start the cluster HDFS And Spark

- stay HDFS Prepare data on - users.txt

3、 ... and 、 To complete the task
( One ) Read the file , obtain RDD
val rdd1 = sc.textFile("hdfs://master:9000/input/users.txt")

( Two ) inverted , swap RDD The element order of the middle element group
val rdd2 = rdd1.map(
line => {
val fields = line.split(",")
(fields(1), fields(0))
}
)

( 3、 ... and ) Inverted RDD Key grouping
val rdd3 = rdd2.groupByKey()

( Four ) Get the minimum value of the grouped date set , Count as 1
val rdd4 = rdd3.map(line => (line._2.min, 1))

( 5、 ... and ) Key count , Get the number of new users per day
val result = rdd4.countByKey()
result.keys.foreach(key => println(key + "," + result(key)))

Four 、 To complete the task
( One ) newly build Maven project – CountNewUsers


take JAVA Directory changed to scala

( Two ) Add related dependencies and build plug-ins
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.cb.rdd</groupId>
<artifactId>CountNewUsers</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- And refresh the download dependency

( 3、 ... and ) Create log properties file
- Create a log attribute file in the resource folder - log4j.properties
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

( Four ) Create a single instance object for calculating the average score
- stay net.cb.sql Create in package CountNewUsersBySQL Singleton object

package net.cb.rdd
import org.apache.spark.{
SparkConf, SparkContext}
/** * function : Statistics of daily new users * author : Chen Biao * Time : 2022 year 06 month 18 Japan */
object CountNewUsersBySQL {
def main(args: Array[String]): Unit = {
// establish Spark conf Containers
val spark: SparkConf = new SparkConf()
.setAppName("CalculusAverageByRDD")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(spark)
val lines = sc.textFile("hdfs://master:9000/input/users.txt")
//lines.collect.foreach(println)
// swap RDD The order of elements in
val value = lines.map(
line =>{
val fields = line.split(",")
val name = fields(1)
val time = fields(0)
(name,time)
}
)
// Group names
val nameGB = value.groupByKey().map(
line => {
val time = line._2.min
val name = line._1
(time,name)
}
)
// Key count , Get daily additions
val CB = nameGB.countByKey()
CB.keys.foreach(key => println(key + "," + CB(key)))
// Closed container
sc.stop()
}
}
( 5、 ... and ) Local running program , View results

边栏推荐
- Ci tools Jenkins installation configuration tutorial
- List collection implements paging
- Ci tool Jenkins II: build a simple CI project
- VerilogA - counter
- try anbox (by quqi99)
- Illegal forward reference and enums
- [answer all questions] CSDN question and answer function evaluation
- Introduction to NoSQL database
- What is the difference between software engineer and software development? What is the difference between software engineer and software developer?
- Qt 自定义位操作类
猜你喜欢

RPC and RMI

Domestic code hosting center code cloud

Testing grpc service with grpcui

多模态 —— Learnable pooling with Context Gating for video classification
【翻译】簇拥而出。构建现代应用程序的设计方法

jetson tx2

Introduction to NoSQL database

Json对象和Json字符串的区别

通过keyup监听textarea输入更改按钮样式

In vscade, how to use eslint to lint and format
随机推荐
消息队列之通过幂等设计和原子锁避免重复退款
Markdown 技能树(9):表格
Some thoughts on port forwarding program
CI工具Jenkins之二:搭建一个简单的CI项目
json tobean
IDEA 集成 码云
Error: GPG check FAILED Once install MySQL
Effective methods for construction enterprises to select smart construction sites
Relevance - correlation analysis
Qt 容器类
Uniapp obtains the date implementation of the beginning and end of the previous month and the next month
To: Hou Hong: the key to enterprise digital transformation is not technology, but strategy
期末总结——Spark
shell条件判断详解
Save token get token refresh token send header header
Crawler data analysis (introduction 2-re analysis)
uva11825
Introduction to Ceres Quartet
centos下配置mysql 5.7 和 8
Daily question - force deduction - multiply the found value by 2