package com.iailab.module.data.influxdb.service.impl;

import com.iailab.module.data.api.dto.ApiPointValueQueryDTO;
import com.iailab.module.data.common.utils.ExcelUtil;
import com.iailab.framework.common.util.date.DateUtils;
import com.iailab.module.data.api.dto.ApiExportValueDTO;
import com.iailab.module.data.api.dto.ApiTagValueQueryDTO;
import com.iailab.module.data.influxdb.common.config.InfluxDBInstance;
import com.iailab.module.data.influxdb.common.enums.DataMeasurement;
import com.iailab.module.data.influxdb.common.utils.PointValueUtils;
import com.iailab.module.data.influxdb.common.utils.TagValueUtils;
import com.iailab.module.data.influxdb.pojo.InfluxPointValuePOJO;
import com.iailab.module.data.influxdb.pojo.InfluxTagValuePOJO;
import com.iailab.module.data.influxdb.service.InfluxDBService;
import com.iailab.module.data.point.dto.DaPointDTO;
import com.iailab.module.data.point.service.DaPointService;
import com.iailab.module.data.point.vo.PointValueExportVO;
import com.influxdb.client.InfluxQLQueryApi;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.InfluxQLQuery;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.influxdb.query.InfluxQLQueryResult;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;

/**
 * InfluxDB操作类
 */
@Slf4j
@Service
public class InfluxDBServiceImpl implements InfluxDBService {

    @Resource
    private InfluxDBInstance influxDBInstance;

    private WriteApi writeApi;

    @Autowired
    private DaPointService daPointService;

    private WriteApiBlocking writeApiBlocking;

    private QueryApi queryApi;

    private InfluxQLQueryApi influxQLQueryApi;

    public static final String VALUE = "value";

    public static final String TIME = "time";

    private int rawOffset = TimeZone.getDefault().getRawOffset();

    private int pas_ms = 1000;

    @Override
    public void syncWriteFloatValue(String pointNo, String dataValue, long time) {
        if (writeApiBlocking == null) {
            writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
        }
        Point point = Point.measurement(DataMeasurement.t_da_sim_value.name());
        point.addTag("point", pointNo);
        point.addField("value", Double.parseDouble(dataValue));
        point.time(time, WritePrecision.MS);
        writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
    }

    @Override
    public void syncWriteIntValue(String pointNo, String dataValue, long time) {
        if (writeApiBlocking == null) {
            writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
        }
        Point point = Point.measurement(DataMeasurement.t_da_dig_value.name());
        point.addTag("point", pointNo);
        point.addField("value", Integer.parseInt(dataValue));
        point.time(time, WritePrecision.MS);
        writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
    }

    @Override
    public void syncWriteBooleanValue(String pointNo, String dataValue, long time) {
        if (writeApiBlocking == null) {
            writeApiBlocking = influxDBInstance.getClient().getWriteApiBlocking();
        }
        Point point = Point.measurement(DataMeasurement.t_da_bool_value.name());
        point.addTag("point", pointNo);
        point.addField("value", Boolean.parseBoolean(dataValue));
        point.time(time, WritePrecision.MS);
        writeApiBlocking.writePoint(influxDBInstance.getBucket(), influxDBInstance.org, point);
    }

    @Override
    public void asyncWritePointValues(List<InfluxPointValuePOJO> pointValues) {
        if (writeApi == null) {
            writeApi = influxDBInstance.getClient().makeWriteApi();
        }
        if (!CollectionUtils.isEmpty(pointValues)) {
            pointValues.forEach(item -> {
                String bucket = influxDBInstance.getBucket();
                writeApi.writeMeasurement(bucket, influxDBInstance.org, WritePrecision.MS, item);
            });
        }
        writeApi.flush();
    }

    @Override
    public void asyncWriteTagValues(List<InfluxTagValuePOJO> tagValues) {
        if (writeApi == null) {
            writeApi = influxDBInstance.getClient().makeWriteApi();
        }
        if (!CollectionUtils.isEmpty(tagValues)) {
            tagValues.forEach(item -> {
                log.info("bucket==="+ influxDBInstance.getBucket() + ",org===" + influxDBInstance.org);
                writeApi.writeMeasurement(influxDBInstance.getBucket(), influxDBInstance.org, WritePrecision.MS, item);
            });
        }
        writeApi.flush();
    }

