本文为您介绍如何使用Nexmark测试实时计算Flink版性能。
性能表现
实时计算Flink版1 CU计算资源配置下,Nexmark 19个Queries语句的性能表现最小为5000RPS,最大55000RPS。
- 简单业务(例如,单流过滤、字符串变换等操作)1 CU每秒可以处理40000~55000条数据。 
- 复杂业务(例如,JOIN、GROUP BY或窗口函数等操作)1 CU每秒可以处理5000~10000条数据。 
前提条件
- 已安装Java JDK 1.8.x版本或更高版本。 
- 已安装Maven 3.8.2版本。 
- 已安装Git,下载请参见Git。 
- 已安装Nexmark,关于Nexmark信息详情请参见Nexmark。 
- 已创建工作空间,详情请参见开通实时计算Flink版。 说明- Nexmark和Git属于第三方网站,访问时可能出现延迟或无法访问的情况。 
测试工具

- Nexmark源表:基于Nexmark源表按照测试TPS要求生成测试数据。 
- Transformations:Nexmark的19个Queries。 
- Blackhole结果表:排除上下游存储的性能影响,重点验证Flink自身性能。 
数据准备
- 您可以通过CMD控制台输入以下命令,下载并编译Nexmark源码。 - cd <path> git clone https://github.com/nexmark/nexmark.git cd nexmark/nexmark-flink mvn clean package说明- <path>是您自定义的路径,用来存放下载的nexmark.git文件。 
- 创建Nexmark连接器。 - 登录实时计算控制台。 
- 单击目标工作空间操作列下的控制台。 
- 在连接器页面右侧,单击创建自定义连接器。 
- 在弹出的对话框中,单击选择文件,上传作业如下图所示。  说明 说明- 包的位置在您自定义的<path>/nexmark/nexmark-flink/target目录下,nexmark-flink-0.2-SNAPSHOT.jar是初步编译完成的包。 
- 单击下一步后,单击完成。 
 
Nexmark连接器参数信息如下表所示。
| 参数 | 参数值 | 参数说明 | 
| first-event.rate | 55000 | 数据生成速率。 | 
| next-event.rate | ||
| events.num | 100000000 | 生成的事件数。 | 
| bid.proportion | 92% | Bid事件占比。 | 
| auction.proportion | 6% | Auction事件占比。 | 
| person.proportion | 2% | Person事件占比。 | 
测试步骤
- 分别创建以下Queries作业,创建作业详情请参见SQL作业开发。 
q0
CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid;q1
CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price DECIMAL(23, 3),
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        0.908 * price as price, -- convert dollar to euro
        channel,
        url,
        dateTime,
        extra 
    FROM
        bid;q2
CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        bid 
    WHERE
        MOD(auction, 123) = 0;
q3
CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (name VARCHAR, city VARCHAR, state VARCHAR, id BIGINT) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;
CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;
INSERT INTO discard_sink
    SELECT 
        P.name, P.city, P.state, A.id 
    FROM
        auction AS A 
        INNER JOIN person AS P
            on A.seller = P.id 
    WHERE
        A.category = 10 
            and(P.state = 'OR' 
            OR P.state = 'ID' 
            OR P.state = 'CA');
q4
CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (id BIGINT, final BIGINT) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        nexmark_table 
    WHERE
        event_type = 1;
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        Q.category, AVG(Q.final) 
    FROM
        (
            SELECT 
                MAX(B.price) AS final, A.category 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires 
            GROUP BY
                A.id, A.category
        ) Q 
    GROUP BY
        Q.category;q5
CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (auction BIGINT, num BIGINT) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        AuctionBids.auction, AuctionBids.num 
    FROM
        (
            SELECT 
                B1.auction,
                count(*) AS num,
                HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
            FROM
                bid B1 
            GROUP BY
                B1.auction,
                HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
        ) AS AuctionBids 
        JOIN (
            SELECT 
                max(CountBids.num) AS maxn,
                CountBids.starttime,
                CountBids.endtime 
            FROM
                (
                    SELECT 
                        count(*) AS num,
                        HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
                        HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime 
                    FROM
                        bid B2 
                    GROUP BY
                        B2.auction,
                        HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
                ) AS CountBids 
            GROUP BY
                CountBids.starttime, CountBids.endtime
        ) AS MaxBids
            ON AuctionBids.starttime = MaxBids.starttime 
                AND AuctionBids.endtime = MaxBids.endtime 
                AND AuctionBids.num >= MaxBids.maxn;
