您的位置:首頁 > 軟件教程 > 教程 > 異源數(shù)據(jù)同步 → DataX 為什么要支持 kafka?

異源數(shù)據(jù)同步 → DataX 為什么要支持 kafka?

來源:好特整理 | 時間:2024-08-26 09:55:30 | 閱讀:168 |  標簽: a T K   | 分享到:

開心一刻 昨天發(fā)了一條朋友圈:酒吧有什么好去的,上個月在酒吧當服務員兼職,一位大姐看上了我,說一個月給我 10 萬,要我陪她去上海,我沒同意 朋友評論道:你沒同意,為什么在上海? 我回復到:上個月沒同意 前情回顧 關于 DataX,官網(wǎng)有很詳細的介紹,鄙人不才,也寫過幾篇文章 異構數(shù)據(jù)源同步之數(shù)據(jù)同

開心一刻

昨天發(fā)了一條朋友圈:酒吧有什么好去的,上個月在酒吧當服務員兼職,一位大姐看上了我,說一個月給我 10 萬,要我陪她去上海,我沒同意

朋友評論道:你沒同意,為什么在上海?

我回復到:上個月沒同意

前情回顧

關于 DataX ,官網(wǎng)有很詳細的介紹,鄙人不才,也寫過幾篇文章

異構數(shù)據(jù)源同步之數(shù)據(jù)同步 → datax 改造,有點意思

異構數(shù)據(jù)源同步之數(shù)據(jù)同步 → datax 再改造,開始觸及源碼

異構數(shù)據(jù)源同步之數(shù)據(jù)同步 → DataX 使用細節(jié)

異構數(shù)據(jù)源數(shù)據(jù)同步 → 從源碼分析 DataX 敏感信息的加解密

不了解的小伙伴可以按需去查看,所以了, DataX 就不做過多介紹了;官方提供了非常多的插件,囊括了絕大部分的數(shù)據(jù)源,基本可以滿足我們?nèi)粘P枰珨?shù)據(jù)源種類太多,DataX 插件不可能包含全部,比如 kafka ,DataX 官方是沒有提供讀寫插件的,大家知道為什么嗎?你們?nèi)绻麑?shù)據(jù)同步了解的比較多的話,一看到 kafka,第一反應往往想到的是 實時同步 ,而 DataX 針對的是 離線同步 ,所以 DataX 官方?jīng)]提供 kafka 插件是不是也就能理解了?因為不合適嘛!

但如果客戶非要離線同步也支持 kafka

你能怎么辦?直接懟過去:實現(xiàn)不了?

所以沒得選,那就只能給 DataX 開發(fā)一套 kafka 插件了;基于 DataX插件開發(fā)寶典 ,插件開發(fā)起來還是非常簡單的

