首页 文章

如何通过自定义守护程序和图形REST API访问多个用户的OneDrive文件?

提问于
浏览
2

我们正在开发一种守护程序服务,该服务将定期自动连接到Microsoft Graph API,以列出所有用户驱动器中包含敏感内容的所有文件 . 我们在Azure / Office365租户帐户中设置了一个自定义应用程序,该帐户启用了许多权限(所有Graph和Sharepoint权限(以及其他一些权限),以便进行测试) .

使用Graph Explorer工具和我的个人登录帐户,我可以使用 /me/drive/root/children endpoints 和 /users('<user-id>')/drive/root/children endpoints 列出我自己的驱动器帐户中的文件(当用户ID是我自己的时) . 当我尝试使用curl和 client_credentials client_credentials 连接时,使用Azure中自定义应用程序的 client_idclient_secret/users('<user-id>')/drive 返回正确的驱动器ID,但 /users('<user-id>')/drive/root/children 只返回一个空的子列表 .

是否有一些我遗漏的许可,我们需要设置某个地方?

这是Graph API当前状态的限制吗?

这是 client_credentials 授权类型的限制吗?

2 回答

  • 3

    这是Graph API当前状态的限制 - 不存在仅限应用程序的权限范围,与客户端凭据流一起使用,这将允许应用程序访问任何用户的驱动器/文件 . Files . *范围只能用作委派权限 - 请参阅https://graph.microsoft.io/en-us/docs/authorization/permission_scopes .

  • 2

    今天(使用应用程序权限)可以使用新的Microsoft App Dev Portal并遵循指令here . 或者,如果您在Azure门户中创建(注册)了您的应用程序,则必须使用X509证书而不是共享密钥(客户端密钥) . 至少对我来说,最有用的资源是:

    这里有一些python代码(第二种情况),它为用户生成一个访问的URL,因此她可以授权你的应用程序,并请求访问令牌:

    import calendar
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    from datetime import datetime, timedelta
    import jwt
    from jwt.exceptions import InvalidTokenError
    from oauthlib.common import generate_nonce, generate_token
    from oauthlib.oauth2 import BackendApplicationClient
    import requests
    from requests_oauthlib import OAuth2Session
    import uuid
    
    def to_unix(obj):
        if isinstance(obj, datetime):
            if obj.utcoffset() is not None:
                obj = obj - obj.utcoffset()
        millis = calendar.timegm(obj.timetuple()) + obj.microsecond / 1e6
        return millis
    
    def validate_id_token(token):
        '''Validates the given id token.
    
        Args:
            token (str): An encoded ID token.
        Returns:
            The decoded token which is a dict.
        '''
        # Extract kid from token header
        try:
            header = jwt.get_unverified_header(token)
        except InvalidTokenError as e:
            raise Exception('No valid id token provided.')
            })
        else:
            kid = header.get('kid', '')
    
        if not kid:
            raise Exception("Unable to find 'kid' claim in token header.")
    
        # Fetch public key info
        url = 'https://login.microsoftonline.com/common/discovery/keys'
        try:
            response = requests.get(url)
        except RequestException as e:
            raise Exception('Failed to get public key info: %s' % e)
        else:
            if not response.ok:
                raise Exception('Failed to get public key info: %s' %
                                  response.content)
            else:
                public_keys = response.json().get('keys', [])
    
        # Find public key, used to sign id token
        public_key = None
        for k in public_keys:
            if kid == k['kid']:
                public_key = k['x5c'][0]
                break
        if not public_key:
            raise Exception("Unable to find public key for given kid '%s'" % kid)
    
        # Verify id token signature
        # NOTE: The x5c value is actually a X509 certificate. The public key
        # could also be generated from the n (modulos) and e (exponent) values.
        # But that's more involved.
        cert_string = ('-----BEGIN CERTIFICATE-----\n' +
                       public_key +
                       '\n-----END CERTIFICATE-----').encode('UTF-8')
        try:
            cert = x509.load_pem_x509_certificate(
                cert_string, default_backend())
        except ValueError as e:
            raise Exception('Failed to load certificate for token signature'
                              'verification: %s' % e)
        else:
            public_key = cert.public_key()
    
        try:
            decoded = jwt.decode(token, public_key, audience=self.key)
        except InvalidTokenError as e:
            raise Exception('Failed to decode token: %s' % e)
        else:
            return decoded
    
    def generate_client_assertion(tenant_id, fp_hash, private_key, private_key_passphrase):
        """Generate a client assertion (jwt token).
    
        This token is required to fetch an oauth app-only access token.
    
        Args:
            fp_hash (str): Base64 encoded SHA1 has of certificate fingerprint
            private_key (str): Private key used to sign the jwt token
            tenant_id (str): The tenant to which this token is bound.
        Returns:
            On success a tuple of the client assertion and the token type
            indicator.
        """
        valid_from = str(int(ts.to_unix(datetime.utcnow() - timedelta(0, 1))))
        expires_at = str(int(ts.to_unix(datetime.utcnow() + timedelta(7))))
        jwt_payload = {
            'aud': ('https://login.microsoftonline.com/%s/'
                    'oauth2/token' % tenant_id),
            'iss': client_id,
            'sub': client_id,
            'jti': str(uuid.uuid1()),
            'nbf': valid_from,
            'exp': expires_at,
        }
        headers = {
            'x5t': fp_hash
        }
    
        if not private_key_passphrase:
            secret = private_key
        else:
            try:
                secret = serialization.load_pem_private_key(
                    str(private_key), password=str(private_key_passphrase),
                    backend=default_backend())
            except Exception as e:
                raise Exception('Failed to load private key: %s' % e)
    
        try:
            client_assertion = jwt.encode(jwt_payload, secret,
                                          algorithm='RS256', headers=headers)
        except ValueError as e:
            raise Exception('Failed to encode jwt_payload: %s' % e)
    
        client_assertion_type = ('urn:ietf:params:oauth:client-assertion-type:'
                                 'jwt-bearer')
    
        return client_assertion, client_assertion_type
    
    def generate_auth_url(client_id, redirect_uri):
        nonce = generate_nonce()
        state = generate_token()
        query_params = {
            'client_id': client_id,
            'nonce': nonce,
            'prompt': 'admin_consent',
            'redirect_uri': redirect_uri,
            'response_mode': 'fragment',
            'response_type': 'id_token',
            'scope': 'openid',
            'state': state
        }
        tenant = 'common'
        auth_url = ('https://login.microsoftonline.com/%s'
                    '/oauth2/authorize?%s') % (tenant, urllib.urlencode(query_params))
    
        return nonce, auth_url
    
    def get_access_token(client_id, id_token, nonce=None):
        '''id_token is returned w/ the url after the user authorized the app'''
        decoded_id_token = validate_id_token(id_token)
    
        # Compare the nonce values, to mitigate token replay attacks
        if not nonce:
            raise Exception("No nonce value provided.")
        elif nonce != decoded_id_token['nonce']:
            raise Exception("Nonce values don't match!")
    
        # Prepare the JWT token for fetching an access token
        tenant_id = decoded_id_token['tid']
        client_assertion, client_assertion_type = generate_client_assertion(tenant_id)
    
        # Fetch the access token
        client = BackendApplicationClient(self.key)
        oauth = OAuth2Session(client=client)
        resource = 'https://graph.microsoft.com/'
        url = https://login.microsoftonline.com/common/oauth2/token
        query_params = {
            'client_id': client_id,
            'client_assertion': client_assertion,
            'client_assertion_type': client_assertion_type,
            'resource': resource
        }
        try:
            fetch_token_response = oauth.fetch_token(url, **query_params)
        except Exception as e:
            raise Exception('Failed to obtain access token: %s' % e)
        else:
            return fetch_token_response
    

相关问题