q7
CREATE TEMPORARY TABLE nexmark_table (  
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        B.auction, B.price, B.bidder, B.dateTime, B.extra 
    from
        bid B 
        JOIN (
            SELECT 
                MAX(B1.price) AS maxprice,
                TUMBLE_ROWTIME(B1.dateTime, INTERVAL '10' SECOND) as dateTime 
            FROM
                bid B1 
            GROUP BY
                TUMBLE(B1.dateTime, INTERVAL '10' SECOND)
        ) B1
            ON B.price = B1.maxprice 
    WHERE
        B.dateTime BETWEEN B1.dateTime - INTERVAL '10' SECOND AND B1.dateTime;
q8
CREATE TEMPORARY TABLE nexmark_table (   
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (id BIGINT, name VARCHAR, stime TIMESTAMP(3)) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW person AS
    SELECT 
        person.id,
        person.name,
        person.emailAddress,
        person.creditCard,
        person.city,
        person.state,
        dateTime,
        person.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 0;
CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;
INSERT INTO discard_sink
    SELECT 
        P.id, P.name, P.starttime 
    FROM
        (
            SELECT 
                P.id,
                P.name,
                TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                person P 
            GROUP BY
                P.id,
                P.name,
                TUMBLE(P.dateTime, INTERVAL '10' SECOND)
        ) P 
        JOIN (
            SELECT 
                A.seller,
                TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
                TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime 
            FROM
                auction A 
            GROUP BY
                A.seller,
                TUMBLE(A.dateTime, INTERVAL '10' SECOND)
        ) A
            ON P.id = A.seller 
                AND P.starttime = A.starttime 
                AND P.endtime = A.endtime;
q9
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    id BIGINT,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    extra VARCHAR,
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        id,
        itemName,
        description,
        initialBid,
        reserve,
        dateTime,
        expires,
        seller,
        category,
        extra,
        auction,
        bidder,
        price,
        bid_dateTime,
        bid_extra 
    FROM
        (
            SELECT 
                A.*,
                B.auction,
                B.bidder,
                B.price,
                B.dateTime AS bid_dateTime,
                B.extra AS bid_extra,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            A.id
                        ORDER BY
                            B.price DESC, B.dateTime ASC
                    ) AS rownum 
            FROM
                auction A, bid B 
            WHERE
                A.id = B.auction 
                    AND B.dateTime BETWEEN A.dateTime AND A.expires
        ) 
    WHERE
        rownum <= 1;q11
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        SESSION_START(B.dateTime, INTERVAL '10' SECOND) as starttime,
        SESSION_END(B.dateTime, INTERVAL '10' SECOND) as endtime 
    FROM
        bid B 
    GROUP BY
        B.bidder, SESSION(B.dateTime, INTERVAL '10' SECOND);q12
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    bidder BIGINT,
    bid_count BIGINT,
    starttime TIMESTAMP(3),
    endtime TIMESTAMP(3)
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        B.bidder,
        count(*) as bid_count,
        TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
        TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime 
    FROM
        (
            SELECT 
                *, PROCTIME() as p_time 
            FROM
                bid
        ) B 
    GROUP BY
        B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);
q15
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        DATE_FORMAT(dateTime, 'yyyy-MM-dd');
q16
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    channel VARCHAR,
    `day` VARCHAR,
    `minute` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    total_bidders BIGINT,
    rank1_bidders BIGINT,
    rank2_bidders BIGINT,
    rank3_bidders BIGINT,
    total_auctions BIGINT,
    rank1_auctions BIGINT,
    rank2_auctions BIGINT,
    rank3_auctions BIGINT
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        channel,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) 
            filter(where price < 10000) AS rank1_bidders,
        count(distinct bidder) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bidders,
        count(distinct bidder) 
            filter(where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) 
            filter(where price < 10000) AS rank1_auctions,
        count(distinct auction) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_auctions,
        count(distinct auction) 
            filter(where price >= 1000000) AS rank3_auctions 
    FROM
        bid 
    GROUP BY
        channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');
