nscala를 사용해서 지난 시간 데이터 가져오기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.spark.sql.DataFrame
import com.github.nscala_time.time.Imports._
import org.apache.spark.sql.functions._
def getDataFrameForInterval(interval:Int) : DataFrame = {
var unionDf : DataFrame = null
val now: DateTime = DateTime.now.withZone(DateTimeZone.UTC)
(1 to interval)
.map(hour => now.minusHours(hour))
.foreach(date => {
try {
val targetPath = "your path"
val targetTime = date.toString("yyyy/MM/dd/HH")
val newDf = getDataFrame(targetPath,targetTime)
if (unionDf == null) unionDf = newDf
else unionDf = unionDf.union(newDf)
}
catch {
case ex : Exception => {
println(ex)
}
}
})
unionDf
}
start date에서 end date 까지 데이터 가져오기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.joda.time.format.DateTimeFormat
import org.apache.spark.sql.DataFrame
import org.joda.time.Hours
dbutils.widgets.removeAll()
dbutils.widgets.text("start", "", "start(yyyy/MM/dd/HH)")
dbutils.widgets.text("end", "", "end(yyyy/MM/dd/HH)")
dbutils.widgets.text("logtype", "", "logType(lowercase)")
// dbutils.widgets.help("text")
val formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH")
val startDate = formatter.parseDateTime(dbutils.widgets.get("start"))
val endDate = formatter.parseDateTime(dbutils.widgets.get("end"))
val logType = dbutils.widgets.get("logtype")
var curDate = startDate
var unionDf : DataFrame = null
val hoursCount = Hours.hoursBetween(startDate, endDate).getHours()+1
(0 until hoursCount).map(startDate.plusHours(_)).foreach{date =>
println(date)
try {
val targetPath = "your path"
val targetTime = date.toString("yyyy/MM/dd/HH")
var newDf : DataFrame = getDataFrame(targetPath,targetTime)
if (logType != "") {
newDf = newDf.filter(newDf("rawLogType") === logType)
}
if (unionDf == null) unionDf = newDf
else unionDf = unionDf.union(newDf)
}
catch {
case ex : Exception => {
println(ex)
}
}
}