如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory / * * * Created by silentwolf on 2016 / 6 / 3. * / case class UserTag(SUUID: String, MAN: Float , WOMAN: Float , AGE10_19: Float , AGE20_29: Float , AGE30_39: Float , AGE40_49: Float , AGE50_59: Float , GAME: Float , MOVIE: Float , MUSIC: Float , ART: Float , POLITICS_NEWS: Float , FINANCIAL: Float , EDUCATION_TRAINING: Float , HEALTH_CARE: Float , TRAVEL: Float , AUTOMOBILE: Float , HOUSE_PROPERTY: Float , CLOTHING_ACCESSORIES: Float , BEAUTY: Float , IT: Float , BABY_PRODUCT: Float , FOOD_SERVICE: Float , HOME_FURNISHING: Float , SPORTS: Float , OUTDOOR_ACTIVITIES: Float , MEDICINE: Float ) object UserTagTable { val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass) val REP_HOME = s "${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}" def main(args: Array[String]) { var startTime = System.currentTimeMillis() val conf: com.typesafe.config.Config = ConfigFactory.load() val sc = new SparkContext() val sqlContext = new SQLContext(sc) var df1: DataFrame = null if (args.length = = 0 ) { println( "请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11" ) } else { var appkey = args( 0 ) var lastdate = args( 1 ) df1 = loadDataFrame(sqlContext, appkey, "2016-04-10" , lastdate) df1.registerTempTable( "suuidTable" ) sqlContext.udf.register( "taginfo" , (a: String) = > userTagInfo(a)) sqlContext.udf.register( "intToString" , (b: Long ) = > intToString(b)) import sqlContext.implicits._ / / * * * 重点 * * * :将临时表中的suuid和自定函数中Json数据,放入UserTag中。 sqlContext.sql( " select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid" ). map { case Row(suuid: String, taginfo: String) = > val taginfoObj = JSON.parseObject(taginfo) UserTag(suuid.toString, taginfoObj.getFloat( "man" ), taginfoObj.getFloat( "woman" ), taginfoObj.getFloat( "age10_19" ), taginfoObj.getFloat( "age20_29" ), taginfoObj.getFloat( "age30_39" ), taginfoObj.getFloat( "age40_49" ), taginfoObj.getFloat( "age50_59" ), taginfoObj.getFloat( "game" ), taginfoObj.getFloat( "movie" ), taginfoObj.getFloat( "music" ), taginfoObj.getFloat( "art" ), taginfoObj.getFloat( "politics_news" ), taginfoObj.getFloat( "financial" ), taginfoObj.getFloat( "education_training" ), taginfoObj.getFloat( "health_care" ), taginfoObj.getFloat( "travel" ), taginfoObj.getFloat( "automobile" ), taginfoObj.getFloat( "house_property" ), taginfoObj.getFloat( "clothing_accessories" ), taginfoObj.getFloat( "beauty" ), taginfoObj.getFloat( "IT" ), taginfoObj.getFloat( "baby_Product" ), taginfoObj.getFloat( "food_service" ), taginfoObj.getFloat( "home_furnishing" ), taginfoObj.getFloat( "sports" ), taginfoObj.getFloat( "outdoor_activities" ), taginfoObj.getFloat( "medicine" ) )}.toDF().registerTempTable( "resultTable" ) val resultDF = sqlContext.sql(s "select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," + "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," + "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," + "MEDICINE from resultTable WHERE SUUID IS NOT NULL" ) resultDF.write.mode(SaveMode.Overwrite).options( Map ( "table" - > "USER_TAGS" , "zkUrl" - > conf.getString( "Hbase.url" )) ). format ( "org.apache.phoenix.spark" ).save() } } def intToString(suuid: Long ): String = { suuid.toString() } def userTagInfo(num1: String): String = { var de = new DecimalFormat( "0.00" ) var mannum = de. format (math.random).toFloat var man = mannum var woman = de. format ( 1 - mannum).toFloat var age10_19num = de. format (math.random * 0.2 ).toFloat var age20_29num = de. format (math.random * 0.2 ).toFloat var age30_39num = de. format (math.random * 0.2 ).toFloat var age40_49num = de. format (math.random * 0.2 ).toFloat var age10_19 = age10_19num var age20_29 = age20_29num var age30_39 = age30_39num var age40_49 = age40_49num var age50_59 = de. format ( 1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat var game = de. format (math.random * 1 ).toFloat var movie = de. format (math.random * 1 ).toFloat var music = de. format (math.random * 1 ).toFloat var art = de. format (math.random * 1 ).toFloat var politics_news = de. format (math.random * 1 ).toFloat var financial = de. format (math.random * 1 ).toFloat var education_training = de. format (math.random * 1 ).toFloat var health_care = de. format (math.random * 1 ).toFloat var travel = de. format (math.random * 1 ).toFloat var automobile = de. format (math.random * 1 ).toFloat var house_property = de. format (math.random * 1 ).toFloat var clothing_accessories = de. format (math.random * 1 ).toFloat var beauty = de. format (math.random * 1 ).toFloat var IT = de. format (math.random * 1 ).toFloat var baby_Product = de. format (math.random * 1 ).toFloat var food_service = de. format (math.random * 1 ).toFloat var home_furnishing = de. format (math.random * 1 ).toFloat var sports = de. format (math.random * 1 ).toFloat var outdoor_activities = de. format (math.random * 1 ).toFloat var medicine = de. format (math.random * 1 ).toFloat "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," + "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," + "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," + "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," + "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," + "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," + "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine + "}" ; } def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = { val path = s "$REP_HOME/appstatistic" ctx.read.parquet(path) . filter (s "timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'" ) } } |
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/silentwolfyh/article/details/51966952