    @Override
    public Map<String, List<Object>> queryTagsValues(List<InfluxTagValuePOJO> influxParams, Date startTime, Date endTime) {
        if (queryApi == null) {
            queryApi = influxDBInstance.getClient().getQueryApi();
        }
        if (CollectionUtils.isEmpty(influxParams)) {
            return null;
        }
        Map<String, List<Object>> result = new HashMap<>();
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        // String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
        String start = startTime.getTime() - calendar.getTime().getTime() - pas_ms + "ms";
        if (endTime==null){
            endTime= new Date();
        }
        String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";

        for (int i = 0; i < influxParams.size(); i++) {
            List<Object> dataList = new ArrayList<>();
            InfluxTagValuePOJO tag = influxParams.get(i);
            String measurement = TagValueUtils.getMeasurement(tag.getType());
            StringBuilder sb = new StringBuilder();
            sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
            sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
            sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
            sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
            sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
            sb.append("|> sort(columns: [\"_time\"]) ");
            sb.append("|> yield(name: \"mean\")");
            List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);

            for (FluxTable table : tables) {
                List<FluxRecord> records = table.getRecords();
                for (FluxRecord record : records) {
                    Map<String, Object> dataIem = new HashMap<>(2);
                    dataIem.put(VALUE, record.getValueByKey("_value"));
                    dataIem.put(TIME, Date.from(record.getTime()));
                    dataList.add(dataIem);
                }
            }
            result.put(tag.getId(), dataList);
        }
        return result;
    }

    private List<ApiExportValueDTO> getExportValue(InfluxTagValuePOJO tag, Date startTime, Date endTime) {
        List<ApiExportValueDTO> dataList = new ArrayList<>();
        if (queryApi == null) {
            queryApi = influxDBInstance.getClient().getQueryApi();
        }
        Map<String, List<Object>> result = new HashMap<>();
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
        String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";


        String measurement = TagValueUtils.getMeasurement(tag.getType());
        StringBuilder sb = new StringBuilder();
        sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
        sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
        sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
        sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
        sb.append("|> filter(fn: (r) => r[\"id\"] == \"" + tag.getId() + "\")");
        sb.append("|> sort(columns: [\"_time\"]) ");
        sb.append("|> yield(name: \"mean\")");
        List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);

        for (FluxTable table : tables) {
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                ApiExportValueDTO dataIem = new ApiExportValueDTO();
                dataIem.setDataValue(record.getValueByKey("_value").toString());
                dataIem.setDataTime(DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY));
                dataList.add(dataIem);
            }
        }
        return dataList;
    }

    @Override
    public boolean exportTagValue(HttpServletResponse response, HttpServletRequest request, ApiTagValueQueryDTO queryDto) {
        //构建参数
        Map<String, Object> params = new HashMap<>(1);
        params.put("pointNos", queryDto.getTagIds());

        //查询point列表
        List<DaPointDTO> pointList = daPointService.list(params);
        if (CollectionUtils.isEmpty(pointList)) {
            return true;
        }

        //插入pointType
        List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
            InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
            pojo.setPoint(item.getTagNo());
            pojo.setType(item.getDataType());
            return pojo;
        }).collect(Collectors.toList());

        //查询
        Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());

        //提取list
        List<Map<String, Object>> list = data.get(queryDto.getTagIds().get(0));

        //导出
        try{
            String sheetTitle = "采集数据";
            String[] title = new String[]{"值", "时间"};
            ExcelUtil.exportPointValue(sheetTitle, title, new ArrayList<>(list), response);
        } catch (Exception ex) {
            return false;
        }
        return true;
    }

    @Override
    public List<PointValueExportVO> exportPointValue(ApiPointValueQueryDTO queryDto) {

        List<PointValueExportVO> pointValueExportList = new ArrayList<>();

        //构建参数
        Map<String, Object> params = new HashMap<>(1);
        params.put("pointNos", queryDto.getPointNos());

        //查询point列表
        List<DaPointDTO> pointList = daPointService.list(params);
        if (CollectionUtils.isEmpty(pointList)) {
            return pointValueExportList;
        }

        //插入pointType
        List<InfluxPointValuePOJO> influxParams = pointList.stream().map(item -> {
            InfluxPointValuePOJO pojo = new InfluxPointValuePOJO();
            pojo.setPoint(item.getPointNo());
            pojo.setType(item.getDataType());
            return pojo;
        }).collect(Collectors.toList());

        //查询
        Map<String, List<Map<String, Object>>> data = queryPointsValues(influxParams, queryDto.getStart(), queryDto.getEnd());

        //提取list
        List<Map<String, Object>> list = data.get(queryDto.getPointNos().get(0));
        for(Map<String, Object> map : list){
            PointValueExportVO dto = new PointValueExportVO();
            dto.setDatatime(map.get("time").toString());
            dto.setDatavalue(map.get("value").toString());
            pointValueExportList.add(dto);
        }

        return pointValueExportList;
    }

    @Override
    public Map<String, Object> queryPointsLastValue(List<InfluxPointValuePOJO> influxParams) {
        Map<String, Object> result = new HashMap<>(influxParams.size());
        if (influxQLQueryApi == null) {
            influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
        }
        for (int i = 0; i < influxParams.size(); i++) {
            InfluxPointValuePOJO point = influxParams.get(i);
            String measurement = PointValueUtils.getMeasurement(point.getType());
            StringBuilder sql = new StringBuilder();
            sql.append("SELECT LAST(value) FROM ");
            sql.append(measurement);
            sql.append(" WHERE point = '");
            sql.append(point.getPoint());
            sql.append("'");
            InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
            Object value = data.getResults().get(0).getSeries().get(0).getValues().get(0).getValueByKey("last");
            result.put(point.getPoint(), value);
        }
        return result;
    }

    @Override
    public Object queryPointMaxValue(InfluxPointValuePOJO point, Date startTime) {
        if (influxQLQueryApi == null) {
            influxQLQueryApi = influxDBInstance.getClient().getInfluxQLQueryApi();
        }
        long utcMillis = startTime.getTime() -rawOffset;
        String utsStart = DateUtils.format(new Date(utcMillis), DateUtils.FORMAT_YEAR_MONTH_DAY);
        log.info("utsStart=" + utsStart);
        String measurement = PointValueUtils.getMeasurement(point.getType());
        StringBuilder sql = new StringBuilder();
        sql.append("SELECT MAX(value) FROM ");
        sql.append(measurement);
        sql.append(" WHERE point = '");
        sql.append(point.getPoint());
        sql.append("' AND time >= '" + utsStart +"'");
        InfluxQLQueryResult data = influxQLQueryApi.query(new InfluxQLQuery(sql.toString(), influxDBInstance.getBucket()));
        if (data == null) {
            return null;
        }
        return data.getResults().get(0).getSeries().get(0).getValues().get(0).getValues()[1];
    }

    @Override
    public List<Map<String, Object>> queryPointValues(InfluxPointValuePOJO pojo, Date startTime, Date endTime) {
        List<InfluxPointValuePOJO> influxParams = new ArrayList<>();
        influxParams.add(pojo);
        Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
        return data.get(pojo.getPoint());
    }

    @Override
    public Map<String, List<Map<String, Object>>> queryPointsValues(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
        if (queryApi == null) {
            queryApi = influxDBInstance.getClient().getQueryApi();
        }
        if (CollectionUtils.isEmpty(influxParams)) {
            return null;
        }
        Map<String, List<Map<String, Object>>> result = new HashMap<>();
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.MILLISECOND, 0);
        String start = startTime.getTime() - calendar.getTime().getTime() + "ms";
        if (endTime==null){
            endTime= new Date();
        }
        String stop = endTime.getTime() - calendar.getTime().getTime() + "ms";

        for (int i = 0; i < influxParams.size(); i++) {
            List<Map<String, Object>> dataList = new ArrayList<>();
            InfluxPointValuePOJO point = influxParams.get(i);
            String measurement = PointValueUtils.getMeasurement(point.getType());
            StringBuilder sb = new StringBuilder();
            sb.append("from(bucket:\"" + influxDBInstance.getBucket() + "\") ");
            sb.append("|> range(start: ").append(start).append(", stop: ").append(stop).append(") ");
            sb.append("|> filter(fn: (r) => r[\"_measurement\"] == \"" + measurement + "\")");
            sb.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
            sb.append("|> filter(fn: (r) => r[\"point\"] == \"" + point.getPoint() + "\")");
            sb.append("|> sort(columns: [\"_time\"]) ");
            sb.append("|> yield(name: \"mean\")");
            System.out.println("influxdbSql===============" + sb.toString());
            List<FluxTable> tables = queryApi.query(sb.toString(), influxDBInstance.org);

            for (FluxTable table : tables) {
                List<FluxRecord> records = table.getRecords();
                for (FluxRecord record : records) {
                    Map<String, Object> dataIem = new HashMap<>(2);
                    dataIem.put(VALUE, record.getValueByKey("_value"));
                    dataIem.put(TIME, DateUtils.format(Date.from(record.getTime()), DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND));
                    dataList.add(dataIem);
                }
            }
            result.put(point.getPoint(), dataList);
        }
        return result;
    }

    @Override
    public Map<String, Object> queryPointsSpread(List<InfluxPointValuePOJO> influxParams, Date startTime, Date endTime) {
        Map<String, Object> result = new HashMap<>();
        Map<String, List<Map<String, Object>>> data = this.queryPointsValues(influxParams, startTime, endTime);
        if (CollectionUtils.isEmpty(data)) {
            return result;
        }
        data.forEach((k, v) -> {
            if (!CollectionUtils.isEmpty(v)) {
                BigDecimal lastValue = new BigDecimal(v.get(v.size() - 1).get(VALUE).toString());
                BigDecimal firstValue = new BigDecimal(v.get(0).get(VALUE).toString());
                BigDecimal spread = (lastValue.subtract(firstValue));
                result.put(k, spread.compareTo(BigDecimal.ZERO) < 0 ? lastValue : spread);
            }
            log.info(k + ",isEmpty");
        });
        return result;
    }
}