Home scala 코드예제
Post
Cancel

scala 코드예제

nscala를 사용해서 지난 시간 데이터 가져오기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.spark.sql.DataFrame
import com.github.nscala_time.time.Imports._
import org.apache.spark.sql.functions._

def getDataFrameForInterval(interval:Int) : DataFrame = {
  var unionDf : DataFrame = null
  val now: DateTime = DateTime.now.withZone(DateTimeZone.UTC)
  (1 to interval)
  .map(hour => now.minusHours(hour))
  .foreach(date => {
    try {
      val targetPath = "your path"
      val targetTime = date.toString("yyyy/MM/dd/HH")
      val newDf = getDataFrame(targetPath,targetTime)
      if (unionDf == null) unionDf = newDf
      else unionDf = unionDf.union(newDf)
    }
    catch {
      case ex : Exception => {
        println(ex)
      }
    }
  })
  unionDf
}

start date에서 end date 까지 데이터 가져오기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.joda.time.format.DateTimeFormat
import org.apache.spark.sql.DataFrame
import org.joda.time.Hours

dbutils.widgets.removeAll()

dbutils.widgets.text("start", "", "start(yyyy/MM/dd/HH)")
dbutils.widgets.text("end", "", "end(yyyy/MM/dd/HH)")
dbutils.widgets.text("logtype", "", "logType(lowercase)")

// dbutils.widgets.help("text")
val formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH")
val startDate = formatter.parseDateTime(dbutils.widgets.get("start"))
val endDate = formatter.parseDateTime(dbutils.widgets.get("end"))
val logType = dbutils.widgets.get("logtype")

var curDate = startDate
var unionDf : DataFrame = null

val hoursCount = Hours.hoursBetween(startDate, endDate).getHours()+1
(0 until hoursCount).map(startDate.plusHours(_)).foreach{date => 
  println(date)
  try {
    val targetPath = "your path"
    val targetTime = date.toString("yyyy/MM/dd/HH")
    var newDf : DataFrame = getDataFrame(targetPath,targetTime)
    if (logType != "") {
      newDf = newDf.filter(newDf("rawLogType") === logType)  
    }
    if (unionDf == null) unionDf = newDf
    else unionDf = unionDf.union(newDf)
  }
  catch {
    case ex : Exception => {
      println(ex)
    }
  }
}

This post is licensed under CC BY 4.0 by the author.

Trending Tags