当前位置: 首页 > news >正文

网站建设项目可行性万户网站建设公司

网站建设项目可行性,万户网站建设公司,wordpress主题安装慢,seo文章目录 举个例子 连接器 下载连接器#xff08;connector#xff09;和格式#xff08;format#xff09;jar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API#xff0c;通过额外的函数扩展了TableEnvironment。 下面代码演示两…目录 举个例子 连接器 下载连接器connector和格式formatjar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.common.typeinfo import Typesenv StreamExecutionEnvironment.get_execution_environment() t_env StreamTableEnvironment.create(env) # create a DataStream ds env.from_collection([Alice, Bob, John], Types.STRING())# interpret the insert-only DataStream as a Table t t_env.from_data_stream(ds)# register the Table object as a view and query it t_env.create_temporary_view(InputTable, t) res_table t_env.sql_query(SELECT UPPER(f0) FROM InputTable)# interpret the insert-only Table as a DataStream again res_ds t_env.to_data_stream(res_table)# add a printing sink and execute in DataStream API res_ds.print()env.execute() TableEnvironment将采用StreamExecutionEnvironment所有的配置选项。 建议在转换为Table API之前设置DataStream API的所有配置选项如下代码。 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.datastream.checkpointing_mode import CheckpointingMode# create Python DataStream API env StreamExecutionEnvironment.get_execution_environment()# set various configuration early env.set_max_parallelism(256)env.get_config().add_default_kryo_serializer(type_class_name, serializer_class_name) env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)# then switch to Python Table API t_env StreamTableEnvironment.create(env) # set configuration early t_env.get_config().set_local_timezone(Europe/Berlin)# start defining your pipelines in both APIs... 连接器 下载连接器connector和格式formatjar 包 由于Flink是一个基于 Java/Scala 的项目连接器connector和格式format的实现是作为 jar 包存在的 要在 PyFlink 作业中使用首先需要将其指定为作业的依赖。 如果使用第三方JAR可以在Python Table API中指定JAR如下所示 table_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar) or table_env.get_config().get_configuration().set_string(pipeline.classpaths, file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar) 依赖管理 需要在Python API程序中使用依赖项。例如Python用户自定义函数中使用第三方Python库。此外在用机器学习模型预测等场景中用户可能希望在Python自定义函数中加载机器学习模型。 当PyFlink作业在本地执行时可以将第三方Python库安装到本地Python环境中将机器学习模型下载到本地等等。 然而当用户想要将PyFlink任务提交到远程集群时这种方法并不奏效。 除了Table API 在Python DataStream API中则如下配置 stream_execution_environment.add_jars(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) stream_execution_environment.add_jars(file:///E:/my/jar/path/connector1.jar, file:///E:/my/jar/path/connector2.jar) # NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the # URLs are accessible on both the client and the cluster. stream_execution_environment.add_classpaths(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) 如何使用连接器 在 PyFlink Table API 中DDL 是定义 source 和 sink 比较推荐的方式这可以通过 TableEnvironment 中的 execute_sql() 方法来完成然后就可以在作业中使用这张表了。 --下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。 from pyflink.table import TableEnvironment, Environmentsettingsdef log_processing():env_settings Environmentsettings.in_streaming_mode()t_env TableEnvironment.create(env_settings)# specify connector and format jarst_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)source_ddl CREATE TABLE source_table(a VARCHAR,b INT) WITH (connector kafka,topic source_topic,properties.bootstrap.servers kafka:9092,properties.group.id test_3,scan.startup.mode latest-offset,format json)sink_ddl CREATE TABLE sink_table(a VARCHAR) WITH (connector kafka,topic sink_topic,properties.bootstrap.servers kafka:9092,format json)t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query(SELECT a FROM source_table) \.execute_insert(sink_table).wait()if __name__ __main__:log_processing()
http://www.pierceye.com/news/996140/

相关文章:

  • 河南有名的做网站公司有哪些做设计找素材的+网站有哪些
  • 网站建设规划设计任务书网站开发的费用申请
  • 淮阳住房城乡建设局网站网页模板的作用
  • 知识问答网站开发不用编程做APP和响应式网站
  • 免费创建个人商城网站吗中国互联网前100名企业
  • 贵阳网站建设端觉有做数学题的网站吗
  • 网站备案格式网站开发工程师适合女生吗
  • 江门网站建设自助建站广播电台网站建设板块
  • 淮北市建设安全监督站网站文员工作内容
  • 先做网站还是app海北网站建设
  • 网站中转页怎么做做网页需要什么
  • 台州城乡建设规划网站房产管理局官网入口
  • 徐州手机建站模板宁波公司招聘
  • 类似 wordpress 建站哪里有培训班
  • 广州建设六马路小学网站微营销软件免费下载
  • 广州网站推广解决方案网站建设标志头像图片
  • 网站建设 中企动力成都qq空间wordpress
  • 什么是定制网站php网站开发面试
  • 网站建设推广专家服务重庆万泰建设集团有限公司
  • 2017两学一做竞赛网站手游游戏推广平台
  • 贵州灵溪seo整站优化wordpress开发文档(chm)
  • iis7 网站权限设置亚马逊网站开发设计
  • 贵阳做网站哪家好复古网站设计
  • 网站跳转是什么意思58这样网站怎么做
  • 易语言网站批量注册怎么做百度模板网站模板
  • 海伦市网站山西大川建设有限公司网站
  • 快速搭建网站域名绑定设置网站优化是往新闻中心发新闻吗
  • 复刻手表网站公众号快速涨10000粉丝方法
  • 珠海网站系统建设项目制作网页的网站推荐
  • 做网站公司怎么选宁波外贸公司排行