q17
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    `day` VARCHAR,
    total_bids BIGINT,
    rank1_bids BIGINT,
    rank2_bids BIGINT,
    rank3_bids BIGINT,
    min_price BIGINT,
    max_price BIGINT,
    avg_price BIGINT,
    sum_price BIGINT
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        count(*) AS total_bids,
        count(*) 
            filter(where price < 10000) AS rank1_bids,
        count(*) 
            filter(where price >= 10000 
                and price < 1000000) AS rank2_bids,
        count(*) 
            filter(where price >= 1000000) AS rank3_bids,
        min(price) AS min_price,
        max(price) AS max_price,
        avg(price) AS avg_price,
        sum(price) AS sum_price 
    FROM
        bid 
    GROUP BY
        auction, DATE_FORMAT(dateTime, 'yyyy-MM-dd');q18
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction, bidder, price, channel, url, dateTime, extra 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            bidder, auction
                        ORDER BY
                            dateTime DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 1;
q19
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    dateTime TIMESTAMP(3),
    extra VARCHAR,
    rank_number BIGINT
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        * 
    FROM
        (
            SELECT 
                *,
                ROW_NUMBER() 
                    OVER (
                        PARTITION BY
                            auction
                        ORDER BY
                            price DESC
                    ) AS rank_number 
            FROM
                bid
        ) 
    WHERE
        rank_number <= 10;q20
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    url VARCHAR,
    bid_dateTime TIMESTAMP(3),
    bid_extra VARCHAR,
    itemName VARCHAR,
    description VARCHAR,
    initialBid BIGINT,
    reserve BIGINT,
    auction_dateTime TIMESTAMP(3),
    expires TIMESTAMP(3),
    seller BIGINT,
    category BIGINT,
    auction_extra VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW auction AS
    SELECT 
        auction.id,
        auction.itemName,
        auction.description,
        auction.initialBid,
        auction.reserve,
        dateTime,
        auction.expires,
        auction.seller,
        auction.category,
        auction.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 1;
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        url,
        B.dateTime,
        B.extra,
        itemName,
        description,
        initialBid,
        reserve,
        A.dateTime,
        expires,
        seller,
        category,
        A.extra 
    FROM
        bid AS B 
        INNER JOIN auction AS A
            on B.auction = A.id 
    WHERE
        A.category = 10;
q21
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    channel_id VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        CASE 
            WHEN lower(channel) = 'apple' 
            THEN '0' 
            WHEN lower(channel) = 'google' 
            THEN '1' 
            WHEN lower(channel) = 'facebook' 
            THEN '2' 
            WHEN lower(channel) = 'baidu' 
            THEN '3' 
            ELSE REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) 
        END AS channel_id 
    FROM
        bid 
    where
        REGEXP_EXTRACT(url, '(&|^)channel_id=([^&]*)', 2) is not null 
            or lower(channel) in ('apple', 'google', 'facebook', 'baidu');q22
CREATE TEMPORARY TABLE nexmark_table (
    event_type INT,
    person ROW < id BIGINT, name VARCHAR, emailAddress VARCHAR, creditCard VARCHAR, city VARCHAR, state VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    auction ROW < id BIGINT, itemName VARCHAR, description VARCHAR, initialBid BIGINT, reserve BIGINT, dateTime TIMESTAMP(3), expires TIMESTAMP(3), seller BIGINT, category BIGINT, extra VARCHAR >,
    bid ROW < auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR >,
    dateTime AS CASE 
        WHEN event_type = 0 
        THEN person.dateTime 
        WHEN event_type = 1 
        THEN auction.dateTime 
        ELSE bid.dateTime 
    END,
    WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
) 
WITH (
    'connector' = 'nexmark',
    'first-event.rate' = '55000',
    'next-event.rate' = '55000',
    'events.num' = '100000000',
    'person.proportion' = '2',
    'auction.proportion' = '6',
    'bid.proportion' = '92'
);
CREATE TEMPORARY TABLE discard_sink (
    auction BIGINT,
    bidder BIGINT,
    price BIGINT,
    channel VARCHAR,
    dir1 VARCHAR,
    dir2 VARCHAR,
    dir3 VARCHAR
) 
WITH ('connector' = 'blackhole');
CREATE TEMPORARY VIEW bid AS
    SELECT 
        bid.auction,
        bid.bidder,
        bid.price,
        bid.channel,
        bid.url,
        dateTime,
        bid.extra 
    FROM
        `vvp`.`default`.nexmark_table 
    WHERE
        event_type = 2;