kafkawriter

  1. 編程接口

    自定義 Kafkawriter 繼承 DataX 的 Writer ,實現(xiàn) job、task 對應的接口即可

    /**
     * @author 青石路
     */
    public class KafkaWriter extends Writer {
    
        public static class Job extends Writer.Job {
    
            private Configuration conf = null;
    
            @Override
            public List split(int mandatoryNumber) {
                List configurations = new ArrayList(mandatoryNumber);
                for (int i = 0; i < mandatoryNumber; i++) {
                    configurations.add(this.conf.clone());
                }
                return configurations;
            }
    
            private void validateParameter() {
                this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
                this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
            }
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                this.validateParameter();
            }
    
    
            @Override
            public void destroy() {
    
            }
        }
    
        public static class Task extends Writer.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
            private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
    
            private Producer producer;
            private Configuration conf;
            private Properties props;
            private String fieldDelimiter;
            private List columns;
            private String writeType;
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                columns = conf.getList(Key.COLUMN, String.class);
                writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);
                if (CollUtil.isEmpty(columns)) {
                    throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有誤,[%s]是必填參數(shù),不允許為空或者留白 .", Key.COLUMN));
                }
    
                props = new Properties();
                props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
                //這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數(shù)據(jù)。這是最強的保證。
                props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));
                props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));
                props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
                props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    
                Configuration saslConf = conf.getConfiguration(Key.SASL);
                if (ObjUtil.isNotNull(saslConf)) {
                    logger.info("配置啟用了SASL認證");
                    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));
                    props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));
                    String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);
                    String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);
                    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
                }
    
                producer = new KafkaProducer(props);
            }
    
            @Override
            public void prepare() {
                if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
    
                    ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
                    String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    
                    try {
                        if (!topicsResult.names().get().contains(topic)) {
                            new NewTopic(
                                    topic,
                                    Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                                    Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                            );
                            List newTopics = new ArrayList();
                            AdminClient.create(props).createTopics(newTopics);
                        }
                    } catch (Exception e) {
                        throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
                    }
                }
            }
    
            @Override
            public void startWrite(RecordReceiver lineReceiver) {
                logger.info("start to writer kafka");
                Record record = null;
                while ((record = lineReceiver.getFromReader()) != null) {//說明還在讀取數(shù)據(jù),或者讀取的數(shù)據(jù)沒處理完
                    //獲取一行數(shù)據(jù),按照指定分隔符 拼成字符串 發(fā)送出去
                    if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {
                        producer.send(new ProducerRecord(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToString(record))
                        );
                    } else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {
                        producer.send(new ProducerRecord(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToKafkaJson(record))
                        );
                    }
                    producer.flush();
                }
            }
    
            @Override
            public void destroy() {
                logger.info("producer close");
                if (producer != null) {
                    producer.close();
                }
            }
    
            /**
             * 數(shù)據(jù)格式化
             *
             * @param record
             * @return
             */
            private String recordToString(Record record) {
                int recordLength = record.getColumnNumber();
                if (0 == recordLength) {
                    return NEWLINE_FLAG;
                }
                Column column;
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < recordLength; i++) {
                    column = record.getColumn(i);
                    sb.append(column.asString()).append(fieldDelimiter);
                }
    
                sb.setLength(sb.length() - 1);
                sb.append(NEWLINE_FLAG);
    
                return sb.toString();
            }
    
            private String recordToKafkaJson(Record record) {
                int recordLength = record.getColumnNumber();
                if (recordLength != columns.size()) {
                    throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,
                            String.format("您提供配置文件有誤,列數(shù)不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));
                }
                List kafkaColumns = new ArrayList<>();
                for (int i = 0; i < recordLength; i++) {
                    KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));
                    kafkaColumns.add(column);
                }
                return JSONUtil.toJsonStr(kafkaColumns);
            }
        }
    }
    

    DataX 框架按照如下的順序執(zhí)行 Job 和 Task 的接口

    重點看 Task 的接口實現(xiàn)

    • init:讀取配置項,然后創(chuàng)建 Producer 實例

    • prepare:判斷 Topic 是否存在,不存在則創(chuàng)建

    • startWrite:通過 RecordReceiver 從 Channel 獲取 Record,然后寫入 Topic

      支持兩種寫入格式: text 、 json ,細節(jié)請看下文中的 kafkawriter.md

    • destroy:關閉 Producer 實例

    實現(xiàn)不難,相信大家都能看懂

  2. 插件定義

    resources 下新增 plugin.json

    {
        "name": "kafkawriter",
        "class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter",
        "description": "write data to kafka",
        "developer": "qsl"
    }
    

    強調(diào)下 class ,是 KafkaWriter 的全限定類名,如果你們沒有完全拷貝我的,那么要改成你們自己的

  3. 配置文件

    resources 下新增 plugin_job_template.json

    {
        "name": "kafkawriter",
        "parameter": {
            "bootstrapServers": "",
            "topic": "",
            "ack": "all",
            "batchSize": 1000,
            "retries": 0,
            "fieldDelimiter": ",",
            "writeType": "json",
            "column": [
                "const_id",
                "const_field",
                "const_field_value"
            ],
            "sasl": {
                "securityProtocol": "SASL_PLAINTEXT",
                "mechanism": "PLAIN",
                "username": "",
                "password": ""
            }
        }
    }
    

    配置項說明: kafkawriter.md

  4. 打包發(fā)布

    可以參考官方的 assembly 配置,利用 assembly 來打包

至此, kafkawriter 就算完成了

