Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include: package:flutter_lints/flutter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Cache {

String _constructCacheIdentifier() {
final rawPrefix =
'${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport.transportOptions.host}';
'${_settings.storage}-${dataConnect.app.options.projectId}-${dataConnect.app.name}-${dataConnect.connectorConfig.serviceId}-${dataConnect.connectorConfig.connector}-${dataConnect.connectorConfig.location}-${dataConnect.transport?.transportOptions.host}';
Comment thread
aashishpatil-g marked this conversation as resolved.
final prefixSha = convertToSha256(rawPrefix);
final rawSuffix = dataConnect.auth?.currentUser?.uid ?? 'anon';
final suffixSha = convertToSha256(rawSuffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,11 @@ class EntityNode {
srcListMap.forEach((key, value) {
List<EntityNode> enodeList = [];
List<dynamic> jsonList = value as List<dynamic>;
jsonList.forEach((jsonObj) {
for (var jsonObj in jsonList) {
Map<String, dynamic> jmap = jsonObj as Map<String, dynamic>;
EntityNode en = EntityNode.fromJson(jmap, cacheProvider);
enodeList.add(en);
});
}
objLists?[key] = enodeList;
});
}
Expand Down Expand Up @@ -367,9 +367,9 @@ class EntityNode {
if (nestedObjectLists != null) {
nestedObjectLists!.forEach((key, edoList) {
List<Map<String, dynamic>> jsonList = [];
edoList.forEach((edo) {
for (var edo in edoList) {
jsonList.add(edo.toJson(mode: mode));
});
}
jsonData[key] = jsonList;
});
}
Expand All @@ -396,9 +396,9 @@ class EntityNode {
Map<String, dynamic> nestedObjectListsJson = {};
nestedObjectLists!.forEach((key, edoList) {
List<Map<String, dynamic>> jsonList = [];
edoList.forEach((edo) {
for (var edo in edoList) {
jsonList.add(edo.toJson(mode: mode));
});
}
nestedObjectListsJson[key] = jsonList;
});
jsonData[listsKey] = nestedObjectListsJson;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ abstract class DataConnectTransport {

/// Invokes corresponding query endpoint.
Future<ServerResponse> invokeQuery<Data, Variables>(
String operationId,
String queryName,
Deserializer<Data> deserializer,
Serializer<Variables> serializer,
Expand All @@ -111,6 +112,17 @@ abstract class DataConnectTransport {

/// Invokes corresponding mutation endpoint.
Future<ServerResponse> invokeMutation<Data, Variables>(
String operationId,
String queryName,
Deserializer<Data> deserializer,
Serializer<Variables> serializer,
Variables? vars,
String? token,
);

/// Invokes corresponding stream query endpoint.
Stream<ServerResponse> invokeStreamQuery<Data, Variables>(
String operationId,
String queryName,
Deserializer<Data> deserializer,
Serializer<Variables> serializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ class DataConnectError extends FirebaseException {

/// Error thrown when an operation is partially successful.
class DataConnectOperationError<T> extends DataConnectError {
DataConnectOperationError(
DataConnectErrorCode code, String message, this.response)
: super(code, message);
DataConnectOperationError(super.code, super.message, this.response);
final DataConnectOperationFailureResponse<T> response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,51 @@ abstract class OperationRef<Data, Variables> {
this.serializer,
this.variables,
);
Variables? variables;
String operationName;
DataConnectTransport _transport;
Deserializer<Data> deserializer;
Serializer<Variables> serializer;
final Variables? variables;
final String operationName;
final DataConnectTransport _transport;
Comment thread
aashishpatil-g marked this conversation as resolved.
final Deserializer<Data> deserializer;
final Serializer<Variables> serializer;
String? _lastToken;

FirebaseDataConnect dataConnect;
final FirebaseDataConnect dataConnect;

late final String operationId =
createOperationId(operationName, variables, serializer);

Future<OperationResult<Data, Variables>> execute(
{QueryFetchPolicy fetchPolicy = QueryFetchPolicy.preferCache});
static dynamic _sortKeys(dynamic value) {
if (value is Map) {
final sortedMap = <String, dynamic>{};
final sortedKeys = value.keys.toList()..sort();
for (final key in sortedKeys) {
sortedMap[key.toString()] = _sortKeys(value[key]);
}
return sortedMap;
} else if (value is List) {
return value.map(_sortKeys).toList();
}
return value;
}

static String createOperationId<Variables>(String operationName,
Variables? vars, Serializer<Variables>? serializer) {
if (vars != null && serializer != null) {
try {
final decoded = jsonDecode(serializer(vars));
final sortedStr = jsonEncode(_sortKeys(decoded));
Comment thread
aashishpatil-g marked this conversation as resolved.
final hashVars = convertToSha256(sortedStr);
return '$operationName::$hashVars';
} catch (_) {
final rawVars = serializer(vars);
final hashVars = convertToSha256(rawVars);
return '$operationName::$hashVars';
}
} else {
return operationName;
}
}

Future<OperationResult<Data, Variables>> execute();

Future<bool> _shouldRetry() async {
String? newToken;
Expand Down Expand Up @@ -152,7 +186,7 @@ class QueryManager {
try {
await queryRef.execute(fetchPolicy: QueryFetchPolicy.cacheOnly);
} catch (e) {
log('Error executing impacted query $e');
log('Error executing impacted query $queryId $e');
}
}
}
Expand All @@ -175,24 +209,20 @@ class QueryManager {
StreamController<QueryResult<Data, Variables>> addQuery<Data, Variables>(
QueryRef<Data, Variables> ref,
) {
final queryId = ref._queryId;
final queryId = ref.operationId;
trackedQueries[queryId] = ref;

final streamController =
StreamController<QueryResult<Data, Variables>>.broadcast();
StreamController<QueryResult<Data, Variables>>.broadcast(
onCancel: () {
trackedQueries.remove(queryId);
ref._onAllSubscribersCancelled();
Comment thread
aashishpatil-g marked this conversation as resolved.
},
);

return streamController;
}

static String createQueryId<QueryVariables>(String queryName,
QueryVariables? vars, Serializer<QueryVariables> varSerializer) {
if (vars != null) {
return '$queryName::${varSerializer(vars)}';
} else {
return queryName;
}
}

void dispose() {
_impactedQueriesSubscription?.cancel();
}
Expand All @@ -216,7 +246,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
variables,
);

QueryManager _queryManager;
final QueryManager _queryManager;

@override
Future<QueryResult<Data, Variables>> execute(
Expand All @@ -239,9 +269,6 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
}
}

String get _queryId =>
QueryManager.createQueryId(operationName, variables, serializer);

Future<QueryResult<Data, Variables>> _executeFromCache(
QueryFetchPolicy fetchPolicy) async {
if (dataConnect.cacheManager == null) {
Expand All @@ -251,7 +278,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
final cacheManager = dataConnect.cacheManager!;
bool allowStale = fetchPolicy ==
QueryFetchPolicy.cacheOnly; //if its cache only, we always allow stale
final cachedData = await cacheManager.resultTree(_queryId, allowStale);
final cachedData = await cacheManager.resultTree(operationId, allowStale);

if (cachedData != null) {
try {
Expand Down Expand Up @@ -280,6 +307,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
try {
ServerResponse serverResponse =
await _transport.invokeQuery<Data, Variables>(
operationId,
operationName,
deserializer,
serializer,
Expand All @@ -288,7 +316,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
);

if (dataConnect.cacheManager != null) {
await dataConnect.cacheManager!.update(_queryId, serverResponse);
await dataConnect.cacheManager!.update(operationId, serverResponse);
}
Data typedData = _convertBodyJsonToData(serverResponse.data);

Expand All @@ -307,22 +335,109 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
}

StreamController<QueryResult<Data, Variables>>? _streamController;
Stream<ServerResponse>? _serverStream;
StreamSubscription<ServerResponse>? _serverStreamSubscription;

void _onAllSubscribersCancelled() {
_serverStreamSubscription?.cancel();
_serverStreamSubscription = null;
_serverStream = null;
log("QueryRef $operationId: All subscribers cancelled. Unsubscribed from server stream.");
}

Stream<QueryResult<Data, Variables>> subscribe() {
_streamController ??= _queryManager.addQuery(this);

execute();
final stream =
_streamController!.stream.cast<QueryResult<Data, Variables>>();
Comment thread
aashishpatil-g marked this conversation as resolved.

// Return the stream to the caller, then execute fetches
Future.microtask(() async {
if (dataConnect.cacheManager != null) {
try {
await _executeFromCache(QueryFetchPolicy.cacheOnly);
} catch (err) {
log("Error fetching from cache during subscribe $err");
// Ignore cache misses here, server stream will provide latest data
}
}

// Initiate Web Socket stream only if not already streaming
if (_serverStream == null) {
_streamFromServer();
}
});

return stream;
}

void _streamFromServer() async {
bool shouldRetry = await _shouldRetry();
log("QueryRef $operationId _streamFromServer loop started.");
try {
_serverStream = _transport.invokeStreamQuery<Data, Variables>(
operationId,
operationName,
deserializer,
serializer,
variables,
_lastToken,
);

return _streamController!.stream.cast<QueryResult<Data, Variables>>();
_serverStreamSubscription = _serverStream!.listen(
Comment thread
aashishpatil-g marked this conversation as resolved.
(serverResponse) async {
log("QueryRef $operationId _streamFromServer loop received snapshot.");
if (dataConnect.cacheManager != null) {
try {
await dataConnect.cacheManager!
.update(operationId, serverResponse);
} catch (e) {
log("QueryRef $operationId _streamFromServer loop cache update failed: $e");
}
}
Data typedData = _convertBodyJsonToData(serverResponse.data);

QueryResult<Data, Variables> res =
QueryResult(dataConnect, typedData, DataSource.server, this);
publishResultToStream(res);
},
onError: (e) {
_serverStreamSubscription?.cancel();
_serverStreamSubscription = null;
_serverStream = null;

if (shouldRetry &&
e is DataConnectError &&
e.code == DataConnectErrorCode.unauthorized.toString()) {
_streamFromServer();
} else {
publishErrorToStream(e);
}
},
onDone: () {
_serverStreamSubscription?.cancel();
_serverStreamSubscription = null;
_serverStream = null;
},
);
} catch (e) {
_serverStreamSubscription?.cancel();
_serverStreamSubscription = null;
_serverStream = null;
log("QueryRef $operationId _streamFromServer loop Unknown loop failure: $e");
publishErrorToStream(e);
}
}

void publishResultToStream(QueryResult<Data, Variables> result) {
if (_streamController != null) {
_streamController?.add(result);
} else {
log("QueryRef $operationId _streamFromServer loop _streamController is null");
}
}

void publishErrorToStream(Error err) {
void publishErrorToStream(Object err) {
if (_streamController != null) {
_streamController?.addError(err);
}
Expand All @@ -331,24 +446,16 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {

class MutationRef<Data, Variables> extends OperationRef<Data, Variables> {
MutationRef(
FirebaseDataConnect dataConnect,
String operationName,
DataConnectTransport transport,
Deserializer<Data> deserializer,
Serializer<Variables> serializer,
Variables? variables,
) : super(
dataConnect,
operationName,
transport,
deserializer,
serializer,
variables,
);
super.dataConnect,
super.operationName,
super.transport,
super.deserializer,
super.serializer,
super.variables,
);

@override
Future<OperationResult<Data, Variables>> execute(
{QueryFetchPolicy fetchPolicy = QueryFetchPolicy.serverOnly}) async {
Future<OperationResult<Data, Variables>> execute() async {
bool shouldRetry = await _shouldRetry();
try {
// Logic below is duplicated due to the fact that `executeOperation` returns
Expand All @@ -370,6 +477,7 @@ class MutationRef<Data, Variables> extends OperationRef<Data, Variables> {
) async {
ServerResponse serverResponse =
await _transport.invokeMutation<Data, Variables>(
operationId,
operationName,
deserializer,
serializer,
Expand Down
Loading
Loading