准备数据

这次用到的数据,是一个csv文件,csv中总共有7列,最后一列为时间戳。

CSV

统计目标

以Hermes值为分组,统计每种Hermes的次数,和如下SQL一个效果

select count(1) from csv group by Hermes;

代码实现

准备依赖

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.8</version>
    </dependency>

建立CSV中数据的映射实体

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class VamEntity {
 
    private String vin;
    private long serviceId;
    private long subServiceId;
    private String ntg;
    private String hermes;
    private String ciamId;
    private long timeStamp;
}

实现功能

public class BatchTask {
 
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
        DataSource<VamEntity> vamEntityDataSource = env.readCsvFile(""D:\FTP\TSI_DaiVB_Usage_ttt.csv"").fieldDelimiter("","")
            //忽略首行,本次csv文件中的第一行是列名
                .ignoreFirstLine()
            //选择要读取的列,总共有7列,本次读了后三列
                .includeFields(false, false, false, false, true, true, true)
            //将读取的结果映射为实体,后三个参数是实体类的属性名
                .pojoType(VamEntity.class, ""hermes"", ""ciamId"", ""timeStamp"");
 
        AggregateOperator<Tuple2<String, Long>> sum = vamEntityDataSource
            //将读取到的数据,使用flatmap转换为多个目标对象,这里目标实体是Tuple2
            .flatMap(new FlatMapFunction<VamEntity, Tuple2<String, Long>>() {
                //因为要统计出现的次数,分组条件是hermes,是一个string格式
                //所以这里一条Tuple2<String, Long>相当于<""hermes"",1>,表示这条数据出现了1次
            @Override
            public void flatMap(VamEntity value, Collector<Tuple2<String, Long>> out) throws Exception {
                out.collect(new Tuple2<String, Long>(value.getHermes(), 1L));
            }
        })
            //按照tuple的第0个元素进行分组
                .groupBy(0)
            //分组后,对每组内数据的第1个元素就行累加求和
                .sum(1);
        //输出计算结果
        sum.print();
    }
 
    private static long parse(String s) {
        try {
            long l = Long.parseLong(s);
            return l;
        } catch (Exception e) {
            return 0L;
        }
 
    }
}