kafkareader

  1. 編程接口

    自定義 Kafkareader 繼承 DataX 的 Reader ,實現(xiàn) job、task 對應的接口即可

    /**
     * @author 青石路
     */
    public class KafkaReader extends Reader {
    
        public static class Job extends Reader.Job {
    
            private Configuration originalConfig = null;
    
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
                this.validateParameter();
            }
    
            @Override
            public void destroy() {
    
            }
    
            @Override
            public List split(int adviceNumber) {
                List configurations = new ArrayList<>(adviceNumber);
                for (int i=0; i consumer;
            private String topic;
            private Configuration conf;
            private int maxPollRecords;
            private String fieldDelimiter;
            private String readType;
            private List columnTypes;
    
            @Override
            public void destroy() {
                logger.info("consumer close");
                if (Objects.nonNull(consumer)) {
                    consumer.close();
                }
            }
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                this.topic = conf.getString(Key.TOPIC);
                this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
                if (!ReadType.JSON.name().equalsIgnoreCase(readType)
                        && !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有誤,不支持的readType[%s]", readType));
                }
                if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                    List columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
                    if (CollUtil.isEmpty(columnTypeList)) {
                        throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                                String.format("您提供配置文件有誤,readType是JSON時[%s]是必填參數(shù),不允許為空或者留白 .", Key.COLUMN_TYPE));
                    }
                    convertColumnType(columnTypeList);
                }
                Properties props = new Properties();
                props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
                props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
                Configuration saslConf = conf.getConfiguration(Key.SASL);
                if (ObjUtil.isNotNull(saslConf)) {
                    logger.info("配置啟用了SASL認證");
                    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
                    props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
                    String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
                    String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
                    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
                }
                consumer = new KafkaConsumer<>(props);
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
                consumer.subscribe(CollUtil.newArrayList(topic));
                int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
                int retries = conf.getInt(Key.RETRIES, 5);
                if (retries < 0) {
                    logger.info("joinGroupSuccessRetries 配置有誤[{}], 重置成默認值[5]", retries);
                    retries = 5;
                }
                /**
                 * consumer 每次都是新創(chuàng)建,第一次poll時會重新加入消費者組,加入過程會進行Rebalance,而 Rebalance 會導致同一 Group 內(nèi)的所有消費者都不能工作
                 * 所以 poll 拉取的過程中,即使topic中有數(shù)據(jù)也不一定能拉到,因為 consumer 正在加入消費者組中
                 * kafka-clients 沒有對應的API、事件機制來知道 consumer 成功加入消費者組的確切時間
                 * 故增加重試
                 */
                ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                int i = 0;
                if (CollUtil.isEmpty(records)) {
                    for (; i < retries; i++) {
                        records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                        logger.info("第 {} 次重試,獲取消息記錄數(shù)[{}]", i + 1, records.count());
                        if (!CollUtil.isEmpty(records)) {
                            break;
                        }
                    }
                }
                if (i >= retries) {
                    logger.info("重試 {} 次后,仍未獲取到消息,請確認是否有數(shù)據(jù)、配置是否正確", retries);
                    return;
                }
                transferRecord(recordSender, records);
                do {
                    records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                    transferRecord(recordSender, records);
                } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
            }
    
            private void transferRecord(RecordSender recordSender, ConsumerRecords records) {
                if (CollUtil.isEmpty(records)) {
                    return;
                }
                for (ConsumerRecord record : records) {
                    Record sendRecord = recordSender.createRecord();
                    String msgValue = record.value();
                    if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                        transportJsonToRecord(sendRecord, msgValue);
                    } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                        // readType = text,全當字符串類型處理
                        String[] columnValues = msgValue.split(fieldDelimiter);
                        for (String columnValue : columnValues) {
                            sendRecord.addColumn(new StringColumn(columnValue));
                        }
                    }
                    recordSender.sendToWriter(sendRecord);
                }
                consumer.commitAsync();
            }
    
            private void convertColumnType(List columnTypeList) {
                columnTypes = new ArrayList<>();
                for (String columnType : columnTypeList) {
                    switch (columnType.toUpperCase()) {
                        case "STRING":
                            columnTypes.add(Column.Type.STRING);
                            break;
                        case "LONG":
                            columnTypes.add(Column.Type.LONG);
                            break;
                        case "DOUBLE":
                            columnTypes.add(Column.Type.DOUBLE);
                        case "DATE":
                            columnTypes.add(Column.Type.DATE);
                            break;
                        case "BOOLEAN":
                            columnTypes.add(Column.Type.BOOL);
                            break;
                        case "BYTES":
                            columnTypes.add(Column.Type.BYTES);
                            break;
                        default:
                            throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                    String.format("您提供的配置文件有誤,datax不支持數(shù)據(jù)類型[%s]", columnType));
                    }
                }
            }
    
            private void transportJsonToRecord(Record sendRecord, String msgValue) {
                List kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
                if (columnTypes.size() != kafkaColumns.size()) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                            String.format("您提供的配置文件有誤,readType是JSON時[%s列數(shù)=%d]與[json列數(shù)=%d]的數(shù)量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
                }
                for (int i=0; i

    重點看 Task 的接口實現(xiàn)

    • init:讀取配置項,然后創(chuàng)建 Consumer 實例

    • startWrite:從 Topic 拉取數(shù)據(jù),通過 RecordSender 寫入到 Channel 中

      這里有幾個細節(jié)需要注意下

      1. Consumer 每次都是新創(chuàng)建的,拉取數(shù)據(jù)的時候,如果消費者還未加入到指定的消費者組中,那么它會先加入到消費者組中,加入過程會進行 Rebalance,而 Rebalance 會導致同一消費者組內(nèi)的所有消費者都不能工作,此時即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重試機制來盡量保證那一次同步任務拉取的時候,消費者能正常拉取消息
      2. 一旦 Consumer 拉取到消息,則會循環(huán)拉取消息,如果某一次的拉取數(shù)據(jù)量小于最大拉取量(maxPollRecords),說明 Topic 中的消息已經(jīng)被拉取完了,那么循環(huán)終止;這與常規(guī)使用(Consumer 會一直主動拉取或被動接收)是有差別的
      3. 支持兩種讀取格式: text 、 json ,細節(jié)請看下文的配置文件說明
      4. 為了保證寫入 Channel 數(shù)據(jù)的完整,需要配置列的數(shù)據(jù)類型(DataX 的數(shù)據(jù)類型)
    • destroy:

      關閉 Consumer 實例

  2. 插件定義

    resources 下新增 plugin.json

    {
        "name": "kafkareader",
        "class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader",
        "description": "read data from kafka",
        "developer": "qsl"
    }
    

    class KafkaReader 的全限定類名

  3. 配置文件

    resources 下新增 plugin_job_template.json

    {
        "name": "kafkareader",
        "parameter": {
            "bootstrapServers": "",
            "topic": "test-kafka",
            "groupId": "test1",
            "writeType": "json",
            "pollTimeoutMs": 2000,
            "columnType": [
                "LONG",
                "STRING",
                "STRING"
            ],
            "sasl": {
                "securityProtocol": "SASL_PLAINTEXT",
                "mechanism": "PLAIN",
                "username": "",
                "password": "2"
            }
        }
    }
    

    配置項說明: kafkareader.md

  4. 打包發(fā)布

    可以參考官方的 assembly 配置,利用 assembly 來打包

至此, kafkareader 也完成了

總結

  1. 完整代碼: qsl-datax
  2. kafkareader 重試機制只能降低拉取不到數(shù)據(jù)的概率,并不能杜絕;另外,如果上游一直往 Topic 中發(fā)消息,kafkareader 每次拉取的數(shù)據(jù)量都等于最大拉取量,那么同步任務會一直進行而不會停止,這還是離線同步嗎?
  3. 離線同步,不推薦走 kafka,因為用 kafka 走實時同步更香
小編推薦閱讀

好特網(wǎng)發(fā)布此文僅為傳遞信息,不代表好特網(wǎng)認同期限觀點或證實其描述。

a 1.0
a 1.0
類型:休閑益智  運營狀態(tài):正式運營  語言:中文   

游戲攻略

游戲禮包

游戲視頻

游戲下載

游戲活動

《alittletotheleft》官網(wǎng)正版是一款備受歡迎的休閑益智整理游戲。玩家的任務是對日常生活中的各種雜亂物
K
K
類型:角色扮演  運營狀態(tài):封測  語言:中文   

游戲攻略

游戲禮包

游戲視頻

游戲下載

游戲活動

《K》是由樂次元開發(fā)的一款日系動漫RPG游戲,游戲根據(jù)同名動漫改編而來,高水準的漫畫和音樂是這款游戲的

相關視頻攻略

更多

掃二維碼進入好特網(wǎng)手機版本!

掃二維碼進入好特網(wǎng)微信公眾號!

本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權,請發(fā)郵件[email protected]

湘ICP備2022002427號-10 湘公網(wǎng)安備:43070202000427號© 2013~2025 haote.com 好特網(wǎng)