Druid 0.17 入门(3) – 数据接入指南
本攻略将详细讲解Druid 0.17的数据接入指南,包括数据源的配置、数据导入和数据查询等内容。
数据源的配置
Druid支持多种数据源,包括Hadoop、Kafka、JDBC等。以下是配置JDBC数据源的步骤:
- 在conf/druid/_common.properties文件中添加以下配置:
properties
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=root
- druid.metadata.storage.type=mysql 表示使用MySQL作为元数据存储。
- druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid 表示连接到MySQL的druid数据库。
- druid.metadata.storage.connector.user=root 表示使用root用户连接MySQL。
-
druid.metadata.storage.connector.password=root 表示使用root用户的密码连接MySQL。
-
在conf/druid/_common.properties文件中添加以下配置:
properties
druid.storage.type=metadata
druid.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.storage.connector.user=root
druid.storage.connector.password=root
- druid.storage.type=metadata 表示使用元数据存储。
- druid.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid 表示连接到MySQL的druid数据库。
- druid.storage.connector.user=root 表示使用root用户连接MySQL。
-
druid.storage.connector.password=root 表示使用root用户的密码连接MySQL。
-
在conf/druid/_common.properties文件中添加以下配置:
properties
druid.indexer.type=realtime
druid.indexer.task.logsDir=/var/druid/tasklogs
druid.indexer.logs.type=file
druid.indexer.logs.directory=/var/druid/indexing-logs
- druid.indexer.type=realtime 表示使用实时索引。
- druid.indexer.task.logsDir=/var/druid/tasklogs 表示任务日志的存储目录。
- druid.indexer.logs.type=file 表示使用文件存储索引日志。
- druid.indexer.logs.directory=/var/druid/indexing-logs 表示索引日志的存储目录。
数据导入
以下是使用JDBC数据源导入数据的步骤:
- 在conf/druid/_common.properties文件中添加以下配置:
properties
druid.sql.enable=true
druid.sql.connection-url=jdbc:mysql://localhost:3306/test
druid.sql.username=root
druid.sql.password=root
- druid.sql.enable=true 表示启用JDBC数据源。
- druid.sql.connection-url=jdbc:mysql://localhost:3306/test 表示连接到MySQL的test数据库。
- druid.sql.username=root 表示使用root用户连接MySQL。
-
druid.sql.password=root 表示使用root用户的密码连接MySQL。
-
在conf/druid/hadoop-hdfs-storage.xml文件中添加以下配置:
xml
<property>
<name>druid.storage.type</name>
<value>hdfs</value>
</property>
<property>
<name>druid.storage.storageDirectory</name>
<value>/druid/segments</value>
</property>
<property>
<name>druid.storage.storageDirectoryLockTimeoutMs</name>
<value>PT1M</value>
</property>
- druid.storage.type=hdfs 表示使用HDFS存储数据。
- druid.storage.storageDirectory=/druid/segments 表示数据存储的目录。
-
druid.storage.storageDirectoryLockTimeoutMs=PT1M 表示目录锁定的超时时间。
-
在conf/druid/hadoop-timeline-emitter.xml文件中添加以下配置:
xml
<property>
<name>druid.emitter</name>
<value>timeline</value>
</property>
<property>
<name>druid.emitter.timeline.server</name>
<value>localhost:8090</value>
</property>
<property>
<name>druid.emitter.timeline.useHttps</name>
<value>false</value>
</property>
- druid.emitter=timeline 表示使用Timeline作为数据的发布者。
- druid.emitter.timeline.server=localhost:8090 表示Timeline服务器的地址。
-
druid.emitter.timeline.useHttps=false 表示不使用HTTPS协议。
-
在conf/druid/realtime.spec文件中添加以下配置:
json
{
"dataSources": [
{
"spec": {
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"host",
"user"
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "minute"
}
},
"ioConfig": {
"type": "jdbc",
"jdbcDriverClass": "com.mysql.jdbc.Driver",
"connectionString": "jdbc:mysql://localhost:3306/test",
"user": "root",
"password": "root",
"sql": "SELECT * FROM test",
"pollPeriod": "PT1M"
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": "100000",
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M",
"basePersistDirectory": "/druid/segments",
"maxPendingPersists": "0",
"buildV9Directly": true
}
}
}
]
}
- dataSchema.dataSource=test 表示数据源的名称为test。
- parser.type=string 表示使用字符串解析器。
- parser.parseSpec.format=json 表示数据格式为JSON。
- parser.parseSpec.timestampSpec.column=timestamp 表示时间戳所在的列。
- parser.parseSpec.dimensionsSpec.dimensions=[“host”, “user”] 表示维度为host和user。
- metricsSpec.type=count 表示使用计数器。
- granularitySpec.segmentGranularity=hour 表示分段的时间粒度为小时。
- granularitySpec.queryGranularity=minute 表示查询的时间粒度为分钟。
- ioConfig.type=jdbc 表示使用JDBC数据源。
- ioConfig.jdbcDriverClass=com.mysql.jdbc.Driver 表示使用MySQL的JDBC驱动。
- ioConfig.connectionString=jdbc:mysql://localhost:3306/test 表示连接到MySQL的test数据库。
- ioConfig.user=root 表示使用root用户连接MySQL。
- ioConfig.password=root 表示使用root用户的密码连接MySQL。
- ioConfig.sql=SELECT * FROM test 表示要导入的数据。
- ioConfig.pollPeriod=PT1M 表示轮询的时间间隔为1分钟。
- tuningConfig.type=realtime 表示使用实时索引。
- tuningConfig.maxRowsInMemory=100000 表示内存中最多存储10万行数据。
- tuningConfig.intermediatePersistPeriod=PT10M 表示中间持久化的时间间隔为10分钟。
- tuningConfig.windowPeriod=PT10M 表示窗口的时间间隔为10分钟。
- tuningConfig.basePersistDirectory=/druid/segments 表示数据存储的目录。
- tuningConfig.maxPendingPersists=0 表示最多同时进行0个持久化操作。
-
tuningConfig.buildV9Directly=true 表示直接构建V9索引。
-
启动实时节点
bash
bin/start-micro-quickstart
数据查询
以下是使用Druid查询数据的步骤:
- 在conf/druid/_common.properties文件中添加以下配置:
properties
druid.query.groupBy.maxLimit=100000
-
druid.query.groupBy.maxLimit=100000 表示最多返回10万行数据。
-
在conf/druid/broker.properties文件中添加以下配置:
properties
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.broker.cache.cacheDirectory=/var/druid/cache
druid.broker.cache.maxSize=1000000000
druid.broker.cache.objectMapper=com.fasterxml.jackson.databind.ObjectMapper
- druid.broker.cache.useCache=true 表示启用查询缓存。
- druid.broker.cache.populateCache=true 表示自动填充查询缓存。
- druid.broker.cache.cacheDirectory=/var/druid/cache 表示查询缓存的存储目录。
- druid.broker.cache.maxSize=1000000000 表示查询缓存的最大大小。
-
druid.broker.cache.objectMapper=com.fasterxml.jackson.databind.ObjectMapper 表示使用Jackson作为对象映射器。
-
在conf/druid/historical.properties文件中添加以下配置:
properties
druid.server.http.numThreads=100
druid.server.http.maxIdleTime=PT5M
druid.server.http.maxRequestHeaderSize=8192
druid.server.http.maxRequestSize=10485760
druid.server.http.maxResponseSize=10485760
druid.server.http.compressionLevel=3
druid.server.http.compressionEnabled=true
druid.server.http.compressionMimeTypes=application/json,application/javascript,text/javascript,text/plain,text/xml,text/css
- druid.server.http.numThreads=100 表示HTTP服务器的线程数为100。
- druid.server.http.maxIdleTime=PT5M 表示HTTP连接的最大空闲时间为5分钟。
- druid.server.http.maxRequestHeaderSize=8192 表示HTTP请求头的最大大小为8KB。
- druid.server.http.maxRequestSize=10485760 表示HTTP请求的最大大小为10MB。
- druid.server.http.maxResponseSize=10485760 表示HTTP响应的最大大小为10MB。
- druid.server.http.compressionLevel=3 表示HTTP响应的压缩级别为3。
- druid.server.http.compressionEnabled=true 表示启用HTTP响应的压缩。
-
druid.server.http.compressionMimeTypes=application/json,application/javascript,text/javascript,text/plain,text/xml,text/css 表示需要压缩的MIME类型。
-
启动查询节点
bash
bin/start-micro-quickstart
- 查询数据
sql
SELECT host, COUNT(*) FROM test WHERE __time >= '2022-01-01T00:00:00.000Z' AND __time < '2022-02-01T00:00:00.000Z' GROUP BY host
- __time 表示时间戳列。
示例说明
以下是两个示例说明:
示例1:使用Druid导入CSV数据
假设需要使用Druid导入CSV数据。以下是实现步骤:
- 准备数据
准备一个CSV文件,例如:
csv
timestamp,host,user
1640995200000,localhost,user1
1640995200000,localhost,user2
1640995200000,localhost,user3
- 在conf/druid/_common.properties文件中添加以下配置:
properties
druid.sql.enable=true
druid.sql.connection-url=jdbc:mysql://localhost:3306/test
druid.sql.username=root
druid.sql.password=root
- 在conf/druid/hadoop-hdfs-storage.xml文件中添加以下配置:
xml
<property>
<name>druid.storage.type</name>
<value>hdfs</value>
</property>
<property>
<name>druid.storage.storageDirectory</name>
<value>/druid/segments</value>
</property>
<property>
<name>druid.storage.storageDirectoryLockTimeoutMs</name>
<value>PT1M</value>
</property>
- 在conf/druid/hadoop-timeline-emitter.xml文件中添加以下配置:
xml
<property>
<name>druid.emitter</name>
<value>timeline</value>
</property>
<property>
<name>druid.emitter.timeline.server</name>
<value>localhost:8090</value>
</property>
<property>
<name>druid.emitter.timeline.useHttps</name>
<value>false</value>
</property>
- 在conf/druid/realtime.spec文件中添加以下配置:
json
{
"dataSources": [
{
"spec": {
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"host",
"user"
]
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "hour",
"queryGranularity": "minute"
}
},
"ioConfig": {
"type": "hadoop",
"inputFormat": "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"inputPaths": "/path/to/csv",
"badRecordsPath": "/path/to/bad",
"ignoreBadRows": true,
"badRecordTracker": {
"type": "log",
"config": {}
},
"inputColumns": [
"timestamp",
"host",
"user"
],
"inputTimestampSpec": {
"column": "timestamp",
"format": "auto"
},
"inputFormatConfig": {
"columns": "timestamp,host,user"
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": "100000",
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M",
"basePersistDirectory": "/druid/segments",
"maxPendingPersists": "0",
"buildV9Directly": true
}
}
}
]
}
- 启动实时节点
bash
bin/start-micro-quickstart
示例2:使用Druid查询数据
假设需要使用Druid查询数据。以下是实现步骤:
- 在conf/druid/_common.properties文件中添加以下配置:
properties
druid.query.groupBy.maxLimit=100000
- 在conf/druid/broker.properties文件中添加以下配置:
properties
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.broker.cache.cacheDirectory=/var/druid/cache
druid.broker.cache.maxSize=1000000000
druid.broker.cache.objectMapper=com.fasterxml.jackson.databind.ObjectMapper
- 在conf/druid/historical.properties文件中添加以下配置:
properties
druid.server.http.numThreads=100
druid.server.http.maxIdleTime=PT5M
druid.server.http.maxRequestHeaderSize=8192
druid.server.http.maxRequestSize=10485760
druid.server.http.maxResponseSize=10485760
druid.server.http.compressionLevel=3
druid.server.http.compressionEnabled=true
druid.server.http.compressionMimeTypes=application/json,application/javascript,text/javascript,text/plain,text/xml,text/css
- 启动查询节点
bash
bin/start-micro-quickstart
- 查询数据
sql
SELECT host, COUNT(*) FROM test WHERE __time >= '2022-01-01T00:00:00.000Z' AND __time < '2022-02-01T00:00:00.000Z' GROUP BY host
通过以上示例说明,我们可以看到Druid是一个非常强大的数据处理和分析工具,可以帮助我们快速地处理和分析大量的数据。