開心一刻 昨天發(fā)了一條朋友圈:酒吧有什么好去的,上個月在酒吧當服務員兼職,一位大姐看上了我,說一個月給我 10 萬,要我陪她去上海,我沒同意 朋友評論道:你沒同意,為什么在上海? 我回復到:上個月沒同意 前情回顧 關于 DataX,官網(wǎng)有很詳細的介紹,鄙人不才,也寫過幾篇文章 異構數(shù)據(jù)源同步之數(shù)據(jù)同
昨天發(fā)了一條朋友圈:酒吧有什么好去的,上個月在酒吧當服務員兼職,一位大姐看上了我,說一個月給我 10 萬,要我陪她去上海,我沒同意
朋友評論道:你沒同意,為什么在上海?
我回復到:上個月沒同意
關于 DataX ,官網(wǎng)有很詳細的介紹,鄙人不才,也寫過幾篇文章
異構數(shù)據(jù)源同步之數(shù)據(jù)同步 → datax 改造,有點意思
異構數(shù)據(jù)源同步之數(shù)據(jù)同步 → datax 再改造,開始觸及源碼
異構數(shù)據(jù)源同步之數(shù)據(jù)同步 → DataX 使用細節(jié)
異構數(shù)據(jù)源數(shù)據(jù)同步 → 從源碼分析 DataX 敏感信息的加解密
不了解的小伙伴可以按需去查看,所以了,
DataX
就不做過多介紹了;官方提供了非常多的插件,囊括了絕大部分的數(shù)據(jù)源,基本可以滿足我們?nèi)粘P枰珨?shù)據(jù)源種類太多,DataX 插件不可能包含全部,比如
kafka
,DataX 官方是沒有提供讀寫插件的,大家知道為什么嗎?你們?nèi)绻麑?shù)據(jù)同步了解的比較多的話,一看到 kafka,第一反應往往想到的是
實時同步
,而 DataX 針對的是
離線同步
,所以 DataX 官方?jīng)]提供 kafka 插件是不是也就能理解了?因為不合適嘛!
但如果客戶非要離線同步也支持 kafka
你能怎么辦?直接懟過去:實現(xiàn)不了?
所以沒得選,那就只能給 DataX 開發(fā)一套 kafka 插件了;基于 DataX插件開發(fā)寶典 ,插件開發(fā)起來還是非常簡單的
編程接口
自定義
Kafkawriter
繼承 DataX 的
Writer
,實現(xiàn) job、task 對應的接口即可
/**
* @author 青石路
*/
public class KafkaWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration conf = null;
@Override
public List split(int mandatoryNumber) {
List configurations = new ArrayList(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(this.conf.clone());
}
return configurations;
}
private void validateParameter() {
this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
}
@Override
public void init() {
this.conf = super.getPluginJobConf();
this.validateParameter();
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger logger = LoggerFactory.getLogger(Task.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Producer producer;
private Configuration conf;
private Properties props;
private String fieldDelimiter;
private List columns;
private String writeType;
@Override
public void init() {
this.conf = super.getPluginJobConf();
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
columns = conf.getList(Key.COLUMN, String.class);
writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);
if (CollUtil.isEmpty(columns)) {
throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有誤,[%s]是必填參數(shù),不允許為空或者留白 .", Key.COLUMN));
}
props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
//這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數(shù)據(jù)。這是最強的保證。
props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));
props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));
props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
Configuration saslConf = conf.getConfiguration(Key.SASL);
if (ObjUtil.isNotNull(saslConf)) {
logger.info("配置啟用了SASL認證");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));
props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));
String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);
String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
}
producer = new KafkaProducer(props);
}
@Override
public void prepare() {
if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
try {
if (!topicsResult.names().get().contains(topic)) {
new NewTopic(
topic,
Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
);
List newTopics = new ArrayList();
AdminClient.create(props).createTopics(newTopics);
}
} catch (Exception e) {
throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
}
}
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
logger.info("start to writer kafka");
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {//說明還在讀取數(shù)據(jù),或者讀取的數(shù)據(jù)沒處理完
//獲取一行數(shù)據(jù),按照指定分隔符 拼成字符串 發(fā)送出去
if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {
producer.send(new ProducerRecord(this.conf.getString(Key.TOPIC),
recordToString(record),
recordToString(record))
);
} else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {
producer.send(new ProducerRecord(this.conf.getString(Key.TOPIC),
recordToString(record),
recordToKafkaJson(record))
);
}
producer.flush();
}
}
@Override
public void destroy() {
logger.info("producer close");
if (producer != null) {
producer.close();
}
}
/**
* 數(shù)據(jù)格式化
*
* @param record
* @return
*/
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append(fieldDelimiter);
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
private String recordToKafkaJson(Record record) {
int recordLength = record.getColumnNumber();
if (recordLength != columns.size()) {
throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,
String.format("您提供配置文件有誤,列數(shù)不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));
}
List kafkaColumns = new ArrayList<>();
for (int i = 0; i < recordLength; i++) {
KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));
kafkaColumns.add(column);
}
return JSONUtil.toJsonStr(kafkaColumns);
}
}
}
DataX 框架按照如下的順序執(zhí)行 Job 和 Task 的接口
重點看 Task 的接口實現(xiàn)
init:讀取配置項,然后創(chuàng)建 Producer 實例
prepare:判斷 Topic 是否存在,不存在則創(chuàng)建
startWrite:通過 RecordReceiver 從 Channel 獲取 Record,然后寫入 Topic
支持兩種寫入格式:
text
、
json
,細節(jié)請看下文中的
kafkawriter.md
destroy:關閉 Producer 實例
實現(xiàn)不難,相信大家都能看懂
插件定義
在
resources
下新增
plugin.json
{
"name": "kafkawriter",
"class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter",
"description": "write data to kafka",
"developer": "qsl"
}
強調(diào)下
class
,是
KafkaWriter
的全限定類名,如果你們沒有完全拷貝我的,那么要改成你們自己的
配置文件
在
resources
下新增
plugin_job_template.json
{
"name": "kafkawriter",
"parameter": {
"bootstrapServers": "",
"topic": "",
"ack": "all",
"batchSize": 1000,
"retries": 0,
"fieldDelimiter": ",",
"writeType": "json",
"column": [
"const_id",
"const_field",
"const_field_value"
],
"sasl": {
"securityProtocol": "SASL_PLAINTEXT",
"mechanism": "PLAIN",
"username": "",
"password": ""
}
}
}
配置項說明: kafkawriter.md
打包發(fā)布
可以參考官方的
assembly
配置,利用 assembly 來打包
至此,
kafkawriter
就算完成了
編程接口
自定義
Kafkareader
繼承 DataX 的
Reader
,實現(xiàn) job、task 對應的接口即可
/**
* @author 青石路
*/
public class KafkaReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig = null;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.validateParameter();
}
@Override
public void destroy() {
}
@Override
public List split(int adviceNumber) {
List configurations = new ArrayList<>(adviceNumber);
for (int i=0; i consumer;
private String topic;
private Configuration conf;
private int maxPollRecords;
private String fieldDelimiter;
private String readType;
private List columnTypes;
@Override
public void destroy() {
logger.info("consumer close");
if (Objects.nonNull(consumer)) {
consumer.close();
}
}
@Override
public void init() {
this.conf = super.getPluginJobConf();
this.topic = conf.getString(Key.TOPIC);
this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
if (!ReadType.JSON.name().equalsIgnoreCase(readType)
&& !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有誤,不支持的readType[%s]", readType));
}
if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
List columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
if (CollUtil.isEmpty(columnTypeList)) {
throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
String.format("您提供配置文件有誤,readType是JSON時[%s]是必填參數(shù),不允許為空或者留白 .", Key.COLUMN_TYPE));
}
convertColumnType(columnTypeList);
}
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
Configuration saslConf = conf.getConfiguration(Key.SASL);
if (ObjUtil.isNotNull(saslConf)) {
logger.info("配置啟用了SASL認證");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
}
consumer = new KafkaConsumer<>(props);
}
@Override
public void startRead(RecordSender recordSender) {
consumer.subscribe(CollUtil.newArrayList(topic));
int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
int retries = conf.getInt(Key.RETRIES, 5);
if (retries < 0) {
logger.info("joinGroupSuccessRetries 配置有誤[{}], 重置成默認值[5]", retries);
retries = 5;
}
/**
* consumer 每次都是新創(chuàng)建,第一次poll時會重新加入消費者組,加入過程會進行Rebalance,而 Rebalance 會導致同一 Group 內(nèi)的所有消費者都不能工作
* 所以 poll 拉取的過程中,即使topic中有數(shù)據(jù)也不一定能拉到,因為 consumer 正在加入消費者組中
* kafka-clients 沒有對應的API、事件機制來知道 consumer 成功加入消費者組的確切時間
* 故增加重試
*/
ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
int i = 0;
if (CollUtil.isEmpty(records)) {
for (; i < retries; i++) {
records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
logger.info("第 {} 次重試,獲取消息記錄數(shù)[{}]", i + 1, records.count());
if (!CollUtil.isEmpty(records)) {
break;
}
}
}
if (i >= retries) {
logger.info("重試 {} 次后,仍未獲取到消息,請確認是否有數(shù)據(jù)、配置是否正確", retries);
return;
}
transferRecord(recordSender, records);
do {
records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
transferRecord(recordSender, records);
} while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
}
private void transferRecord(RecordSender recordSender, ConsumerRecords records) {
if (CollUtil.isEmpty(records)) {
return;
}
for (ConsumerRecord record : records) {
Record sendRecord = recordSender.createRecord();
String msgValue = record.value();
if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
transportJsonToRecord(sendRecord, msgValue);
} else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
// readType = text,全當字符串類型處理
String[] columnValues = msgValue.split(fieldDelimiter);
for (String columnValue : columnValues) {
sendRecord.addColumn(new StringColumn(columnValue));
}
}
recordSender.sendToWriter(sendRecord);
}
consumer.commitAsync();
}
private void convertColumnType(List columnTypeList) {
columnTypes = new ArrayList<>();
for (String columnType : columnTypeList) {
switch (columnType.toUpperCase()) {
case "STRING":
columnTypes.add(Column.Type.STRING);
break;
case "LONG":
columnTypes.add(Column.Type.LONG);
break;
case "DOUBLE":
columnTypes.add(Column.Type.DOUBLE);
case "DATE":
columnTypes.add(Column.Type.DATE);
break;
case "BOOLEAN":
columnTypes.add(Column.Type.BOOL);
break;
case "BYTES":
columnTypes.add(Column.Type.BYTES);
break;
default:
throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
String.format("您提供的配置文件有誤,datax不支持數(shù)據(jù)類型[%s]", columnType));
}
}
}
private void transportJsonToRecord(Record sendRecord, String msgValue) {
List kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
if (columnTypes.size() != kafkaColumns.size()) {
throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
String.format("您提供的配置文件有誤,readType是JSON時[%s列數(shù)=%d]與[json列數(shù)=%d]的數(shù)量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
}
for (int i=0; i
重點看 Task 的接口實現(xiàn)
init:讀取配置項,然后創(chuàng)建 Consumer 實例
startWrite:從 Topic 拉取數(shù)據(jù),通過 RecordSender 寫入到 Channel 中
這里有幾個細節(jié)需要注意下
text
、
json
,細節(jié)請看下文的配置文件說明
destroy:
關閉 Consumer 實例
插件定義
在
resources
下新增
plugin.json
{
"name": "kafkareader",
"class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader",
"description": "read data from kafka",
"developer": "qsl"
}
class
是
KafkaReader
的全限定類名
配置文件
在
resources
下新增
plugin_job_template.json
{
"name": "kafkareader",
"parameter": {
"bootstrapServers": "",
"topic": "test-kafka",
"groupId": "test1",
"writeType": "json",
"pollTimeoutMs": 2000,
"columnType": [
"LONG",
"STRING",
"STRING"
],
"sasl": {
"securityProtocol": "SASL_PLAINTEXT",
"mechanism": "PLAIN",
"username": "",
"password": "2"
}
}
}
配置項說明: kafkareader.md
打包發(fā)布
可以參考官方的
assembly
配置,利用 assembly 來打包
至此,
kafkareader
也完成了
本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權,請發(fā)郵件[email protected]
湘ICP備2022002427號-10 湘公網(wǎng)安備:43070202000427號© 2013~2025 haote.com 好特網(wǎng)