数据应用OneID:Spark GraphX实现

发布时间:2025-05-17 11:27:22 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:来源:大数据左右手 前言 说明 以用户实体为例,ID 类型包含 user_id 和 device_id。当然还有其他类型id。不同id可以获取到的阶段、生命周期均不相同。 device_id 生命周期通常指的是一个设备从首次被识别

来源:大数据左右手

前言

说明

以用户实体为例,ID 类型包含 user_id 和 device_id。当然还有其他类型id。不同id可以获取到的阶段、生命周期均不相同。

device_id 生命周期通常指的是一个设备从首次被识别到不再活跃的整个时间段。

user_id是用户登录之后系统分配的唯一标识,即使不同的设备只要user_id相同就会识别为一个用户,但 user_id 只能在登录后获取到,所以会损失用户登录前的行为数据。

单体应用单独使用user_id或者device_id都不能完整地表达一个用户,多应用多类id又有差异性。如果可以将不同 ID 进行关联映射,最终通过唯一的 ID 标识用户。所以需要一个解决方案来映射。

用户渠道

手机、平板电脑安卓手机、ios手机有PC、APP和小程序

标识情况

(1)cookieid:PC站存在用户cookies中的ID,会被清理电脑时重生成。

(2)unionid:微信提供的唯一身份认证。

(3)mac:手机网卡物理地址。

(4)imei(入网许可证序号):安卓系统可取到。

(5)imsi(手机SIM卡序号):安卓系统可取到。

(6)androidid :安卓系统id。

(7)openid (app自己生成的序号) :卸载重装app就会变更。

(8)idfa(广告跟踪码):用户可重置。

(9)deviceid(app日志采集埋点开发人员自己定义一种逻辑id,可能取自 android,imei,openudid等):逻辑上的id。

还有其他不同应用设定标识用户的ID. . . . . . 

设备与登录用户分析

device_id 作为唯一

场景

适用登录率比较低的应用。

缺点不同用户登录一个设备,会识别为一个用户。同一个用户使用不同设备,会识别为多个用户。

一个device_id关联一个user_id

场景

同一个设备登陆前(device_id) 和登录后(user_id) 可以绑定。

缺点一个未被绑定的设备登录前的用户和登录后的用户不同,这个时候会被错误地识别为同一个用户。一个被绑定的设备后续被其他用户在未登录状态下使用,也会被错误地识别为之前被绑定的用户。一个被绑定了的用户使用其他设备时,未登录状态下的数据不会标识为该用户数据。

多个device_id关联一个user_id

场景

只要登录后的 user_id 相同,其多个设备上登录前后的数据都可以连通起来。

缺点

一个 device_id只能绑定到一个用户,当其他用户使用同一个已被绑定的设备时,其登录前数据还是会被识别成已绑定到该设备的用户。

多个应用间的不同ID进行关联

场景

当存在多个应用,实现应用间 ID 映射和数据相通时。比如,通过手机号,邮箱号,微信号等等可以统一为一个 ID。

缺点

复杂性高。

行业内方案

网易ID-Mapping

网易产品线:网易云音乐,邮箱,新闻,严选等等,不同的应用有不同的ID,比如:phone,email,yanxuan_id,music_id 等等

思路与方案结合各种应用账号,各种设备型号之间的关系,以及设备使用规律,比如时间和频次。采用规则过滤 和 数据挖掘,判断账号是否属于同一个人。存在问题和方案用户有多个设备信息:使用一定时间 和 频次才进行关联。设备以后从来不用:设定设备未使用衰减函数。

其他

美团采用手机号、微信、微博、美团账号的登录方式;大众点评采用的手机号、微信、QQ、微博的登录方式;其交集为手机号、微信、微博。最终,对于注册用户账户体系,美团采用了手机号作为用户的唯一标识。

图计算

图计算的核心思想:将数据表达成“点”,点和点之间可以通过某种业务含义建立“边”。然后,我们就可以从点、边上找出各种类型的数据关系。

在GraphX中,图由顶点(Vertices)和边(Edges)组成:

顶点(Vertices):图中的点,代表实体,例如人、商品或事件。边(Edges):连接两个顶点的线,代表实体之间的关系,例如朋友关系、购买行为或网络连接。边的属性(Edge Attributes):边的附加信息,可以是权重、成本或其他相关数据。顶点的属性(Vertex Attributes):顶点的附加信息,可以是标签、状态或其他相关数据。

首先通过一个案例先认识下图计算。

案例:朋友关系的连通性

首先,需要将这些数据转换为Vertex和Edge对象

假设有以下数据:

user_id: A, friend_id: Buser_id: B, friend_id: Cuser_id: C, friend_id: Duser_id: D, friend_id: Euser_id: E, friend_id: Fuser_id: F, friend_id: Guser_id: G, friend_id: Huser_id: H, friend_id: Iuser_id: I, friend_id: Jorg.apache.spark._ org.apache.spark.._val conf = new SparkConf

()

  .setAppName("Graph Example"

)

  .setMaster("local[*]"

)

val sc = new SparkContext

(conf)

// 将原始数据转换为Vertex和Edge对象val vertices: RDD[(VertexIdString)] = sc.parallelize(Seq

(

  (1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"

),

  (6L, "F"), (7L, "G"), (8L, "H"), (9L, "I"), (10L, "J"

)

  )

)

