Category : pyflink

When I submit a simple python program to flink, it works properly, but trying to use a simple udf: s_env = StreamExecutionEnvironment … st_env = StreamTableEnvironment … @udf(result_type=DataTypes.STRING()) def so_what(w : str): try: j = json.loads(w) return j[0] except ValueError: return "" # query st_env.from_path("source") .select(col("when"), col("who"), so_what(col("what").alias("what"))) .insert_into("sink") It only works in local cluster ..

Read more

Just wanted to check if my code is wrong or this is a pyflink 1.11.X When I try to count the number of elements in a group by query (either ‘GroupBy Aggregation’ or ‘GroupBy Window Aggregation’) pyFlink throws the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. : java.lang.AssertionError: Conversion to relational algebra failed ..

Read more

When I run the code, I have a problem using the table’s print function the content of my csv file is: 0 syslog-tcp:0:0: key=None value=b’test kafka sink….’ 1 syslog-tcp:0:1: key=None value=b’test kafka sink1….’ 2 syslog-tcp:0:2: key=None value=b’test kafka sink2….’ 3 syslog-tcp:0:3: key=None value=b’systemd: Stopping System Logging Service…’ source table: t_env.execute_sql(f""" CREATE TABLE source( line STRING ..

Read more

I’m trying to call an outer function through custom UDAF in PyFlink. The function I use requires the data to be in a dictionary object. I tried to use row(t.rowtime, t.b, t.c).cast(schema) to achieve such effect. Outside the UDAF, this expression works well. Inside the UDAF, this expression is translated to InternalRow which cannot be ..

Read more

I am using pycharm to run simple pyflink latest 1.13 example (it is only two line). for the interpreter i created virtual environment and use it in my pycharm project I downloaded also java8 i tried many steps to solve the issue but from pyflink.common.serialization import Encoder from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment ..

Read more

I’m trying to read json data from a Kafka-Topic into a flink DataStream using Python. Here is what the json objects look like (modified out of privacy concerns): {"data":{"text":"text", "public_metrics":{"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0}, "author_id":"1","id":"1","created_at":"2030-05-11T09:19:08.000Z", "source":"Twitter for Android","lang":"in"}, "includes":{"users":[{"protected":false,"id":"1", "name":"A","created_at":"2030-05-11T09:19:08.000Z", "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},"username":"A"}]}, "matching_rules":[{"id":1,"tag":"A"}]} I have tried the following: from pyflink.common.serialization import JsonRowDeserializationSchema from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types ..

Read more

Thanks a lot for any help!!! code: from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo from pyflink.table import EnvironmentSettings, StreamTableEnvironment # stream 模式的env创建 env_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() env_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream) table1 = env_stream.from_elements([(1, 23.4, ‘lili’), (2, 33.4, ‘er’), (3, 45.6, ‘yu’)], [‘id’, ‘order_amt’, ‘name’]) table2 = env_stream.from_elements([(1, 43.4, ‘xixi’), (2, 53.4, ‘rr’), (3, 65.6, ‘ww’)], [‘id2’, ‘order_amt2’, ..

Read more