会员可以在此提问,尚新途老师有问必答

对大家有帮助的问答会被标记为“推荐”,看完课程过来浏览一下别人提的问题,会帮你学得更全面

截止目前,同学们一共提了128799个问题
wechatlpjFYH2020-06-30 21:22:49

发现没有关闭防火墙

image.png

所有节点的状态是:

image.png


关闭所有的防火墙,问题解决了

都已经ssh免密登录了,还需要把每一个ip地址添加到信任区?

还是什么需要再配置什么?

相关课程:大数据全系列/第四阶段:Zookeeper分布式协调服务框架/ZooKeeper分布式集群实战
563612020-06-29 19:46:15
   val kafkaDDL :String = 
        """
        |create table MyTable (
        |componentType STRING,
        |endDate  STRING,
        |envName  STRING,
        |resultId  STRING,
        |spendTime  STRING,
        |returnDataNum  STRING,
        |startDate  STRING,
        |tableName  STRING,
        |tenantName  STRING
        |) with (
        |    'connector.type' = 'kafka',
        |    'connector.version' = '0.10',
        |    'connector.topic' = 'test',
        |    'connector.properties.1.key' = 'bootstrap.servers',
        |    'connector.properties.1.value' = 'localhost:9092',
        |    'connector.properties.0.key' = 'zookeeper.connect',
        |    'connector.properties.0.value' = 'localhost:2181',
        |    'connector.properties.2.key' = 'group.id',
        |    'connector.properties.2.value' = 'testGroup',
        |    'connector.startup-mode' = 'earliest-offset',
        |    'connector.property-version' = '1',
        |    'format.type' = 'json',
        |    'format.property-version' = '1',
        |    'update-mode' = 'append'
        |)
        """.stripMargin
        tableEnv.sqlUpdate(kafkaDDL)
    val hbaseDDL :String =
      """
        |Create table flink_log1 (
        |rowkey string,
        |cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
        |) with(
        |   'connector.type' = 'hbase',
        |   'connector.version' = '1.4.3',
        |   'connector.table-name' = 'flink_log1',
        |   'connector.zookeeper.quorum' = '....:2181,....:2181',
        |   'connector.zookeeper.znode.parent' = '/hbase',
        |   'connector.write.buffer-flush.max-size' = '10mb',
        |   'connector.write.buffer-flush.max-rows' = '1000'
        |)
      """.stripMargin
    tableEnv.sqlUpdate(hbaseDDL)
    val sql =
        "select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
        "count(tenantName) as f1 ," +
        "count(case when resultId =  '2' then resultId else '' end) as f2 ,"+
        "avg(spendTime) as f3 ,"+
        "sum(returnDataNum) as f4 ,"+
        "count(case when resultId =  '1' then tenantName else '' end) as f5 ,"+
        "tenantName "+
        "from MyTable where substring(endDate,1,10)='2020-06-28' " +
        "group by CONCAT_WS('_',tenantName,tenantName) , tenantName"
    val table: Table = tableEnv.sqlQuery(sql)
    tableEnv.createTemporaryView("tmp",table)
    tableEnv.sqlUpdate("insert into flink_log1 " +
      "select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
    streamEnv.execute("my insert hbase sql")


这里的情况主要是插入hbase的时候group by 多个字段会导致

报:

Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated.

但是我group by 单个字段是的时候是没有问题的,也就是当我加上group by tenantName的时候就会报这个错误

麻烦老师看一下这是什么问题

谢谢~

相关课程:大数据全系列/第二十二阶段:Flink实时计算引擎/TableAPI和Flink的SQL
微信用户2020-06-07 23:11:30

代码里为什么zk连接设置了会话超时5秒,而服务生产端在session过期后还能持提供服务,zk连接中的sessiontimeout到底是什么意思?

相关课程:大数据全系列/第四阶段:Zookeeper分布式协调服务框架/分布式RMI协调实战
侯兆瑞18软12中北2020-05-10 18:28:02

老师网络配好了为什么xshell连不上虚拟机

相关课程:大数据全系列/第一阶段:Linux 操作系统/Linux操作系统概述与安装
Aaron2020-04-11 11:46:26

我这是带答疑的还是不带答疑的?为什么提问好几个月都没人回答呢?

相关课程:大数据全系列/第八阶段:Hive数仓 项目一 电信基站掉话率分析实战/运算符和函数、实战、动态分区、分桶
苦战1802020-04-10 12:06:02

如下图一样在搭建kylin时检测环境一直没有反应,不知道怎么回事该怎么解决呢?我hadoop是2.6.5 hive1.21 hbase 1.29 

KW8Z9W%(T]MR(4%A6S)JM01.png

相关课程:大数据全系列/第二十五阶段:Kylin 数据仓库分析引擎(扩展)/Kylin架构原理及实战操作
林见鹿2020-03-24 18:47:50

老师 忘机删除后面的rules文件 就重启了 后来再删除来得及不

相关课程:大数据全系列/第一阶段:Linux 操作系统/Linux操作系统概述与安装
YXFvmmZpZ2qdbmw=2020-03-15 07:39:53

这几集的笔记和资料能不能补发一下下下下?老师

相关课程:大数据全系列/第二十四阶段:CDH集群管理/Cloudera Manager 安装及CDH安装
Aaron2020-03-09 17:42:06

MapTask的数量是由切片数决定的,maptask中的分区数是由什么决定的呢?

相关课程:大数据全系列/第七阶段:Hadoop 分布式计算MapReduce和资源管理Yarn/MapReduce 原理和搭建
YXFvmmZpZ2qdbmw=2020-03-09 12:37:07

这个视频里面的离线文档能不能共享一下???

相关课程:大数据全系列/第十阶段:分布式数据库 HBase/hbase第1章
沉默是金2020-02-20 14:53:17
from pyspark import SparkConf, SparkContext


def myprint(one):
    print(one)


def top2list(one):
    website = one[0]
    locals = one[1]
    localdic = {}
    for local in locals:
        if local in localdic:
            localdic[local] += 1
        else:
            localdic[local] = 1
    site_locallist = sorted(localdic.items(), key=lambda tp: tp[1], reverse=True)
    returnlist = []
    if (len(site_locallist) > 2):
        for i in range(2):
            returnlist.append(site_locallist[i])
    else:
        returnlist = site_locallist
    return website, returnlist


def get_site_uid_count(one):
    uid = one[0]
    sites = one[1]
    siteDic = {}
    for site in sites:
        if site in siteDic:
            siteDic[site] += 1
        else:
            siteDic[site] = 1
    returnlist = []
    for site, count in siteDic.items():
        returnlist.append((site, (uid, count)))
    return returnlist


def top3list(one):
    website = one[0]
    uid_count_itr = one[1]
    top3lists = ['', '', '']
    for uid_count in uid_count_itr:
        count = uid_count[1]
        for i in range(len(top3lists)):
            if top3lists[i] == "":
                top3lists[i] = uid_count
                break
            elif top3lists[i][1] < count://运行时总是提示这行,str与int之间不能比较,打印type(top3lists[i][1])为int)
                for j in range(2, i, -1):
                    top3lists[j] = top3lists[j - 1]
                top3lists = uid_count
                break

    return website, top3lists


def getApv():
    conf = SparkConf()
    conf.setMaster("local")
    conf.setAppName("pv")
    sc = SparkContext(conf=conf)
    linesRDD = sc.textFile("D:\mypython\date\website")
    # linesRDD.map(lambda line:(line.split("\t")[2],1)).reduceByKey(lambda v1,v2:v1+v2).\
    #     sortBy(lambda tp:tp[1],ascending=False).foreach(lambda one:myprint(one=one))
    # 每个网站访问最多的top2地区
    # site_local = linesRDD.map(lambda line:(line.split("\t")[4],line.split("\t")[2])).groupByKey()
    # site_local.map(lambda one:top2list(one)).foreach(print)
    # 每个网站最活跃的用户top3
    linesRDD.map(lambda line: (line.split("\t")[3], line.split("\t")[4])).groupByKey() \
        .flatMap(lambda one: get_site_uid_count(one)).groupByKey().map(lambda one: top3list(one)).foreach(print)


if __name__ == "__main__":
    getApv()


相关课程:大数据全系列/(隐藏)第十三阶段:机器学习及推荐系统实战/PySpark及线性回归算法
2020-02-06 19:55:40

为什么在装好xshell 连上linux服务器后,70-persistent-net.rules依然存在呢?我又重新删除了一遍

相关课程:大数据全系列/第一阶段:Linux 操作系统/Linux操作系统概述与安装
林先生2020-01-13 14:26:36

image.png

我这个版本的VMware怎么是这样,上面怎么没有编辑、查看什么的啊

相关课程:大数据全系列/第一阶段:Linux 操作系统/Linux操作系统概述与安装

©2014-2023 百战汇智(北京)科技有限公司 All Rights Reserved 北京亦庄经济开发区科创十四街 赛蒂国际工业园
网站维护:百战汇智(北京)科技有限公司
京公网安备 11011402011233号    京ICP备13018289号-12    营业执照    经营许可证:京B2-20212637