val edges: RDD[Edge[String]] = sc.parallelize(Seq

(

  Edge(1L, 2L,"friend"), Edge(2L, 3L,"friend"), Edge(3L, 4L,"friend"

),

  Edge(4L, 5L,"friend"), Edge(5L, 6L,"friend"), Edge(6L, 7L,"friend"

),

  Edge(7L, 8L,"friend"), Edge(8L, 9L,"friend"

),

  Edge(9L, 10L,"friend"), Edge(10L, 1L,"friend"

)

))

// 创建图val graph: Graph[String,String] = Graph

(vertices, edges)

// triplets同时存储了边属性和对应顶点信息

graph.triplets.foreach(println)

((4,D),(5,E

),friend)

((5,E),(6,F

),friend)

((9,I),(10,J

),friend)

((10,J),(1,A

),friend)

......

// 连通性:可以将每个顶点都关联到连通图里的最小顶点val

 value = graph.connectedComponents()

value.vertices.map(tp => (tp._2, tp._1))

  .groupByKey()

  .collect()

  .foreach(println)

结果:(1,CompactBuffer(81910234567

))

如果修改:Edge(5L, 1L,"friend"Edge(10L, 5L,"friend"

)

val edges: RDD[Edge[String]] = sc.parallelize(Seq

(

  Edge(1L, 2L,"friend"), Edge(2L, 3L,"friend"), Edge(3L, 4L,"friend"

),

  Edge(4L, 5L,"friend"

), 

  Edge(5L, 1L,"friend"), Edge(6L, 7L,"friend"

),

  Edge(7L, 8L,"friend"), Edge(8L, 9L,"friend"

),

  Edge(9L, 10L,"friend"), Edge(10L, 5L,"friend"

)

))

结果:

(1,CompactBuffer(1234

))

(5,CompactBuffer(8910567

))

ID-Mapping 简单实现

val conf = new SparkConf

()

  .setAppName("Graph Example"

)

  .setMaster("local[*]"

)

val sc = new SparkContext

(conf)

// 假设我们有三个数据集val userMappingData = sc.parallelize(Seq

(

  (11L,111L), //  phone,device_id  (22L,222

L)

))

val userInfoData = sc.parallelize(Seq

(

  (11L, 1111L), // phone,open_id  (22L, 2222

L)

))

val userLoginData = sc.parallelize(Seq

(

  (1111L, 11111L, 111111L), // open_id,idfa,idfy  (2222L, 22222L, 222222

L)

))

    // 为每个数据集创建顶点RDD//    val userVertices = userMappingData.flatMap(item =>{//      for (element <- item.productIterator)//        yield (element,element)//    })val phoneVertices = userMappingData.map { case (phone, _) => (phone, "phone"

) }

val deviceVertices = userMappingData.map { case (_, deviceId) => (deviceId, "deviceId"

) }

val userPhoneVertices = userInfoData.map { case (phone,_) => (phone, "phone"

) }

val openidVertices = userInfoData.map { case (_, openId) => (openId, "openId"

) }

val idfaVertices = userLoginData.flatMap { case (openId, idfa, _) => Seq((openId, "openid"), (idfa, "idfa"

)) }

val idfvVertices = userLoginData.flatMap { case (openId, _, idfv) => Seq((openId, "openid"), (idfv, "idfv"

)) }

// 合并所有顶点RDDval

 allVertices = phoneVertices.union(deviceVertices)

                  .union(userPhoneVertices).union(openidVertices)

                  .union(idfaVertices).union(idfvVertices)

// 创建边RDDval mappingEdges = userMappingData.map { case (phone, deviceId) => Edge(phone, deviceId, "maps_to"

) }

val infoEdges = userInfoData.map { case (phone, openid) => Edge(phone, openid, "linked_to"

) }

val loginEdges = userLoginData.flatMap { case

 (openid, idfa, idfv) =>

  Seq(Edge(openid, idfa, "logins_with"), Edge(openid, idfv, "logins_with"

))

}

// 合并所有边RDDval

 allEdges = mappingEdges.union(infoEdges).union(loginEdges)

val graph = Graph

(allVertices, allEdges)

graph.triplets.map(item=> "点 and 边:"

+item).foreach(println)

点 and 边:((22,phone),(222

,deviceId),maps_to)

点 and 边:((11,phone),(111

,deviceId),maps_to)

点 and 边:((11,phone),(1111

,openId),linked_to)

点 and 边:((22,phone),(2222

,openId),linked_to)

点 and 边:((1111,openId),(11111

,idfa),logins_with)

点 and 边:((1111,openId),(111111

,idfv),logins_with)

点 and 边:((2222,openId),(22222

,idfa),logins_with)

点 and 边:((2222,openId),(222222

,idfv),logins_with)

val

 value = graph.connectedComponents()

value.vertices.map(tp => (tp._2, tp._1))

  .groupByKey()

  .collect()

  .foreach(println)

(11,CompactBuffer(11111111111111111111

))

(22,CompactBuffer(22222222222222222222

))

数据连通后,生成统一ID。

说明

真实的数据可能不会都是Long型,需要你特殊处理计算,计算出结果再转换为明文。案例中,数据连通性后,可以生成统一ID。

上面只是简单案例,在最上面分析过会出现不同的情况,更复杂的需要更复杂的逻辑处理。

除了图计算,直接SQL JOIN也即可。

再接再厉,实战才能涨经验,才能更合理地应对业务需求。

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!