druid0.17入门(3)——数据接入指南

  • Post category:other

Druid 0.17 入门(3) – 数据接入指南

本攻略将详细讲解Druid 0.17的数据接入指南,包括数据源的配置、数据导入和数据查询等内容。

数据源的配置

Druid支持多种数据源,包括Hadoop、Kafka、JDBC等。以下是配置JDBC数据源的步骤:

  1. 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=root

  • druid.metadata.storage.type=mysql 表示使用MySQL作为元数据存储。
  • druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid 表示连接到MySQL的druid数据库。
  • druid.metadata.storage.connector.user=root 表示使用root用户连接MySQL。
  • druid.metadata.storage.connector.password=root 表示使用root用户的密码连接MySQL。

  • 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.storage.type=metadata
druid.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.storage.connector.user=root
druid.storage.connector.password=root

  • druid.storage.type=metadata 表示使用元数据存储。
  • druid.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid 表示连接到MySQL的druid数据库。
  • druid.storage.connector.user=root 表示使用root用户连接MySQL。
  • druid.storage.connector.password=root 表示使用root用户的密码连接MySQL。

  • 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.indexer.type=realtime
druid.indexer.task.logsDir=/var/druid/tasklogs
druid.indexer.logs.type=file
druid.indexer.logs.directory=/var/druid/indexing-logs

  • druid.indexer.type=realtime 表示使用实时索引。
  • druid.indexer.task.logsDir=/var/druid/tasklogs 表示任务日志的存储目录。
  • druid.indexer.logs.type=file 表示使用文件存储索引日志。
  • druid.indexer.logs.directory=/var/druid/indexing-logs 表示索引日志的存储目录。

数据导入

以下是使用JDBC数据源导入数据的步骤:

  1. 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.sql.enable=true
druid.sql.connection-url=jdbc:mysql://localhost:3306/test
druid.sql.username=root
druid.sql.password=root

  • druid.sql.enable=true 表示启用JDBC数据源。
  • druid.sql.connection-url=jdbc:mysql://localhost:3306/test 表示连接到MySQL的test数据库。
  • druid.sql.username=root 表示使用root用户连接MySQL。
  • druid.sql.password=root 表示使用root用户的密码连接MySQL。

  • 在conf/druid/hadoop-hdfs-storage.xml文件中添加以下配置:

xml
<property>
<name>druid.storage.type</name>
<value>hdfs</value>
</property>
<property>
<name>druid.storage.storageDirectory</name>
<value>/druid/segments</value>
</property>
<property>
<name>druid.storage.storageDirectoryLockTimeoutMs</name>
<value>PT1M</value>
</property>

  • druid.storage.type=hdfs 表示使用HDFS存储数据。
  • druid.storage.storageDirectory=/druid/segments 表示数据存储的目录。
  • druid.storage.storageDirectoryLockTimeoutMs=PT1M 表示目录锁定的超时时间。

  • 在conf/druid/hadoop-timeline-emitter.xml文件中添加以下配置:

xml
<property>
<name>druid.emitter</name>
<value>timeline</value>
</property>
<property>
<name>druid.emitter.timeline.server</name>
<value>localhost:8090</value>
</property>
<property>
<name>druid.emitter.timeline.useHttps</name>
<value>false</value>
</property>

  • druid.emitter=timeline 表示使用Timeline作为数据的发布者。
  • druid.emitter.timeline.server=localhost:8090 表示Timeline服务器的地址。
  • druid.emitter.timeline.useHttps=false 表示不使用HTTPS协议。

  • 在conf/druid/realtime.spec文件中添加以下配置:

