머신러닝을 활용한 빅데이터 분석 #3

Apachi nifi에서 취득한 json 데이터를 kafka broker를 이용해 메세징 처리합니다.
이후, druid indexing처리를 이용하여 분석을 위한 데이터셋를 작성합니다.

전체 분석 flow

#1 Kafka topic 만들기

이하 명령을 실행하여 suicide4 라는 카프카 항목을 만들고 여기에 데이터를 보내십시오
## list kafka topic
bin/kafka-topics.sh –list –zookeeper localhost

## delete kafka topic
bin/kafka-topics.sh –delete –zookeeper localhost –topic suicides2

## 위의 명령으로 삭제 되지 않을 경우, zookeeper shell을 이용하여 삭제
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[suicides3, suicides4, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 3] rmr /brokers/topics/suicides3
[zk: localhost:2181(CONNECTED) 4] rmr /brokers/topics/suicides4
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[zk: localhost:2181(CONNECTED) 6]
## create kafka topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic suicides4

#2 Druid Kafka ingestion

Druid의 Kafka 인덱싱 서비스를 사용하여 #1에서 작성한 suicides4에서 메시지를 수집합니다.
vi sucides4-kafka-supervisor.json
  “type”: “kafka”,
  “dataSchema”: {
    “dataSource”: “suicides4”,
    “parser”: {
      “type”: “string”,
      “parseSpec”: {
        “format”: “json”,
        “timestampSpec”: {
          “column”: “time”,
          “format”: “auto”
        “dimensionsSpec”: {     
        “timestampSpec”: {
          “column”: “time”,
          “format”: “auto”
          “dimensions”: [“time”,”country”,”year”,”sex”,”agegroup”,”count_of_suicides”,“population”,
    “metricsSpec” : [],
    “granularitySpec”: {
      “type”: “uniform”,
      “segmentGranularity”: “DAY”,
      “queryGranularity”: “NONE”,
      “rollup”: false
  “tuningConfig”: {
    “type”: “kafka”,
    “reportParseExceptions”: false
  “ioConfig”: {
    “topic”: “suicides4”,
    “replicas”: 2,
    “taskDuration”: “PT10M”,
    “completionTimeout”: “PT20M”,
    “consumerProperties”: {
      “bootstrap.servers”: “localhost:9092”
## Enable Druid Kafka ingestion
curl -XPOST -H’Content-Type: application/json’ -d @/home/min/work/sucides4-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor

#3 Result

kafka consumer 확인
./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic suicides4


