4.4、使用HSET命令作为sink表写入命令(指定key)@Testpublic void testHSetWithKey() throws Exception {long start = System.currentTimeMillis();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";/***1、这里因为command是HSET,所以需要一个key和一个field,这里配置项指定了key,那么主键拼接就作为field,*使用hset保存到redis*2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒 , 默认-1表示长期保存*/String sink ="CREATE TABLE sink_students\n" +"(\n" +"schoolstring, \n" +"numberBIGINT ,\n" +"namestring,\n" +"class_idBIGINT, \n" +"class_namestring, \n" +"primary key(number) not enforced" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'format'='json',\n" +"'key'='sink_students_hset',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'sink.parallelism' = '16',\n" +"'sink.key.ttl' = '300',\n" +"'command'='HSET'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.school,s.number,s.name,s.class_id,d.class_namefrom students s"+ " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start) + "ms");}5、配置说明配置项描述hostredis的hostportredis的portpasswordredis的passwordcluster-nodesredis的集群节点,ip和端口之间用英文冒号分隔,多个ip端口用英文逗号分割master.nameredis的sentinel模式的master节点的名称sentinels.inforedis的sentinel模式的info信息sentinels.passwordredis的sentinel模式的密码databaseredis的database,一般是0~15commandredis的命令,作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP;作为维表时支持GET、HGET;作为sink表时支持LPUSH、RPUSH、SADD、SET、HSETredis-moderedis的部署模式,single、cluster、sentinelkeyredis需要访问的key , 比如数据是以某个固定的key存放在redis里,值是一个list;redis执行lpush、rpush、sadd、hset等sink使用的命令时的key;timeout连接redis的超时时间 , 单位毫秒max-total连接redis的连接池的最大连接数max-idle连接redis的连接池的最大空闲数min-idle连接redis的连接池的最小空闲数format格式化数据格式 , 如json、csvbatch-fetch-rows像LPOP、BLPOP、RPOP、BRPOP这种命令每次从redis拿到数据的条数lookup.cache.max-rows作为维表lookup模式,缓存在内存中的数据的最大条数lookup.cache.ttl作为维表lookup模式,缓存在内存中的数据的ttl超时时间,单位秒lookup.max-retries作为维表lookup模式,查找数据的失败重试次数lookup.cache.load-all作为维表lookup模式,查找数据是否加载所有 , 主要是针对hget命令 , 如:HGET KEY_NAME FIELD_NAME;是否根据key查出所有field的值,这里可以根据实际hash表的大小决定是否要查询所有出来缓存起来sink.max-retriesredis作为sink源时,最大重试次数sink.parallelismredis作为sink源时,sink的并行数,默认并行度为核心数sink.key.ttlredis作为sink源时,sink的数据保存在redis的ttl超时时间,单位秒,默认为-1表示长期保存lookup.max-retries作为维表lookup模式,查找数据的失败重试次数源码地址:https://gitee.com/rongdi/flinksql-connector-redis/
【flinksql读写redis】
推荐阅读
- 追求性能极致:Redis6.0的多线程模型
- spring boot集成redis基础入门
- STM32的SPI口的DMA读写[原创www.cnblogs.com/helesheng]
- CentOS 7.9 安装 redis-6.2.0
- Redis实现布隆过滤器解析
- 深入底层C源码 Redis核心设计原理
- Redis高并发分布式锁详解
- 利用msg_msg实现任意地址读写
- 原生Redis跨数据中心双向同步优化实践
- Java 读写锁 ReadWriteLock 原理与应用场景详解