json
{
"dataSources": [
{
"spec": {
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"host",
"user"
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "minute"
}
},
"ioConfig": {
"type": "jdbc",
"jdbcDriverClass": "com.mysql.jdbc.Driver",
"connectionString": "jdbc:mysql://localhost:3306/test",
"user": "root",
"password": "root",
"sql": "SELECT * FROM test",
"pollPeriod": "PT1M"
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": "100000",
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M",
"basePersistDirectory": "/druid/segments",
"maxPendingPersists": "0",
"buildV9Directly": true
}
}
}
]
}

  • dataSchema.dataSource=test 表示数据源的名称为test。
  • parser.type=string 表示使用字符串解析器。
  • parser.parseSpec.format=json 表示数据格式为JSON。
  • parser.parseSpec.timestampSpec.column=timestamp 表示时间戳所在的列。
  • parser.parseSpec.dimensionsSpec.dimensions=[“host”, “user”] 表示维度为host和user。
  • metricsSpec.type=count 表示使用计数器。
  • granularitySpec.segmentGranularity=hour 表示分段的时间粒度为小时。
  • granularitySpec.queryGranularity=minute 表示查询的时间粒度为分钟。
  • ioConfig.type=jdbc 表示使用JDBC数据源。
  • ioConfig.jdbcDriverClass=com.mysql.jdbc.Driver 表示使用MySQL的JDBC驱动。
  • ioConfig.connectionString=jdbc:mysql://localhost:3306/test 表示连接到MySQL的test数据库。
  • ioConfig.user=root 表示使用root用户连接MySQL。
  • ioConfig.password=root 表示使用root用户的密码连接MySQL。
  • ioConfig.sql=SELECT * FROM test 表示要导入的数据。
  • ioConfig.pollPeriod=PT1M 表示轮询的时间间隔为1分钟。
  • tuningConfig.type=realtime 表示使用实时索引。
  • tuningConfig.maxRowsInMemory=100000 表示内存中最多存储10万行数据。
  • tuningConfig.intermediatePersistPeriod=PT10M 表示中间持久化的时间间隔为10分钟。
  • tuningConfig.windowPeriod=PT10M 表示窗口的时间间隔为10分钟。
  • tuningConfig.basePersistDirectory=/druid/segments 表示数据存储的目录。
  • tuningConfig.maxPendingPersists=0 表示最多同时进行0个持久化操作。
  • tuningConfig.buildV9Directly=true 表示直接构建V9索引。

  • 启动实时节点

bash
bin/start-micro-quickstart

数据查询

以下是使用Druid查询数据的步骤:

  1. 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.query.groupBy.maxLimit=100000

  • druid.query.groupBy.maxLimit=100000 表示最多返回10万行数据。

  • 在conf/druid/broker.properties文件中添加以下配置:

properties
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.broker.cache.cacheDirectory=/var/druid/cache
druid.broker.cache.maxSize=1000000000
druid.broker.cache.objectMapper=com.fasterxml.jackson.databind.ObjectMapper

  • druid.broker.cache.useCache=true 表示启用查询缓存。
  • druid.broker.cache.populateCache=true 表示自动填充查询缓存。
  • druid.broker.cache.cacheDirectory=/var/druid/cache 表示查询缓存的存储目录。
  • druid.broker.cache.maxSize=1000000000 表示查询缓存的最大大小。
  • druid.broker.cache.objectMapper=com.fasterxml.jackson.databind.ObjectMapper 表示使用Jackson作为对象映射器。

  • 在conf/druid/historical.properties文件中添加以下配置:

properties
druid.server.http.numThreads=100
druid.server.http.maxIdleTime=PT5M
druid.server.http.maxRequestHeaderSize=8192
druid.server.http.maxRequestSize=10485760
druid.server.http.maxResponseSize=10485760
druid.server.http.compressionLevel=3
druid.server.http.compressionEnabled=true
druid.server.http.compressionMimeTypes=application/json,application/javascript,text/javascript,text/plain,text/xml,text/css

  • druid.server.http.numThreads=100 表示HTTP服务器的线程数为100。
  • druid.server.http.maxIdleTime=PT5M 表示HTTP连接的最大空闲时间为5分钟。
  • druid.server.http.maxRequestHeaderSize=8192 表示HTTP请求头的最大大小为8KB。
  • druid.server.http.maxRequestSize=10485760 表示HTTP请求的最大大小为10MB。
  • druid.server.http.maxResponseSize=10485760 表示HTTP响应的最大大小为10MB。
  • druid.server.http.compressionLevel=3 表示HTTP响应的压缩级别为3。
  • druid.server.http.compressionEnabled=true 表示启用HTTP响应的压缩。
  • druid.server.http.compressionMimeTypes=application/json,application/javascript,text/javascript,text/plain,text/xml,text/css 表示需要压缩的MIME类型。

  • 启动查询节点

