#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importbase64importcollectionsimportjsonfromcopyimportdeepcopytry:importgraphlearnfromgraphlearnimportGraphasGLGraphexceptImportError:GLGraph=objectfromgraphscope.framework.errorsimportInvalidArgumentErrorfromgraphscope.framework.errorsimportcheck_argumentfromgraphscope.protoimportgraph_def_pb2
[docs]def__init__(self,graph,handle,config=None,object_id=None):"""Initialize a graph for the learning engine using a handle."""self.graph=graphself.graphscope_session=self.graph._sessionhandle=self.decode_arg(handle)config=self.decode_arg(config)ifconfigisNone:if"config"inhandle:config=handle["config"]ifconfigisNone:config=collections.defaultdict(lambda:dict)ifobject_idisNone:object_id=handle["vineyard_id"]self.handle=handleself.config=configself.object_id=object_idself.closed=Falsesuper(Graph,self).__init__()self.vineyard(handle,config["nodes"],config["edges"])forlabel,node_attrinconfig["node_attributes"].items():n_ints,n_floats,n_strings=(node_attr[1][0],node_attr[1][1],node_attr[1][2],)self.node_attributes(label,node_attr[0],n_ints,n_floats,n_strings)forlabel,edge_attrinconfig["edge_attributes"].items():n_ints,n_floats,n_strings=(edge_attr[1][0],edge_attr[1][1],edge_attr[1][2],)self.edge_attributes(label,edge_attr[0],n_ints,n_floats,n_strings)formask,node_label,nsplit,split_rangeinconfig["gen_labels"]:self.node_view(node_label,mask,nsplit=nsplit,split_range=split_range)self.init_vineyard(worker_index=0,worker_count=1)
defdecode_arg(self,arg):ifargisNoneorisinstance(arg,dict):returnargreturnjson.loads(base64.b64decode(arg.encode("utf-8",errors="ignore")).decode("utf-8",errors="ignore"))defclose(self):ifself.closedorself.graphscope_session.closed:returnself.closed=Truesuper(Graph,self).close()# close client first# close server instanceself.graphscope_session._close_learning_instance(self)@staticmethod# noqa: C901defpreprocess_args(handle,nodes,edges,gen_labels):# noqa: C901handle=json.loads(base64.b64decode(handle).decode("utf-8",errors="ignore"))node_names=[]node_attributes={}edge_names=[]edge_attributes={}defselected_property_schema(attr_types,attributes):prop_counts=collections.defaultdict(lambda:0)forattrinattributes:prop_counts[attr_types[attr]]+=1return[prop_counts["i"],prop_counts["f"],prop_counts["s"]]ifnodesisnotNone:fornodeinnodes:ifisinstance(node,str):ifnodeinnode_names:raiseInvalidArgumentError("Duplicate node type: %s"%node)node_names.append(node)elifisinstance(node,tuple):ifnode[0]innode_names:raiseInvalidArgumentError("Duplicate node type: %s"%node[0])node_names.append(node[0])attr_types=handle["node_attribute_types"][node[0]]attr_schema=selected_property_schema(attr_types,node[1])node_attributes[node[0]]=(node[1],attr_schema)else:raiseInvalidArgumentError("The node parameter is in bad format: %s"%node)else:fornodeinhandle["node_schema"]:node_names.append(node.split(":")[0])ifedgesisnotNone:foredgeinedges:ifisinstance(edge,str):iflen(node_names)>1:raiseInvalidArgumentError("Cannot inference edge type when multiple kinds of nodes exists")edge_names.append((node_names[0],edge,node_names[0]))elif(isinstance(edge,tuple)andisinstance(edge[0],str)andisinstance(edge[1],str)):edge_names.append(edge)elif(isinstance(edge,tuple)andisinstance(edge[0],str)andisinstance(edge[1],list)):iflen(node_names)>1:raiseInvalidArgumentError("Cannot inference edge type when multiple kinds of nodes exists")edge_names.append((node_names[0],edge[0],node_names[0]))attr_types=handle["edge_attribute_types"][edge[0]]attr_schema=selected_property_schema(attr_types,edge[1])edge_attributes[edge[0]]=(edge[1],attr_schema)elif(isinstance(edge,tuple)andisinstance(edge[0],(list,tuple))andisinstance(edge[1],list)):edge_names.append(edge[0])attr_types=handle["edge_attribute_types"][edge[0][1]]attr_schema=selected_property_schema(attr_types,edge[1])edge_attributes[edge[0][1]]=(edge[1],attr_schema)else:raiseInvalidArgumentError("The edge parameter is in bad format: %s"%edge)split_groups=collections.defaultdict(list)ifgen_labelsisnotNone:forlabelingen_labels:iflen(label)==3orlen(label)==4:split_groups[label[1]].append(label)else:raiseInvalidArgumentError("Bad gen_labels arguments: %s"%gen_labels)split_labels=[]forlabel,groupinsplit_groups.items():lengths=[len(split)forsplitingroup]check_argument(lengths[:-1]==lengths[1:],"Invalid gen labels: %s"%group)iflen(group[0])==3:length_sum=sum(split[2]forsplitingroup)s,ss=0,[]forsplitingroup:ss.append((s,s+split[2]))s+=split[2]group=[(split[0],split[1],length_sum,s)forsplit,sinzip(group,ss)]forsplitingroup:split_labels.append(split)return{"nodes":node_namesifnode_nameselseNone,"edges":edge_namesifedge_nameselseNone,"node_attributes":node_attributes,"edge_attributes":edge_attributes,"gen_labels":split_labels,}
[docs]defget_handle(self,worker_count=1):"""Return a base64-encoded handle for distributed training."""handle_copy=self.handle.copy()handle_copy["config"]=self.confighandle_copy["client_count"]=worker_countreturnbase64.b64encode(json.dumps(handle_copy).encode("utf-8",errors="ignore")).decode("utf-8",errors="ignore")
[docs]defV(self,t,feed=None,node_from=graphlearn.pywrap.NodeFrom.NODE,mask=graphlearn.python.utils.Mask.NONE,):"""Entry of GSL, starting from VERTEX. Args: t (string): The type of node which is the entry of query or the type of edge when node is from edge source or dst. feed (None| numpy.ndarray | types.GeneratorType | `Nodes`): When `feed` is not `None`, the `type` should be a node type, which means query the attributes of the specified node ids. - None: Default. Sample nodes with the following .shuffle and .batch API. numpy.ndarray: Any shape of ids. Get nodes of the given ids and node_type. - types.Generator: A generator of numpy.ndarray. Get nodes of generated ids and given node_type. - `Nodes`: A `Nodes` object. node_from (NodeFrom): Default is `NodeFrom.NODE`, which means sample or or iterate node from node. `NodeFrom.EDGE_SRC` means sample or iterate node from source node of edge, and `NodeFrom.EDGE_DST` means sample or iterate node from destination node of edge. If node is from edge, the `type` must be an edge type. mask (NONE | TRAIN | TEST | VAL): The given node set is indexed by both the raw node type and mask value. The default mask value is NONE, which plays nothing on the index. """returnsuper(Graph,self).V(t,feed,node_from,mask)
[docs]defE(self,edge_type,feed=None,reverse=False):"""Entry of GSL, starting from EDGE. Args: edge_type (string): The type of edge which is the entry of query. feed (None| (np.ndarray, np.ndarray) | types.GeneratorType | `Edges`): - None: Default. Sample edges with the following .shuffle and .batch API. (np.ndarray, np.ndarray): src_ids, dst_ids. Get edges of the given (src_ids, dst_ids) and given edge_type. src_ids and dst_ids must be the same shape, dtype is int. - types.Generator: A generator of (numpy.ndarray, numpy.ndarray). Get edges of generated (src_ids, dst_ids) and given edge_type. - `Edges`: An `Edges` object. """returnsuper(Graph,self).E(edge_type,feed,reverse)
defget_gl_handle(schema,vineyard_id,engine_hosts,engine_config,fragments=None):"""Dump a handler for GraphLearn for interaction. Fields in :code:`schema` are: + the name of node type or edge type + whether the graph is weighted graph + whether the graph is labeled graph + the number of int attributes + the number of float attributes + the number of string attributes An example of the graph handle: .. code:: python { "server": "127.0.0.1:8888,127.0.0.1:8889", "client_count": 1, "vineyard_socket": "/var/run/vineyard.sock", "vineyard_id": 13278328736, "fragments": [13278328736, ...], # fragment ids "node_schema": [ "user:false:false:10:0:0", "item:true:false:0:0:5" ], "edge_schema": [ "user:click:item:true:false:0:0:0", "user:buy:item:true:true:0:0:0", "item:similar:item:false:false:10:0:0" ], "node_attribute_types": { "person": { "age": "i", "name": "s", }, }, "edge_attribute_types": { "knows": { "weight": "f", }, }, } The handle can be decoded using: .. code:: python base64.b64decode(handle.encode('ascii', errors="ignore")).decode('ascii', errors="ignore") Note that the ports are selected from a range :code:`(8000, 9000)`. Args: schema: The graph schema. vineyard_id: The object id of graph stored in vineyard. engine_hosts: A list of hosts for GraphScope engine workers. engine_config: dict of config for GAE engine. Returns: str: Base64 encoded handle """defgroup_property_types(props):weighted,labeled,i,f,s,attr_types="false","false",0,0,0,{}forpropinprops:ifprop.typein[graph_def_pb2.STRING]:s+=1attr_types[prop.name]="s"elifprop.typein(graph_def_pb2.FLOAT,graph_def_pb2.DOUBLE):f+=1attr_types[prop.name]="f"else:i+=1attr_types[prop.name]="i"ifprop.name=="weight":weighted="true"elifprop.name=="label":labeled="true"returnweighted,labeled,i,f,s,attr_typesnode_schema,node_attribute_types=[],dict()forlabelinschema.vertex_labels:weighted,labeled,i,f,s,attr_types=group_property_types(schema.get_vertex_properties(label))node_schema.append("{}:{}:{}:{}:{}:{}".format(label,weighted,labeled,i,f,s))node_attribute_types[label]=attr_typesedge_schema,edge_attribute_types=[],dict()forlabelinschema.edge_labels:weighted,labeled,i,f,s,attr_types=group_property_types(schema.get_edge_properties(label))forrelinschema.get_relationships(label):edge_schema.append("{}:{}:{}:{}:{}:{}:{}:{}".format(rel[0],label,rel[1],weighted,labeled,i,f,s))edge_attribute_types[label]=attr_typesengine_hosts=",".join(engine_hosts)handle={"hosts":engine_hosts,"client_count":1,"vineyard_id":vineyard_id,"vineyard_socket":engine_config["vineyard_socket"],"node_schema":node_schema,"edge_schema":edge_schema,"node_attribute_types":node_attribute_types,"edge_attribute_types":edge_attribute_types,"fragments":fragments,}handle_json_string=json.dumps(handle)returnbase64.b64encode(handle_json_string.encode("utf-8",errors="ignore")).decode("utf-8",errors="ignore")