Streamlit如何跳转第三方页面实现登陆鉴权

2023-02-28  本文已影响0人  越大大雨天

本文关键字:streamlit python cas authentication multi-page

Python可以使用streamlit框架实现0前端代码,全栈开发数据大屏等简单的可交互可视化web项目。

Streamlit: A faster way to build and share data apps

普通的提交表单用户名密码的方式实现登陆可以使用streamlit-authenticator第三方包轻松实现。
该文章主要记录了在使用streamlit实际场景中,遇到需要跳转第三方cas页面进行登陆的情况,改如何实现streamlit应用的鉴权登陆。

streamlit相关参考文档

  1. 官方文档:https://docs.streamlit.io/
  2. 实现表单登陆文档: https://blog.streamlit.io/streamlit-authenticator-part-1-adding-an-authentication-component-to-your-app/
  3. 实现跳转登陆参考文档: https://levelup.gitconnected.com/building-a-multi-page-app-with-streamlit-and-restricting-user-access-to-pages-using-aws-cognito-89a1fb5364a3

效果展示

代码实现

streamlit基本代码释义请参考官方文档,不再赘述。

代码结构

代码结构

代码展示

实现方法直接用代码来说明,在每个展示页面前,需要先判断鉴权,未登陆时则只提示并展示登陆按钮;登陆成功后展示页面内容。
直接参考以下代码,即可复用在其余场景中。

1. 配置文件

抽取的配置文件位于config.py

import os

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

# 后端数据接口地址,用于请求实际的数据
API_HOST = "http://127.0.0.1:8080/api"

# streamlit运行后访问地址
FE_HOST = "http://127.0.0.1:8501"

# CAS跳转登陆地址
CAS_LOGIN_URL = "https://127.0.0.1:4436/sec/login"

# 允许访问的用户名
ALLOWED_USERS = [
    "user1",
    "user2",
    "user3"
]

2. 跳转鉴权组件代码

鉴权组件代码位于utils/authenticate.py

import streamlit as st
import requests
import json

from config import FE_HOST, CAS_LOGIN_URL, ALLOWED_USERS


# ------------------------------------
# Initialise Streamlit state variables
# ------------------------------------
def initialise_st_state_vars():
    if "auth_code" not in st.session_state:
        st.session_state["auth_code"] = ""
    if "authenticated" not in st.session_state:
        st.session_state["authenticated"] = False
    if "user_cognito_groups" not in st.session_state:
        st.session_state["user_cognito_groups"] = []


# ----------------------------------
# Get authorization code after login
# ----------------------------------
def get_auth_code():
    auth_query_params = st.experimental_get_query_params()
    try:
        auth_code = dict(auth_query_params)["sid"][0]
    except (KeyError, TypeError):
        auth_code = ""

    return auth_code

# -----------------------------
# Set Streamlit state variables
# -----------------------------
def set_st_state_vars():
    initialise_st_state_vars()
    auth_code = get_auth_code()
    user_info = get_auth_user(sid=auth_code) or {}

    if user_info.get("user"):
        st.session_state["auth_code"] = auth_code
        st.session_state["authenticated"] = True
        st.session_state["user"] = user_info.get("user")
        st.session_state["email"] = user_info.get("mail")
        st.session_state["display"] = user_info.get("display")


# -----------------------------
# Login/ Logout HTML components
# -----------------------------
login_link = f"{CAS_LOGIN_URL}?ref={FE_HOST}"


html_css_login = """
<style>
.button-login {
  background-color: skyblue;
  color: white !important;
  padding: 1em 1.5em;
  text-decoration: none;
  text-transform: uppercase;
}

.button-login:hover {
  background-color: #555;
  text-decoration: none;
}

.button-login:active {
  background-color: black;
}

</style>
"""

html_button_login = (
    html_css_login
    + f"<a href='{login_link}' class='button-login' target='_self'>Log In</a>"
)


def button_login():
    """

    Returns:
        Html of the login button.
    """
    _, col, _ = st.columns(3)
    return col.markdown(f"{html_button_login}", unsafe_allow_html=True)


def button_logout():
    """

    Returns:
        Html of the logout button.
    """
    def logout_click():
        st.session_state["authenticated"] = False
    st.sidebar.button("Logout", on_click=logout_click)
    print(st.session_state)


def get_auth_user(sid, ref=FE_HOST):
    cas_url = f"{CAS_LOGIN_URL}?sid=%s&ref=%s" % (sid, ref)
    if not sid or not ref:
        return

    user_info = requests.get(cas_url ).text
    try:
        user_dict = json.loads(user_info)
    except json.decoder.JSONDecodeError:
        return
    else:
        return user_dict


def is_allowed_user():
    if st.session_state["email"] in ALLOWED_USERS:
        return True
    return False
3. 入口主页面文件

入口位于Welcome.py
需要在所有展示的页面代码文件前都加上鉴权代码

import streamlit as st

from utils import authenticate
from utils.authenticate import is_allowed_user


st.set_page_config(
    page_title="Welcome",
    page_icon="👋",
)

# 初始化鉴权变量
authenticate.set_st_state_vars()

# Add login/logout buttons,点击可跳转
if not st.session_state.get("authenticated"):
    st.warning("Please login!")
    authenticate.button_login()