INSERT INTO discard_sink
    SELECT 
        auction,
        bidder,
        price,
        channel,
        SPLIT_INDEX(url, '/', 3) as dir1,
        SPLIT_INDEX(url, '/', 4) as dir2,
        SPLIT_INDEX(url, '/', 5) as dir3 
    FROM
        bid;
- 单击右侧页签选择更多配置中的引擎版本,版本号选择为vvr-6.0.4-flink-1.15。 
- 在SQL编辑区域右上方,单击部署。 
- 在页面中分别单击目标作业,在部署详情页签下资源配置部分,单击右上角编辑。填写配置项并发度为1、TaskManager Memory为4 GiB和TaskManager CPU Cores为1 Core,配置如下图所示。  
- 配置完成后,在资源配置右上方单击保存。 
- 分别单击目标作业操作列下的启动。 
- 待目标作业状态为已完成时,单击目标作业名称,在运行事件页签下查看信息,获取作业开始时间和结束时间,如下所示。 - 开始时间:Job was successfully started. 
- 结束时间:Job has successfully finished. 
 
预期结果
Duration计算方式为结束时间减去开始时间的差值,RPS计算方式为events.num与Duration的比值。
| query | 开始时间 | 结束时间 | Duration(单位:秒) | RPS | 
| q0 | 2023-02-07 11:55:13 | 2023-02-07 12:26:00 | 1847 | 54141 | 
| q1 | 2023-02-07 12:28:06 | 2023-02-07 12:58:53 | 1847 | 54141 | 
| q2 | 2023-02-07 12:29:13 | 2023-02-07 13:00:02 | 1849 | 54083 | 
| q3 | 2023-02-07 12:47:12 | 2023-02-07 13:18:01 | 1849 | 54083 | 
| q4 | 2023-02-07 13:30:08 | 2023-02-07 14:04:46 | 2078 | 48123 | 
| q5 | 2023-02-07 14:15:07 | 2023-02-07 14:45:45 | 1838 | 54406 | 
| q7 | 2023-02-07 14:24:13 | 2023-02-07 17:08:42 | 9869 | 10132 | 
| q8 | 2023-02-07 14:27:09 | 2023-02-07 14:57:47 | 1838 | 54406 | 
| q9 | 2023-02-07 14:35:53 | 2023-02-07 15:38:16 | 3743 | 26716 | 
| q11 | 2023-02-07 14:38:14 | 2023-02-07 15:08:53 | 1839 | 54377 | 
| q12 | 2023-02-07 14:40:36 | 2023-02-07 15:11:15 | 1839s | 54377 | 
| q15 | 2023-02-07 14:42:43 | 2023-02-07 15:13:22 | 1839 | 54377 | 
| q16 | 2023-02-07 14:46:17 | 2023-02-07 16:03:09 | 4012 | 24925 | 
| q17 | 2023-02-07 16:02:19 | 2023-02-07 16:33:07 | 1848 | 54112 | 
| q18 | 2023-02-07 16:04:43 | 2023-02-07 16:41:08 | 2185 | 45766 | 
| q19 | 2023-02-07 16:07:13 | 2023-02-07 16:40:01 | 1968 | 50813 | 
| q20 | 2023-02-07 16:13:29 | 2023-02-07 16:59:13 | 2744 | 36443 | 
| q21 | 2023-02-07 16:16:32 | 2023-02-07 16:47:11 | 1849 | 54083 | 
| q22 | 2023-02-07 16:18:53 | 2023-02-07 16:49:41 | 1848 | 54112 |