TA的每日心情 | 擦汗 10 小时前 |
---|
签到天数: 1046 天 连续签到: 4 天 [LV.10]测试总司令
|
需求使用背景, 业务系统自定义了一套数据处理语言,支持sql,elasticsearch查询数据,然后经过一系列自定义处理,自定义处理支持执行python脚本,最终返回给调用端。
具体执行流程,java服务执行页面输入的python语句,python语句调用java方法得到结果,python进行自定义处理后再返回给java服务,java服务最终返回具体的结果给客户端。
语法示例 其中, | dbquery , | python 为自定义语法示例,最终是通过java执行返回结果。
- | dbquery sql="select id,num,create_time from security_log"
- | python
- import sys
- from py4j.java_gateway import JavaGateway, GatewayParameters
- gateway = JavaGateway(gateway_parameters=GatewayParameters(port=int(sys.argv[1])))
- SESSION_ID = sys.argv[2]
-
- # 获取java 实例类
- entry= gateway.entry_point
- # 调用 java具体方法,java返回具体的json
- preDataJson = entry.getPreDateJson(SESSION_ID)
-
- # 以下开始处理java返回的结果json
- import pandas as pd
- df = pd.read_json(preDataJson)
- rtn_data = df[:2].to_json(orient='records')
- # 处理完成的rtn_data 再设置给java 实例
- entry.setDatasetJson(SESSION_ID,rtn_data )
复制代码
首先java服务中引用依赖。
- <dependency>
- <groupId>net.sf.py4j</groupId>
- <artifactId>py4j</artifactId>
- <version>0.10.7</version>
- </dependency>
复制代码
首先java端定义使用pythonServer网关入口。
- public class PythonServer {
-
- /**
- * python 网关服务
- */
- private GatewayServer gatewayServer;
-
- /**
- * python 服务启动端口
- */
- private int port = GatewayServer.DEFAULT_PORT;
-
- /**
- * 缓存当前几个 python reader
- */
- private Map<String, PythonComd> pythonComdMap = new ConcurrentHashMap<>();
-
- public PythonServer() {
- for (int i = 0; i < 10; i++) {
- if (gatewayServer == null) {
- try {
- gatewayServer = new GatewayServer(new EntryPoint(), port);
- gatewayServer.start();
- log.info("Python 网关启动成功,端口:" + port);
- } catch (Exception e) {
- log.warn("Python 网关启动失败,端口:" + port);
- port += 2;//每次加2
- log.warn("Python 网关尝试下一个启动端口:" + port);
- }
- }
- }
- if (gatewayServer == null) {
- log.error("Python 网关启动失败 10 次,不再进行重试,终止启动。");
- System.exit(-1);
- }
- }
-
- /**
- * 打开一个 java to python 的会话,实际只是 put 一个识别自身的 uuid
- * @param uuid
- * @param pythonComd
- */
- public void openSession(String uuid, PythonComd pythonComd) {
- pythonComdMap.put(uuid, pythonComd);
- }
-
- /**
- * 关闭一个 java to python 的会话
- * @param uuid
- */
- public void closeSession(String uuid) {
- pythonComdMap.remove(uuid);
- }
-
- /**
- * 用于 python 与 java GatewayServer 通信的端口号
- * @return
- */
- public int getPort() {
- return port;
- }
-
- /**
- * 当 java 进程关闭时,结束 python 网关。
- */
- @PreDestroy
- public void shutdown() {
- gatewayServer.shutdown();
- }
-
- class EntryPoint {
-
- /**
- * 可以传递 json 给 python,但限制最大加载条数为 ENGINE_QUERY_LIMIT
- * 优点:结构化数据,支持嵌套数组。缺点:空数组时没有字段名。
- * @param uuid
- * @return
- */
- public String getPreDataJson(String uuid) {
- PythonComd pythonComd = pythonComdMap.get(uuid);
- if (pythonComd == null) {
- return null;
- }
- Dataset<Row> preDataset = pythonComd.getPreDateset();
- if (preDataset == null) {
- return null;
- }
- try {
- SparkContext sparkContext = preDataset.sparkSession().sparkContext();
- PythonConf conf = pythonComd.getConf();
- String taskId = conf.getTaskId();
- String description = AppConfig.ENGINE_ADDR + "/search?taskId=" + taskId;// 仅用于 spark web ui 显示,暂时没有扩展此接口。
- sparkContext.setJobGroup(uuid, description, true);
- sparkContext.setLocalProperty(CallSite.SHORT_FORM(), "Python 进程获取 JSON 格式结果集。");
-
- List<Map<String, Object>> list = DatasetUtil.datasetToList(preDataset, AppConfig.ENGINE_QUERY_LIMIT, taskId, "python 进程获取 json 格式结果集。");
- return JsonUtil.objToJson(list);
- } catch (GplException e) {
- log.error(e.getMessage(), e);//2021-02-04,暂时没想明白 python 拉取 java 时,异常传递给前端
- }
- return null;
- }
-
-
-
- /**
- * 接收 python 返回的 json。
- * @param uuid
- * @param json
- */
- public void setDataJson(String uuid, String json) {
- PythonComd pythonComd = pythonComdMap.get(uuid);
- if (pythonComd == null) {
- return;
- }
- if (json == null || json.isEmpty()) {
- PythonConf conf = pythonComd.getConf();
- log.error("任务 id:" + conf.getTaskId() + "," + conf.getComdName() + " 命令,Python 进程返回的 JSON 数据为空!");
- return;
- }
- List<String> list = new ArrayList<>();
- list.add(json);
- SparkSession sparkSession = BeanFactory.getSparkSession();
- Dataset<String> datasetTmp = sparkSession.createDataset(list, Encoders.STRING());
- Dataset<Row> dataset = sparkSession.read().json(datasetTmp);
- pythonComd.setReturnResult(dataset);
- }
- }
- }
复制代码
java端执行python代码。
- public class PythonComd extends BaseComd<PythonConf> {
-
- /**
- * 单例,用于和 python 进程通信。
- */
- @Resource
- private PythonServer pythonServer;
-
- /**
- * python 进程执行结束后,回调 pythonComd.setReturnResult(dataset);
- * execSparkDataset() 方法,返回 this.returnResult;
- */
- @Setter
- private Dataset<Row> returnResult;
-
- /**
- * 输出流字符串。
- */
- private StringBuilder outputStringBuilder = new StringBuilder();
-
- /**
- * 错误流字符串。
- */
- private StringBuilder errorStringBuilder = new StringBuilder();
-
- /**
- * 构造方法,传入配置类。
- * @param conf
- */
- public PythonComd(PythonConf conf) {
- super(conf);
- }
-
-
- /**
- * 以命令行的方式,执行 python 进程。
- * @param preDataset 上一条命令生成的结果集对象
- * @return
- * @throws GplException
- */
- @Override
- public Dataset<Row> execSparkDataset(Dataset<Row> preDataset) throws GplException {
- if (preDataset == null) {
- throw new GplException("python 命令,未找到结果集!");
- }
- String script = conf.getExpression();
- String fileName = conf.getFilename();
- if (StringUtils.isEmpty(script) && StringUtils.isEmpty(fileName)) {
- throw new GplException("python 命令,脚本为空!");
- }
- if (StringUtils.isEmpty(script) && !StringUtils.isEmpty(fileName)) {//如果是执行py文件
- File pyFile = new File(NConst.PYTHON_PATH + fileName);
- if (!pyFile.exists()) {
- throw new GplException("python 命令,GPL 的 python 目录找不到脚本文件:" + fileName);
- }
- }
- if (!StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
- File file = new File(AppConfig.PYTHON_ENV);
- if (!file.exists()) {
- throw new GplException("python 命令,自定义环境变量,路径不存在:" + AppConfig.PYTHON_ENV);
- }
- }
-
- String uuid = conf.getUuid();
- pythonServer.openSession(uuid, this);
- String containerName = uuid;
- try {
- String scriptPath = null;
- if (!StringUtils.isEmpty(script)) {//优先执行脚本,如果没有脚本语句赐执行文件
- String md5 = DigestUtils.md5Hex(script);
- String tmpName = md5 + ".py";
- scriptPath = NConst.PYTHON_PATH + tmpName;
- File tmpFile = new File(scriptPath);//临时文件不再结束调用后删除,以后完善成定期清理
- FileUtils.write(tmpFile, script, NConst.CHARSET_UTF_8);//把脚本写成临时文件
- } else {
- scriptPath = NConst.PYTHON_PATH + fileName;
- }
-
- List<String> command = new ArrayList<>();
- if (!StringUtils.isEmpty(conf.getDocker())) {
- // docker run --name containerName -v /home/nyx:/home/nyx --net host python:slim python /home/nyx/test.py 25333 uuid
- command.add("docker");
- command.add("run");
- command.add("--name");
- command.add(containerName);//容器名
- command.add("-e");
- command.add("TZ=Asia/Shanghai");//设置时区
- command.add("-v");
- command.add(NConst.PYTHON_PATH + ":" + NConst.PYTHON_PATH);// 绑定挂载目录
- command.add("--net");
- command.add("host");
- command.add(conf.getDocker());// docker 镜像名
- }
-
- if (StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
- command.add("python");
- } else {
- command.add(AppConfig.PYTHON_ENV);
- }
- command.add(scriptPath);//py文件路径
- command.add(String.valueOf(pythonServer.getPort()));//端口号
- command.add(uuid);
- String cmd = String.join(" ", command);
- log.info("任务 id:" + conf.getTaskId() + ",python 执行命令:" + cmd);
- ProcessBuilder processBuilder = new ProcessBuilder(command);
- Process process = processBuilder.start();
- this.printProcessStream(process);
- int timeout = conf.getTimeout();
- log.info("任务 id:" + conf.getTaskId() + ",python 脚本执行超时参数为:" + timeout);
- if (timeout > 0) {
- boolean isExited = process.waitFor(timeout, TimeUnit.MILLISECONDS);//脚本执行超时退出
- if (!isExited) {//如果命令行子进程没有退出,销毁子进程。
- process.destroy();
- }
- } else {
- process.waitFor();
- }
- this.clearTmpFile();//最后执行一下清理目录
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- } finally {
- if (!StringUtils.isEmpty(conf.getDocker())) {
- try {
- Process process = new ProcessBuilder("docker", "rm", "-f", containerName).start();
- this.printProcessStream(process);
- process.waitFor();
- log.info("任务 id:" + conf.getTaskId() + ",docker 执行 python 结束!删除容器:" + containerName);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
- pythonServer.closeSession(uuid);
- }
- if (errorStringBuilder.length() > 0) {
- String error = errorStringBuilder.toString();
- throw new GplException("python 命令,脚本执行异常!", error);
- }
- if (outputStringBuilder.length() > 0) {
- String input = outputStringBuilder.toString();
- comdTask.addWarnMsg("python 命令,控制台打印:" + input);
- }
- if (returnResult == null) {
- throw new RuntimeException("python 命令,脚本执行结束,脚本返回结果集为空!");
- }
- // 此处不要计算 count(),会消耗掉几十秒,直接打印一下字段名就行了。
- Map<String, String> fieldMap = new HashMap<>();
- StructType structType = returnResult.schema();
- StructField[] structFieldArray = structType.fields();
- for (StructField structField : structFieldArray) {
- fieldMap.put(structField.name(), structField.dataType().simpleString());
- }
- log.info("任务 id:" + conf.getTaskId() + ",python 命令,返回结果集字段:" + fieldMap.toString());
- return this.returnResult;
- }
-
- /**
- * 清理生成的临时文件
- */
- private void clearTmpFile() {
- File pyPath = new File(NConst.PYTHON_PATH);
- File[] fileArray = pyPath.listFiles(f -> {
- return f.getName().length() == 35//文件名长度 35 的是临时文件
- && System.currentTimeMillis() - f.lastModified() > 86400000;//删除大于 1 天的文件
- });
- for (File file : fileArray) {
- file.delete();
- }
- }
-
- /**
- * 启动两个后台线程,用于接收 python 进程的输出流。
- * @param process
- */
- private void printProcessStream(Process process) {
- Thread inputThread = new Thread(() -> {
- try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
- String input = null;
- while ((input = inputReader.readLine()) != null) {
- outputStringBuilder.append("\n");
- outputStringBuilder.append(input);
- }
- if (outputStringBuilder.length() > 0) {
- log.info("任务 id:" + conf.getTaskId() + ",python print(): " + outputStringBuilder.toString());
- }
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- });
- inputThread.setName("gpl-py-input-stream-task-id=" + conf.getTaskId());
- inputThread.setDaemon(true);
- inputThread.start();
-
- Thread errorThread = new Thread(() -> {
- try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
- String error = null;
- while ((error = errorReader.readLine()) != null) {
- errorStringBuilder.append("\n");
- errorStringBuilder.append(error);
- }
- if (errorStringBuilder.length() > 0) {
- log.error("任务 id:" + conf.getTaskId() + ",python error: " + errorStringBuilder.toString());
- }
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- });
- errorThread.setName("py-error-stream-task-id=" + conf.getTaskId());
- errorThread.setDaemon(true);
- errorThread.start();
- }
- }
复制代码
|
|