else:
    authenticate.button_logout()
    if not is_allowed_user():
        st.error("You do not have access. Please contact the administrator.")
    else:
        # else,页面展示代码位于通过鉴权后
        st.title("欢迎使用XX仪表盘 👋")

        st.markdown(
            """
           
            该项目为streamlit跳转登陆测试项目\n
            **👈 请从侧边栏进入功能页**
            ### 官方参考文档
            - Streamlit: [Streamlit](https://docs.streamlit.io/)
            - 表单登陆: [streamlit-authenticator](https://blog.streamlit.io/streamlit-authenticator-part-1-adding-an-authentication-component-to-your-app/)
        
            ### 实现跳转登陆参考文档
            - [参考文档](https://levelup.gitconnected.com/building-a-multi-page-app-with-streamlit-and-restricting-user-access-to-pages-using-aws-cognito-89a1fb5364a3)
    
        """
        )
4. 分页面文件

分页示例文件位于pages/1_📊__Overview.py

import datetime

import streamlit as st

from utils import dict_to_dataframe, authenticate
from utils.authenticate import is_allowed_user

from utils.plotly_utils import plotly_pie, plotly_bar

st.set_page_config(
    page_title="Overview",
    page_icon="👋",
    layout="wide"
)

authenticate.set_st_state_vars()

# Add login/logout buttons
if not st.session_state.get("authenticated"):
    st.warning("Please login!")
    authenticate.button_login()
else:
    authenticate.button_logout()
    # 可以继续限制允许登陆的用户
    if not is_allowed_user():
        st.error("You do not have access. Please contact the administrator.")
    # 通过登陆后的页面代码
    else:
        st.subheader("数据量概览")
        overview_data = {
            "updateTime": "2023-01-01 00:00:00",
            "total": "12345614",
            "todaySeen": "12345",
            "recent7daysSeen": "1234561",
            "black": "2435"
        }
        overview_group_data = {
            'all': {
                'source': {
                    'source1': 13467,
                    'source2': 56900,
                    'source3': 89580409,
                    'source4': 25405
                },
                'tag1st': {
                    'tag1': 1414953,
                    'tag2': 3112059,
                    'tag3': 48486,
                    'tag4': 23226,
                    'tag5': 4907815,
                    'tag6': 9690544
                },
                'category': {
                    'red': 1382345,
                    'green': 258362
                },
                'updateTime': '2023-03-01T08:31:00.345000Z'
            },
            'recent7Days': {
                'source': {
                    'source1': 245,
                    'source2': 2457,
                },
                'tag1st': {
                    'tag1': 12345,
                    'tag3': 12345,
                    'tag4': 1235,
                },
                'category': {
                    'red': 1341,
                    'green': 3456
                },
                'updateTime': '2023-03-01T08:31:00.345000Z'
            },
            'calculateDate': '20230301'
        }

        st.markdown(f"`概览统计时间:{overview_data['updateTime']}`")

        recent_7_group_data = overview_group_data["recent7Days"]
        all_group_data = overview_group_data["all"]
        overview_group_date = overview_group_data["calculateDate"]

        col1, col2, col3, col4 = st.columns(4)
        col1.metric("数据总量", overview_data["total"])
        col2.metric("今日生效", overview_data["todaySeen"])
        col3.metric("最近7日生效", overview_data["recent7daysSeen"])
        col4.metric("无效数据", overview_data["black"])

        st.markdown("### 数据分类统计")
        st.markdown(f"`分类数据生效时间:{datetime.datetime.strptime(overview_group_date, '%Y%m%d')}`")
        date_range = st.sidebar.radio(
            "您关注哪一时间段数据?",
            ('最近七日', '全量'),
            index=1
        )

        display_type = st.sidebar.multiselect(
            "您想使用哪些方式展示数据?",
            ('图形', '表格'),
            default=('图形', '表格')
        )

        if date_range == "最近七日":
            group_data = recent_7_group_data
        else:
            group_data = all_group_data

        source_data = group_data["source"]
        tag1st_data = group_data["tag1st"]
        category_data = group_data["category"]


        def write_overview_data_charts(data):
            tab1, tab2 = st.tabs(["Pie(default)", "Bar"])
            with tab1:
                fig_pie = plotly_pie(data)
                st.plotly_chart(fig_pie, use_container_width=True)
            with tab2:
                fig_bar = plotly_bar(data)
                st.plotly_chart(fig_bar, use_container_width=True)


        def write_overview_data_table(data, key_name):
            tab1, tab2 = st.tabs(["table(default)", "json"])
            with tab1:
                st.dataframe(dict_to_dataframe(data, key_name), use_container_width=True)
            with tab2:
                st.json(data)


        def write_overview_to_column(data, title, table_key):
            st.markdown(f"#####  {title}")
            if "图形" in display_type:
                write_overview_data_charts(data, )
            if "表格" in display_type:
                write_overview_data_table(data, key_name=table_key)


        if not display_type:
            st.sidebar.error("请至少选择一种展示方式")
            st.error("请在左侧复选框选择至少一种展示方式")

        if display_type:
            col5, col6, col7 = st.columns(3)
            with col5:
                write_overview_to_column(source_data, title="情报源分布", table_key="数据源")
            with col6:
                write_overview_to_column(tag1st_data, title="标签分布", table_key="标签")
            with col7:
                write_overview_to_column(category_data, title="数据类别分布", table_key="数据类别")

最后

上一篇下一篇

猜你喜欢

热点阅读