bash
bin/start-micro-quickstart

  1. 查询数据

sql
SELECT host, COUNT(*) FROM test WHERE __time >= '2022-01-01T00:00:00.000Z' AND __time < '2022-02-01T00:00:00.000Z' GROUP BY host

  • __time 表示时间戳列。

示例说明

以下是两个示例说明:

示例1:使用Druid导入CSV数据

假设需要使用Druid导入CSV数据。以下是实现步骤:

  1. 准备数据

准备一个CSV文件,例如:

csv
timestamp,host,user
1640995200000,localhost,user1
1640995200000,localhost,user2
1640995200000,localhost,user3

  1. 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.sql.enable=true
druid.sql.connection-url=jdbc:mysql://localhost:3306/test
druid.sql.username=root
druid.sql.password=root

  1. 在conf/druid/hadoop-hdfs-storage.xml文件中添加以下配置:

xml
<property>
<name>druid.storage.type</name>
<value>hdfs</value>
</property>
<property>
<name>druid.storage.storageDirectory</name>
<value>/druid/segments</value>
</property>
<property>
<name>druid.storage.storageDirectoryLockTimeoutMs</name>
<value>PT1M</value>
</property>

  1. 在conf/druid/hadoop-timeline-emitter.xml文件中添加以下配置:

xml
<property>
<name>druid.emitter</name>
<value>timeline</value>
</property>
<property>
<name>druid.emitter.timeline.server</name>
<value>localhost:8090</value>
</property>
<property>
<name>druid.emitter.timeline.useHttps</name>
<value>false</value>
</property>

  1. 在conf/druid/realtime.spec文件中添加以下配置:

json
{
"dataSources": [
{
"spec": {
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"host",
"user"
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "minute"
}
},
"ioConfig": {
"type": "hadoop",
"inputFormat": "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"inputPaths": "/path/to/csv",
"badRecordsPath": "/path/to/bad",
"ignoreBadRows": true,
"badRecordTracker": {
"type": "log",
"config": {}
},
"inputColumns": [
"timestamp",
"host",
"user"
],
"inputTimestampSpec": {
"column": "timestamp",
"format": "auto"
},
"inputFormatConfig": {
"columns": "timestamp,host,user"
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": "100000",
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M",
"basePersistDirectory": "/druid/segments",
"maxPendingPersists": "0",
"buildV9Directly": true
}
}
}
]
}

  1. 启动实时节点

bash
bin/start-micro-quickstart

示例2:使用Druid查询数据

假设需要使用Druid查询数据。以下是实现步骤:

  1. 在conf/druid/_common.properties文件中添加以下配置:

properties
druid.query.groupBy.maxLimit=100000

  1. 在conf/druid/broker.properties文件中添加以下配置:

properties
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.broker.cache.cacheDirectory=/var/druid/cache
druid.broker.cache.maxSize=1000000000
druid.broker.cache.objectMapper=com.fasterxml.jackson.databind.ObjectMapper

  1. 在conf/druid/historical.properties文件中添加以下配置:

properties
druid.server.http.numThreads=100
druid.server.http.maxIdleTime=PT5M
druid.server.http.maxRequestHeaderSize=8192
druid.server.http.maxRequestSize=10485760
druid.server.http.maxResponseSize=10485760
druid.server.http.compressionLevel=3
druid.server.http.compressionEnabled=true
druid.server.http.compressionMimeTypes=application/json,application/javascript,text/javascript,text/plain,text/xml,text/css

  1. 启动查询节点

bash
bin/start-micro-quickstart

  1. 查询数据

sql
SELECT host, COUNT(*) FROM test WHERE __time >= '2022-01-01T00:00:00.000Z' AND __time < '2022-02-01T00:00:00.000Z' GROUP BY host

通过以上示例说明,我们可以看到Druid是一个非常强大的数据处理和分析工具,可以帮助我们快速地处理和分析大量的数据。